# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ File: prepare.py This is the prepare class for all relavent prepare file support: 1. download and uncompress the file. 2. save the data as the above format. 3. read the preprocessed data into train.txt and val.txt """ import os import os.path as osp import sys import nrrd import time import glob import argparse import zipfile import collections import numpy as np import nibabel as nib import SimpleITK as sitk from tqdm import tqdm import json sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..")) from medicalseg.utils import get_image_list from tools.preprocess_utils import uncompressor, global_var, add_qform_sform class Prep: def __init__(self, dataset_root="data/TemDataSet", raw_dataset_dir="TemDataSet_seg_raw/", images_dir="train_imgs", labels_dir="train_labels", phase_dir="phase0", urls=None, valid_suffix=("nii.gz", "nii.gz"), filter_key=(None, None), uncompress_params={"format": "zip", "num_files": 1}, images_dir_test=""): """ Create proprosessor for medical dataset. Folder structure: dataset_root ├── raw_dataset_dir │ ├── image_dir │ ├── labels_dir │ ├── images_dir_test ├── phase_dir │ ├── images │ ├── labels │ ├── train_list.txt │ └── val_list.txt ├── archive_1.zip ├── archive_2.zip └── ... archives ... Args: urls (dict): Urls to download dataset archive. Key will be used as archive name. valid_suffix(tuple): Only files with the assigned suffix will be considered. The first is the suffix for image, and the other is for label. filter_key(tuple): Only files containing the filter_key the will be considered. """ # combine all paths self.dataset_root = dataset_root self.phase_path = os.path.join(self.dataset_root, phase_dir) self.raw_data_path = os.path.join(self.dataset_root, raw_dataset_dir) self.dataset_json_path = os.path.join( self.raw_data_path, "dataset.json") # save the dataset.json to raw path self.image_path = os.path.join(self.phase_path, "images") self.label_path = os.path.join(self.phase_path, "labels") os.makedirs(self.dataset_root, exist_ok=True) os.makedirs(self.phase_path, exist_ok=True) os.makedirs(self.image_path, exist_ok=True) os.makedirs(self.label_path, exist_ok=True) self.gpu_tag = "GPU" if global_var.get_value('USE_GPU') else "CPU" self.urls = urls if osp.exists(self.raw_data_path): print( f"raw_dataset_dir {self.raw_data_path} exists, skipping uncompress. To uncompress again, remove this directory" ) else: self.uncompress_file( num_files=uncompress_params["num_files"], form=uncompress_params["format"]) self.image_files_test = None if len(images_dir_test ) != 0: # test image filter is the same as training image self.image_files_test = get_image_list( os.path.join(self.raw_data_path, images_dir_test), valid_suffix[0], filter_key[0]) self.image_files_test.sort() self.image_path_test = os.path.join(self.phase_path, 'images_test') os.makedirs(self.image_path_test, exist_ok=True) # Load the needed file with filter if isinstance(images_dir, tuple): self.image_files = [] self.label_files = [] for i in range(len(images_dir)): self.image_files += get_image_list( os.path.join(self.raw_data_path, images_dir[i]), valid_suffix[0], filter_key[0]) self.label_files += get_image_list( os.path.join(self.raw_data_path, labels_dir[i]), valid_suffix[1], filter_key[1]) else: self.image_files = get_image_list( os.path.join(self.raw_data_path, images_dir), valid_suffix[0], filter_key[0]) self.label_files = get_image_list( os.path.join(self.raw_data_path, labels_dir), valid_suffix[1], filter_key[1]) self.image_files.sort() self.label_files.sort() def uncompress_file(self, num_files, form): uncompress_tool = uncompressor( download_params=(self.urls, self.dataset_root, True)) """unzip all the file in the root directory""" files = glob.glob(os.path.join(self.dataset_root, "*.{}".format(form))) assert len( files ) == num_files, "The file directory should include {} compressed files, but there is only {}".format( num_files, len(files)) for f in files: extract_path = os.path.join(self.raw_data_path, f.split("/")[-1].split('.')[0]) uncompress_tool._uncompress_file( f, extract_path, delete_file=False, print_progress=True) @staticmethod def load_medical_data(f): """ load data of different format into numpy array, return data is in xyz f: the complete path to the file that you want to load """ filename = osp.basename(f).lower() images = [] # validate nii.gz on lung and mri with correct spacing_resample if filename.endswith((".nii", ".nii.gz", ".dcm")): itkimage = sitk.ReadImage(f) if itkimage.GetDimension() == 4: slicer = sitk.ExtractImageFilter() s = list(itkimage.GetSize()) s[-1] = 0 slicer.SetSize(s) for slice_idx in range(itkimage.GetSize()[-1]): slicer.SetIndex([0, 0, 0, slice_idx]) sitk_volume = slicer.Execute(itkimage) images.append(sitk_volume) images = [sitk.DICOMOrient(img, 'LPS') for img in images] f_nps = [sitk.GetArrayFromImage(img) for img in images] else: f_nps = [nib.load(f).get_fdata(dtype=np.float32)] elif filename.endswith( (".mha", ".mhd", "nrrd" )): # validate mhd on lung and mri with correct spacing_resample itkimage = sitk.DICOMOrient(sitk.ReadImage(f), 'LPS') f_np = sitk.GetArrayFromImage(itkimage) if f_np.ndim == 4: f_nps = [f_np[:, :, :, idx] for idx in range(f_np.shape[3])] else: f_nps = [f_np] f_nps = [np.transpose(f_np, [1, 2, 0]) for f_np in f_nps] elif filename.endswith(".raw"): raise RuntimeError( f"Received {f}. Please only provide path to .mhd file, not to .raw file" ) else: raise NotImplementedError return f_nps def load_save(self): """ preprocess files, transfer to the correct type, and save it to the directory. """ print( "Start convert images to numpy array using {}, please wait patiently" .format(self.gpu_tag)) tic = time.time() with open(self.dataset_json_path, 'r', encoding='utf-8') as f: dataset_json_dict = json.load(f) if self.image_files_test: process_files = (self.image_files, self.label_files, self.image_files_test) process_tuple = ("images", "labels", "images_test") save_tuple = (self.image_path, self.label_path, self.image_path_test) else: process_files = (self.image_files, self.label_files) process_tuple = ("images", "labels") save_tuple = (self.image_path, self.label_path) for i, files in enumerate(process_files): pre = self.preprocess[process_tuple[i]] savepath = save_tuple[i] for f in tqdm( files, total=len(files), desc="preprocessing the {}".format( ["images", "labels", "images_test"][i])): # load data will transpose the image from "zyx" to "xyz" spacing = dataset_json_dict["training"][osp.basename(f).split( ".")[0]]["spacing"] if i == 0 else None f_nps = Prep.load_medical_data(f) for volume_idx, f_np in enumerate(f_nps): for op in pre: if op.__name__ == "resample": f_np, new_spacing = op( f_np, spacing=spacing) # (960, 15, 960) if transpose else: f_np = op(f_np) f_np = f_np.astype("float32") if i == 0 else f_np.astype( "int32") volume_idx = "" if len(f_nps) == 1 else f"-{volume_idx}" np.save( os.path.join( savepath, osp.basename(f).split(".")[0] + volume_idx), f_np) if i == 0: dataset_json_dict["training"][osp.basename(f).split(".")[ 0]]["spacing_resample"] = new_spacing with open(self.dataset_json_path, 'w', encoding='utf-8') as f: json.dump(dataset_json_dict, f, ensure_ascii=False, indent=4) print("The preprocess time on {} is {}".format(self.gpu_tag, time.time() - tic)) def convert_path(self): """convert nii.gz file to numpy array in the right directory""" raise NotImplementedError def generate_txt(self): """generate the train_list.txt and val_list.txt""" raise NotImplementedError # TODO add data visualize method, such that data can be checked every time after preprocess. def visualize(self): pass # imga = Image.fromarray(np.int8(imga)) # #当要保存的图片为灰度图像时,灰度图像的 numpy 尺度是 [1, h, w]。需要将 [1, h, w] 改变为 [h, w] # imgb = np.squeeze(imgb) # # imgb = Image.fromarray(np.int8(imgb)) # plt.figure(figsize=(12, 6)) # plt.subplot(1,2,1),plt.xticks([]),plt.yticks([]),plt.imshow(imga) # plt.subplot(1,2,2),plt.xticks([]),plt.yticks([]),plt.imshow(imgb) # plt.show() @staticmethod def write_txt(txt, image_names, label_names=None): """ write the image_names and label_names on the txt file like this: images/image_name labels/label_name ... or this when label is None. images/image_name ... """ with open(txt, 'w') as f: for i in range(len(image_names)): if label_names is not None: string = "{} {}\n".format('images/' + image_names[i], 'labels/' + label_names[i]) else: string = "{}\n".format('images/' + image_names[i]) f.write(string) print("successfully write to {}".format(txt)) def split_files_txt(self, txt, image_files, label_files=None, split=None): """ Split filenames and write the image names and label names on train.txt, val.txt or test.txt. Set the valset to 20% of images if all files need to be used in training. Args: txt(string): the path to the txt file, for example: "data/train.txt" image_files(list|tuple): the list of image names. label_files(list|tuple): the list of label names, order is corresponding with the image_files. split(float|int): Percentage of the dataset used in training """ if split is None: if label_files is None: # testset don't have split = len(image_files) else: split = int(0.8 * len(image_files)) elif split <= 1: split = int(split * len(image_files)) elif split > 1: raise RuntimeError( "Only have {} images but required {} images in trainset") if "train" in txt: image_names = image_files[:split] label_names = label_files[:split] elif "val" in txt: # set the valset to 20% of images if all files need to be used in training if split == len(image_files): valsplit = int(0.8 * len(image_files)) image_names = image_files[valsplit:] label_names = label_files[valsplit:] else: image_names = image_files[split:] label_names = label_files[split:] elif "test" in txt: self.write_txt(txt, image_files[:split]) return else: raise NotImplementedError( "The txt split except for train.txt, val.txt and test.txt is not implemented yet." ) self.write_txt(txt, image_names, label_names) @staticmethod def set_image_infor(image_name, infor_dict): try: img_itk = sitk.ReadImage(image_name) except: add_qform_sform(image_name) img_itk = sitk.ReadImage(image_name) infor_dict["dim"] = img_itk.GetDimension() img_npy = sitk.GetArrayFromImage(img_itk) infor_dict["shape"] = [img_npy.shape, ] infor_dict["minmax_vals"] = [str(img_npy.min()), str(img_npy.max())] infor_dict["spacing"] = img_itk.GetSpacing() infor_dict["origin"] = img_itk.GetOrigin() infor_dict["direction"] = img_itk.GetDirection() return infor_dict def generate_dataset_json(self, modalities, labels, dataset_name, license_desc="hands off!", dataset_description="", dataset_reference="", save_path=None): """ :param save_path: This needs to be the full path to the dataset.json you intend to write, default is the raw_data_path :param images_dir: path to the images folder of that dataset :param labels_dir: path to the label folder of that dataset :param modalities: tuple of strings with modality names. must be in the same order as the images (first entry corresponds to _0000.nii.gz, etc). Example: ('T1', 'T2', 'FLAIR'). :param labels: dict with int->str (key->value) mapping the label IDs to label names. Note that 0 is always supposed to be background! Example: {0: 'background', 1: 'edema', 2: 'enhancing tumor'} :param dataset_name: The name of the dataset. Can be anything you want :param license_desc: :param dataset_description: :param dataset_reference: website of the dataset, if available :return: saved dataset.json """ if save_path is not None: self.dataset_json_path = os.path.join( save_path, "dataset.json") # save the dataset.json to raw path if osp.exists(self.dataset_json_path): print( f"Dataset json exists, skipping. Delete file {self.dataset_json_path} to regenerate." ) return if not self.dataset_json_path.endswith("dataset.json"): print( "WARNING: output file name is not dataset.json! This may be intentional or not. You decide. " "Proceeding anyways...") json_dict = {} json_dict['name'] = dataset_name json_dict['description'] = dataset_description json_dict['reference'] = dataset_reference json_dict['licence'] = license_desc json_dict['modality'] = { str(i): modalities[i] for i in range(len(modalities)) } json_dict['labels'] = {str(i): labels[i] for i in labels.keys()} # set information of training and testing file json_dict['training'] = {} for i, image_name in enumerate( tqdm( self.image_files, total=len(self.image_files), desc="Load train file information into dataset.json")): infor_dict = { 'image': image_name, "label": self.label_files[i] } # nii.gz filename infor_dict = self.set_image_infor(image_name, infor_dict) json_dict['training'][image_name.split("/")[-1].split(".")[ 0]] = infor_dict json_dict['test'] = {} if self.image_files_test: for i, image_name in enumerate( tqdm( self.image_files_test, total=len(self.image_files_test), desc="Load Test file information")): infor_dict = {'image': image_name} infor_dict = self.set_image_infor(image_name, infor_dict) json_dict['test'][image_name.split("/")[-1].split(".")[ 0]] = infor_dict with open(self.dataset_json_path, 'w', encoding='utf-8') as f: json.dump(json_dict, f, ensure_ascii=False, indent=4) print("save dataset.json to {}".format(self.dataset_json_path))