Source code for neuror.sanitize

'''Module for the sanitization of raw morphologies.'''
import logging
from functools import partial
from multiprocessing import Pool
from pathlib import Path
from typing import Dict, List, Tuple

import numpy as np
from morphio import MorphioError, SomaType, set_maximum_warnings
from morphio.mut import Morphology  # pylint: disable=import-error
from neurom.check import morphology_checks as mc
from neurom.check import CheckResult
from neurom.apps.annotate import annotate
from neurom import load_morphology
from tqdm import tqdm

from neuror.exceptions import CorruptedMorphology
from neuror.exceptions import ZeroLengthRootSection

L = logging.getLogger('neuror')
_ZERO_LENGTH = 1e-4


[docs]def iter_morphologies(folder): '''Recursively yield morphology files in folder and its sub-directories.''' return (path for path in folder.rglob('*') if path.suffix.lower() in {'.swc', '.h5', '.asc'})
[docs]def sanitize(input_neuron, output_path): '''Sanitize one morphology. - ensures it can be loaded with MorphIO - raises if the morphology has no soma or of invalid format - removes unifurcations - set negative diameters to zero - raises if the morphology has a neurite whose type changes along the way - removes segments with near zero lengths (shorter than 1e-4) Args: input_neuron (str|pathlib.Path|morphio.Morphology|morphio.mut.Morphology): input neuron output_path (str|pathlib.Path): output name ''' neuron = Morphology(input_neuron) if neuron.soma.type == SomaType.SOMA_UNDEFINED: # pylint: disable=no-member raise CorruptedMorphology(f'{input_neuron} has an invalid or no soma.') for section in neuron.iter(): section.diameters = np.clip(section.diameters, 0, None) for root in neuron.root_sections: # pylint: disable=not-an-iterable for section in root.iter(): if section.type != root.type: raise CorruptedMorphology(f'{input_neuron} has a neurite whose type changes along ' 'the way\n' f'Child section (id: {section.id}) has a different type ' f'({section.type}) than its parent (id: ' f'{section.parent.id}) (type: {section.parent.type})') try: sanitized_neuron = fix_non_zero_segments(neuron) except ZeroLengthRootSection as e: # reraise to attach the morphology path raise ZeroLengthRootSection(f"Failed morphology: {input_neuron}") from e sanitized_neuron.remove_unifurcations() sanitized_neuron.write(str(output_path))
[docs]def _sanitize_one(path, input_folder, output_folder): '''Function to be called by sanitize_all to catch all exceptions and return path if in error. Since :meth:`multiprocessing.pool.Pool.imap_unordered` only supports one argument, the argument is a tuple: (path, input_folder, output_folder). ''' relative_path = path.relative_to(input_folder) output_dir = output_folder / relative_path.parent if not output_dir.exists(): # exist_ok=True since there can be race conditions because of Pool output_dir.mkdir(parents=True, exist_ok=True) try: sanitize(path, output_dir / path.name) except (MorphioError, CorruptedMorphology): return str(path) else: return None
[docs]def sanitize_all(input_folder, output_folder, nprocesses=1): '''Sanitize all morphologies in input_folder and its sub-directories. See :func:`~neuror.sanitize.sanitize` for more information on the sanitization process. Args: input_folder (str|pathlib.Path): input neuron output_folder (str|pathlib.Path): output name .. note:: The sub-directory structure is maintained. ''' set_maximum_warnings(0) morphologies = list(iter_morphologies(Path(input_folder))) func = partial(_sanitize_one, input_folder=input_folder, output_folder=output_folder) if nprocesses == 1: results = map(func, morphologies) else: with Pool(nprocesses) as pool: results = list(pool.imap_unordered(func, morphologies, chunksize=100)) errored_paths = list(filter(None, tqdm(results, total=len(morphologies)))) if errored_paths: L.info('Files in error:') for path in errored_paths: L.info(path)
[docs]def fix_non_zero_segments(neuron, zero_length=_ZERO_LENGTH): '''Return a neuron with zero length segments removed Sections composed of a single zero length segment are deleted, where zero is parametrized by zero_length Args: neuron (str|pathlib.Path|morphio.Morphology|morphio.mut.Morphology): input neuron zero_length (float): smallest length of a segment Returns: morphio.mut.Morphology: a fixed morphology ''' neuron = Morphology(neuron) to_be_deleted = [] for section in neuron.iter(): points = section.points distances = np.linalg.norm(np.diff(points, axis=0), axis=1) distances[distances < zero_length] = 0 indices = np.append(0, np.nonzero(distances)[0] + 1) if len(indices) != len(points): section.points = section.points[indices] section.diameters = section.diameters[indices] if len(indices) < 2: to_be_deleted.append(section) L.debug("Sections to be deleted: %s", [s.id for s in to_be_deleted]) for section in to_be_deleted: if section.is_root: raise ZeroLengthRootSection( f"Morphology has root sections at the soma with zero length." f"\nZero length section points: {section.points}" ) neuron.delete_section(section, recursive=False) return neuron
[docs]def annotate_neurolucida(morph_path: str, checkers: Dict = None): """Annotate errors on a morphology in neurolucida format. Args: morph_path: absolute path to an ascii morphology checkers: dict of checker functons from neurom with function as keys and marker data in a dict as values, if None, default checkers are used Default checkers include: - fat ends - z-jumps - narrow start - dangling branch - multifurcation Returns: * annotations to append to .asc file * dict of error summary * dict of error markers """ if checkers is None: checkers = { mc.has_no_fat_ends: {"name": "fat end", "label": "Circle3", "color": "Blue"}, partial(mc.has_no_jumps, axis="z"): { "name": "zjump", "label": "Circle2", "color": "Green", }, mc.has_no_narrow_start: {"name": "narrow start", "label": "Circle1", "color": "Blue"}, mc.has_no_dangling_branch: {"name": "dangling", "label": "Circle6", "color": "Magenta"}, mc.has_multifurcation: { "name": "Multifurcation", "label": "Circle8", "color": "Yellow", }, } def _try(checker, neuron): """Try to apply a checker, returns True if exception raised, so the checker is bypassed.""" try: return checker(neuron) except Exception as e: # pylint: disable=broad-except L.exception("%s failed on %s", checker, morph_path) L.exception(e, exc_info=True) return CheckResult(True) neuron = load_morphology(morph_path) results = [_try(checker, neuron) for checker in checkers] markers = [ dict(setting, data=result.info) for result, setting in zip(results, checkers.values()) if not result.status ] summary = { setting["name"]: len(result.info) for result, setting in zip(results, checkers.values()) if result.info } return annotate(results, checkers.values()), summary, markers
[docs]def annotate_neurolucida_all( morph_paths: List[str], nprocesses: int = 1 ) -> Tuple[Dict, Dict, Dict]: """Annotate errors on a list of morphologies in neurolicida format. Args: morph_paths: list of str of paths to morphologies. nprocesses: number of processes to use for parallel computation Returns: * dict annotations to append to .asc file (morph_path as keys) * dict of dict of error summary (morph_path as keys) * dict of dict of markers (morph_path as keys) """ summaries, annotations, markers = {}, {}, {} with Pool(nprocesses) as pool: for morph_path, result in zip( morph_paths, pool.map(annotate_neurolucida, morph_paths) ): morph_path = str(morph_path) annotations[morph_path], summaries[morph_path], markers[morph_path] = result return annotations, summaries, markers
[docs]def fix_points_in_soma(morph: Morphology) -> bool: """Ensure section points are not inside the soma. Method: - for each root section, we check which points are inside the soma. - if all points of a root section are inside the soma, an exception is raised because it means that a bifurcation is located inside the soma, which is hard to automatically fix. - if there is at least 1 point inside the soma, a new point is defined to replace them. If this new point is too close to the first point outside the soma, the point is not added. Args: morph: the morphology Returns: ``True`` if at least one point was changed, else ``False``. """ changed = False for root_sec in morph.root_sections: sec_pts = root_sec.points in_soma = np.argwhere(morph.soma.overlaps(sec_pts)).flatten() if in_soma.size > 0: changed = True last_in_soma = in_soma[-1] if last_in_soma >= len(sec_pts) - 1: raise CorruptedMorphology("An entire section is located inside the soma") in_pt = sec_pts[last_in_soma] out_pt = sec_pts[last_in_soma + 1] vec = out_pt - in_pt new_pt = morph.soma.center + vec / np.linalg.norm(vec) * morph.soma.radius if np.linalg.norm(new_pt - root_sec.points[last_in_soma + 1]) <= _ZERO_LENGTH: new_sec_pts = root_sec.points[last_in_soma + 1:] last_in_soma += 1 else: new_sec_pts = np.concatenate([[new_pt], root_sec.points[last_in_soma + 1:]]) root_sec.points = new_sec_pts root_sec.diameters = root_sec.diameters[last_in_soma:] root_sec.perimeters = root_sec.perimeters[last_in_soma:] return changed