webknossos.dataset.mag_view

View Source
import logging
import os
import shutil
from argparse import Namespace
from os.path import join
from pathlib import Path
from typing import TYPE_CHECKING, Generator, List, Tuple, Union, cast
from uuid import uuid4

import numpy as np
from wkw import wkw

from webknossos.geometry import BoundingBox, Mag
from webknossos.utils import get_executor_for_args, wait_and_ensure_success

from .compress_utils import compress_file_job
from .properties import MagViewProperties

if TYPE_CHECKING:
    from .layer import (
        Layer,
    )

from .view import View

Vec3 = Union[Tuple[int, int, int], np.ndarray]


def _find_mag_path_on_disk(dataset_path: Path, layer_name: str, mag_name: str) -> Path:
    mag = Mag(mag_name)
    short_mag_file_path = dataset_path / layer_name / mag.to_layer_name()
    long_mag_file_path = dataset_path / layer_name / mag.to_long_layer_name()
    if os.path.exists(short_mag_file_path):
        return short_mag_file_path
    else:
        return long_mag_file_path


def _convert_mag1_offset(
    mag1_offset: Union[List, np.ndarray], target_mag: Mag
) -> np.ndarray:
    return np.array(mag1_offset) // target_mag.as_np()  # floor div


class MagView(View):
    """
    A `MagView` contains all information about the data of a single magnification of a `webknossos.dataset.layer.Layer`.
    `MagView` inherits from `webknossos.dataset.view.View`.
    Therefore, the main difference between them is that a `MagView` handles the whole magnification,
    whereas the `View` only handles a sub-region.

    A `MagView` can read/write outside the specified bounding box (unlike a normal `View`).
    If necessary, the properties are automatically updated (e.g. if the bounding box changed).
    This is possible because a `MagView` does have a reference to the `webknossos.dataset.layer.Layer`.

    The `global_offset` of a `MagView` is always `(0, 0, 0)` and its `size` is chosen so that the bounding box from the properties is fully inside this View.
    """

    def __init__(
        self,
        layer: "Layer",
        mag: Mag,
        block_len: int,
        file_len: int,
        block_type: int,
        create: bool = False,
    ) -> None:
        """
        Do not use this constructor manually. Instead use `webknossos.dataset.layer.Layer.add_mag()` to create a `MagView`.
        """
        header = wkw.Header(
            voxel_type=layer.dtype_per_channel,
            num_channels=layer.num_channels,
            version=1,
            block_len=block_len,
            file_len=file_len,
            block_type=block_type,
        )

        self._layer = layer
        self._mag = mag

        super().__init__(
            _find_mag_path_on_disk(
                self.layer.dataset.path, self.layer.name, mag.to_layer_name()
            ),
            header,
            cast(
                Tuple[int, int, int],
                tuple(self._mag_view_bounding_box_at_creation.bottomright),
            ),
            (0, 0, 0),
            False,
            False,
            None,
        )

        if create:
            wkw.Dataset.create(
                join(layer.dataset.path, layer.name, self.name), self.header
            )

    @property
    def layer(self) -> "Layer":
        return self._layer

    @property
    def _properties(self) -> MagViewProperties:
        return next(
            mag_property
            for mag_property in self.layer._properties.wkw_resolutions
            if Mag(mag_property.resolution).to_array() == self.mag.to_array()
        )

    @property
    def name(self) -> str:
        return self._mag.to_layer_name()

    @property
    def mag(self) -> Mag:
        return self._mag

    def write(self, data: np.ndarray, offset: Vec3 = (0, 0, 0)) -> None:
        """
        Writes the `data` at the specified `offset` to disk (like `webknossos.dataset.view.View.write()`).

        The `offset` refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)).
        If the data exceeds the original bounding box, the properties are updated.

        Note that writing compressed data which is not aligned with the blocks on disk may result in
        diminished performance, as full blocks will automatically be read to pad the write actions.
        """
        self._assert_valid_num_channels(data.shape)
        super().write(data, offset)
        current_offset_in_mag1 = self.layer.bounding_box.topleft
        current_size_in_mag1 = self.layer.bounding_box.size

        mag_np = self.mag.as_np()

        offset_in_mag1 = np.array(offset) * mag_np

        # The (-1, -1, -1) is for backwards compatibility because we used '(-1, -1, -1)' to indicate that there is no data written yet.
        new_offset_in_mag1 = (
            offset_in_mag1
            if tuple(current_offset_in_mag1) == (-1, -1, -1)
            or self.layer.bounding_box == BoundingBox((0, 0, 0), (0, 0, 0))
            else np.minimum(current_offset_in_mag1, offset_in_mag1)
        )

        old_end_offset_in_mag1 = current_offset_in_mag1 + current_size_in_mag1
        new_end_offset_in_mag1 = (np.array(offset) + np.array(data.shape[-3:])) * mag_np
        max_end_offset_in_mag1 = np.array(
            [old_end_offset_in_mag1, new_end_offset_in_mag1]
        ).max(axis=0)
        total_size_in_mag1 = max_end_offset_in_mag1 - np.array(new_offset_in_mag1)

        self._size = cast(
            Tuple[int, int, int],
            tuple(_convert_mag1_offset(max_end_offset_in_mag1, self.mag)),
        )  # The base view of a MagDataset always starts at (0, 0, 0)

        self.layer.bounding_box = BoundingBox(
            new_offset_in_mag1,
            total_size_in_mag1,
        )

    def get_view(
        self,
        offset: Vec3 = None,
        size: Vec3 = None,
        read_only: bool = None,
    ) -> View:
        """
        Returns a view that is limited to the specified bounding box.

        The `offset` refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)).
        The default value for `offset` is the offset that is specified in the properties.
        The default value for `size` is calculated so that the bounding box ends where the bounding box from the
        properties ends.
        Therefore, if both (`offset` and `size`) are not specified, then the bounding box of the view is equal to the
        bounding box specified in the properties.

        The `offset` and `size` may only exceed the bounding box from the properties, if `read_only` is set to `True`.

        If `read_only` is `True`, write operations are not allowed for the returned sub-view.

        Example:
        ```python
        # ...
        # Let 'mag1' be a `MagView` with offset (0, 0, 0) and size (100, 200, 300)

        # Properties are used to determine the default parameter
        view_with_bb_from_properties = mag1.get_view()

        # Sub-view where the specified bounding box is completely in the bounding box of the MagView
        sub_view1 = mag1.get_view(offset=(50, 60, 70), size=(10, 120, 230))

        # Fails because the specified view is not completely in the bounding box from the properties.
        sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)

        # Sub-view where the specified bounding box is NOT completely in the bounding box of the MagView.
        # This still works because `read_only=True`.
        sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)
        ```
        """

        bb = self.layer.bounding_box

        # The (-1, -1, -1) is for backwards compatibility because we used '(-1, -1, -1)' to indicate that there is no data written yet.
        if tuple(bb.topleft) == (-1, -1, -1):
            bb.topleft = np.array((0, 0, 0))

        bb = bb.align_with_mag(self.mag, ceil=True).in_mag(self.mag)

        view_offset = cast(
            Tuple[int, int, int],
            tuple(offset if offset is not None else tuple(bb.topleft)),
        )

        if size is None:
            size = cast(
                Tuple[int, int, int], tuple(bb.bottomright - np.array(view_offset))
            )

        assert bb.contains_bbox(BoundingBox(view_offset, size)) or read_only
        return super().get_view(
            view_offset,
            cast(Tuple[int, int, int], tuple(size)),
            read_only,
        )

    def _assert_valid_num_channels(self, write_data_shape: Tuple[int, ...]) -> None:
        num_channels = self.layer.num_channels
        if len(write_data_shape) == 3:
            assert (
                num_channels == 1
            ), f"The number of channels of the dataset ({num_channels}) does not match the number of channels of the passed data (1)"
        else:
            assert (
                num_channels == write_data_shape[0]
            ), f"The number of channels of the dataset ({num_channels}) does not match the number of channels of the passed data ({write_data_shape[0]})"

    def get_bounding_boxes_on_disk(
        self,
    ) -> Generator[Tuple[Tuple[int, int, int], Tuple[int, int, int]], None, None]:
        """
        Returns a bounding box for each file on disk.
        A bounding box is represented as a tuple of the offset and the size.

        This differs from the bounding box in the properties in two ways:
        - the bounding box in the properties is always specified in mag 1
        - the bounding box in the properties is an "overall" bounding box, which abstracts from the files on disk
        """
        cube_size = self._get_file_dimensions()
        was_opened = self._is_opened

        if not was_opened:
            self.open()  # opening the view is necessary to set the dataset

        assert self._dataset is not None
        for filename in self._dataset.list_files():
            file_path = Path(os.path.splitext(filename)[0]).relative_to(self._path)
            cube_index = _extract_file_index(file_path)
            cube_offset = [idx * size for idx, size in zip(cube_index, cube_size)]

            yield (cube_offset[0], cube_offset[1], cube_offset[2]), cube_size

        if not was_opened:
            self.close()

    def compress(
        self, target_path: Union[str, Path] = None, args: Namespace = None
    ) -> None:
        """
        Compresses the files on disk. This has consequences for writing data (see `write`).

        The data gets compressed inplace, if target_path is None.
        Otherwise it is written to target_path/layer_name/mag.
        """

        if target_path is not None:
            target_path = Path(target_path)

        uncompressed_full_path = (
            Path(self.layer.dataset.path) / self.layer.name / self.name
        )
        compressed_path = (
            target_path
            if target_path is not None
            else Path("{}.compress-{}".format(self.layer.dataset.path, uuid4()))
        )
        compressed_full_path = compressed_path / self.layer.name / self.name

        if compressed_full_path.exists():
            logging.error(
                "Target path '{}' already exists".format(compressed_full_path)
            )
            exit(1)

        logging.info(
            "Compressing mag {0} in '{1}'".format(
                self.name, str(uncompressed_full_path)
            )
        )

        was_opened = self._is_opened
        if not was_opened:
            self.open()  # opening the view is necessary to set the dataset
        assert self._dataset is not None

        # create empty wkw.Dataset
        self._dataset.compress(str(compressed_full_path))

        # compress all files to and move them to 'compressed_path'
        with get_executor_for_args(args) as executor:
            job_args = []
            for file in self._dataset.list_files():
                rel_file = Path(file).relative_to(self.layer.dataset.path)
                job_args.append((Path(file), compressed_path / rel_file))

            wait_and_ensure_success(
                executor.map_to_futures(compress_file_job, job_args)
            )

        logging.info("Mag {0} successfully compressed".format(self.name))

        if not was_opened:
            self.close()

        if target_path is None:
            shutil.rmtree(uncompressed_full_path)
            shutil.move(str(compressed_full_path), uncompressed_full_path)
            shutil.rmtree(compressed_path)

            # update the handle to the new dataset
            MagView.__init__(
                self,
                self.layer,
                self.mag,
                self.header.block_len,
                self.header.file_len,
                wkw.Header.BLOCK_TYPE_LZ4HC,
            )

    def _get_file_dimensions(self) -> Tuple[int, int, int]:
        return cast(Tuple[int, int, int], (self._properties.cube_length,) * 3)

    @property
    def _mag_view_bounding_box_at_creation(self) -> BoundingBox:
        return self.layer.bounding_box.align_with_mag(Mag(self.name), ceil=True).in_mag(
            Mag(self.name)
        )

    def __repr__(self) -> str:
        return repr(
            "MagView(name=%s, global_offset=%s, size=%s)"
            % (self.name, self.global_offset, self.size)
        )


def _extract_file_index(file_path: Path) -> Tuple[int, int, int]:
    zyx_index = [int(el[1:]) for el in file_path.parts]
    return zyx_index[2], zyx_index[1], zyx_index[0]
View Source
class MagView(View):
    """
    A `MagView` contains all information about the data of a single magnification of a `webknossos.dataset.layer.Layer`.
    `MagView` inherits from `webknossos.dataset.view.View`.
    Therefore, the main difference between them is that a `MagView` handles the whole magnification,
    whereas the `View` only handles a sub-region.

    A `MagView` can read/write outside the specified bounding box (unlike a normal `View`).
    If necessary, the properties are automatically updated (e.g. if the bounding box changed).
    This is possible because a `MagView` does have a reference to the `webknossos.dataset.layer.Layer`.

    The `global_offset` of a `MagView` is always `(0, 0, 0)` and its `size` is chosen so that the bounding box from the properties is fully inside this View.
    """

    def __init__(
        self,
        layer: "Layer",
        mag: Mag,
        block_len: int,
        file_len: int,
        block_type: int,
        create: bool = False,
    ) -> None:
        """
        Do not use this constructor manually. Instead use `webknossos.dataset.layer.Layer.add_mag()` to create a `MagView`.
        """
        header = wkw.Header(
            voxel_type=layer.dtype_per_channel,
            num_channels=layer.num_channels,
            version=1,
            block_len=block_len,
            file_len=file_len,
            block_type=block_type,
        )

        self._layer = layer
        self._mag = mag

        super().__init__(
            _find_mag_path_on_disk(
                self.layer.dataset.path, self.layer.name, mag.to_layer_name()
            ),
            header,
            cast(
                Tuple[int, int, int],
                tuple(self._mag_view_bounding_box_at_creation.bottomright),
            ),
            (0, 0, 0),
            False,
            False,
            None,
        )

        if create:
            wkw.Dataset.create(
                join(layer.dataset.path, layer.name, self.name), self.header
            )

    @property
    def layer(self) -> "Layer":
        return self._layer

    @property
    def _properties(self) -> MagViewProperties:
        return next(
            mag_property
            for mag_property in self.layer._properties.wkw_resolutions
            if Mag(mag_property.resolution).to_array() == self.mag.to_array()
        )

    @property
    def name(self) -> str:
        return self._mag.to_layer_name()

    @property
    def mag(self) -> Mag:
        return self._mag

    def write(self, data: np.ndarray, offset: Vec3 = (0, 0, 0)) -> None:
        """
        Writes the `data` at the specified `offset` to disk (like `webknossos.dataset.view.View.write()`).

        The `offset` refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)).
        If the data exceeds the original bounding box, the properties are updated.

        Note that writing compressed data which is not aligned with the blocks on disk may result in
        diminished performance, as full blocks will automatically be read to pad the write actions.
        """
        self._assert_valid_num_channels(data.shape)
        super().write(data, offset)
        current_offset_in_mag1 = self.layer.bounding_box.topleft
        current_size_in_mag1 = self.layer.bounding_box.size

        mag_np = self.mag.as_np()

        offset_in_mag1 = np.array(offset) * mag_np

        # The (-1, -1, -1) is for backwards compatibility because we used '(-1, -1, -1)' to indicate that there is no data written yet.
        new_offset_in_mag1 = (
            offset_in_mag1
            if tuple(current_offset_in_mag1) == (-1, -1, -1)
            or self.layer.bounding_box == BoundingBox((0, 0, 0), (0, 0, 0))
            else np.minimum(current_offset_in_mag1, offset_in_mag1)
        )

        old_end_offset_in_mag1 = current_offset_in_mag1 + current_size_in_mag1
        new_end_offset_in_mag1 = (np.array(offset) + np.array(data.shape[-3:])) * mag_np
        max_end_offset_in_mag1 = np.array(
            [old_end_offset_in_mag1, new_end_offset_in_mag1]
        ).max(axis=0)
        total_size_in_mag1 = max_end_offset_in_mag1 - np.array(new_offset_in_mag1)

        self._size = cast(
            Tuple[int, int, int],
            tuple(_convert_mag1_offset(max_end_offset_in_mag1, self.mag)),
        )  # The base view of a MagDataset always starts at (0, 0, 0)

        self.layer.bounding_box = BoundingBox(
            new_offset_in_mag1,
            total_size_in_mag1,
        )

    def get_view(
        self,
        offset: Vec3 = None,
        size: Vec3 = None,
        read_only: bool = None,
    ) -> View:
        """
        Returns a view that is limited to the specified bounding box.

        The `offset` refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)).
        The default value for `offset` is the offset that is specified in the properties.
        The default value for `size` is calculated so that the bounding box ends where the bounding box from the
        properties ends.
        Therefore, if both (`offset` and `size`) are not specified, then the bounding box of the view is equal to the
        bounding box specified in the properties.

        The `offset` and `size` may only exceed the bounding box from the properties, if `read_only` is set to `True`.

        If `read_only` is `True`, write operations are not allowed for the returned sub-view.

        Example:
        ```python
        # ...
        # Let 'mag1' be a `MagView` with offset (0, 0, 0) and size (100, 200, 300)

        # Properties are used to determine the default parameter
        view_with_bb_from_properties = mag1.get_view()

        # Sub-view where the specified bounding box is completely in the bounding box of the MagView
        sub_view1 = mag1.get_view(offset=(50, 60, 70), size=(10, 120, 230))

        # Fails because the specified view is not completely in the bounding box from the properties.
        sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)

        # Sub-view where the specified bounding box is NOT completely in the bounding box of the MagView.
        # This still works because `read_only=True`.
        sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)
        ```
        """

        bb = self.layer.bounding_box

        # The (-1, -1, -1) is for backwards compatibility because we used '(-1, -1, -1)' to indicate that there is no data written yet.
        if tuple(bb.topleft) == (-1, -1, -1):
            bb.topleft = np.array((0, 0, 0))

        bb = bb.align_with_mag(self.mag, ceil=True).in_mag(self.mag)

        view_offset = cast(
            Tuple[int, int, int],
            tuple(offset if offset is not None else tuple(bb.topleft)),
        )

        if size is None:
            size = cast(
                Tuple[int, int, int], tuple(bb.bottomright - np.array(view_offset))
            )

        assert bb.contains_bbox(BoundingBox(view_offset, size)) or read_only
        return super().get_view(
            view_offset,
            cast(Tuple[int, int, int], tuple(size)),
            read_only,
        )

    def _assert_valid_num_channels(self, write_data_shape: Tuple[int, ...]) -> None:
        num_channels = self.layer.num_channels
        if len(write_data_shape) == 3:
            assert (
                num_channels == 1
            ), f"The number of channels of the dataset ({num_channels}) does not match the number of channels of the passed data (1)"
        else:
            assert (
                num_channels == write_data_shape[0]
            ), f"The number of channels of the dataset ({num_channels}) does not match the number of channels of the passed data ({write_data_shape[0]})"

    def get_bounding_boxes_on_disk(
        self,
    ) -> Generator[Tuple[Tuple[int, int, int], Tuple[int, int, int]], None, None]:
        """
        Returns a bounding box for each file on disk.
        A bounding box is represented as a tuple of the offset and the size.

        This differs from the bounding box in the properties in two ways:
        - the bounding box in the properties is always specified in mag 1
        - the bounding box in the properties is an "overall" bounding box, which abstracts from the files on disk
        """
        cube_size = self._get_file_dimensions()
        was_opened = self._is_opened

        if not was_opened:
            self.open()  # opening the view is necessary to set the dataset

        assert self._dataset is not None
        for filename in self._dataset.list_files():
            file_path = Path(os.path.splitext(filename)[0]).relative_to(self._path)
            cube_index = _extract_file_index(file_path)
            cube_offset = [idx * size for idx, size in zip(cube_index, cube_size)]

            yield (cube_offset[0], cube_offset[1], cube_offset[2]), cube_size

        if not was_opened:
            self.close()

    def compress(
        self, target_path: Union[str, Path] = None, args: Namespace = None
    ) -> None:
        """
        Compresses the files on disk. This has consequences for writing data (see `write`).

        The data gets compressed inplace, if target_path is None.
        Otherwise it is written to target_path/layer_name/mag.
        """

        if target_path is not None:
            target_path = Path(target_path)

        uncompressed_full_path = (
            Path(self.layer.dataset.path) / self.layer.name / self.name
        )
        compressed_path = (
            target_path
            if target_path is not None
            else Path("{}.compress-{}".format(self.layer.dataset.path, uuid4()))
        )
        compressed_full_path = compressed_path / self.layer.name / self.name

        if compressed_full_path.exists():
            logging.error(
                "Target path '{}' already exists".format(compressed_full_path)
            )
            exit(1)

        logging.info(
            "Compressing mag {0} in '{1}'".format(
                self.name, str(uncompressed_full_path)
            )
        )

        was_opened = self._is_opened
        if not was_opened:
            self.open()  # opening the view is necessary to set the dataset
        assert self._dataset is not None

        # create empty wkw.Dataset
        self._dataset.compress(str(compressed_full_path))

        # compress all files to and move them to 'compressed_path'
        with get_executor_for_args(args) as executor:
            job_args = []
            for file in self._dataset.list_files():
                rel_file = Path(file).relative_to(self.layer.dataset.path)
                job_args.append((Path(file), compressed_path / rel_file))

            wait_and_ensure_success(
                executor.map_to_futures(compress_file_job, job_args)
            )

        logging.info("Mag {0} successfully compressed".format(self.name))

        if not was_opened:
            self.close()

        if target_path is None:
            shutil.rmtree(uncompressed_full_path)
            shutil.move(str(compressed_full_path), uncompressed_full_path)
            shutil.rmtree(compressed_path)

            # update the handle to the new dataset
            MagView.__init__(
                self,
                self.layer,
                self.mag,
                self.header.block_len,
                self.header.file_len,
                wkw.Header.BLOCK_TYPE_LZ4HC,
            )

    def _get_file_dimensions(self) -> Tuple[int, int, int]:
        return cast(Tuple[int, int, int], (self._properties.cube_length,) * 3)

    @property
    def _mag_view_bounding_box_at_creation(self) -> BoundingBox:
        return self.layer.bounding_box.align_with_mag(Mag(self.name), ceil=True).in_mag(
            Mag(self.name)
        )

    def __repr__(self) -> str:
        return repr(
            "MagView(name=%s, global_offset=%s, size=%s)"
            % (self.name, self.global_offset, self.size)
        )

A MagView contains all information about the data of a single magnification of a webknossos.dataset.layer.Layer. MagView inherits from webknossos.dataset.view.View. Therefore, the main difference between them is that a MagView handles the whole magnification, whereas the View only handles a sub-region.

A MagView can read/write outside the specified bounding box (unlike a normal View). If necessary, the properties are automatically updated (e.g. if the bounding box changed). This is possible because a MagView does have a reference to the webknossos.dataset.layer.Layer.

The global_offset of a MagView is always (0, 0, 0) and its size is chosen so that the bounding box from the properties is fully inside this View.

#   MagView( layer: webknossos.dataset.layer.Layer, mag: webknossos.geometry.mag.Mag, block_len: int, file_len: int, block_type: int, create: bool = False )
View Source
    def __init__(
        self,
        layer: "Layer",
        mag: Mag,
        block_len: int,
        file_len: int,
        block_type: int,
        create: bool = False,
    ) -> None:
        """
        Do not use this constructor manually. Instead use `webknossos.dataset.layer.Layer.add_mag()` to create a `MagView`.
        """
        header = wkw.Header(
            voxel_type=layer.dtype_per_channel,
            num_channels=layer.num_channels,
            version=1,
            block_len=block_len,
            file_len=file_len,
            block_type=block_type,
        )

        self._layer = layer
        self._mag = mag

        super().__init__(
            _find_mag_path_on_disk(
                self.layer.dataset.path, self.layer.name, mag.to_layer_name()
            ),
            header,
            cast(
                Tuple[int, int, int],
                tuple(self._mag_view_bounding_box_at_creation.bottomright),
            ),
            (0, 0, 0),
            False,
            False,
            None,
        )

        if create:
            wkw.Dataset.create(
                join(layer.dataset.path, layer.name, self.name), self.header
            )

