Commit 2cebb1a0 authored by mibaumgartner's avatar mibaumgartner
Browse files

preprocessing

parent ede95851
from nndet.preprocessing.crop import *
from nndet.preprocessing.preprocessor import *
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import shutil
import pickle
import numpy as np
from loguru import logger
from multiprocessing.pool import Pool
from pathlib import Path
from typing import List, Tuple, Sequence
from scipy.ndimage import binary_fill_holes
from nndet.io.paths import get_case_id_from_path
from nndet.io.load import load_case_from_list
def create_nonzero_mask(data: np.ndarray) -> np.ndarray:
"""
Create a nonzero mask from data
Args:
data (np.ndarray): input data [C, X, Y, Z]
Returns:
np.ndarray: binary mask on nonzero regions [X, Y, Z]
"""
assert len(data.shape) == 4 or len(data.shape) == 3, \
"data must have shape (C, X, Y, Z) or shape (C, X, Y)"
nonzero_mask = np.max(data != 0, axis=0)
nonzero_mask = binary_fill_holes(nonzero_mask.astype(bool))
return nonzero_mask
def get_bbox_from_mask(mask: np.ndarray, outside_value: int = 0) -> List[Tuple]:
"""
Create a bounding box from a mask
Args:
mask (np.ndarray): mask [X, Y, Z]
outside_value (int): background value
Returns:
np.ndarray: [(dim0_min, dim0_max), (dim1_min, dim1_max), (dim2_min, dim2_max))
"""
mask_voxel_coords = (mask != outside_value).nonzero()
min0idx = int(np.min(mask_voxel_coords[0]))
max0idx = int(np.max(mask_voxel_coords[0])) + 1
min1idx = int(np.min(mask_voxel_coords[1]))
max1idx = int(np.max(mask_voxel_coords[1])) + 1
idx = [(min0idx, max0idx), (min1idx, max1idx)]
if len(mask_voxel_coords) == 3:
min2idx = int(np.min(mask_voxel_coords[2]))
max2idx = int(np.max(mask_voxel_coords[2])) + 1
idx.append((min2idx, max2idx))
return idx
def crop_to_bbox_no_channels(image, bbox: Sequence[Sequence[int]]):
"""
Crops image to bounding box (in spatial dimensions)
Args:
image (arraylike): 2d or 3d array
bbox (Sequence[Sequence[int]]): bounding box coordinated in an interleaved fashion
(e.g. (x1, x2), (y1, y2), (z1, z2))
Returns:
arraylike: cropped array
"""
resizer = tuple([slice(_dim[0], _dim[1]) for _dim in bbox])
return image[resizer]
def crop_to_bbox(data: np.ndarray, bbox: Sequence[Sequence[int]]):
"""
Crops image to bounding box (performed per channel)
Args:
data (np.ndarray): 3d or 4d array [C, X, Y, (Z)]
bbox (Sequence[Sequence[int]]): bounding box coordinated in an interleaved fashion
(e.g. (x1, x2), (y1, y2), (z1, z2))
Returns:
np.ndarray: cropped array
"""
cropped_data = []
for c in range(data.shape[0]):
cropped = crop_to_bbox_no_channels(data[c], bbox)
cropped_data.append(cropped)
data = np.stack(cropped_data)
return data
def crop_to_nonzero(data, seg=None, nonzero_label=-1):
"""
Crop data to nonzero region of data
Args:
data (np.ndarray): data to crop
seg (np.ndarray): segmentation
nonzero_label (int): nonzero label is written into segmentation map
where only background was found
Returns:
np.ndarray: cropped data
np.ndarray: cropped and filled (with nonzero_label) segmentation
List[Tuple[int]]: bounding box of nonzero region
"""
nonzero_mask = create_nonzero_mask(data)
bbox = get_bbox_from_mask(nonzero_mask, 0)
data = crop_to_bbox(data, bbox)
seg = crop_to_bbox(seg, bbox)
nonzero_mask = crop_to_bbox_no_channels(nonzero_mask, bbox)[None]
if seg is not None:
seg[(seg == 0) & (nonzero_mask == 0)] = nonzero_label
else:
nonzero_mask = nonzero_mask.astype(np.int32)
nonzero_mask[nonzero_mask == 0] = nonzero_label
nonzero_mask[nonzero_mask > 0] = 0
seg = nonzero_mask
return data, seg, bbox
class ImageCropper(object):
def __init__(self, num_processes: int, output_dir: Path = None):
"""
Helper class to crop images to non zero region (must hold for all modalities)
In the case of BRaTS and ISLES data this results in a significant reduction in image size
Args:
num_processes (int): number of processes to use for cropping
output_dir (Path): path to output directory
"""
self.output_dir = Path(output_dir) if output_dir is not None else None
self.num_processes = num_processes
self.maybe_init_output_dir()
def maybe_init_output_dir(self):
if self.output_dir is not None:
(self.output_dir / "imagesTr").mkdir(parents=True, exist_ok=True)
(self.output_dir / "labelsTr").mkdir(parents=True, exist_ok=True)
def run_cropping(self,
case_files: List[List[Path]],
overwrite_existing: bool = False,
output_dir: Path = None,
copy_gt_data: bool = True,
):
"""
Crops data to non zero region and saves them into output_dir
Optional: also copies ground truth data
Args:
case_files (List[List[Path]]): list with all cases in the structure [Case[Case Files]];
where case files are sorted to corresponding modalities (last file is the label file)
overwrite_existing (bool): overwrite existing crops
output_dir (Path): path to output directory
copy_gt_data (bool): copies ground truth data to output directory
"""
if output_dir is not None:
self.output_dir = Path(output_dir)
self.maybe_init_output_dir()
if copy_gt_data:
self.copy_gt_data(case_files)
list_of_args = []
for _i, case in enumerate(case_files):
case_id = get_case_id_from_path(str(case[0]))
assert not case_id.endswith(".gz") and not case_id.endswith(".nii")
list_of_args.append((case, case_id, overwrite_existing))
if self.num_processes == 0:
for a in list_of_args:
self.process_data(*a)
else:
with Pool(processes=self.num_processes) as p:
p.starmap(self.process_data, list_of_args)
def copy_gt_data(self, case_files: List[List[Path]]):
"""
Copy ground truth to output directory
"""
output_dir_gt = self.output_dir / "labelsTr"
if output_dir_gt.is_dir():
shutil.rmtree(output_dir_gt)
source_dir_gt = case_files[0][-1].parent
shutil.copytree(source_dir_gt, output_dir_gt)
def process_data(self, case: List[Path], case_id: str, overwrite_existing: bool = False):
"""
Extract nonzero region from all cases and create a single array where segmentation
is located in the last channel and save as npz (saved in key `data`)
Additional properties per case are saved inside a pkl file
Args:
case (List[Path]): list of paths to data and label (label is always at the last position
and data is sorted after modalities)
case_id (str): case identifier
overwrite_existing (bool): overwrite existing data
"""
try:
logger.info(f"Processing case {case_id}")
npz_exists = (self.output_dir / "imagesTr" / f"{case_id}.npz").is_file()
pkl_exists = (self.output_dir / "imagesTr" / f"{case_id}.pkl").is_file()
if not (npz_exists and pkl_exists) or overwrite_existing:
data, seg, properties = self.load_crop_from_list_of_files(case[:-1], case[-1])
all_data = np.vstack((data, seg))
np.savez_compressed(self.output_dir / "imagesTr" / f"{case_id}.npz", data=all_data)
with open(self.output_dir / "imagesTr" / f"{case_id}.pkl", 'wb') as f:
pickle.dump(properties, f)
else:
logger.warning(f"Case {case_id} already exists and overwrite is deactivated")
except Exception as e:
logger.info(f"exception in: {case_id}: {e}")
raise e
@staticmethod
def load_crop_from_list_of_files(data_files: List[Path], seg_file: Path = None):
"""
Load and crop form list of files
Args:
data_files (List[Path]): paths to data files
seg_file (Path): pth to segmentation
Returns:
np.ndarray: cropped data
np.ndarray: cropped (and filled segmentation: -1 where no forground exists) label
dict: additional properties
`original_size_of_raw_data`: original shape of data (correctly reordered)
`original_spacing`: original spacing (correctly reordered)
`list_of_data_files`: paths of data files
`seg_file`: path to label file
`itk_origin`: origin in world coordinates
`itk_spacing`: spacing in world coordinates
`itk_direction`: direction in world coordinates
`crop_bbox`: List[Tuple[int]] cropped bounding box
`classes`: present classes in segmentation
`size_after_cropping`: size after cropping
"""
data, seg, properties = load_case_from_list(data_files, seg_file)
return ImageCropper.crop(data, properties, seg)
@staticmethod
def crop(data: np.ndarray, properties: dict, seg: np.ndarray = None):
"""
Crop data and segmentation to non zero region
Args:
data (np.ndarray): data to crop [C, X, Y, Z]
properties (dict): additional properties
seg (np.ndarray): segmentation [1, X, Y, Z]
Returns:
data (np.ndarray): data to crop [C, X, Y, Z]
seg (np.ndarray): segmentation [1, X, Y, Z]
properties (dict): newly added properties
`crop_bbox`: List[Tuple[int]] cropped bounding box
`classes`: present classes in segmentation
`size_after_cropping`: size after cropping
"""
shape_before = data.shape
data, seg, bbox = crop_to_nonzero(data, seg, nonzero_label=-1)
shape_after = data.shape
logger.info(f"Shape before crop {shape_before}; after crop {shape_after}; "
f"spacing {np.array(properties['original_spacing'])}")
properties["crop_bbox"] = bbox
properties['classes'] = np.unique(seg)
seg[seg < -1] = 0
properties["size_after_cropping"] = data[0].shape
return data, seg, properties
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import numpy as np
from os import PathLike
from loguru import logger
from abc import ABC, abstractmethod
from multiprocessing import Pool
from pathlib import Path
from typing import Dict, Sequence, List, Tuple, Union
from itertools import repeat
from nndet.io.transforms.instances import instances_to_boxes_np
from nndet.io.paths import get_case_ids_from_dir, get_case_id_from_path
from nndet.io.load import load_case_cropped, save_pickle
from nndet.preprocessing.resampling import resample_patient
from nndet.io.crop import ImageCropper
class AbstractPreprocessor(ABC):
DATA_ID = "abstractdata"
def __init__(self, **kwargs):
"""
Interface for preprocessor
"""
for key, item in kwargs.items():
setattr(self, key, item)
@abstractmethod
def run(self,
target_spacings: Sequence[Sequence[float]],
identifiers: Sequence[str],
cropped_data_dir: Path,
preprocessed_output_dir: Path,
num_processes: int,
force_separate_z=None,
):
"""
Run preprocessing
Args:
target_spacings: target spacing for each case
identifiers: identifier strings used to name the directory
cropped_data_dir: source directory
preprocessed_output_dir: target directory
num_processes: number of processes used for preprocessing
force_separate_z: force independent resampling of z direction
"""
raise NotImplementedError
@abstractmethod
def run_test(self,
data_files,
target_spacing,
target_dir: PathLike,
) -> None:
"""
Preprocess and save test data
Args:
data_files: path to data files
target_spacing: spacing to resample
target_dir: directory to save data to
"""
raise NotImplementedError
@abstractmethod
def preprocess_test_case(self,
data_files,
target_spacing,
seg_file=None,
force_separate_z=None,
) -> Tuple[np.ndarray, np.ndarray, dict]:
"""
Preprocess a test file
Args:
data_files: path to data files
target_spacing: spacing to resample
seg_file: optional segmentation file
force_separate_z: separate resampling in z direction
Returns:
np.ndarray: preprocessed data [C, dims]
np.ndarray: preprocessed segmentation [1, dims]
dict: updated properties
"""
raise NotImplementedError
class GenericPreprocessor:
DATA_ID = "Generic"
def __init__(self,
norm_scheme_per_modality: Dict[int, str],
use_mask_for_norm: Dict[int, bool],
transpose_forward: Sequence[int],
intensity_properties: Dict[int, Dict] = None,
resample_anisotropy_threshold: float = 3.,
):
"""
Preprocess data
Args:
norm_scheme_per_modality: integer index represents modality and string is
either `CT`, `CT2`, 'BValRaw'. Other modalities are treated the with zeo mean and unit std.
use_mask_for_norm: only foreground values should be used for normalization
(defined for each modality)
transpose_forward: transpose input data
intensity_properties: Intensity properties of foreground over the dataset.
Evaluated statistics: `median`; `mean`; `std`; `min`; `max`;
`percentile_99_5`; `percentile_00_5`
`local_props`: contains a dict (with case ids) where statistics
where computed per case
Overwrites:
:self:`data_id`: unique identifier of GenericPreprocessor
"""
self.resample_anisotropy_threshold = resample_anisotropy_threshold
self.intensity_properties = intensity_properties
self.transpose_forward = list(transpose_forward)
self.use_mask_for_norm = use_mask_for_norm
self.norm_scheme_per_modality = norm_scheme_per_modality
self.norm_schemes = {
"CT": self.normalize_ct,
"CT2": self.normalize_ct2,
"CT3": self.normalize_ct,
"raw": self.no_norm,
}
def run(self,
target_spacings: Sequence[Sequence[float]],
identifiers: Sequence[str],
cropped_data_dir: Path,
preprocessed_output_dir: Path,
num_processes: Union[int, Sequence[int]],
overwrite: bool = False,
):
"""
Run preprocessing
Args:
target_spacings: target spacing for each case
identifiers: identifier strings used to name the directory
cropped_data_dir: source directory
preprocessed_output_dir: target directory
num_processes: number of processes used for preprocessing
overwrite: overwrite existing data
"""
case_ids, num_processes = self.initialize_run(
target_spacings=target_spacings,
cropped_data_dir=cropped_data_dir,
preprocessed_output_dir=preprocessed_output_dir,
num_processes=num_processes,
)
for identifier, spacing, nump in zip(identifiers, target_spacings, num_processes):
logger.info(f"+++ Preprocessing {identifier} +++")
output_dir_stage = preprocessed_output_dir / identifier / "imagesTr"
output_dir_stage.mkdir(parents=True, exist_ok=True)
if not overwrite:
case_ids_npz_present = get_case_ids_from_dir(
output_dir_stage, remove_modality=False, pattern="*.npz")
case_ids_pkl_present = get_case_ids_from_dir(
output_dir_stage, remove_modality=False, pattern="*.pkl")
case_ids_present = list(set.intersection(set(case_ids_npz_present), set(case_ids_pkl_present)))
logger.info(f"Skipping case ids which are already present {case_ids_present}")
_case_ids = list(filter(lambda x: x not in case_ids_present, case_ids))
else:
_case_ids = case_ids
logger.info(f"Running preprocessing on {_case_ids}")
if nump == 0:
for _cid in _case_ids:
self.run_process(spacing, _cid, output_dir_stage, cropped_data_dir)
else:
with Pool(processes=nump) as p:
p.starmap(self.run_process,
zip(repeat(spacing),
_case_ids,
repeat(output_dir_stage),
repeat(cropped_data_dir),
))
def initialize_run(self,
target_spacings: Sequence[Sequence[float]],
cropped_data_dir: Path,
preprocessed_output_dir: Path,
num_processes: int,
) -> Tuple[List[str], List[int]]:
"""
Prepare preprocessing run
Args:
target_spacings: target spacings
cropped_data_dir: source dir
preprocessed_output_dir: target dir
num_processes: number of processes
Returns:
List[str]: case ids from source dir
List[int]: number of processes for each stage
"""
logger.info("Initializing preprocessing")
logger.info(f"Folder with cropped data: {cropped_data_dir}")
logger.info(f"Folder for preprocessed data:{preprocessed_output_dir}")
for key, modality in self.norm_scheme_per_modality.items():
if modality in self.norm_schemes:
logger.info(f"Found normalization scheme for {modality}")
else:
logger.info(f"No normalization scheme for {modality} using zero mean unit std.")
preprocessed_output_dir.mkdir(parents=True, exist_ok=True)
num_stages = len(target_spacings)
if not isinstance(num_processes, Sequence):
num_processes = [num_processes] * num_stages
assert len(num_processes) == num_stages
case_ids = get_case_ids_from_dir(
cropped_data_dir, pattern="*.npz", remove_modality=False)
return case_ids, num_processes
def run_process(self,
target_spacing: Sequence[float],
case_id: str,
output_dir_stage: Path,
cropped_data_dir: Path,
) -> None:
"""
Process a single case
Result is saved into :param:`output_dir_stage`
Args:
target_spacing: target spacing for processed case
case_id: case identifier
output_dir_stage: path to output directory
cropped_data_dir: path to source directory
"""
data, seg, properties = load_case_cropped(cropped_data_dir, case_id)
seg = seg[None]
data, seg, properties = self.apply_process(
data, target_spacing, properties, seg)
properties["use_nonzero_mask_for_norm"] = self.use_mask_for_norm
data = data.astype(np.float32)
seg = seg.astype(np.int32)
candidates = self.compute_candidates(
data=data,
seg=seg,
properties=properties,
)
logger.info(f"Saving: {case_id} into {output_dir_stage}.")
np.savez_compressed(str(output_dir_stage / f"{case_id}.npz"),
data=data,
seg=seg,
)
save_pickle(candidates, output_dir_stage / f"{case_id}_boxes.pkl")
save_pickle(properties, output_dir_stage / f"{case_id}.pkl")
def apply_process(self,
data: np.ndarray,
target_spacing: Sequence[float],
properties: dict,
seg: np.ndarray = None,
) -> Tuple[np.ndarray, np.ndarray, dict]:
"""
Applies all preprocessing steps to data and segmentation
Args:
data: input data
target_spacing: target spacing (not! transposed)
properties: dict with properties for preprocessing
seg: input segmentation
Returns:
np.ndarray: preprocessed data
np.ndarray: preprocessed segmentation
dict: updated properties
"""
data, seg, original_spacing, target_spacing, before = self.transpose(
data, seg, properties["original_spacing"], target_spacing)
data, seg, after = self.resample(
data, seg, original_spacing, target_spacing)
# logger.info(f"\nBefore: {before} \nAfter: {after}\n")
if seg is not None:
seg[seg < -1] = 0
properties["size_after_resampling"] = data[0].shape
properties["spacing_after_resampling"] = after["spacing"]
data = self.normalize(data, seg)
return data, seg, properties
def transpose(self,
data: np.ndarray,
seg: np.ndarray,
original_spacing: Sequence[float],
target_spacing: Sequence[float]) -> Tuple[
np.ndarray, np.ndarray, np.ndarray, np.ndarray, dict]:
"""
Transpose data, segmentation and spacings
Args:
data: input data
seg: input segmentation
original_spacing: original spacing
target_spacing: target spacing
Returns:
np.ndarray: transposed data
np.ndarray: transposed segmentation
np.ndarray: transposed original spacing
np.ndarray: transposed target spacing
dict: values for debugging
"""
data = data.transpose((0, *[i + 1 for i in self.transpose_forward]))
seg = seg.transpose((0, *[i + 1 for i in self.transpose_forward]))
_original_spacing = np.array(original_spacing)[self.transpose_forward]
_target_spacing = np.array(target_spacing)[self.transpose_forward]
before = {
"spacing": original_spacing,
"transpose": self.transpose_forward,
"spacing_transposed": _original_spacing,
"shape (transposed": data.shape,
}
return data, seg, _original_spacing, _target_spacing, before
def resample(self,
data: np.ndarray,
seg: np.ndarray,
original_spacing: Sequence[float],
target_spacing: Sequence[float],
) -> Tuple[np.ndarray, np.ndarray, dict]:
"""
Resample data and segmentation to new spacing
Args:
data: input data
seg: input segmentation
original_spacing: original spacing
target_spacing: target spacing
Returns:
np.ndarray: resampled data
np.ndarray: resampled segmentation
dict: properties after resampling
`spacing`: spacing after resampling
`shape (resampled)`: shape after resampling
"""
original_spacing = np.array(original_spacing)
target_spacing = np.array(target_spacing)
data[np.isnan(data)] = 0
data, seg = resample_patient(data,
seg,
original_spacing,
target_spacing,
order_data=3,
order_seg=0,
force_separate_z=False,
order_z_data=9999,
order_z_seg=9999,
separate_z_anisotropy_threshold=
self.resample_anisotropy_threshold,
)
after = {
"spacing": target_spacing,
"shape (resampled)": data.shape,
}
return data, seg, after
def normalize(self, data: np.ndarray, seg: np.ndarray) -> np.ndarray:
"""
Normalize data with correct scheme
Args:
data: input data
seg: input data
Returns:
np.ndarray: normalized data
"""
assert len(self.norm_scheme_per_modality) == len(data), \
f"norm_scheme_per_modality must have as many entries as data has modalities"
assert len(self.use_mask_for_norm) == len(data), \
f"use_mask_for_norm must have as many entries as data has modalities"
for c in range(len(data)):
scheme = self.norm_scheme_per_modality[c]
scheme_fn = self.norm_schemes.get(scheme, self.normalize_other)
data = scheme_fn(data, seg, c, self.use_mask_for_norm)
return data
def normalize_ct(self,
data: np.ndarray,
seg: np.ndarray,
modality: int,
use_nonzero_mask: Dict[int, bool],
) -> np.ndarray:
"""
clip to lb and ub from train data foreground and use foreground mn and sd from training data
(This uses the foreground mean and std!)
Args:
data: data to normalize [C, dims]
seg: segmentation [C, dims]
modality: current modality
use_nonzero_mask: use non zero region for normalization and set all values
outside to zero [C]
Returns:
np.ndarray: normalized data (only modality channel was changes)
"""
assert self.intensity_properties is not None, \
"ERROR: if there is a CT then we need intensity properties"
mean_intensity = self.intensity_properties[modality]['mean']
std_intensity = self.intensity_properties[modality]['std']
lower_bound = self.intensity_properties[modality]['percentile_00_5']
upper_bound = self.intensity_properties[modality]['percentile_99_5']
data[modality] = np.clip(data[modality], lower_bound, upper_bound)
data[modality] = (data[modality] - mean_intensity) / std_intensity
if use_nonzero_mask[modality]:
data[modality][seg[-1] < 0] = 0
return data
def normalize_ct2(self, data: np.ndarray, seg: np.ndarray, modality: int,
use_nonzero_mask: Dict[int, bool]) -> np.ndarray:
"""
clip to lb and ub from train data foreground, use mn and sd from each case for normalization
(This uses mean and std from whole case!)
Args:
data: data to normalize [C, dims]
seg: segmentation [C, dims]
modality: current modality
use_nonzero_mask: use non zero region for normalization [C]
Returns:
np.ndarray: normalized data (only modality channel was changes)
"""
assert self.intensity_properties is not None, \
"ERROR: if there is a CT then we need intensity properties"
lower_bound = self.intensity_properties[modality]['percentile_00_5']
upper_bound = self.intensity_properties[modality]['percentile_99_5']
mask = (data[modality] > lower_bound) & (data[modality] < upper_bound)
data[modality] = np.clip(data[modality], lower_bound, upper_bound)
mn = data[modality][mask].mean()
sd = data[modality][mask].std()
data[modality] = (data[modality] - mn) / sd
if use_nonzero_mask[modality]:
data[modality][seg[-1] < 0] = 0
return data
def normalize_ct3(self,
data: np.ndarray,
seg: np.ndarray,
modality: int,
use_nonzero_mask: Dict[int, bool],
) -> np.ndarray:
"""
clip to lb and ub from train data foreground and use foreground mn
and sd from training data (This uses the foreground mean and std!)
Use this if channels are overloaded with spatial information (
in case of CT)
Args:
data: data to normalize [C, dims]
seg: segmentation [C, dims]
modality: current modality
use_nonzero_mask: use non zero region for normalization and set all values
outside to zero [C]
Returns:
np.ndarray: normalized data (only modality channel was changes)
"""
assert self.intensity_properties is not None, \
"ERROR: if there is a CT then we need intensity properties"
mean_intensity = np.mean([k["mean"] for k in self.intensity_properties.values()])
# the intensity values are not independent but we do not have enough information here
std_intensity = np.sqrt(np.sum([k["std"] ** 2 for k in self.intensity_properties.values()]))
lower_bound = np.mean([k["percentile_00_5"] for k in self.intensity_properties.values()])
upper_bound = np.mean([k["percentile_99_5"] for k in self.intensity_properties.values()])
data[modality] = np.clip(data[modality], lower_bound, upper_bound)
data[modality] = (data[modality] - mean_intensity) / std_intensity
if use_nonzero_mask[modality]:
data[modality][seg[-1] < 0] = 0
return data
def normalize_other(self, data: np.ndarray, seg: np.ndarray, modality: int,
use_nonzero_mask: Dict[int, bool]) -> np.ndarray:
"""
Zero mean and unit std
Args:
data: data to normalize [C, dims]
seg: segmentation [C, dims]
modality: current modality
use_nonzero_mask: use non zero region for normalization [C]
Returns:
np.ndarray: normalized data (only modality channel was changes)
"""
if use_nonzero_mask[modality]:
mask = seg[-1] >= 0
else:
mask = np.ones(seg.shape[1:], dtype=bool)
data[modality][mask] = (data[modality][mask] - data[modality][mask].mean()) / \
(data[modality][mask].std() + 1e-8)
data[modality][mask == 0] = 0
return data
def no_norm(self, data: np.ndarray, seg: np.ndarray, modality: int,
use_nonzero_mask: Dict[int, bool]) -> np.ndarray:
"""
No normalization only masking
Args:
data: data to normalize [C, dims]
seg: segmentation [C, dims]
modality: current modality
use_nonzero_mask: use non zero region for normalization [C]
Returns:
np.ndarray: masked data (only modality channel was changed)
"""
if use_nonzero_mask[modality]:
mask = seg[-1] >= 0
else:
mask = np.ones(seg.shape[1:], dtype=bool)
data[modality][mask == 0] = 0
return data
@staticmethod
def compute_candidates(
data: np.ndarray,
seg: np.ndarray,
properties: dict,
) -> dict:
"""
Precompute candidate sampling positions for training
This method computes the bounding boxes of each present
instance which can be used to oversample foreground effectively.
Args:
data: data after resampling
seg: instance segmentation after resampling
"""
dim = data.ndim - 1
boxes = instances_to_boxes_np(seg[0], dim=dim)[0]
instances = np.unique(seg)
instances = instances[instances > 0].astype(np.int32) # [N]
instances = instances.tolist()
instances_props = properties["instances"]
labels = [int(instances_props[str(i)]) for i in instances]
assert (len(boxes) == len(instances)) or ((boxes.size == 0) and (len(instances) == 0))
assert len(labels) == len(instances)
return {
"boxes": boxes,
"instances": instances,
"labels": labels,
}
def run_test(self,
data_files,
target_spacing,
target_dir: PathLike,
) -> None:
"""
Preprocess and save test data
Args:
data_files: path to data files
target_spacing: spacing to resample
target_dir: directory to save data to
"""
target_dir = Path(target_dir)
data, seg, properties = self.preprocess_test_case(
data_files=data_files,
target_spacing=target_spacing,
)
case_id = get_case_id_from_path(str(data_files[0]), remove_modality=True)
np.savez_compressed(str(target_dir / f"{case_id}.npz"), data=data)
save_pickle(properties, target_dir / f"{case_id}")
def preprocess_test_case(self,
data_files,
target_spacing,
seg_file=None,
) -> Tuple[np.ndarray, np.ndarray, dict]:
"""
Preprocess a test file
Args:
data_files: path to data files
target_spacing: spacing to resample
seg_file: optional segmentation file
Returns:
np.ndarray: preprocessed data
np.ndarray: preprocessed segmentation
dict: updated properties
"""
data, seg, properties = ImageCropper.load_crop_from_list_of_files(
data_files, seg_file)
data, seg, properties = self.apply_process(
data=data,
target_spacing=target_spacing,
properties=properties,
seg=seg,
)
return data.astype(np.float32), seg.astype(np.int32), properties
"""
Copyright 2020 Division of Medical Image Computing, German Cancer Research Center (DKFZ), Heidelberg, Germany
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import numpy as np
import nnunet.preprocessing.preprocessing as nn_preprocessing
def resize_segmentation(segmentation, new_shape, order=3, cval=0):
"""
Resizes a segmentation map. Supports all orders (see skimage documentation). Will transform segmentation map to one
hot encoding which is resized and transformed back to a segmentation map.
This prevents interpolation artifacts ([0, 0, 2] -> [0, 1, 2])
"""
return nn_preprocessing.resize_segmentation(
segmentation=segmentation, new_shape=new_shape, order=order, cval=cval)
def get_do_separate_z(spacing, anisotropy_threshold: float = 3):
return nn_preprocessing.get_do_separate_z(spacing=spacing, anisotropy_threshold=anisotropy_threshold)
def get_lowres_axis(new_spacing):
return nn_preprocessing.get_lowres_axis(new_spacing=new_spacing)
def resample_patient(data,
seg,
original_spacing,
target_spacing,
order_data=3,
order_seg=0,
force_separate_z=False,
cval_data=0,
cval_seg=-1,
order_z_data=0,
order_z_seg=0,
separate_z_anisotropy_threshold: float = 3,
):
return nn_preprocessing.resample_patient(data=data, seg=seg, original_spacing=original_spacing,
target_spacing=target_spacing, order_data=order_data,
order_seg=order_seg, force_separate_z=force_separate_z,
cval_data=cval_data, cval_seg=cval_seg, order_z_data=order_z_data,
order_z_seg=order_z_seg,
separate_z_anisotropy_threshold=separate_z_anisotropy_threshold)
def resample_data_or_seg(data, new_shape, is_seg, axis=None, order=3,
do_separate_z=False, cval=0, order_z=0) -> np.ndarray:
"""
Resample data or segmentation
Args:
data: array to resample [C, dims]
new_shape: define new dims (without channels)
is_seg: changes the resampling strategy
axis: anisotropic axis, different resampling order used here
order: order of resampling along the isotropic axis
do_separate_z: Different resampling along z dimensions
cval: //
order_z: if separate z resampling is done then this is the order for resampling in z
Returns:
np.ndarray: resampled array
"""
return nn_preprocessing.resample_data_or_seg(
data=data, new_shape=new_shape, is_seg=is_seg, axis=axis,
order=order, do_separate_z=do_separate_z, cval=cval, order_z=order_z)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment