diff --git a/pyotb/__init__.py b/pyotb/__init__.py index 6164870c44c29fe2ec11e072412f98b38045cb7d..ac6264cb652a2f2e86a9a47d34a922ea8f58b8ba 100644 --- a/pyotb/__init__.py +++ b/pyotb/__init__.py @@ -2,19 +2,15 @@ """This module provides convenient python wrapping of otbApplications.""" __version__ = "1.6.0" -from .helpers import find_otb, logger, set_logger_level - -otb = find_otb() - +from .helpers import logger, set_logger_level from .apps import * - from .core import ( + App, Input, Output, get_nbchannels, get_pixel_type ) - from .functions import ( # pylint: disable=redefined-builtin all, any, diff --git a/pyotb/apps.py b/pyotb/apps.py index 9421b28d18ee3680e48afa047fd9c52bdf618634..bad5089cba57143f1cf36c3466478641cb8017c6 100644 --- a/pyotb/apps.py +++ b/pyotb/apps.py @@ -36,9 +36,9 @@ def get_available_applications(as_subprocess: bool = False) -> list[str]: cmd_args = [sys.executable, "-c", pycmd] try: params = {"env": env, "stdout": subprocess.PIPE, "stderr": subprocess.PIPE} - with subprocess.Popen(cmd_args, **params) as p: + with subprocess.Popen(cmd_args, **params) as process: logger.debug('Exec "%s \'%s\'"', ' '.join(cmd_args[:-1]), pycmd) - stdout, stderr = p.communicate() + stdout, stderr = process.communicate() stdout, stderr = stdout.decode(), stderr.decode() # ast.literal_eval is secure and will raise more handy Exceptions than eval from ast import literal_eval # pylint: disable=import-outside-toplevel diff --git a/pyotb/core.py b/pyotb/core.py index fb07a7bf910d87d6f4ca9cab00be783002e59252..cc3c96e869ce54518929e3ba4522bb6d157e9c99 100644 --- a/pyotb/core.py +++ b/pyotb/core.py @@ -6,6 +6,7 @@ from ast import literal_eval from pathlib import Path from time import perf_counter from typing import Any +from abc import ABC, abstractmethod import numpy as np import otbApplication as otb # pylint: disable=import-error @@ -13,66 +14,24 @@ import otbApplication as otb # pylint: disable=import-error from .helpers import logger -class OTBObject: - """Base class for all pyotb objects.""" +class RasterInterface(ABC): + """Abstraction of an image object.""" - def __init__(self, name: str, app: otb.Application, image_dic: dict = None): - """Constructor for an OTBObject. - - Args: - name: name of the object (e.g. "Slicer") - app: OTB application instance - image_dic: enables to keep a reference to image_dic. image_dic is a dictionary, such as - the result of self.app.ExportImage(). Use it when the app takes a numpy array as input. - See this related issue for why it is necessary to keep reference of object: - https://gitlab.orfeo-toolbox.org/orfeotoolbox/otb/-/issues/1824 - - """ - self.name = name - self.app = app - self.image_dic = image_dic - - self.parameters_keys = tuple(self.app.GetParametersKeys()) - self.all_param_types = {k: self.app.GetParameterType(k) for k in self.parameters_keys} - self.out_param_types = {k: v for k, v in self.all_param_types.items() - if v in (otb.ParameterType_OutputImage, - otb.ParameterType_OutputVectorData, - otb.ParameterType_OutputFilename)} - - self.exports_dic = {} - - def get_first_key(self, param_types: list[str]) -> str: - """Get the first output param key for specific file types.""" - for key, param_type in sorted(self.all_param_types.items()): - if param_type in param_types: - return key - return None - - @property - def key_input(self) -> str: - """Get the name of first input parameter, raster > vector > file.""" - return self.get_first_key(param_types=[otb.ParameterType_InputImage, - otb.ParameterType_InputImageList]) \ - or self.get_first_key(param_types=[otb.ParameterType_InputVectorData, - otb.ParameterType_InputVectorDataList]) \ - or self.get_first_key(param_types=[otb.ParameterType_InputFilename, - otb.ParameterType_InputFilenameList]) + app: otb.Application + exports_dic: dict @property - def key_input_image(self) -> str: - """Get the name of first input image parameter.""" - return self.get_first_key(param_types=[otb.ParameterType_InputImage, otb.ParameterType_InputImageList]) + @abstractmethod + def key_output_image(self): + """Returns the name of a parameter associated to an image. Property defined in App and Output.""" - @property - def key_output_image(self) -> str: - """Get the name of first output image parameter.""" - return self.get_first_key(param_types=[otb.ParameterType_OutputImage]) + @abstractmethod + def write(self): + """Write image, this is defined in App. Output will use App.write for a specific key.""" @property - def metadata(self): + def metadata(self) -> dict[str, (str, float, list[float])]: """Return first output image metadata dictionary.""" - if not self.key_output_image: - raise TypeError(f"{self.name}: this application has no raster output") return dict(self.app.GetMetadataDictionary(self.key_output_image)) @property @@ -83,50 +42,40 @@ class OTBObject: dtype: pixel type of the output image """ - if not self.key_output_image: - raise TypeError(f"{self.name}: this application has no raster output") enum = self.app.GetParameterOutputImagePixelType(self.key_output_image) return self.app.ConvertPixelTypeToNumpy(enum) @property - def shape(self) -> tuple(int): + def shape(self) -> tuple[int]: """Enables to retrieve the shape of a pyotb object using numpy convention. Returns: shape: (height, width, bands) """ - if not self.key_output_image: - raise TypeError(f"\"{self.name}\" has no raster output") width, height = self.app.GetImageSize(self.key_output_image) bands = self.app.GetImageNbBands(self.key_output_image) return height, width, bands @property - def transform(self) -> tuple(int): + def transform(self) -> tuple[int]: """Get image affine transform, rasterio style (see https://www.perrygeo.com/python-affine-transforms.html). Returns: transform: (X spacing, X offset, X origin, Y offset, Y spacing, Y origin) """ - if not self.key_output_image: - raise TypeError(f"{self.name}: this application has no raster output") spacing_x, spacing_y = self.app.GetImageSpacing(self.key_output_image) origin_x, origin_y = self.app.GetImageOrigin(self.key_output_image) # Shift image origin since OTB is giving coordinates of pixel center instead of corners origin_x, origin_y = origin_x - spacing_x / 2, origin_y - spacing_y / 2 return spacing_x, 0.0, origin_x, 0.0, spacing_y, origin_y - def get_infos(self): + def get_infos(self) -> dict[str, (str, float, list[float])]: """Return a dict output of ReadImageInfo for the first image output.""" - if not self.key_output_image: - raise TypeError(f"{self.name}: this application has no raster output") return App("ReadImageInfo", self, quiet=True).data - def get_statistics(self): + def get_statistics(self) -> dict[str, (str, float, list[float])]: """Return a dict output of ComputeImagesStatistics for the first image output.""" - if not self.key_output_image: - raise TypeError(f"{self.name}: this application has no raster output") return App("ComputeImagesStatistics", self, quiet=True).data def read_values_at_coords(self, row: int, col: int, bands: int = None) -> list[int | float] | int | float: @@ -151,7 +100,7 @@ class OTBObject: elif isinstance(bands, slice): channels = self.channels_list_from_slice(bands) elif not isinstance(bands, list): - raise TypeError(f"{self.name}: type '{type(bands)}' cannot be interpreted as a valid slicing") + raise TypeError(f"{self.app.GetName()}: type '{type(bands)}' cannot be interpreted as a valid slicing") if channels: app.app.Execute() app.set_parameters({"cl": [f"Channel{n + 1}" for n in channels]}) @@ -176,7 +125,7 @@ class OTBObject: return list(range(0, stop, step)) if start is None and stop is None: return list(range(0, nb_channels, step)) - raise ValueError(f"{self.name}: '{bands}' cannot be interpreted as valid slicing.") + raise ValueError(f"{self.app.GetName()}: '{bands}' cannot be interpreted as valid slicing.") def export(self, key: str = None, preserve_dtype: bool = True) -> dict[str, dict[str, np.ndarray]]: """Export a specific output image as numpy array and store it in object exports_dic. @@ -184,7 +133,7 @@ class OTBObject: Args: key: parameter key to export, if None then the default one will be used preserve_dtype: when set to True, the numpy array is converted to the same pixel type as - the OTBObject first output. Default is True + the App first output. Default is True Returns: the exported numpy array @@ -204,7 +153,7 @@ class OTBObject: Args: key: the output parameter name to export as numpy array preserve_dtype: when set to True, the numpy array is converted to the same pixel type as - the OTBObject first output. Default is True + the App first output. Default is True copy: whether to copy the output array, default is False required to True if preserve_dtype is False and the source app reference is lost @@ -227,14 +176,13 @@ class OTBObject: """ array = self.to_numpy(preserve_dtype=True, copy=False) - array = np.moveaxis(array, 2, 0) + height, width, count = array.shape proj = self.app.GetImageProjection(self.key_output_image) profile = { - 'crs': proj, 'dtype': array.dtype, - 'count': array.shape[0], 'height': array.shape[1], 'width': array.shape[2], - 'transform': self.transform + 'crs': proj, 'dtype': array.dtype, 'transform': self.transform, + 'count': count, 'height': height, 'width': width, } - return array, profile + return np.moveaxis(array, 2, 0), profile def xy_to_rowcol(self, x: float, y: float) -> tuple[int, int]: """Find (row, col) index using (x, y) projected coordinates - image CRS is expected. @@ -250,55 +198,7 @@ class OTBObject: row, col = (origin_y - y) / spacing_y, (x - origin_x) / spacing_x return abs(int(row)), int(col) - # Special functions - def __hash__(self): - """Override the default behaviour of the hash function. - - Returns: - self hash - - """ - return id(self) - - def __getitem__(self, key): - """Override the default __getitem__ behaviour. - - This function enables 2 things : - - access attributes like that : object['any_attribute'] - - slicing, i.e. selecting ROI/bands. For example, selecting first 3 bands: object[:, :, :3] - selecting bands 1, 2 & 5 : object[:, :, [0, 1, 4]] - selecting 1000x1000 subset : object[:1000, :1000] - - access pixel value(s) at a specified row, col index - - Args: - key: attribute key - - Returns: - attribute, pixel values or Slicer - - """ - # Accessing string attributes - if isinstance(key, str): - return self.__dict__.get(key) - # Accessing pixel value(s) using Y/X coordinates - if isinstance(key, tuple) and len(key) >= 2: - row, col = key[0], key[1] - if isinstance(row, int) and isinstance(col, int): - if row < 0 or col < 0: - raise ValueError(f"{self.name}: can't read pixel value at negative coordinates ({row}, {col})") - channels = None - if len(key) == 3: - channels = key[2] - return self.read_values_at_coords(row, col, channels) - # Slicing - if not isinstance(key, tuple) or (isinstance(key, tuple) and (len(key) < 2 or len(key) > 3)): - raise ValueError(f'"{key}"cannot be interpreted as valid slicing. Slicing should be 2D or 3D.') - if isinstance(key, tuple) and len(key) == 2: - # Adding a 3rd dimension - key = key + (slice(None, None, None),) - return Slicer(self, *key) - - def __add__(self, other: OTBObject | str | int | float) -> Operation: + def __add__(self, other: App | str | int | float) -> Operation: """Overrides the default addition and flavours it with BandMathX. Args: @@ -312,7 +212,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("+", self, other) - def __sub__(self, other: OTBObject | str | int | float) -> Operation: + def __sub__(self, other: App | str | int | float) -> Operation: """Overrides the default subtraction and flavours it with BandMathX. Args: @@ -326,7 +226,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("-", self, other) - def __mul__(self, other: OTBObject | str | int | float) -> Operation: + def __mul__(self, other: App | str | int | float) -> Operation: """Overrides the default subtraction and flavours it with BandMathX. Args: @@ -340,7 +240,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("*", self, other) - def __truediv__(self, other: OTBObject | str | int | float) -> Operation: + def __truediv__(self, other: App | str | int | float) -> Operation: """Overrides the default subtraction and flavours it with BandMathX. Args: @@ -354,7 +254,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("/", self, other) - def __radd__(self, other: OTBObject | str | int | float) -> Operation: + def __radd__(self, other: App | str | int | float) -> Operation: """Overrides the default reverse addition and flavours it with BandMathX. Args: @@ -368,7 +268,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("+", other, self) - def __rsub__(self, other: OTBObject | str | int | float) -> Operation: + def __rsub__(self, other: App | str | int | float) -> Operation: """Overrides the default subtraction and flavours it with BandMathX. Args: @@ -382,7 +282,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("-", other, self) - def __rmul__(self, other: OTBObject | str | int | float) -> Operation: + def __rmul__(self, other: App | str | int | float) -> Operation: """Overrides the default multiplication and flavours it with BandMathX. Args: @@ -396,7 +296,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return Operation("*", other, self) - def __rtruediv__(self, other: OTBObject | str | int | float) -> Operation: + def __rtruediv__(self, other: App | str | int | float) -> Operation: """Overrides the default division and flavours it with BandMathX. Args: @@ -419,7 +319,7 @@ class OTBObject: """ return Operation("abs", self) - def __ge__(self, other: OTBObject | str | int | float) -> Operation: + def __ge__(self, other: App | str | int | float) -> Operation: """Overrides the default greater or equal and flavours it with BandMathX. Args: @@ -433,7 +333,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation(">=", self, other) - def __le__(self, other: OTBObject | str | int | float) -> Operation: + def __le__(self, other: App | str | int | float) -> Operation: """Overrides the default less or equal and flavours it with BandMathX. Args: @@ -447,7 +347,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation("<=", self, other) - def __gt__(self, other: OTBObject | str | int | float) -> Operation: + def __gt__(self, other: App | str | int | float) -> Operation: """Overrides the default greater operator and flavours it with BandMathX. Args: @@ -461,7 +361,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation(">", self, other) - def __lt__(self, other: OTBObject | str | int | float) -> Operation: + def __lt__(self, other: App | str | int | float) -> Operation: """Overrides the default less operator and flavours it with BandMathX. Args: @@ -475,7 +375,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation("<", self, other) - def __eq__(self, other: OTBObject | str | int | float) -> Operation: + def __eq__(self, other: App | str | int | float) -> Operation: """Overrides the default eq operator and flavours it with BandMathX. Args: @@ -489,7 +389,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation("==", self, other) - def __ne__(self, other: OTBObject | str | int | float) -> Operation: + def __ne__(self, other: App | str | int | float) -> Operation: """Overrides the default different operator and flavours it with BandMathX. Args: @@ -503,7 +403,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation("!=", self, other) - def __or__(self, other: OTBObject | str | int | float) -> Operation: + def __or__(self, other: App | str | int | float) -> Operation: """Overrides the default or operator and flavours it with BandMathX. Args: @@ -517,7 +417,7 @@ class OTBObject: return NotImplemented # this enables to fallback on numpy emulation thanks to __array_ufunc__ return LogicalOperation("||", self, other) - def __and__(self, other: OTBObject | str | int | float) -> Operation: + def __and__(self, other: App | str | int | float) -> Operation: """Overrides the default and operator and flavours it with BandMathX. Args: @@ -543,7 +443,7 @@ class OTBObject: """ return self.to_numpy() - def __array_ufunc__(self, ufunc, method, *inputs, **kwargs) -> OTBObject: + def __array_ufunc__(self, ufunc, method, *inputs, **kwargs) -> App: """This is called whenever a numpy function is called on a pyotb object. Operation is performed in numpy, then imported back to pyotb with the same georeference as input. @@ -566,7 +466,7 @@ class OTBObject: for inp in inputs: if isinstance(inp, (float, int, np.ndarray, np.generic)): arrays.append(inp) - elif isinstance(inp, OTBObject): + elif isinstance(inp, App): if not inp.exports_dic: inp.export() image_dic = inp.exports_dic[inp.key_output_image] @@ -590,19 +490,18 @@ class OTBObject: return NotImplemented -class App(OTBObject): +class App(RasterInterface): """Base class that gathers common operations for any OTB application.""" - def __init__(self, otb_app_name: str, *args, frozen: bool = False, quiet: bool = False, image_dic: dict = None, - name: str = None, **kwargs): + def __init__(self, name: str, *args, frozen: bool = False, quiet: bool = False, image_dic: dict = None, **kwargs): """Common constructor for OTB applications. Handles in-memory connection between apps. Args: - otb_app_name: name of the OTB application, e.g. 'BandMath' + name: name of the app, e.g. 'BandMath' *args: used for passing application parameters. Can be : - dictionary containing key-arguments enumeration. Useful when a key is python-reserved (e.g. "in") or contains reserved characters such as a point (e.g."mode.extent.unit") - - string or OTBObject, useful when the user wants to specify the input "in" + - string, App or Output, useful when the user wants to specify the input "in" - list, useful when the user wants to specify the input list 'il' frozen: freeze OTB app in order to use execute() later and avoid blocking process during __init___ quiet: whether to print logs of the OTB app @@ -610,65 +509,77 @@ class App(OTBObject): the result of app.ExportImage(). Use it when the app takes a numpy array as input. See this related issue for why it is necessary to keep reference of object: https://gitlab.orfeo-toolbox.org/orfeotoolbox/otb/-/issues/1824 - name: override the application name + **kwargs: used for passing application parameters. e.g. il=['input1.tif', App_object2, App_object3.out], out='output.tif' """ + self.name = name self.frozen = frozen self.quiet = quiet - create = otb.Registry.CreateApplicationWithoutLogger if quiet else otb.Registry.CreateApplication - super().__init__(name=name or f"OTB Application {otb_app_name}", app=create(otb_app_name), image_dic=image_dic) - self.description = self.app.GetDocLongDescription() - - # Set parameters + self.image_dic = image_dic + self._time_start, self._time_end = 0, 0 + self.exports_dic = {} self.parameters = {} + # Initialize app, set parameters and execute if not frozen + create = otb.Registry.CreateApplicationWithoutLogger if quiet else otb.Registry.CreateApplication + self.app = create(name) + self.parameters_keys = tuple(self.app.GetParametersKeys()) + self._all_param_types = {k: self.app.GetParameterType(k) for k in self.parameters_keys} + types = (otb.ParameterType_OutputImage, otb.ParameterType_OutputVectorData, otb.ParameterType_OutputFilename) + self._out_param_types = {k: v for k, v in self._all_param_types.items() if v in types} if args or kwargs: self.set_parameters(*args, **kwargs) if not self.frozen: self.execute() - if any(key in self.parameters for key in self.out_param_types): + if any(key in self.parameters for key in self._out_param_types): self.flush() # auto flush if any output param was provided during app init - # Elapsed time - self.time_start, self.time_end = 0, 0 + def get_first_key(self, param_types: list[int]) -> str: + """Get the first output param key for specific file types.""" + for key, param_type in sorted(self._all_param_types.items()): + if param_type in param_types: + return key + return None @property - def elapsed_time(self): - """Get elapsed time between app init and end of exec or file writing.""" - return self.time_end - self.time_start + def key_input(self) -> str: + """Get the name of first input parameter, raster > vector > file.""" + return self.get_first_key([otb.ParameterType_InputImage, otb.ParameterType_InputImageList]) \ + or self.get_first_key([otb.ParameterType_InputVectorData, otb.ParameterType_InputVectorDataList]) \ + or self.get_first_key([otb.ParameterType_InputFilename, otb.ParameterType_InputFilenameList]) @property - def used_outputs(self) -> list[str]: - """List of used application outputs.""" - return [getattr(self, key) for key in self.out_param_types if key in self.parameters] + def key_input_image(self) -> str: + """Get the name of first input image parameter.""" + return self.get_first_key(param_types=[otb.ParameterType_InputImage, otb.ParameterType_InputImageList]) - def find_outputs(self) -> tuple[str]: - """Find output files on disk using path found in parameters. + @property + def key_output_image(self) -> str: + """Get the name of first output image parameter.""" + return self.get_first_key(param_types=[otb.ParameterType_OutputImage]) - Returns: - list of files found on disk + @property + def elapsed_time(self) -> float: + """Get elapsed time between app init and end of exec or file writing.""" + return self._time_end - self._time_start - """ - files, missing = [], [] - for out in self.used_outputs: - dest = files if out.exists() else missing - dest.append(str(out.filepath.absolute())) - for filename in missing: - logger.error("%s: execution seems to have failed, %s does not exist", self.name, filename) - return tuple(files) + @property + def used_outputs(self) -> list[str]: + """List of used application outputs.""" + return [getattr(self, key) for key in self._out_param_types if key in self.parameters] @property - def data(self): + def data(self) -> dict[str, float, list[float]]: """Expose app's output data values in a dictionary.""" - skip_keys = ("ram", "elev.default", "mapproj.utm.zone", "mapproj.utm.northhem") - skip_keys = skip_keys + tuple(self.out_param_types) + tuple(self.parameters) - keys = (k for k in self.parameters_keys if k not in skip_keys) - - def _check(v): - return not isinstance(v, otb.ApplicationProxy) and v not in ("", None, [], ()) - - return {str(k): self[k] for k in keys if _check(self[k])} + known_bad_keys = ("ram", "elev.default", "mapproj.utm.zone", "mapproj.utm.northhem") + skip_keys = known_bad_keys + tuple(self._out_param_types) + tuple(self.parameters) + data_dict = {} + for key in filter(lambda k: k not in skip_keys, self.parameters_keys): + value = self.__dict__.get(key) + if not isinstance(value, otb.ApplicationProxy) and value not in (None, "", [], ()): + data_dict[str(key)] = value + return data_dict def set_parameters(self, *args, **kwargs): """Set some parameters of the app. @@ -679,7 +590,7 @@ class App(OTBObject): Args: *args: Can be : - dictionary containing key-arguments enumeration. Useful when a key is python-reserved (e.g. "in") or contains reserved characters such as a point (e.g."mode.extent.unit") - - string or OTBObject, useful when the user implicitly wants to set the param "in" + - string or App, useful when the user implicitly wants to set the param "in" - list, useful when the user implicitly wants to set the param "il" **kwargs: keyword arguments e.g. il=['input1.tif', oApp_object2, App_object3.out], out='output.tif' @@ -692,7 +603,9 @@ class App(OTBObject): # Going through all arguments for key, obj in parameters.items(): if key not in self.parameters_keys: - raise KeyError(f'{self.name}: unknown parameter name "{key}"') + raise KeyError( + f"{self.name}: parameter '{key}' was not recognized. Available keys are {self.parameters_keys}" + ) # When the parameter expects a list, if needed, change the value to list if is_key_list(self, key) and not isinstance(obj, (list, tuple)): obj = [obj] @@ -712,6 +625,36 @@ class App(OTBObject): self.parameters.update({**parameters, **otb_params}) self.save_objects() + def propagate_dtype(self, target_key: str = None, dtype: int = None): + """Propagate a pixel type from main input to every outputs, or to a target output key only. + + With multiple inputs (if dtype is not provided), the type of the first input is considered. + With multiple outputs (if target_key is not provided), all outputs will be converted to the same pixel type. + + Args: + target_key: output param key to change pixel type + dtype: data type to use + + """ + if not dtype: + param = self.parameters.get(self.key_input_image) + if not param: + logger.warning("%s: could not propagate pixel type from inputs to output", self.name) + return + if isinstance(param, (list, tuple)): + param = param[0] # first image in "il" + try: + dtype = get_pixel_type(param) + except (TypeError, RuntimeError): + logger.warning('%s: unable to identify pixel type of key "%s"', self.name, param) + return + if target_key: + keys = [target_key] + else: + keys = [k for k, v in self._out_param_types.items() if v == otb.ParameterType_OutputImage] + for key in keys: + self.app.SetParameterOutputImagePixelType(key, dtype) + def save_objects(self): """Saving app parameters and outputs as attributes, so that they can be accessed with `obj.key`. @@ -727,7 +670,7 @@ class App(OTBObject): except RuntimeError: continue # this is when there is no value for key # Convert output param path to Output object - if key in self.out_param_types: + if key in self._out_param_types: value = Output(self, key, value) elif isinstance(value, str): try: @@ -740,13 +683,13 @@ class App(OTBObject): def execute(self): """Execute and write to disk if any output parameter has been set during init.""" logger.debug("%s: run execute() with parameters=%s", self.name, self.parameters) - self.time_start = perf_counter() + self._time_start = perf_counter() try: self.app.Execute() except (RuntimeError, FileNotFoundError) as e: raise Exception(f"{self.name}: error during during app execution") from e self.frozen = False - self.time_end = perf_counter() + self._time_end = perf_counter() logger.debug("%s: execution ended", self.name) self.save_objects() # this is required for apps like ReadImageInfo or ComputeImagesStatistics @@ -758,7 +701,7 @@ class App(OTBObject): except RuntimeError: logger.debug("%s: failed with WriteOutput, executing once again with ExecuteAndWriteOutput", self.name) self.app.ExecuteAndWriteOutput() - self.time_end = perf_counter() + self._time_end = perf_counter() def write(self, *args, filename_extension: str = "", pixel_type: dict[str, str] | str = None, preserve_dtype: bool = False, **kwargs): @@ -786,8 +729,8 @@ class App(OTBObject): kwargs.update(arg) elif isinstance(arg, str) and kwargs: logger.warning('%s: keyword arguments specified, ignoring argument "%s"', self.name, arg) - elif isinstance(arg, str) and self.key_output_image: - kwargs.update({self.key_output_image: arg}) + elif isinstance(arg, (str, Path)) and self.key_output_image: + kwargs.update({self.key_output_image: str(arg)}) # Append filename extension to filenames if filename_extension: @@ -795,7 +738,7 @@ class App(OTBObject): if not filename_extension.startswith("?"): filename_extension = "?" + filename_extension for key, value in kwargs.items(): - if self.out_param_types[key] == otb.ParameterType_OutputImage and '?' not in value: + if self._out_param_types[key] == otb.ParameterType_OutputImage and "?" not in value: kwargs[key] = value + filename_extension # Manage output pixel types @@ -805,7 +748,7 @@ class App(OTBObject): type_name = self.app.ConvertPixelTypeToNumpy(parse_pixel_type(pixel_type)) logger.debug('%s: output(s) will be written with type "%s"', self.name, type_name) for key in kwargs: - if self.out_param_types[key] == otb.ParameterType_OutputImage: + if self._out_param_types[key] == otb.ParameterType_OutputImage: dtypes[key] = parse_pixel_type(pixel_type) elif isinstance(pixel_type, dict): dtypes = {k: parse_pixel_type(v) for k, v in pixel_type.items()} @@ -814,43 +757,46 @@ class App(OTBObject): # Set parameters and flush to disk for key, output_filename in kwargs.items(): + if Path(output_filename).exists(): + logger.warning("%s: overwriting file %s", self.name, output_filename) if key in dtypes: self.propagate_dtype(key, dtypes[key]) self.set_parameters({key: output_filename}) self.flush() - def propagate_dtype(self, target_key: str = None, dtype: int = None): - """Propagate a pixel type from main input to every outputs, or to a target output key only. + def find_outputs(self) -> tuple[str]: + """Find output files on disk using path found in parameters. - With multiple inputs (if dtype is not provided), the type of the first input is considered. - With multiple outputs (if target_key is not provided), all outputs will be converted to the same pixel type. + Returns: + list of files found on disk - Args: - target_key: output param key to change pixel type - dtype: data type to use + """ + files, missing = [], [] + for out in self.used_outputs: + dest = files if out.exists() else missing + dest.append(str(out.filepath.absolute())) + for filename in missing: + logger.error("%s: execution seems to have failed, %s does not exist", self.name, filename) + return tuple(files) + + def summarize(self) -> dict[str, str | dict[str, Any]]: + """Serialize an object and its pipeline into a dictionary. + + Returns: + nested dictionary summarizing the pipeline """ - if not dtype: - param = self.parameters.get(self.key_input_image) - if not param: - logger.warning("%s: could not propagate pixel type from inputs to output", self.name) - return - if isinstance(param, (list, tuple)): - param = param[0] # first image in "il" - try: - dtype = get_pixel_type(param) - except (TypeError, RuntimeError): - logger.warning('%s: unable to identify pixel type of key "%s"', self.name, param) - return - if target_key: - keys = [target_key] - else: - keys = [k for k, v in self.out_param_types.items() if v == otb.ParameterType_OutputImage] - for key in keys: - self.app.SetParameterOutputImagePixelType(key, dtype) + parameters = self.parameters.copy() + for key, param in parameters.items(): + # In the following, we replace each parameter which is an App, with its summary. + if isinstance(param, App): # single parameter + parameters[key] = param.summarize() + elif isinstance(param, list): # parameter list + parameters[key] = [p.summarize() if isinstance(p, App) else p for p in param] + return {"name": self.app.GetName(), "parameters": parameters} # Private functions - def __parse_args(self, args: list[str | OTBObject | dict | list]) -> dict[str, Any]: + def __parse_args(self, args: list[str | App | dict | list]) -> dict[str, Any]: """Gather all input arguments in kwargs dict. Args: @@ -864,21 +810,17 @@ class App(OTBObject): for arg in args: if isinstance(arg, dict): kwargs.update(arg) - elif isinstance(arg, (str, OTBObject)) or isinstance(arg, list) and is_key_list(self, self.key_input): + elif isinstance(arg, (str, App)) or isinstance(arg, list) and is_key_list(self, self.key_input): kwargs.update({self.key_input: arg}) return kwargs - def __set_param(self, key: str, obj: list | tuple | OTBObject | otb.Application | list[Any]): + def __set_param(self, key: str, obj: list | tuple | App | otb.Application | list[Any]): """Set one parameter, decide which otb.Application method to use depending on target object.""" if obj is None or (isinstance(obj, (list, tuple)) and not obj): self.app.ClearValue(key) return - if key not in self.parameters_keys: - raise KeyError( - f"{self.name}: parameter '{key}' was not recognized. Available keys are {self.parameters_keys}" - ) # Single-parameter cases - if isinstance(obj, OTBObject): + if isinstance(obj, App): self.app.ConnectImage(key, obj.app, obj.key_output_image) elif isinstance(obj, otb.Application): # this is for backward comp with plain OTB self.app.ConnectImage(key, obj, get_out_images_param_keys(obj)[0]) @@ -890,7 +832,7 @@ class App(OTBObject): elif is_key_images_list(self, key): # To enable possible in-memory connections, we go through the list and set the parameters one by one for inp in obj: - if isinstance(inp, OTBObject): + if isinstance(inp, App): self.app.ConnectImage(key, inp.app, inp.key_output_image) elif isinstance(inp, otb.Application): # this is for backward comp with plain OTB self.app.ConnectImage(key, obj, get_out_images_param_keys(inp)[0]) @@ -901,23 +843,55 @@ class App(OTBObject): else: self.app.SetParameterValue(key, obj) - def summarize(self) -> dict: - """Serialize an object and its pipeline into a dictionary. + # Special functions + def __hash__(self) -> int: + """Override the default behaviour of the hash function. Returns: - nested dictionary summarizing the pipeline + self hash + + """ + return id(self) + + def __getitem__(self, key) -> Any | list[int | float] | int | float | Slicer: + """Override the default __getitem__ behaviour. + + This function enables 2 things : + - access attributes like that : object['any_attribute'] + - slicing, i.e. selecting ROI/bands. For example, selecting first 3 bands: object[:, :, :3] + selecting bands 1, 2 & 5 : object[:, :, [0, 1, 4]] + selecting 1000x1000 subset : object[:1000, :1000] + - access pixel value(s) at a specified row, col index + + Args: + key: attribute key + + Returns: + attribute, pixel values or Slicer """ - params = self.parameters - for k, p in params.items(): - # In the following, we replace each parameter which is an OTBObject, with its summary. - if isinstance(p, App): # single parameter - params[k] = p.summarize() - elif isinstance(p, list): # parameter list - params[k] = [pi.summarize() if isinstance(pi, App) else pi for pi in p] - return {"name": self.app.GetName(), "parameters": params} - - def __str__(self): + # Accessing string attributes + if isinstance(key, str): + return getattr(self, key) + # Accessing pixel value(s) using Y/X coordinates + if isinstance(key, tuple) and len(key) >= 2: + row, col = key[0], key[1] + if isinstance(row, int) and isinstance(col, int): + if row < 0 or col < 0: + raise ValueError(f"{self.name}: can't read pixel value at negative coordinates ({row}, {col})") + channels = None + if len(key) == 3: + channels = key[2] + return self.read_values_at_coords(row, col, channels) + # Slicing + if not isinstance(key, tuple) or (isinstance(key, tuple) and (len(key) < 2 or len(key) > 3)): + raise ValueError(f'"{key}"cannot be interpreted as valid slicing. Slicing should be 2D or 3D.') + if isinstance(key, tuple) and len(key) == 2: + # Adding a 3rd dimension + key = key + (slice(None, None, None),) + return Slicer(self, *key) + + def __str__(self) -> str: """Return a nice string representation with object id.""" return f"<pyotb.App {self.name} object id {id(self)}>" @@ -925,7 +899,7 @@ class App(OTBObject): class Slicer(App): """Slicer objects i.e. when we call something like raster[:, :, 2] from Python.""" - def __init__(self, obj: OTBObject | str, rows: int, cols: int, channels: int): + def __init__(self, obj: App | str, rows: int, cols: int, channels: int): """Create a slicer object, that can be used directly for writing or inside a BandMath. It contains : @@ -939,7 +913,8 @@ class Slicer(App): channels: channels, can be slicing, list or int """ - super().__init__("ExtractROI", {"in": obj, "mode": "extent"}, quiet=True, frozen=True, name="Slicer") + super().__init__("ExtractROI", obj, mode="extent", quiet=True, frozen=True) + self.name = "Slicer" self.rows, self.cols = rows, cols parameters = {} @@ -1017,7 +992,7 @@ class Operation(App): Args: operator: (str) one of +, -, *, /, >, <, >=, <=, ==, !=, &, |, abs, ? - *inputs: inputs. Can be OTBObject, filepath, int or float + *inputs: inputs. Can be App, filepath, int or float nb_bands: to specify the output nb of bands. Optional. Used only internally by pyotb.where name: override the Operation name @@ -1029,7 +1004,7 @@ class Operation(App): self.nb_channels = {} self.fake_exp_bands = [] self.logical_fake_exp_bands = [] - self.create_fake_exp(operator, inputs, nb_bands=nb_bands) + self.build_fake_expressions(operator, inputs, nb_bands=nb_bands) # Transforming images to the adequate im#, e.g. `input1` to "im1" # creating a dictionary that is like {str(input1): 'im1', 'image2.tif': 'im2', ...}. # NB: the keys of the dictionary are strings-only, instead of 'complex' objects, to enable easy serialization @@ -1045,19 +1020,19 @@ class Operation(App): # Getting unique image inputs, in the order im1, im2, im3 ... self.unique_inputs = [mapping_str_to_input[str_input] for str_input in sorted(self.im_dic, key=self.im_dic.get)] self.exp_bands, self.exp = self.get_real_exp(self.fake_exp_bands) + appname = "BandMath" if len(self.exp_bands) == 1 else "BandMathX" # Execute app - super().__init__("BandMath" if len(self.exp_bands) == 1 else "BandMathX", il=self.unique_inputs, - exp=self.exp, quiet=True, name=name or f'Operation exp="{self.exp}"') + super().__init__(appname, il=self.unique_inputs, exp=self.exp, quiet=True) + self.name = f'Operation exp="{self.exp}"' - def create_fake_exp(self, operator: str, inputs: list[OTBObject | str | int | float], - nb_bands: int = None): - """Create a 'fake' expression. + def build_fake_expressions(self, operator: str, inputs: list[App | str | int | float], nb_bands: int = None): + """Create a list of 'fake' expressions, one for each band. E.g for the operation input1 + input2, we create a fake expression that is like "str(input1) + str(input2)" Args: operator: (str) one of +, -, *, /, >, <, >=, <=, ==, !=, &, |, abs, ? - inputs: inputs. Can be OTBObject, filepath, int or float + inputs: inputs. Can be App, filepath, int or float nb_bands: to specify the output nb of bands. Optional. Used only internally by pyotb.where """ @@ -1074,33 +1049,25 @@ class Operation(App): else: nb_bands_list = [get_nbchannels(inp) for inp in inputs if not isinstance(inp, (float, int))] # check that all inputs have the same nb of bands - if len(nb_bands_list) > 1: - if not all(x == nb_bands_list[0] for x in nb_bands_list): - raise ValueError("All images do not have the same number of bands") + if len(nb_bands_list) > 1 and not all(x == nb_bands_list[0] for x in nb_bands_list): + raise ValueError("All images do not have the same number of bands") nb_bands = nb_bands_list[0] # Create a list of fake expressions, each item of the list corresponding to one band self.fake_exp_bands.clear() for i, band in enumerate(range(1, nb_bands + 1)): - fake_exps = [] + expressions = [] for k, inp in enumerate(inputs): # Generating the fake expression of the current input, # this is a special case for the condition of the ternary operator `cond ? x : y` if len(inputs) == 3 and k == 0: # When cond is monoband whereas the result is multiband, we expand the cond to multiband - if nb_bands != inp.shape[2]: - cond_band = 1 - else: - cond_band = band - fake_exp, corresponding_inputs, nb_channels = self.create_one_input_fake_exp( - inp, cond_band, keep_logical=True - ) + cond_band = 1 if nb_bands != inp.shape[2] else band + fake_exp, corresponding_inputs, nb_channels = self.make_fake_exp(inp, cond_band, keep_logical=True) else: # Any other input - fake_exp, corresponding_inputs, nb_channels = self.create_one_input_fake_exp( - inp, band, keep_logical=False - ) - fake_exps.append(fake_exp) + fake_exp, corresponding_inputs, nb_channels = self.make_fake_exp(inp, band, keep_logical=False) + expressions.append(fake_exp) # Reference the inputs and nb of channels (only on first pass in the loop to avoid duplicates) if i == 0 and corresponding_inputs and nb_channels: self.inputs.extend(corresponding_inputs) @@ -1108,16 +1075,16 @@ class Operation(App): # Generating the fake expression of the whole operation if len(inputs) == 1: # this is only for 'abs' - fake_exp = f"({operator}({fake_exps[0]}))" + fake_exp = f"({operator}({expressions[0]}))" elif len(inputs) == 2: # We create here the "fake" expression. For example, for a BandMathX expression such as '2 * im1 + im2', # the false expression stores the expression 2 * str(input1) + str(input2) - fake_exp = f"({fake_exps[0]} {operator} {fake_exps[1]})" + fake_exp = f"({expressions[0]} {operator} {expressions[1]})" elif len(inputs) == 3 and operator == "?": # this is only for ternary expression - fake_exp = f"({fake_exps[0]} ? {fake_exps[1]} : {fake_exps[2]})" + fake_exp = f"({expressions[0]} ? {expressions[1]} : {expressions[2]})" self.fake_exp_bands.append(fake_exp) - def get_real_exp(self, fake_exp_bands: str) -> tuple(list[str], str): + def get_real_exp(self, fake_exp_bands: str) -> tuple[list[str], str]: """Generates the BandMathX expression. Args: @@ -1137,13 +1104,11 @@ class Operation(App): one_band_exp = one_band_exp.replace(str(inp), self.im_dic[str(inp)]) exp_bands.append(one_band_exp) # Form the final expression (e.g. 'im1b1 + 1; im1b2 + 1') - exp = ";".join(exp_bands) - return exp_bands, exp + return exp_bands, ";".join(exp_bands) @staticmethod - def create_one_input_fake_exp(x: OTBObject | str, - band: int, keep_logical: bool = False) -> tuple(str, list[OTBObject], int): - """This an internal function, only to be used by `create_fake_exp`. + def make_fake_exp(x: App | str, band: int, keep_logical: bool = False) -> tuple[str, list[App], int]: + """This an internal function, only to be used by `build_fake_expressions`. Enable to create a fake expression just for one input and one band. @@ -1152,7 +1117,7 @@ class Operation(App): band: which band to consider (bands start at 1) keep_logical: whether to keep the logical expressions "as is" in case the input is a logical operation. ex: if True, for `input1 > input2`, returned fake expression is "str(input1) > str(input2)" - if False, for `input1 > input2`, returned fake exp is "str(input1) > str(input2) ? 1 : 0". + if False, for `input1 > input2`, returned fake exp is "str(input1) > str(input2) ? 1 : 0"] Default False Returns: @@ -1165,38 +1130,32 @@ class Operation(App): if isinstance(x, Slicer) and hasattr(x, "one_band_sliced"): if keep_logical and isinstance(x.input, LogicalOperation): fake_exp = x.input.logical_fake_exp_bands[x.one_band_sliced - 1] - inputs = x.input.inputs - nb_channels = x.input.nb_channels + inputs, nb_channels = x.input.inputs, x.input.nb_channels elif isinstance(x.input, Operation): # Keep only one band of the expression fake_exp = x.input.fake_exp_bands[x.one_band_sliced - 1] - inputs = x.input.inputs - nb_channels = x.input.nb_channels + inputs, nb_channels = x.input.inputs, x.input.nb_channels else: # Add the band number (e.g. replace '<pyotb.App object>' by '<pyotb.App object>b1') fake_exp = f"{x.input}b{x.one_band_sliced}" - inputs = [x.input] - nb_channels = {x.input: 1} + inputs, nb_channels = [x.input], {x.input: 1} # For LogicalOperation, we save almost the same attributes as an Operation elif keep_logical and isinstance(x, LogicalOperation): fake_exp = x.logical_fake_exp_bands[band - 1] - inputs = x.inputs - nb_channels = x.nb_channels + inputs, nb_channels = x.inputs, x.nb_channels elif isinstance(x, Operation): fake_exp = x.fake_exp_bands[band - 1] - inputs = x.inputs - nb_channels = x.nb_channels + inputs, nb_channels = x.inputs, x.nb_channels # For int or float input, we just need to save their value elif isinstance(x, (int, float)): fake_exp = str(x) - inputs = None - nb_channels = None + inputs, nb_channels = None, None # We go on with other inputs, i.e. pyotb objects, filepaths... else: - nb_channels = {x: get_nbchannels(x)} - inputs = [x] # Add the band number (e.g. replace '<pyotb.App object>' by '<pyotb.App object>b1') fake_exp = f"{x}b{band}" + inputs, nb_channels = [x], {x: get_nbchannels(x)} + return fake_exp, inputs, nb_channels def __str__(self) -> str: @@ -1224,15 +1183,15 @@ class LogicalOperation(Operation): super().__init__(operator, *inputs, nb_bands=nb_bands, name="LogicalOperation") self.logical_exp_bands, self.logical_exp = self.get_real_exp(self.logical_fake_exp_bands) - def create_fake_exp(self, operator: str, inputs: list[OTBObject | str | int | float], nb_bands: int = None): - """Create a 'fake' expression. + def build_fake_expressions(self, operator: str, inputs: list[App | str | int | float], nb_bands: int = None): + """Create a list of 'fake' expressions, one for each band. e.g for the operation input1 > input2, we create a fake expression that is like "str(input1) > str(input2) ? 1 : 0" and a logical fake expression that is like "str(input1) > str(input2)" Args: operator: str (one of >, <, >=, <=, ==, !=, &, |) - inputs: Can be OTBObject, filepath, int or float + inputs: Can be App, filepath, int or float nb_bands: to specify the output nb of bands. Optional. Used only internally by pyotb.where """ @@ -1242,24 +1201,22 @@ class LogicalOperation(Operation): else: nb_bands_list = [get_nbchannels(inp) for inp in inputs if not isinstance(inp, (float, int))] # check that all inputs have the same nb of bands - if len(nb_bands_list) > 1: - if not all(x == nb_bands_list[0] for x in nb_bands_list): - raise ValueError("All images do not have the same number of bands") + if len(nb_bands_list) > 1 and not all(x == nb_bands_list[0] for x in nb_bands_list): + raise ValueError("All images do not have the same number of bands") nb_bands = nb_bands_list[0] - # Create a list of fake exp, each item of the list corresponding to one band for i, band in enumerate(range(1, nb_bands + 1)): - fake_exps = [] + expressions = [] for inp in inputs: - fake_exp, corresp_inputs, nb_channels = super().create_one_input_fake_exp(inp, band, keep_logical=True) - fake_exps.append(fake_exp) + fake_exp, corresp_inputs, nb_channels = super().make_fake_exp(inp, band, keep_logical=True) + expressions.append(fake_exp) # Reference the inputs and nb of channels (only on first pass in the loop to avoid duplicates) if i == 0 and corresp_inputs and nb_channels: self.inputs.extend(corresp_inputs) self.nb_channels.update(nb_channels) # We create here the "fake" expression. For example, for a BandMathX expression such as 'im1 > im2', # the logical fake expression stores the expression "str(input1) > str(input2)" - logical_fake_exp = f"({fake_exps[0]} {operator} {fake_exps[1]})" + logical_fake_exp = f"({expressions[0]} {operator} {expressions[1]})" # We keep the logical expression, useful if later combined with other logical operations self.logical_fake_exp_bands.append(logical_fake_exp) # We create a valid BandMath expression, e.g. "str(input1) > str(input2) ? 1 : 0" @@ -1277,21 +1234,21 @@ class Input(App): path: Anything supported by GDAL (local file on the filesystem, remote resource e.g. /vsicurl/.., etc.) """ - self.path = path - super().__init__("ExtractROI", {"in": path}, frozen=True, name=f"Input from {path}") + super().__init__("ExtractROI", {"in": path}, frozen=True) + self.name = f"Input from {path}" + self.filepath = Path(path) self.propagate_dtype() self.execute() def __str__(self) -> str: """Return a nice string representation with file path.""" - return f"<pyotb.Input object from {self.path}>" + return f"<pyotb.Input object from {self.filepath}>" -class Output(OTBObject): +class Output(RasterInterface): """Object that behave like a pointer to a specific application output file.""" - def __init__(self, pyotb_app: OTBObject, # pylint: disable=super-init-not-called - param_key: str = None, filepath: str = None, mkdir: bool = True): + def __init__(self, pyotb_app: App, param_key: str = None, filepath: str = None, mkdir: bool = True): """Constructor for an Output object. Args: @@ -1301,39 +1258,48 @@ class Output(OTBObject): mkdir: create missing parent directories """ - super().__init__(name=f"Output {param_key} from {pyotb_app.name}", app=pyotb_app.app) + self.name = f"Output {param_key} from {pyotb_app.name}" self.parent_pyotb_app = pyotb_app # keep trace of parent app - self.param_key = param_key or super().key_output_image + self.app = pyotb_app.app + self.exports_dic = pyotb_app.exports_dic + self.param_key = param_key self.filepath = None if filepath: - if '?' in filepath: - filepath = filepath.split('?')[0] + if "?" in filepath: + filepath = filepath.split("?")[0] self.filepath = Path(filepath) if mkdir: self.make_parent_dirs() @property - def key_output_image(self): - """Overwrite OTBObject prop, in order to use Operation special methods with the right Output param_key.""" + def key_output_image(self) -> str: + """Force the right key to be used when accessing the RasterInterface.""" return self.param_key def exists(self) -> bool: """Check file exist.""" - assert self.filepath, "Filepath not set" + if self.filepath is None: + raise ValueError("Filepath is not set") return self.filepath.exists() def make_parent_dirs(self): """Create missing parent directories.""" - assert self.filepath, "Filepath not set" - if not self.filepath.parent.exists(): - self.filepath.parent.mkdir(parents=True) + if self.filepath is None: + raise ValueError("Filepath is not set") + self.filepath.parent.mkdir(parents=True, exist_ok=True) + + def write(self, filepath: None | str | Path = None, **kwargs): + """Write output to disk, filepath is not required if it was provided to parent App during init.""" + if filepath is None and self.filepath: + return self.parent_pyotb_app.write({self.key_output_image: self.filepath}, **kwargs) + return self.parent_pyotb_app.write({self.key_output_image: filepath}, **kwargs) def __str__(self) -> str: """Return a nice string representation with source app name and object id.""" return f"<pyotb.Output {self.name} object, id {id(self)}>" -def get_nbchannels(inp: str | OTBObject) -> int: +def get_nbchannels(inp: str | App) -> int: """Get the nb of bands of input image. Args: @@ -1343,7 +1309,7 @@ def get_nbchannels(inp: str | OTBObject) -> int: number of bands in image """ - if isinstance(inp, OTBObject): + if isinstance(inp, App): nb_channels = inp.shape[-1] else: # Executing the app, without printing its log @@ -1351,11 +1317,11 @@ def get_nbchannels(inp: str | OTBObject) -> int: info = App("ReadImageInfo", inp, quiet=True) nb_channels = info.app.GetParameterInt("numberbands") except Exception as e: # this happens when we pass a str that is not a filepath - raise TypeError(f'Could not get the number of channels of `{inp}`. Not a filepath or wrong filepath') from e + raise TypeError(f"Could not get the number of channels of '{inp}'. Not a filepath or wrong filepath") from e return nb_channels -def get_pixel_type(inp: str | OTBObject) -> str: +def get_pixel_type(inp: str | App) -> str: """Get the encoding of input image pixels. Args: @@ -1375,23 +1341,23 @@ def get_pixel_type(inp: str | OTBObject) -> str: if not datatype: raise TypeError(f"Unable to read pixel type of image {inp}") datatype_to_pixeltype = { - 'unsigned_char': 'uint8', - 'short': 'int16', - 'unsigned_short': 'uint16', - 'int': 'int32', - 'unsigned_int': 'uint32', - 'long': 'int32', - 'ulong': 'uint32', - 'float': 'float', - 'double': 'double' + "unsigned_char": "uint8", + "short": "int16", + "unsigned_short": "uint16", + "int": "int32", + "unsigned_int": "uint32", + "long": "int32", + "ulong": "uint32", + "float": "float", + "double": "double", } if datatype not in datatype_to_pixeltype: raise TypeError(f"Unknown data type `{datatype}`. Available ones: {datatype_to_pixeltype}") - pixel_type = getattr(otb, f'ImagePixelType_{datatype_to_pixeltype[datatype]}') - elif isinstance(inp, OTBObject): + pixel_type = getattr(otb, f"ImagePixelType_{datatype_to_pixeltype[datatype]}") + elif isinstance(inp, App): pixel_type = inp.app.GetParameterOutputImagePixelType(inp.key_output_image) else: - raise TypeError(f'Could not get the pixel type of {type(inp)} object {inp}') + raise TypeError(f"Could not get the pixel type of {type(inp)} object {inp}") return pixel_type @@ -1406,31 +1372,30 @@ def parse_pixel_type(pixel_type: str | int) -> int: """ if isinstance(pixel_type, str): # this correspond to 'uint8' etc... - return getattr(otb, f'ImagePixelType_{pixel_type}') + return getattr(otb, f"ImagePixelType_{pixel_type}") if isinstance(pixel_type, int): return pixel_type - raise ValueError(f'Bad pixel type specification ({pixel_type})') + raise ValueError(f"Bad pixel type specification ({pixel_type})") -def is_key_list(pyotb_app: OTBObject, key: str) -> bool: +def is_key_list(pyotb_app: App, key: str) -> bool: """Check if a key of the App is an input parameter list.""" - return pyotb_app.app.GetParameterType(key) in ( + types = ( otb.ParameterType_InputImageList, otb.ParameterType_StringList, otb.ParameterType_InputFilenameList, otb.ParameterType_ListView, otb.ParameterType_InputVectorDataList, ) + return pyotb_app.app.GetParameterType(key) in types -def is_key_images_list(pyotb_app: OTBObject, key: str) -> bool: +def is_key_images_list(pyotb_app: App, key: str) -> bool: """Check if a key of the App is an input parameter image list.""" - return pyotb_app.app.GetParameterType(key) in ( - otb.ParameterType_InputImageList, - otb.ParameterType_InputFilenameList - ) + types = (otb.ParameterType_InputImageList, otb.ParameterType_InputFilenameList) + return pyotb_app.app.GetParameterType(key) in types -def get_out_images_param_keys(app: OTBObject) -> list[str]: +def get_out_images_param_keys(app: App) -> list[str]: """Return every output parameter keys of an OTB app.""" return [key for key in app.GetParametersKeys() if app.GetParameterType(key) == otb.ParameterType_OutputImage] diff --git a/pyotb/functions.py b/pyotb/functions.py index d25610adb74bdb93d6353c48878eb5c877d95b10..d782a0eb2f3258fc8fa09850025d66238ce6fb80 100644 --- a/pyotb/functions.py +++ b/pyotb/functions.py @@ -9,11 +9,11 @@ import textwrap import subprocess from collections import Counter -from .core import OTBObject, App, Operation, LogicalOperation, Input, get_nbchannels +from .core import App, Operation, LogicalOperation, Input, get_nbchannels from .helpers import logger -def where(cond: OTBObject | str, x: OTBObject | str | int | float, y: OTBObject | str | int | float) -> Operation: +def where(cond: App | str, x: App | str | int | float, y: App | str | int | float) -> Operation: """Functionally similar to numpy.where. Where cond is True (!=0), returns x. Else returns y. Args: @@ -34,7 +34,6 @@ def where(cond: OTBObject | str, x: OTBObject | str | int | float, y: OTBObject x_nb_channels = get_nbchannels(x) if not isinstance(y, (int, float)): y_nb_channels = get_nbchannels(y) - if x_nb_channels and y_nb_channels: if x_nb_channels != y_nb_channels: raise ValueError('X and Y images do not have the same number of bands. ' @@ -42,44 +41,38 @@ def where(cond: OTBObject | str, x: OTBObject | str | int | float, y: OTBObject x_or_y_nb_channels = x_nb_channels if x_nb_channels else y_nb_channels cond_nb_channels = get_nbchannels(cond) - - # Get the number of bands of the result - if x_or_y_nb_channels: # if X or Y is a raster - out_nb_channels = x_or_y_nb_channels - else: # if only cond is a raster - out_nb_channels = cond_nb_channels - if cond_nb_channels != 1 and x_or_y_nb_channels and cond_nb_channels != x_or_y_nb_channels: raise ValueError('Condition and X&Y do not have the same number of bands. Condition has ' f'{cond_nb_channels} bands whereas X&Y have {x_or_y_nb_channels} bands') - # If needed, duplicate the single band binary mask to multiband to match the dimensions of x & y if cond_nb_channels == 1 and x_or_y_nb_channels and x_or_y_nb_channels != 1: logger.info('The condition has one channel whereas X/Y has/have %s channels. Expanding number' ' of channels of condition to match the number of channels of X/Y', x_or_y_nb_channels) - operation = Operation('?', cond, x, y, nb_bands=out_nb_channels) + # Get the number of bands of the result + if x_or_y_nb_channels: # if X or Y is a raster + out_nb_channels = x_or_y_nb_channels + else: # if only cond is a raster + out_nb_channels = cond_nb_channels - return operation + return Operation('?', cond, x, y, nb_bands=out_nb_channels) -def clip(a: OTBObject | str, a_min: OTBObject | str | int | float, a_max: OTBObject | str | int | float): +def clip(image: App | str, v_min: App | str | int | float, v_max: App | str | int | float): """Clip values of image in a range of values. Args: - a: input raster, can be filepath or any pyotb object - a_min: minimum value of the range - a_max: maximum value of the range + image: input raster, can be filepath or any pyotb object + v_min: minimum value of the range + v_max: maximum value of the range Returns: raster whose values are clipped in the range """ - if isinstance(a, str): - a = Input(a) - - res = where(a <= a_min, a_min, - where(a >= a_max, a_max, a)) + if isinstance(image, str): + image = Input(image) + res = where(image <= v_min, v_min, where(image >= v_max, v_max, image)) return res @@ -102,11 +95,9 @@ def all(*inputs): # pylint: disable=redefined-builtin # If necessary, flatten inputs if len(inputs) == 1 and isinstance(inputs[0], (list, tuple)): inputs = inputs[0] - # Add support for generator inputs (to have the same behavior as built-in `all` function) if isinstance(inputs, tuple) and len(inputs) == 1 and inspect.isgenerator(inputs[0]): inputs = list(inputs[0]) - # Transforming potential filepaths to pyotb objects inputs = [Input(inp) if isinstance(inp, str) else inp for inp in inputs] @@ -117,13 +108,11 @@ def all(*inputs): # pylint: disable=redefined-builtin res = inp[:, :, 0] else: res = (inp[:, :, 0] != 0) - for band in range(1, inp.shape[-1]): if isinstance(inp, LogicalOperation): res = res & inp[:, :, band] else: res = res & (inp[:, :, band] != 0) - # Checking that all images are True else: if isinstance(inputs[0], LogicalOperation): @@ -157,11 +146,9 @@ def any(*inputs): # pylint: disable=redefined-builtin # If necessary, flatten inputs if len(inputs) == 1 and isinstance(inputs[0], (list, tuple)): inputs = inputs[0] - # Add support for generator inputs (to have the same behavior as built-in `any` function) if isinstance(inputs, tuple) and len(inputs) == 1 and inspect.isgenerator(inputs[0]): inputs = list(inputs[0]) - # Transforming potential filepaths to pyotb objects inputs = [Input(inp) if isinstance(inp, str) else inp for inp in inputs] @@ -240,7 +227,6 @@ def run_tf_function(func): func_name = func.__name__ create_and_save_model_str = func_def_str - # Adding the instructions to create the model and save it to output dir create_and_save_model_str += textwrap.dedent(f""" import tensorflow as tf @@ -324,7 +310,7 @@ def run_tf_function(func): def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_rule: str = 'minimal', interpolator: str = 'nn', reference_window_input: dict = None, - reference_pixel_size_input: str = None) -> list[OTBObject]: + reference_pixel_size_input: str = None) -> list[App]: """Given several inputs, this function handles the potential resampling and cropping to same extent. WARNING: Not fully implemented / tested @@ -348,13 +334,12 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ inputs.extend(arg) else: inputs.append(arg) - # Getting metadatas of inputs metadatas = {} for inp in inputs: if isinstance(inp, str): # this is for filepaths metadata = Input(inp).app.GetImageMetaData('out') - elif isinstance(inp, OTBObject): + elif isinstance(inp, App): metadata = inp.app.GetImageMetaData(inp.output_param) else: raise TypeError(f"Wrong input : {inp}") @@ -362,7 +347,6 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ # Get a metadata of an arbitrary image. This is just to compare later with other images any_metadata = next(iter(metadatas.values())) - # Checking if all images have the same projection if not all(metadata['ProjectionRef'] == any_metadata['ProjectionRef'] for metadata in metadatas.values()): @@ -403,9 +387,8 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ # TODO : it is when the user wants the final bounding box to be the union of all bounding box # It should replace any 'outside' pixel by some NoData -> add `fillvalue` argument in the function - logger.info('Cropping all images to extent Upper Left (%s, %s), Lower Right (%s, %s)', ulx, uly, lrx, lry) - # Applying this bounding box to all inputs + logger.info('Cropping all images to extent Upper Left (%s, %s), Lower Right (%s, %s)', ulx, uly, lrx, lry) new_inputs = [] for inp in inputs: try: @@ -425,13 +408,11 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ logger.error('Cannot define the processing area for input %s: %s', inp, e) raise inputs = new_inputs - # Update metadatas metadatas = {input: input.app.GetImageMetaData('out') for input in inputs} # Get a metadata of an arbitrary image. This is just to compare later with other images any_metadata = next(iter(metadatas.values())) - # Handling different pixel sizes if not all(metadata['GeoTransform'][1] == any_metadata['GeoTransform'][1] and metadata['GeoTransform'][5] == any_metadata['GeoTransform'][5] @@ -449,9 +430,9 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ pass # TODO : when the user explicitly specify the pixel size -> add argument inside the function pixel_size = metadatas[reference_input]['GeoTransform'][1] - logger.info('Resampling all inputs to resolution: %s', pixel_size) # Perform resampling on inputs that do not comply with the target pixel size + logger.info('Resampling all inputs to resolution: %s', pixel_size) new_inputs = [] for inp in inputs: if metadatas[inp]['GeoTransform'][1] != pixel_size: @@ -460,18 +441,14 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ else: new_inputs.append(inp) inputs = new_inputs - - # Update metadatas metadatas = {inp: inp.app.GetImageMetaData('out') for inp in inputs} # Final superimposition to be sure to have the exact same image sizes - # Getting the sizes of images image_sizes = {} for inp in inputs: if isinstance(inp, str): inp = Input(inp) image_sizes[inp] = inp.shape[:2] - # Selecting the most frequent image size. It will be used as reference. most_common_image_size, _ = Counter(image_sizes.values()).most_common(1)[0] same_size_images = [inp for inp, image_size in image_sizes.items() if image_size == most_common_image_size] @@ -484,6 +461,5 @@ def define_processing_area(*args, window_rule: str = 'intersection', pixel_size_ new_inputs.append(superimposed) else: new_inputs.append(inp) - inputs = new_inputs - return inputs + return new_inputs diff --git a/pyotb/helpers.py b/pyotb/helpers.py index ebccc48e8f337d23cc6ce564e20d12b81218e072..03200a20798baadac6838c511f3de225e670f1c2 100644 --- a/pyotb/helpers.py +++ b/pyotb/helpers.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -"""This module provides some helpers to properly initialize pyotb.""" +"""This module helps to ensure we properly initialize pyotb: only in case OTB is found and apps are available.""" import os import sys import logging @@ -41,8 +41,9 @@ def find_otb(prefix: str = OTB_ROOT, scan: bool = True, scan_userdir: bool = Tru Path precedence : OTB_ROOT > python bindings directory OR search for releases installations : HOME - OR (for linux) : /opt/otbtf > /opt/otb > /usr/local > /usr - OR (for windows) : C:/Program Files + OR (for Linux) : /opt/otbtf > /opt/otb > /usr/local > /usr + OR (for MacOS) : ~/Applications + OR (for Windows) : C:/Program Files Args: prefix: prefix to search OTB in (Default value = OTB_ROOT) @@ -76,9 +77,9 @@ def find_otb(prefix: str = OTB_ROOT, scan: bool = True, scan_userdir: bool = Tru otb.Registry.SetApplicationPath(apps_path) return otb except ImportError as e: - PYTHONPATH = os.environ.get("PYTHONPATH") + pythonpath = os.environ.get("PYTHONPATH") if not scan: - raise SystemExit(f"Failed to import OTB with env PYTHONPATH={PYTHONPATH}") from e + raise SystemExit(f"Failed to import OTB with env PYTHONPATH={pythonpath}") from e # Else search system logger.info("Failed to import OTB. Searching for it...") prefix = __find_otb_root(scan_userdir) @@ -135,9 +136,9 @@ def set_environment(prefix: str): os.environ["OTB_APPLICATION_PATH"] = apps_path else: raise EnvironmentError("Can't find OTB applications directory") - os.environ["LC_NUMERIC"] = "C" os.environ["GDAL_DRIVER_PATH"] = "disable" + if (prefix / "share/gdal").exists(): # Local GDAL (OTB Superbuild, .run, .exe) gdal_data = str(prefix / "share/gdal") @@ -151,7 +152,6 @@ def set_environment(prefix: str): proj_lib = str(prefix / "share/proj") else: raise EnvironmentError(f"Can't find GDAL location with current OTB prefix '{prefix}' or in /usr") - os.environ["GDAL_DATA"] = gdal_data os.environ["PROJ_LIB"] = proj_lib @@ -259,15 +259,15 @@ def __find_otb_root(scan_userdir: bool = False): logger.info("Found %s", path.parent) prefix = path.parent.absolute() elif sys.platform == "darwin": - # TODO: find OTB in macOS - pass - + for path in (Path.home() / "Applications").glob("**/OTB-*/lib"): + logger.info("Found %s", path.parent) + prefix = path.parent.absolute() # If possible, use OTB found in user's HOME tree (this may take some time) if scan_userdir: - for path in Path().home().glob("**/OTB-*/lib"): + for path in Path.home().glob("**/OTB-*/lib"): logger.info("Found %s", path.parent) prefix = path.parent.absolute() - + # Return latest found prefix (and version), see precedence in function def find_otb() return prefix @@ -303,3 +303,7 @@ def __suggest_fix_import(error_message: str, prefix: str): " first use 'call otbenv.bat' then try to import pyotb once again") docs_link = "https://www.orfeo-toolbox.org/CookBook/Installation.html" logger.critical("You can verify installation requirements for your OS at %s", docs_link) + + +# Since helpers is the first module to be inititialized, this will prevent pyotb to run if OTB is not found +find_otb() diff --git a/pyproject.toml b/pyproject.toml index 9538fc11cbc8be7ff8478ee9afe2d311beb2acb5..845e218cfd7c2bf4b9b173fba8281d0b7dba838e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,16 +41,14 @@ version = {attr = "pyotb.__version__"} [tool.pylint] max-line-length = 120 +max-module-lines = 2000 +good-names = ["x", "y", "i", "j", "k", "e"] disable = [ "fixme", - "invalid-name", - "too-many-lines", "too-many-locals", "too-many-branches", "too-many-statements", - "too-many-public-methods", - "too-many-instance-attributes", - "wrong-import-position", + "too-many-instance-attributes" ] [tool.pydocstyle] diff --git a/tests/test_core.py b/tests/test_core.py index 086c26e64d8b5bd68247cfcb87050d238cc46c10..75dca37e4056ccf39b57247e095fe183f5b3d697 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -56,7 +56,8 @@ def test_nonraster_property(): def test_elapsed_time(): - assert pyotb.ReadImageInfo(INPUT).elapsed_time < 1 + assert 0 < pyotb.ReadImageInfo(INPUT).elapsed_time < 1 + # Other functions @@ -74,8 +75,15 @@ def test_xy_to_rowcol(): def test_write(): - INPUT.write("/tmp/missing_dir/test_write.tif") + INPUT.write("/tmp/test_write.tif") + assert INPUT.out.exists() + INPUT.out.filepath.unlink() + + +def test_output_write(): + INPUT.out.write("/tmp/test_output_write.tif") assert INPUT.out.exists() + INPUT.out.filepath.unlink() # Slicer diff --git a/tests/test_numpy.py b/tests/test_numpy.py index 0f424351430250d354bdc0905fe8bcbc639fda9a..d36dfb7c2c17e7504546c94821bb9c6f22c888df 100644 --- a/tests/test_numpy.py +++ b/tests/test_numpy.py @@ -1,18 +1,19 @@ import numpy as np import pyotb from tests_data import INPUT -import numpy as np - -import pyotb -from tests_data import INPUT def test_export(): INPUT.export() - assert "out" in INPUT.exports_dic - array = INPUT.exports_dic["out"]["array"] + array = INPUT.exports_dic[INPUT.key_output_image]["array"] assert isinstance(array, np.ndarray) assert array.dtype == "uint8" + del INPUT.exports_dic["out"] + + +def test_output_export(): + INPUT.out.export() + assert INPUT.out.key_output_image in INPUT.out.exports_dic def test_to_numpy(): @@ -43,7 +44,7 @@ def test_pixel_coords_otb_equals_numpy(): def test_add_noise_array(): white_noise = np.random.normal(0, 50, size=INPUT.shape) noisy_image = INPUT + white_noise - assert isinstance(noisy_image, pyotb.core.OTBObject) + assert isinstance(noisy_image, pyotb.core.App) assert noisy_image.shape == INPUT.shape diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 3acc354fd2f25cdd5e605a4cedd1e9d4aae9b4cc..76f9872285d9de60dae0744cf0d8fd10f042c5b0 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -103,8 +103,7 @@ PIPELINES, NAMES = make_pipelines_list() @pytest.mark.parametrize("pipe", PIPELINES, ids=NAMES) def test_pipeline_shape(pipe): - for i, app in enumerate(pipe): - print(app.shape) + for app in pipe: assert bool(app.shape) @@ -116,7 +115,7 @@ def test_pipeline_shape_nointermediate(pipe): @pytest.mark.parametrize("pipe", PIPELINES, ids=NAMES) def test_pipeline_shape_backward(pipe): - for i, app in enumerate(reversed(pipe)): + for app in reversed(pipe): assert bool(app.shape)