Do not use this constructor manually. Instead use webknossos.dataset.layer.Layer.add_mag() to create a MagView.

#   name: str
#   def write( self, data: numpy.ndarray, offset: Union[Tuple[int, int, int], numpy.ndarray] = (0, 0, 0) ) -> None:
View Source
    def write(self, data: np.ndarray, offset: Vec3 = (0, 0, 0)) -> None:
        """
        Writes the `data` at the specified `offset` to disk (like `webknossos.dataset.view.View.write()`).

        The `offset` refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)).
        If the data exceeds the original bounding box, the properties are updated.

        Note that writing compressed data which is not aligned with the blocks on disk may result in
        diminished performance, as full blocks will automatically be read to pad the write actions.
        """
        self._assert_valid_num_channels(data.shape)
        super().write(data, offset)
        current_offset_in_mag1 = self.layer.bounding_box.topleft
        current_size_in_mag1 = self.layer.bounding_box.size

        mag_np = self.mag.as_np()

        offset_in_mag1 = np.array(offset) * mag_np

        # The (-1, -1, -1) is for backwards compatibility because we used '(-1, -1, -1)' to indicate that there is no data written yet.
        new_offset_in_mag1 = (
            offset_in_mag1
            if tuple(current_offset_in_mag1) == (-1, -1, -1)
            or self.layer.bounding_box == BoundingBox((0, 0, 0), (0, 0, 0))
            else np.minimum(current_offset_in_mag1, offset_in_mag1)
        )

        old_end_offset_in_mag1 = current_offset_in_mag1 + current_size_in_mag1
        new_end_offset_in_mag1 = (np.array(offset) + np.array(data.shape[-3:])) * mag_np
        max_end_offset_in_mag1 = np.array(
            [old_end_offset_in_mag1, new_end_offset_in_mag1]
        ).max(axis=0)
        total_size_in_mag1 = max_end_offset_in_mag1 - np.array(new_offset_in_mag1)

        self._size = cast(
            Tuple[int, int, int],
            tuple(_convert_mag1_offset(max_end_offset_in_mag1, self.mag)),
        )  # The base view of a MagDataset always starts at (0, 0, 0)

        self.layer.bounding_box = BoundingBox(
            new_offset_in_mag1,
            total_size_in_mag1,
        )

