# -*- coding: utf-8 -*-
"""
preprocess
~~~~~~~~~~
this module contains all the common processing of OCT rawdata before reconstruct it
into images
"""
# below imports enables python 2 and 3 compatible codes
# requires python-future, install by `pip install future`
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from builtins import *
import numpy as np
# from scipy import signal
# from . import decors
[docs]def calc_dc(data, axis=-1, method='mean'):
if method == 'mean':
dc = data.mean(0)
elif method == 'median':
dc = np.median(data[10::10, :], 0)
return dc
[docs]def gen_calib(data, win=[], order=4, conf_file_path=[]):
'''
function gen_calib_coeff(data, win = [], file_path = [])
generate the calibration coeff data as non-linear array from 0 to 1
data: input data
win: window to filter the data for calcuation
file_path: file to save the calib_coeff
'''
data = data if (len(data.shape) == 1) else data.mean(0)
# plt.plot(data)
if len(win):
data = fft_filt_pass(data, win)
calib_data = np.unwrap(np.angle(hilbert(data)))
calib_data = (calib_data - calib_data[0]) / (calib_data[-1] - calib_data[0])
linear_x = np.linspace(0, 1, calib_data.size)
calib_coeff = np.polyfit(linear_x[50:-50], calib_data[50:-50], order)
if len(conf_file_path):
write_cfg_file(conf_file_path, 'SYS', 'Calibration', calib_coeff) # update the number in the oct.ini file
return calib_coeff
[docs]def gen_disp(data, win=[], conf_file_path=[]):
'''
function gen_disp_coeff(data, win = [], file_path = [])
generate the disp coeff (3rd and 2nd order) by mirror signal
data: input data
win: the window to filter data
update_cfg: update pypeline.ini
'''
data = data if (len(data.shape) == 1) else data.mean(0)
# plt.plot(data)
if len(win):
data = fft_filt_pass(data, win)
phase = np.unwrap(np.angle(hilbert(data)))
linear_x = np.linspace(0, 1, phase.size)
disp_coeff = np.polyfit(linear_x[50:-50], phase[50:-50], 3)[0:2]
# disp_data = np.polyval(np.append(disp_coeff, [-disp_coeff.sum, 0]), linear_x)
if len(conf_file_path):
write_cfg_file(conf_file_path, 'SYS', 'Dispersion', disp_coeff) # update the number in the oct.ini file
# if len(data_file_path):
# save_binary_data(disp_data, data_file_path)
return disp_coeff
[docs]def gen_disp2(data, win=[0, 1], conf_file_path=[]):
'''
function gen_disp_coeff2(data, win = [], file_path = [])
generate the disp coeff (3rd and 2nd order) iteratively
data: input data
win: the window to filter data
update_cfg: update pypeline.ini
'''
def img_entropy(disp_coeff, data, win):
nondisp_data = corr_dispersion(data, disp_coeff)
Image = abs(fftpack.fft(nondisp_data))
Image = Image[:, win[0]:win[1]]
Image /= Image.sum()
return -(Image * np.log(Image)).sum()
# select window to minimize the computation
if all(0 <= foo <= 1 for foo in win):
win = [data.shape[-1] * win[0] / 2, data.shape[-1] * win[1] / 2]
fmin(img_entropy, disp_coeff, data, win)
if len(conf_file_path):
write_cfg_file(conf_file_path, 'SYS', 'Dispersion', disp_coeff) # update the number in the oct.ini file
return disp_coeff
[docs]def corr_dispersion(data, disp_coeff):
'''
compensate dispersion using 3rd and 2nd order dispersionc oeff
'''
linear_x = np.linspace(0, 1, data.shape[-1]).astype(np.float32)
disp_coeff = np.polyval(np.append(disp_coeff, [-disp_coeff.sum(), 0]), linear_x)
nondisp_data = data * np.exp(-1j * disp_coeff) # the value of data changes, since data is immutable
return nondisp_data
[docs]def corr_phase_jit(data):
pass
[docs]def gen_fr_data(data, r2i_ratio, bandwidth=0.4):
'''
function gen_fr_data(data, r2i_ratio, bandwidth = 0.4)
generate the complex data for full range OCT reconstruction
data: input data
r2i_ratio: the real to imaginary ratio, tan(phi) where phi is phase modulation depth
bandwidth: the bandwidth to separate the real and imaginary part.
'''
data = data.T
alt_filter = np.array([1, -1] * data.shape[-1] / 2)
real_data = fft_filt_pass(data, [0, bandwidth * data.shape[-1] / 2])
imag_data = fft_filt_pass(data * alt_filter, [0, bandwidth * data.shape[-1] / 2])
fr_data = real_data + 1j * imag_data;
return fr_data
# def pre_proc_oct_data(data, calib_coeff=[], disp_coeff=[], reshape_win=[]):
# if len(calib_coeff):
# data = calibrate_data(data, calib_coeff, )
# if len(reshape_win):
# data *= eval('np.' + reshape_win + '('+str(data.shape[-1])+')')
# if len(disp_coeff):
# data = corr_dispersion(data, disp_coeff)
# return data
[docs]def recon_magnitude_img(data, full_range=0, fft_num=2048, scale='log'):
'''
function recon_magnitude_img(data, real_input = True, depth_dim = 1024, scale = 'log', disp_range = [0.5, 0])
reconstruct the amplitude image:
data: input data
full_range: if ture, use full range
depth_dim: pixel num in depth
scale: log or linear
disp_range: display range for contrast/dynamic range, relative value if within [0,1],
'''
img = fftpack.fft(data, fft_num)
# if np.any(np.imag(data)):
# img = fftpack.fft(data,fft_num)
# else:
# img = fftpack.rfft(data,fft_num)
if not full_range:
img = img[:, :fft_num / 2]
if scale == 'log':
img = 20 * np.log10(abs(img))
elif scale == 'linear':
img = abs(img)
return img
[docs]def recon_phase_img(data, thresh=0, full_range=0, fft_num=2048, filter_size=3):
'''
function recon_magnitude_img(data, real_input = True, depth_dim = 1024, scale = 'log', disp_range = [0.5, 0])
reconstruct the amplitude image:
data: input data
full_range: if ture, use full range
depth_dim: pixel num in depth
scale: log or linear
disp_range: display range for contrast/dynamic range, relative value if within [0,1],
'''
img = fftpack.fft(data, fft_num)
# if np.any(np.imag(data)):
# img = fftpack.fft(data,fft_num)
# else:
# img = fftpack.rfft(data,fft_num)
if not full_range:
img = img[:, :fft_num / 2]
img = img[:-1] * np.conjugate(img[1:])
if filter_size > 1:
img = ndimage.uniform_filter(img.real, filter_size) + 1j * ndimage.uniform_filter(img.imag, filter_size)
img = np.angle(np.append(img, [img[-1]], 0))
# img +=np.pi
# if all(0<=foo<=1 for foo in disp_range):
# mag_min = np.percentile(img, disp_range[0]*100.0)
# mag_max = np.percentile(img, 100.0-disp_range[1]*100.0)
# else:
# mag_min, mag_max = disp_range
# mag_min, mag_max = disp_range
# img = (img - mag_min) /(mag_max-mag_min)
# if img_format is'uint8':
# img = img * (img >= 0) *(img <= 1) + (img>1)
# img = np.uint8(img*255)
return img
[docs]def get_display_range(image, disp_range=np.array([50, 100])):
disp_range = [max(min(disp_range), 0), min(max(disp_range), 100)]
mag_min = np.percentile(image, disp_range[0])
mag_max = np.percentile(image, disp_range[1])
return np.array([mag_min, mag_max])
[docs]def rescale_range(image, disp_range, img_format='uint8'):
# re-scale
image = (image - disp_range[0]) / (disp_range[1] - disp_range[0])
image[image < 0] = 0
image[image > 1] = 1
if img_format is 'uint8':
image = np.uint8(image * 255)
else:
image = np.uint16(image * 65535)
return image
[docs]def get_psf(data, disp_coeff, calib_coeff):
data = calibrate_data(data, calib_coeff, 'cubic')
data = corr_dispersion(data, disp_coeff)
psf = recon_magnitude_img(data, 'log').mean(0)
# if any(win>1):
# psf = psf[win[0]:win[1]]
# else:
# psf = psf[win[0]*psf.size:win[1]*psf.size]
return psf
# test code
if __name__ == '__main__':
import matplotlib.pyplot as plt
# ref = read_binary_data('T:\\ZYtemp\\SoftwareDev\\PyOCT\\conf\\BT_SDOCT.ref')
calib_coeff = read_binary_data('T:\\ZYtemp\\SoftwareDev\\PyOCT\\conf\\BT_SDOCT.clb')
disp_coeff = read_binary_data('T:\\ZYtemp\\SoftwareDev\\PyOCT\\conf\\BT_SDOCT.dsp')
for i in range(1, 12, 1):
data = read_binary_data('T:\ZYtemp\\2012-05-30 benchtop sdoct\\signal roll off\\OCTImage' + "%.2d" % i + '.raw',
data_dim=1024, input_format='uint16')
ref = gen_ref(data, 'median')
data -= ref
# ref = read_binary_data(ref_source, data_dim, input_format = 'uint16').mean(0)
calied_data = calibrate_data(data, calib_coeff)
calied_data = calied_data * np.exp(-1j * disp_coeff) # remove disperison
mag_img = recon_magnitude_img(calied_data)
plt.figure(1)
plt.imshow(mag_img)
plt.ginput()
[docs]def adjust_phase():
pass