import numpy as np
from dipy.align.imwarp import SymmetricDiffeomorphicRegistration
from dipy.align.metrics import CCMetric, EMMetric, SSDMetric
from dipy.align.imaffine import MutualInformationMetric, AffineRegistration, transform_centers_of_mass
from dipy.align.transforms import (
TranslationTransform2D, RigidTransform2D, AffineTransform2D,
TranslationTransform3D, RigidTransform3D, AffineTransform3D)
from dipy.align import center_of_mass
from dipy.align.vector_fields import (
warp_3d_nn,
warp_3d,
warp_2d_nn,
warp_2d,
invert_vector_field_fixed_point_3d,
invert_vector_field_fixed_point_2d,
)
from dipy.segment.mask import median_otsu as median_otsu_np
import dbdicom.extensions.vreg as vreg
[docs]
def align_center_of_mass_3d(moving, fixed):
# Get arrays for fixed and moving series
array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Coregister fixed and moving slice-by-slice
identity = np.eye(4)
array_moving, _ = center_of_mass(array_moving, array_fixed, static_affine=identity, moving_affine=identity)
# Create new series
moved = moving.new_sibling(suffix='aligned')
moved.set_array(array_moving, headers_moving, pixels_first=True)
return moved
[docs]
def coregister_translation_3d(moving, fixed):
# Get arrays for fixed and moving series
array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Set up coregistration
metric = MutualInformationMetric(nbins=32, sampling_proportion=None,)
affreg = AffineRegistration(
metric = metric,
level_iters = [10000, 1000, 100],
sigmas = [3.0, 1.0, 0.0],
factors = [4, 2, 1],
)
transform = TranslationTransform3D()
params0 = None
# Perform coregistration
moving.message('Performing coregistration..')
mapping = affreg.optimize(array_fixed, array_moving, transform, params0)
coregistered = mapping.transform(array_moving, 'linear')
# Save as DICOM
coreg = moving.new_sibling(suffix='translated')
coreg.set_array(coregistered, headers_moving, pixels_first=True)
return coreg
[docs]
def coregister_rigid_3d(moving, fixed):
# Get arrays for fixed and moving series
array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Setup coregistration
metric = MutualInformationMetric(nbins=32, sampling_proportion=None)
affreg = AffineRegistration(
metric = metric,
level_iters = [10000, 1000, 100],
sigmas = [3.0, 1.0, 0.0],
factors = [4, 2, 1],
)
transform = RigidTransform3D()
params0 = None
# Perform coregistration
moving.message('Performing coregistration..')
mapping = affreg.optimize(array_fixed, array_moving, transform, params0)
coregistered = mapping.transform(array_moving, 'linear')
# Save as DICOM
coreg = moving.new_sibling(suffix='rigid transform')
coreg.set_array(coregistered, headers_moving, pixels_first=True)
return coreg
[docs]
def coregister_affine_3d(moving, fixed):
# Get arrays for fixed and moving series
array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Setup coregistration
metric = MutualInformationMetric(nbins=32, sampling_proportion=None)
affreg = AffineRegistration(
metric = metric,
level_iters = [10000, 1000, 100],
sigmas = [3.0, 1.0, 0.0],
factors = [4, 2, 1],
)
transform = AffineTransform3D()
params0 = None
# Perform coregistration
moving.message('Performing coregistration..')
mapping = affreg.optimize(array_fixed, array_moving, transform, params0)
coregistered = mapping.transform(array_moving, 'linear')
# Save as DICOM
coreg = moving.new_sibling(suffix='rigid transform')
coreg.set_array(coregistered, headers_moving, pixels_first=True)
return coreg
[docs]
def align_center_of_mass_2d(moving, fixed):
# Get arrays for fixed and moving series
zaxis = 'SliceLocation'
array_moving = moving.pixel_values(zaxis)
array_fixed = vreg.pixel_values(fixed, zaxis, on=moving)
# Coregister fixed and moving slice-by-slice
id = np.eye(3)
for z in range(array_moving.shape[2]):
moving.progress(z+1, array_moving.shape[2], 'Performing coregistration..')
c_of_mass = transform_centers_of_mass(array_fixed[:,:,z], id, array_moving[:,:,z], id)
array_moving[:,:,z] = c_of_mass.transform(array_moving[:,:,z])
# Save as DICOM (new API)
coreg = moving.copy(SeriesDescription=moving.SeriesDescription + ' [coreg]')
coreg.set_pixel_values(array_moving, coords=moving.coords(zaxis))
return coreg
[docs]
def coregister_translation_2d(moving, fixed):
# # Get arrays for fixed and moving series (old API)
# array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
# array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Get arrays for fixed and moving series (new API)
#zaxis = ('SliceLocation',)
zaxis = 'SliceLocation'
array_moving = moving.pixel_values(zaxis)
array_fixed = vreg.pixel_values(fixed, zaxis, on=moving)
# Set up coregistration
metric = MutualInformationMetric(nbins=32, sampling_proportion=None)
affreg = AffineRegistration(
metric = metric,
level_iters = [10000, 1000, 100],
sigmas = [3.0, 1.0, 0.0],
factors = [4, 2, 1],
)
transform = TranslationTransform2D()
# Coregister fixed and moving slice-by-slice
for z in range(array_moving.shape[2]):
moving.progress(z+1, array_moving.shape[2], 'Performing coregistration..')
# Coregister slice
params0 = None
mapping = affreg.optimize(array_fixed[:,:,z], array_moving[:,:,z], transform, params0)
array_moving[:,:,z] = mapping.transform(array_moving[:,:,z], 'linear')
# # Save as DICOM (old API)
# coreg = moving.new_sibling(suffix= 'registered')
# coreg.set_array(array_moving, headers_moving, pixels_first=True)
# Save as DICOM (new API)
coreg = moving.copy(SeriesDescription=moving.SeriesDescription + ' [coreg]')
coreg.set_pixel_values(array_moving, coords=moving.coords(zaxis))
return coreg
[docs]
def coregister_rigid_2d(moving, fixed):
# Get arrays for fixed and moving series
array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Set up coregistration
metric = MutualInformationMetric(nbins=32, sampling_proportion=None)
affreg = AffineRegistration(
metric = metric,
level_iters = [10000, 1000, 100],
sigmas = [3.0, 1.0, 0.0],
factors = [4, 2, 1],
)
transform = RigidTransform2D()
# Coregister fixed and moving slice-by-slice
for z in range(array_moving.shape[2]):
moving.status.progress(z+1, array_moving.shape[2], 'Performing coregistration..')
# Coregister slice
params0 = None
mapping = affreg.optimize(array_fixed[:,:,z], array_moving[:,:,z], transform, params0)
array_moving[:,:,z] = mapping.transform(array_moving[:,:,z], 'linear')
# Save as DICOM
coreg = moving.new_sibling(suffix= 'registered')
coreg.set_array(array_moving, headers_moving, pixels_first=True)
return coreg
[docs]
def coregister_affine_2d(moving, fixed):
# Get arrays for fixed and moving series
array_moving, headers_moving = moving.array(sortby='SliceLocation', pixels_first=True, first_volume=True)
array_fixed = vreg.array(fixed, on=moving, sortby='SliceLocation', pixels_first=True, first_volume=True)
# Set up coregistration
metric = MutualInformationMetric(nbins=32, sampling_proportion=None)
affreg = AffineRegistration(
metric = metric,
level_iters = [10000, 1000, 100],
sigmas = [3.0, 1.0, 0.0],
factors = [4, 2, 1],
)
transform = AffineTransform2D()
# Coregister fixed and moving slice-by-slice
for z in range(array_moving.shape[2]):
moving.status.progress(z+1, array_moving.shape[2], 'Performing coregistration..')
# Coregister slice
params0 = None
mapping = affreg.optimize(array_fixed[:,:,z], array_moving[:,:,z], transform, params0)
array_moving[:,:,z] =mapping.transform(array_moving[:,:,z], 'linear')
# Save as DICOM
coreg = moving.new_sibling(suffix= 'registered')
coreg.set_array(array_moving, headers_moving, pixels_first=True)
return coreg
[docs]
def warp(image, deformation_field, **kwargs):
# Get arrays
array, headers = image.array('SliceLocation', pixels_first=True, first_volume=True)
array_deform = vreg.array(deformation_field, on=image, sortby='SliceLocation', pixels_first=True)
# Perform warping
array = _warp_array(array, array_deform, image.status, **kwargs)
# Create new series
warped = image.new_sibling(suffix='warped')
warped.set_array(array, headers, pixels_first=True)
return warped
### ARRAY functions
def _invert_deformation_field_array(array, status, max_iter=10, tolerance=0.1):
status.message('Inverting deformation field..')
dim = array.shape[-1]
d_world2grid = np.eye(dim+1)
spacing = np.ones(dim)
if dim==3:
return invert_vector_field_fixed_point_3d(array, d_world2grid, spacing, max_iter, tolerance)
elif dim==2:
nslices = array.shape[2]
for z in range(nslices):
status.progress(z+1, nslices, 'Inverting deformation field..')
array[:,:,z] = invert_vector_field_fixed_point_2d(array[:,:,z], d_world2grid, spacing, max_iter, tolerance)
return array
else:
msg = 'This series is not a deformation field.'
msg += 'A deformation field must have either 2 or 3 components.'
raise ValueError(msg)
def _warp_array(array, deform, status, interpolate=True):
status.message('Warping array..')
dim = deform.shape[-1]
if dim==3:
if interpolate:
return warp_3d(array, deform)
else:
return warp_3d_nn(array, deform)
elif dim==2:
nslices = deform.shape[2]
for z in range(nslices):
status.progress(z+1, nslices, 'Warping array..')
if interpolate:
array[:,:,z] = warp_2d(array[:,:,z], deform[:,:,z,:])
else:
array[:,:,z] = warp_2d_nn(array[:,:,z], deform[:,:,z,:])
return array
else:
msg = 'This series is not a deformation field.'
msg += 'A deformation field must have either 2 or 3 components.'
raise ValueError(msg)
def _coregister_arrays(fixed, moving, transformation='Symmetric Diffeomorphic', metric="Cross-Correlation"):
"""
Coregister two arrays and return coregistered + deformation field
"""
dim = fixed.ndim
# 3D registration does not seem to work with smaller slabs
# Exclude this case
if dim == 3:
if fixed.shape[-1] < 6:
msg = 'The 3D volume does not have enough slices for 3D registration. \n'
msg += 'Try 2D registration instead.'
raise ValueError(msg)
# Define the metric
if metric == "Cross-Correlation":
sigma_diff = 3.0 # Gaussian Kernel
radius = 4 # Window for local CC
metric = CCMetric(dim, sigma_diff, radius)
elif metric == 'Expectation-Maximization':
metric = EMMetric(dim, smooth=1.0)
elif metric == 'Sum of Squared Differences':
metric = SSDMetric(dim, smooth=4.0)
else:
msg = 'The metric ' + metric + ' is currently not implemented.'
raise ValueError(msg)
# Define the deformation model
if transformation == 'Symmetric Diffeomorphic':
level_iters = [200, 100, 50, 25]
sdr = SymmetricDiffeomorphicRegistration(metric, level_iters, inv_iter=50)
else:
msg = 'The transform ' + transformation + ' is currently not implemented.'
raise ValueError(msg)
# Perform the optimization, return a DiffeomorphicMap object
mapping = sdr.optimize(fixed, moving)
# Get forward deformation field
deformation_field = mapping.get_forward_field()
# Warp the moving image
warped_moving = mapping.transform(moving, 'linear')
return warped_moving, deformation_field