Writes the data at the specified offset to disk (like webknossos.dataset.view.View.write()).

The offset refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)). If the data exceeds the original bounding box, the properties are updated.

Note that writing compressed data which is not aligned with the blocks on disk may result in diminished performance, as full blocks will automatically be read to pad the write actions.

#   def get_view( self, offset: Union[Tuple[int, int, int], numpy.ndarray] = None, size: Union[Tuple[int, int, int], numpy.ndarray] = None, read_only: bool = None ) -> webknossos.dataset.view.View:
View Source
    def get_view(
        self,
        offset: Vec3 = None,
        size: Vec3 = None,
        read_only: bool = None,
    ) -> View:
        """
        Returns a view that is limited to the specified bounding box.

        The `offset` refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)).
        The default value for `offset` is the offset that is specified in the properties.
        The default value for `size` is calculated so that the bounding box ends where the bounding box from the
        properties ends.
        Therefore, if both (`offset` and `size`) are not specified, then the bounding box of the view is equal to the
        bounding box specified in the properties.

        The `offset` and `size` may only exceed the bounding box from the properties, if `read_only` is set to `True`.

        If `read_only` is `True`, write operations are not allowed for the returned sub-view.

        Example:
        ```python
        # ...
        # Let 'mag1' be a `MagView` with offset (0, 0, 0) and size (100, 200, 300)

        # Properties are used to determine the default parameter
        view_with_bb_from_properties = mag1.get_view()

        # Sub-view where the specified bounding box is completely in the bounding box of the MagView
        sub_view1 = mag1.get_view(offset=(50, 60, 70), size=(10, 120, 230))

        # Fails because the specified view is not completely in the bounding box from the properties.
        sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)

        # Sub-view where the specified bounding box is NOT completely in the bounding box of the MagView.
        # This still works because `read_only=True`.
        sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)
        ```
        """

        bb = self.layer.bounding_box

        # The (-1, -1, -1) is for backwards compatibility because we used '(-1, -1, -1)' to indicate that there is no data written yet.
        if tuple(bb.topleft) == (-1, -1, -1):
            bb.topleft = np.array((0, 0, 0))

        bb = bb.align_with_mag(self.mag, ceil=True).in_mag(self.mag)

        view_offset = cast(
            Tuple[int, int, int],
            tuple(offset if offset is not None else tuple(bb.topleft)),
        )

        if size is None:
            size = cast(
                Tuple[int, int, int], tuple(bb.bottomright - np.array(view_offset))
            )

        assert bb.contains_bbox(BoundingBox(view_offset, size)) or read_only
        return super().get_view(
            view_offset,
            cast(Tuple[int, int, int], tuple(size)),
            read_only,
        )

