wkcuber.metadata
View Source
import json import os import re import wkw import logging import numpy as np import glob from argparse import ArgumentParser from glob import iglob from os import path, listdir from typing import Optional, Tuple, Iterable, Generator from .mag import Mag from typing import List from .utils import add_verbose_flag, setup_logging, add_scale_flag from pathlib import Path from os.path import basename, normpath def get_datasource_path(dataset_path: Path) -> Path: return dataset_path / "datasource-properties.json" def create_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument("path", help="Directory containing the dataset.", type=Path) parser.add_argument("--name", "-n", help="Name of the dataset", default=None) parser.add_argument("--refresh", "-r", default=False, action="store_true") group = parser.add_mutually_exclusive_group() group.add_argument( "--compute_max_id", "-c", help="set to compute max id", default=False, action="store_true", ) group.add_argument("--max_id", help="set max id of segmentation.", default=0) add_scale_flag(parser, required=False) add_verbose_flag(parser) return parser def write_datasource_properties( dataset_path: Path, datasource_properties: dict ) -> None: datasource_properties_path = get_datasource_path(dataset_path) with open(datasource_properties_path, "wt") as datasource_properties_file: json.dump(datasource_properties, datasource_properties_file, indent=2) def read_datasource_properties(dataset_path: Path) -> dict: with open(get_datasource_path(dataset_path), "r") as datasource_properties_file: return json.load(datasource_properties_file) def write_webknossos_metadata( dataset_path: Path, name: str, scale: Tuple[float, float, float], max_id: int = 0, compute_max_id: bool = False, exact_bounding_box: Optional[dict] = None, view_configuration: Optional[dict] = None, ) -> None: """ Creates a datasource-properties.json file with the specified properties for the given dataset path. Common layers are detected automatically. """ if name == None: name = dataset_path.name # Generate a metadata file for webKnossos # Currently includes no source of information for team layers = list( detect_layers( dataset_path, max_id, compute_max_id, exact_bounding_box, view_configuration ) ) write_datasource_properties( dataset_path, { "id": {"name": name, "team": "<unknown>"}, "dataLayers": layers, "scale": scale, }, ) def refresh_metadata( wkw_path: Path, max_id: int = 0, compute_max_id: bool = False, exact_bounding_box: Optional[dict] = None, view_configuration: Optional[dict] = None, ) -> None: """ Updates the datasource-properties.json file for a given dataset. Use this method if you added (or removed) layers and/or changed magnifications for existing layers. Raises an exception if the datasource-properties.json file does not exist, yet. In this case, use write_webknossos_metadata instead. """ dataset_path = get_datasource_path(wkw_path) if not path.exists(dataset_path): raise Exception( "datasource-properties.json file could not be found. Please use write_webknossos_metadata to create it." ) datasource_properties = read_datasource_properties(wkw_path) existing_layers_dict = { layer["name"]: layer for layer in datasource_properties["dataLayers"] } new_layers = list( detect_layers( wkw_path, max_id, compute_max_id, exact_bounding_box, view_configuration ) ) # Merge the freshly read layers with the existing layer information, so that information, # such as bounding boxes, are not lost for existing layers. # For existing layers, only the resolutions will be updated. merged_layers = [] for new_layer in new_layers: layer_name = new_layer["name"] if layer_name in existing_layers_dict: existing_layer = existing_layers_dict[layer_name] # Update the resolutions existing_layer["wkwResolutions"] = new_layer["wkwResolutions"] merged_layers.append(existing_layer) else: merged_layers.append(new_layer) datasource_properties["dataLayers"] = merged_layers write_datasource_properties(wkw_path, datasource_properties) def convert_element_class_to_dtype(elementClass: str) -> np.dtype: default_dtype = np.uint8 if "uint" in elementClass else np.dtype(elementClass) conversion_map = { "float": np.float32, "double": np.float64, "uint8": np.uint8, "uint16": np.uint16, "uint32": np.uint32, "uint64": np.uint64, } return conversion_map.get(elementClass, default_dtype) def read_metadata_for_layer( wkw_path: Path, layer_name: str ) -> Tuple[dict, np.dtype, List[int], List[int]]: datasource_properties = read_datasource_properties(wkw_path) layers = datasource_properties["dataLayers"] layer_info = next(layer for layer in layers if layer["name"] == layer_name) dtype = convert_element_class_to_dtype(layer_info["elementClass"]) bounding_box = layer_info["boundingBox"] origin = bounding_box["topLeft"] bounding_box = [ bounding_box["width"], bounding_box["height"], bounding_box["depth"], ] return layer_info, dtype, bounding_box, origin def convert_dtype_to_element_class(dtype: np.dtype) -> str: element_class_to_dtype_map = { "float": np.float32, "double": np.float64, "uint8": np.uint8, "uint16": np.uint16, "uint32": np.uint32, "uint64": np.uint64, } conversion_map = {v: k for k, v in element_class_to_dtype_map.items()} return conversion_map.get(dtype, str(dtype)) def detect_mag_path( dataset_path: Path, layer: str, mag: Mag = Mag(1) ) -> Optional[Path]: layer_path = dataset_path / layer / str(mag) if layer_path.exists(): return layer_path layer_path = dataset_path / layer / mag.to_long_layer_name() if layer_path.exists(): return layer_path return None def detect_dtype(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> str: layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is not None: with wkw.Dataset.open(str(layer_path)) as dataset: voxel_size = dataset.header.voxel_type num_channels = dataset.header.num_channels if voxel_size == np.uint8 and num_channels > 1: return "uint" + str(8 * num_channels) else: return convert_dtype_to_element_class(voxel_size) raise RuntimeError( f"Failed to detect dtype (for {dataset_path}, {layer}, {mag}) because the layer_path is None" ) def detect_cubeLength(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> int: layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is not None: with wkw.Dataset.open(str(layer_path)) as dataset: return dataset.header.block_len * dataset.header.file_len raise RuntimeError( f"Failed to detect the cube length (for {dataset_path}, {layer}, {mag}) because the layer_path is None" ) def detect_bbox(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> Optional[dict]: # Detect the coarse bounding box of a dataset by iterating # over the WKW cubes layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is None: return None def list_files(layer_path: str) -> Iterable[str]: return iglob(path.join(layer_path, "*", "*", "*.wkw"), recursive=True) def parse_cube_file_name(filename: str) -> Tuple[int, int, int]: CUBE_REGEX = re.compile( fr"z(\d+){re.escape(os.path.sep)}y(\d+){re.escape(os.path.sep)}x(\d+)(\.wkw)$" ) m = CUBE_REGEX.search(filename) if m is not None: return int(m.group(3)), int(m.group(2)), int(m.group(1)) raise RuntimeError(f"Failed to parse cube file name from {filename}") def list_cubes(layer_path: str) -> Iterable[Tuple[int, int, int]]: return (parse_cube_file_name(f) for f in list_files(layer_path)) xs, ys, zs = list(zip(*list_cubes(str(layer_path)))) min_x, min_y, min_z = min(xs), min(ys), min(zs) max_x, max_y, max_z = max(xs), max(ys), max(zs) cubeLength = detect_cubeLength(dataset_path, layer, mag) if cubeLength is None: return None return { "topLeft": [min_x * cubeLength, min_y * cubeLength, min_z * cubeLength], "width": (1 + max_x - min_x) * cubeLength, "height": (1 + max_y - min_y) * cubeLength, "depth": (1 + max_z - min_z) * cubeLength, } def detect_resolutions(dataset_path: Path, layer: str) -> Generator: for mag in listdir(path.join(dataset_path, layer)): try: yield Mag(mag) except ValueError: logging.info("ignoring {} as resolution".format(mag)) def detect_standard_layer( dataset_path: Path, layer_name: str, exact_bounding_box: Optional[dict] = None, category: str = "color", layer_view_configuration: Optional[dict] = None, ) -> dict: # Perform metadata detection for well-known layers mags = list(detect_resolutions(dataset_path, layer_name)) mags = sorted(mags) assert len(mags) > 0, f"No resolutions found for {dataset_path}/{layer_name}" if exact_bounding_box is None: bbox = detect_bbox(dataset_path, layer_name, mags[0]) else: bbox = exact_bounding_box assert ( bbox is not None ), f"Could not detect bounding box for {dataset_path}/{layer_name}" # BB can be created manually # assert the presence/spelling of all attributes assert "width" in bbox, "Attribute `width` is missing/misspelled in bounding box" assert "height" in bbox, "Attribute `height` is missing/misspelled in bounding box" assert "depth" in bbox, "Attribute `depth` is missing/misspelled in bounding box" assert ( "topLeft" in bbox ), "Attribute `topLeft` is missing/misspelled in bounding box" resolutions = [ { "resolution": mag.to_array(), "cubeLength": detect_cubeLength(dataset_path, layer_name, mag), } for mag in mags ] resolutions = [r for r in resolutions if r["cubeLength"] is not None] assert len(resolutions) > 0, f"No resolutions found for {dataset_path}/{layer_name}" dtype = detect_dtype(dataset_path, layer_name, mags[0]) assert ( dtype is not None ), f"Data type could not be detected for {dataset_path}/{layer_name}" layer_info = { "dataFormat": "wkw", "name": layer_name, "category": category, "elementClass": dtype, "boundingBox": bbox, "wkwResolutions": list(resolutions), } if layer_view_configuration is not None: layer_info["defaultViewConfiguration"] = layer_view_configuration return layer_info def detect_mappings(dataset_path: Path, layer_name: str) -> List[str]: pattern = dataset_path / layer_name / "mappings" / "*.json" mapping_files = glob.glob(str(pattern)) mapping_file_names = [path.basename(mapping_file) for mapping_file in mapping_files] return mapping_file_names def detect_segmentation_layer( dataset_path: Path, layer_name: str, max_id: int, compute_max_id: bool = False, exact_bounding_box: dict = None, ) -> dict: layer_info = detect_standard_layer( dataset_path, layer_name, exact_bounding_box, category="segmentation" ) layer_info["mappings"] = detect_mappings(dataset_path, layer_name) layer_info["largestSegmentId"] = max_id if compute_max_id: logging.info("Computing max id of layer={}".format(layer_name)) # Computing the current largest segment id # This may take very long due to IO load layer_path = str(detect_mag_path(dataset_path, layer_name, Mag(1))) with wkw.Dataset.open(layer_path) as dataset: bbox = layer_info["boundingBox"] layer_info["largestSegmentId"] = int( np.max( dataset.read( bbox["topLeft"], [bbox["width"], bbox["height"], bbox["depth"]] ) ) ) logging.info( "Max id of layer={} is {}".format( layer_name, layer_info["largestSegmentId"] ) ) return layer_info def detect_layers( dataset_path: Path, max_id: int, compute_max_id: bool, exact_bounding_box: Optional[dict] = None, view_configuration: Optional[dict] = None, ) -> Iterable[dict]: if view_configuration is None: view_configuration = dict() # Detect metadata for well-known layers (i.e., color, prediction and segmentation) if path.exists(path.join(dataset_path, "color")): yield detect_standard_layer( dataset_path, "color", exact_bounding_box, layer_view_configuration=view_configuration.get("color"), ) if path.exists(path.join(dataset_path, "segmentation")): yield detect_segmentation_layer( dataset_path, "segmentation", max_id, compute_max_id, exact_bounding_box ) available_layer_names = set( [ basename(normpath(Path(x).parent.parent)) for x in glob.glob(path.join(dataset_path, "*/*/header.wkw")) ] ) for layer_name in available_layer_names: # color and segmentation are already checked explicitly to ensure downwards compatibility (some older datasets don't have the header.wkw file) if layer_name not in ["color", "segmentation"]: layer_info = None try: layer_info = detect_standard_layer( dataset_path, layer_name, exact_bounding_box, layer_view_configuration=view_configuration.get(layer_name), ) except Exception: pass if layer_info is not None: yield layer_info else: logging.warning(f"{layer_name} is not a WKW layer") if __name__ == "__main__": args = create_parser().parse_args() setup_logging(args) if not args.refresh: assert ( args.scale is not None ), "The scale has to be specified when creating metadata for a dataset." assert ( args.name is not None ), "Please provide a name via --name to create meta data." write_webknossos_metadata( args.path, args.name, args.scale, args.max_id, args.compute_max_id ) else: if args.name is not None: logging.warning( "The --name argument is ignored, since --refresh was provided." ) refresh_metadata(args.path, args.max_id, args.compute_max_id)
View Source
def get_datasource_path(dataset_path: Path) -> Path: return dataset_path / "datasource-properties.json"
View Source
def create_parser() -> ArgumentParser: parser = ArgumentParser() parser.add_argument("path", help="Directory containing the dataset.", type=Path) parser.add_argument("--name", "-n", help="Name of the dataset", default=None) parser.add_argument("--refresh", "-r", default=False, action="store_true") group = parser.add_mutually_exclusive_group() group.add_argument( "--compute_max_id", "-c", help="set to compute max id", default=False, action="store_true", ) group.add_argument("--max_id", help="set max id of segmentation.", default=0) add_scale_flag(parser, required=False) add_verbose_flag(parser) return parser
#  
def
write_datasource_properties(dataset_path: pathlib.Path, datasource_properties: dict) -> None:
View Source
def write_datasource_properties( dataset_path: Path, datasource_properties: dict ) -> None: datasource_properties_path = get_datasource_path(dataset_path) with open(datasource_properties_path, "wt") as datasource_properties_file: json.dump(datasource_properties, datasource_properties_file, indent=2)
View Source
def read_datasource_properties(dataset_path: Path) -> dict: with open(get_datasource_path(dataset_path), "r") as datasource_properties_file: return json.load(datasource_properties_file)
#  
def
write_webknossos_metadata(
dataset_path: pathlib.Path,
name: str,
scale: Tuple[float, float, float],
max_id: int = 0,
compute_max_id: bool = False,
exact_bounding_box: Union[dict, NoneType] = None,
view_configuration: Union[dict, NoneType] = None
) -> None:
View Source
def write_webknossos_metadata( dataset_path: Path, name: str, scale: Tuple[float, float, float], max_id: int = 0, compute_max_id: bool = False, exact_bounding_box: Optional[dict] = None, view_configuration: Optional[dict] = None, ) -> None: """ Creates a datasource-properties.json file with the specified properties for the given dataset path. Common layers are detected automatically. """ if name == None: name = dataset_path.name # Generate a metadata file for webKnossos # Currently includes no source of information for team layers = list( detect_layers( dataset_path, max_id, compute_max_id, exact_bounding_box, view_configuration ) ) write_datasource_properties( dataset_path, { "id": {"name": name, "team": "<unknown>"}, "dataLayers": layers, "scale": scale, }, )
Creates a datasource-properties.json file with the specified properties for the given dataset path. Common layers are detected automatically.
#  
def
refresh_metadata(
wkw_path: pathlib.Path,
max_id: int = 0,
compute_max_id: bool = False,
exact_bounding_box: Union[dict, NoneType] = None,
view_configuration: Union[dict, NoneType] = None
) -> None:
View Source
def refresh_metadata( wkw_path: Path, max_id: int = 0, compute_max_id: bool = False, exact_bounding_box: Optional[dict] = None, view_configuration: Optional[dict] = None, ) -> None: """ Updates the datasource-properties.json file for a given dataset. Use this method if you added (or removed) layers and/or changed magnifications for existing layers. Raises an exception if the datasource-properties.json file does not exist, yet. In this case, use write_webknossos_metadata instead. """ dataset_path = get_datasource_path(wkw_path) if not path.exists(dataset_path): raise Exception( "datasource-properties.json file could not be found. Please use write_webknossos_metadata to create it." ) datasource_properties = read_datasource_properties(wkw_path) existing_layers_dict = { layer["name"]: layer for layer in datasource_properties["dataLayers"] } new_layers = list( detect_layers( wkw_path, max_id, compute_max_id, exact_bounding_box, view_configuration ) ) # Merge the freshly read layers with the existing layer information, so that information, # such as bounding boxes, are not lost for existing layers. # For existing layers, only the resolutions will be updated. merged_layers = [] for new_layer in new_layers: layer_name = new_layer["name"] if layer_name in existing_layers_dict: existing_layer = existing_layers_dict[layer_name] # Update the resolutions existing_layer["wkwResolutions"] = new_layer["wkwResolutions"] merged_layers.append(existing_layer) else: merged_layers.append(new_layer) datasource_properties["dataLayers"] = merged_layers write_datasource_properties(wkw_path, datasource_properties)
Updates the datasource-properties.json file for a given dataset. Use this method if you added (or removed) layers and/or changed magnifications for existing layers.
Raises an exception if the datasource-properties.json file does not exist, yet. In this case, use write_webknossos_metadata instead.
View Source
def convert_element_class_to_dtype(elementClass: str) -> np.dtype: default_dtype = np.uint8 if "uint" in elementClass else np.dtype(elementClass) conversion_map = { "float": np.float32, "double": np.float64, "uint8": np.uint8, "uint16": np.uint16, "uint32": np.uint32, "uint64": np.uint64, } return conversion_map.get(elementClass, default_dtype)
#  
def
read_metadata_for_layer(
wkw_path: pathlib.Path,
layer_name: str
) -> Tuple[dict, numpy.dtype, List[int], List[int]]:
View Source
def read_metadata_for_layer( wkw_path: Path, layer_name: str ) -> Tuple[dict, np.dtype, List[int], List[int]]: datasource_properties = read_datasource_properties(wkw_path) layers = datasource_properties["dataLayers"] layer_info = next(layer for layer in layers if layer["name"] == layer_name) dtype = convert_element_class_to_dtype(layer_info["elementClass"]) bounding_box = layer_info["boundingBox"] origin = bounding_box["topLeft"] bounding_box = [ bounding_box["width"], bounding_box["height"], bounding_box["depth"], ] return layer_info, dtype, bounding_box, origin
View Source
def convert_dtype_to_element_class(dtype: np.dtype) -> str: element_class_to_dtype_map = { "float": np.float32, "double": np.float64, "uint8": np.uint8, "uint16": np.uint16, "uint32": np.uint32, "uint64": np.uint64, } conversion_map = {v: k for k, v in element_class_to_dtype_map.items()} return conversion_map.get(dtype, str(dtype))
#  
def
detect_mag_path(
dataset_path: pathlib.Path,
layer: str,
mag: webknossos.geometry.mag.Mag = <webknossos.geometry.mag.Mag object>
) -> Union[pathlib.Path, NoneType]:
View Source
def detect_mag_path( dataset_path: Path, layer: str, mag: Mag = Mag(1) ) -> Optional[Path]: layer_path = dataset_path / layer / str(mag) if layer_path.exists(): return layer_path layer_path = dataset_path / layer / mag.to_long_layer_name() if layer_path.exists(): return layer_path return None
#  
def
detect_dtype(
dataset_path: pathlib.Path,
layer: str,
mag: webknossos.geometry.mag.Mag = <webknossos.geometry.mag.Mag object>
) -> str:
View Source
def detect_dtype(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> str: layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is not None: with wkw.Dataset.open(str(layer_path)) as dataset: voxel_size = dataset.header.voxel_type num_channels = dataset.header.num_channels if voxel_size == np.uint8 and num_channels > 1: return "uint" + str(8 * num_channels) else: return convert_dtype_to_element_class(voxel_size) raise RuntimeError( f"Failed to detect dtype (for {dataset_path}, {layer}, {mag}) because the layer_path is None" )
#  
def
detect_cubeLength(
dataset_path: pathlib.Path,
layer: str,
mag: webknossos.geometry.mag.Mag = <webknossos.geometry.mag.Mag object>
) -> int:
View Source
def detect_cubeLength(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> int: layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is not None: with wkw.Dataset.open(str(layer_path)) as dataset: return dataset.header.block_len * dataset.header.file_len raise RuntimeError( f"Failed to detect the cube length (for {dataset_path}, {layer}, {mag}) because the layer_path is None" )
#  
def
detect_bbox(
dataset_path: pathlib.Path,
layer: str,
mag: webknossos.geometry.mag.Mag = <webknossos.geometry.mag.Mag object>
) -> Union[dict, NoneType]:
View Source
def detect_bbox(dataset_path: Path, layer: str, mag: Mag = Mag(1)) -> Optional[dict]: # Detect the coarse bounding box of a dataset by iterating # over the WKW cubes layer_path = detect_mag_path(dataset_path, layer, mag) if layer_path is None: return None def list_files(layer_path: str) -> Iterable[str]: return iglob(path.join(layer_path, "*", "*", "*.wkw"), recursive=True) def parse_cube_file_name(filename: str) -> Tuple[int, int, int]: CUBE_REGEX = re.compile( fr"z(\d+){re.escape(os.path.sep)}y(\d+){re.escape(os.path.sep)}x(\d+)(\.wkw)$" ) m = CUBE_REGEX.search(filename) if m is not None: return int(m.group(3)), int(m.group(2)), int(m.group(1)) raise RuntimeError(f"Failed to parse cube file name from {filename}") def list_cubes(layer_path: str) -> Iterable[Tuple[int, int, int]]: return (parse_cube_file_name(f) for f in list_files(layer_path)) xs, ys, zs = list(zip(*list_cubes(str(layer_path)))) min_x, min_y, min_z = min(xs), min(ys), min(zs) max_x, max_y, max_z = max(xs), max(ys), max(zs) cubeLength = detect_cubeLength(dataset_path, layer, mag) if cubeLength is None: return None return { "topLeft": [min_x * cubeLength, min_y * cubeLength, min_z * cubeLength], "width": (1 + max_x - min_x) * cubeLength, "height": (1 + max_y - min_y) * cubeLength, "depth": (1 + max_z - min_z) * cubeLength, }
View Source
def detect_resolutions(dataset_path: Path, layer: str) -> Generator: for mag in listdir(path.join(dataset_path, layer)): try: yield Mag(mag) except ValueError: logging.info("ignoring {} as resolution".format(mag))
#  
def
detect_standard_layer(
dataset_path: pathlib.Path,
layer_name: str,
exact_bounding_box: Union[dict, NoneType] = None,
category: str = 'color',
layer_view_configuration: Union[dict, NoneType] = None
) -> dict:
View Source
def detect_standard_layer( dataset_path: Path, layer_name: str, exact_bounding_box: Optional[dict] = None, category: str = "color", layer_view_configuration: Optional[dict] = None, ) -> dict: # Perform metadata detection for well-known layers mags = list(detect_resolutions(dataset_path, layer_name)) mags = sorted(mags) assert len(mags) > 0, f"No resolutions found for {dataset_path}/{layer_name}" if exact_bounding_box is None: bbox = detect_bbox(dataset_path, layer_name, mags[0]) else: bbox = exact_bounding_box assert ( bbox is not None ), f"Could not detect bounding box for {dataset_path}/{layer_name}" # BB can be created manually # assert the presence/spelling of all attributes assert "width" in bbox, "Attribute `width` is missing/misspelled in bounding box" assert "height" in bbox, "Attribute `height` is missing/misspelled in bounding box" assert "depth" in bbox, "Attribute `depth` is missing/misspelled in bounding box" assert ( "topLeft" in bbox ), "Attribute `topLeft` is missing/misspelled in bounding box" resolutions = [ { "resolution": mag.to_array(), "cubeLength": detect_cubeLength(dataset_path, layer_name, mag), } for mag in mags ] resolutions = [r for r in resolutions if r["cubeLength"] is not None] assert len(resolutions) > 0, f"No resolutions found for {dataset_path}/{layer_name}" dtype = detect_dtype(dataset_path, layer_name, mags[0]) assert ( dtype is not None ), f"Data type could not be detected for {dataset_path}/{layer_name}" layer_info = { "dataFormat": "wkw", "name": layer_name, "category": category, "elementClass": dtype, "boundingBox": bbox, "wkwResolutions": list(resolutions), } if layer_view_configuration is not None: layer_info["defaultViewConfiguration"] = layer_view_configuration return layer_info
View Source
def detect_mappings(dataset_path: Path, layer_name: str) -> List[str]: pattern = dataset_path / layer_name / "mappings" / "*.json" mapping_files = glob.glob(str(pattern)) mapping_file_names = [path.basename(mapping_file) for mapping_file in mapping_files] return mapping_file_names
#  
def
detect_segmentation_layer(
dataset_path: pathlib.Path,
layer_name: str,
max_id: int,
compute_max_id: bool = False,
exact_bounding_box: dict = None
) -> dict:
View Source
def detect_segmentation_layer( dataset_path: Path, layer_name: str, max_id: int, compute_max_id: bool = False, exact_bounding_box: dict = None, ) -> dict: layer_info = detect_standard_layer( dataset_path, layer_name, exact_bounding_box, category="segmentation" ) layer_info["mappings"] = detect_mappings(dataset_path, layer_name) layer_info["largestSegmentId"] = max_id if compute_max_id: logging.info("Computing max id of layer={}".format(layer_name)) # Computing the current largest segment id # This may take very long due to IO load layer_path = str(detect_mag_path(dataset_path, layer_name, Mag(1))) with wkw.Dataset.open(layer_path) as dataset: bbox = layer_info["boundingBox"] layer_info["largestSegmentId"] = int( np.max( dataset.read( bbox["topLeft"], [bbox["width"], bbox["height"], bbox["depth"]] ) ) ) logging.info( "Max id of layer={} is {}".format( layer_name, layer_info["largestSegmentId"] ) ) return layer_info
#  
def
detect_layers(
dataset_path: pathlib.Path,
max_id: int,
compute_max_id: bool,
exact_bounding_box: Union[dict, NoneType] = None,
view_configuration: Union[dict, NoneType] = None
) -> Iterable[dict]:
View Source
def detect_layers( dataset_path: Path, max_id: int, compute_max_id: bool, exact_bounding_box: Optional[dict] = None, view_configuration: Optional[dict] = None, ) -> Iterable[dict]: if view_configuration is None: view_configuration = dict() # Detect metadata for well-known layers (i.e., color, prediction and segmentation) if path.exists(path.join(dataset_path, "color")): yield detect_standard_layer( dataset_path, "color", exact_bounding_box, layer_view_configuration=view_configuration.get("color"), ) if path.exists(path.join(dataset_path, "segmentation")): yield detect_segmentation_layer( dataset_path, "segmentation", max_id, compute_max_id, exact_bounding_box ) available_layer_names = set( [ basename(normpath(Path(x).parent.parent)) for x in glob.glob(path.join(dataset_path, "*/*/header.wkw")) ] ) for layer_name in available_layer_names: # color and segmentation are already checked explicitly to ensure downwards compatibility (some older datasets don't have the header.wkw file) if layer_name not in ["color", "segmentation"]: layer_info = None try: layer_info = detect_standard_layer( dataset_path, layer_name, exact_bounding_box, layer_view_configuration=view_configuration.get(layer_name), ) except Exception: pass if layer_info is not None: yield layer_info else: logging.warning(f"{layer_name} is not a WKW layer")