Returns a view that is limited to the specified bounding box.

The offset refers to the absolute position, regardless of the offset in the properties (because the global_offset is set to (0, 0, 0)). The default value for offset is the offset that is specified in the properties. The default value for size is calculated so that the bounding box ends where the bounding box from the properties ends. Therefore, if both (offset and size) are not specified, then the bounding box of the view is equal to the bounding box specified in the properties.

The offset and size may only exceed the bounding box from the properties, if read_only is set to True.

If read_only is True, write operations are not allowed for the returned sub-view.

Example:

# ...
# Let 'mag1' be a `MagView` with offset (0, 0, 0) and size (100, 200, 300)

# Properties are used to determine the default parameter
view_with_bb_from_properties = mag1.get_view()

# Sub-view where the specified bounding box is completely in the bounding box of the MagView
sub_view1 = mag1.get_view(offset=(50, 60, 70), size=(10, 120, 230))

# Fails because the specified view is not completely in the bounding box from the properties.
sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)

# Sub-view where the specified bounding box is NOT completely in the bounding box of the MagView.
# This still works because `read_only=True`.
sub_view2 = mag1.get_view(offset=(50, 60, 70), size=(999, 120, 230), read_only=True)
#   def get_bounding_boxes_on_disk( self ) -> Generator[Tuple[Tuple[int, int, int], Tuple[int, int, int]], NoneType, NoneType]:
View Source
    def get_bounding_boxes_on_disk(
        self,
    ) -> Generator[Tuple[Tuple[int, int, int], Tuple[int, int, int]], None, None]:
        """
        Returns a bounding box for each file on disk.
        A bounding box is represented as a tuple of the offset and the size.

        This differs from the bounding box in the properties in two ways:
        - the bounding box in the properties is always specified in mag 1
        - the bounding box in the properties is an "overall" bounding box, which abstracts from the files on disk
        """
        cube_size = self._get_file_dimensions()
        was_opened = self._is_opened

        if not was_opened:
            self.open()  # opening the view is necessary to set the dataset

        assert self._dataset is not None
        for filename in self._dataset.list_files():
            file_path = Path(os.path.splitext(filename)[0]).relative_to(self._path)
            cube_index = _extract_file_index(file_path)
            cube_offset = [idx * size for idx, size in zip(cube_index, cube_size)]

            yield (cube_offset[0], cube_offset[1], cube_offset[2]), cube_size

        if not was_opened:
            self.close()

Returns a bounding box for each file on disk. A bounding box is represented as a tuple of the offset and the size.

This differs from the bounding box in the properties in two ways:

  • the bounding box in the properties is always specified in mag 1
  • the bounding box in the properties is an "overall" bounding box, which abstracts from the files on disk
#   def compress( self, target_path: Union[str, pathlib.Path] = None, args: argparse.Namespace = None ) -> None:
View Source
    def compress(
        self, target_path: Union[str, Path] = None, args: Namespace = None
    ) -> None:
        """
        Compresses the files on disk. This has consequences for writing data (see `write`).

        The data gets compressed inplace, if target_path is None.
        Otherwise it is written to target_path/layer_name/mag.
        """

        if target_path is not None:
            target_path = Path(target_path)

        uncompressed_full_path = (
            Path(self.layer.dataset.path) / self.layer.name / self.name
        )
        compressed_path = (
            target_path
            if target_path is not None
            else Path("{}.compress-{}".format(self.layer.dataset.path, uuid4()))
        )
        compressed_full_path = compressed_path / self.layer.name / self.name

        if compressed_full_path.exists():
            logging.error(
                "Target path '{}' already exists".format(compressed_full_path)
            )
            exit(1)

        logging.info(
            "Compressing mag {0} in '{1}'".format(
                self.name, str(uncompressed_full_path)
            )
        )

        was_opened = self._is_opened
        if not was_opened:
            self.open()  # opening the view is necessary to set the dataset
        assert self._dataset is not None

        # create empty wkw.Dataset
        self._dataset.compress(str(compressed_full_path))

        # compress all files to and move them to 'compressed_path'
        with get_executor_for_args(args) as executor:
            job_args = []
            for file in self._dataset.list_files():
                rel_file = Path(file).relative_to(self.layer.dataset.path)
                job_args.append((Path(file), compressed_path / rel_file))

            wait_and_ensure_success(
                executor.map_to_futures(compress_file_job, job_args)
            )

        logging.info("Mag {0} successfully compressed".format(self.name))

        if not was_opened:
            self.close()

        if target_path is None:
            shutil.rmtree(uncompressed_full_path)
            shutil.move(str(compressed_full_path), uncompressed_full_path)
            shutil.rmtree(compressed_path)

            # update the handle to the new dataset
            MagView.__init__(
                self,
                self.layer,
                self.mag,
                self.header.block_len,
                self.header.file_len,
                wkw.Header.BLOCK_TYPE_LZ4HC,
            )

Compresses the files on disk. This has consequences for writing data (see write).

The data gets compressed inplace, if target_path is None. Otherwise it is written to target_path/layer_name/mag.