Skip to content

processedresult

ProcessedResult

Bases: ABC

Source code in src/tcd_pipeline/result/processedresult.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class ProcessedResult(ABC):
    confidence_threshold: int = 0.5

    def set_threshold(self, new_threshold: int) -> None:
        """Sets the threshold of the ProcessedResult, also regenerates
        prediction masks

        Args:
            new_threshold (double): new confidence threshold
        """
        self.confidence_threshold = new_threshold
        self._generate_masks()

    def get_hardware_information(self) -> dict:
        """Returns the hardware information of the system

        Returns:
            dict: hardware information
        """

        self.hardware = {}

        self.hardware["physical_cores"] = psutil.cpu_count()
        self.hardware["logical_cores"] = psutil.cpu_count(logical=True)
        self.hardware["system_memory"] = psutil.virtual_memory().total

        try:
            self.hardware["cpu_frequency"] = psutil.cpu_freq().max
        except:
            pass

        # TODO use actual device?
        if torch.cuda.is_available():
            gpu_memory = torch.cuda.get_device_properties(0).total_memory
            self.hardware["gpu_model"] = torch.cuda.get_device_name(0)
            self.hardware["gpu_memory"] = gpu_memory

        return self.hardware

    @abstractmethod
    def visualise(self):
        pass

    @abstractmethod
    def _generate_masks(self):
        pass

    @abstractmethod
    def load(self):
        """
        Load in results from a model as cached (e.g. as a GeoTIFF or as a Shapefile).
        """
        pass

    def filter_geometry(self, geometry):
        pass

    def set_roi(
        self,
        shape: Union[dict, shapely.geometry.Polygon],
        crs: Optional[rasterio.crs.CRS] = None,
    ):
        """Filter by geometry, should be a simple Polygon i.e. a
        convex hull that defines the region of interest for analysis.

        Args:
            shape (dict): shape to filter the data
            crs: Coordinate reference system for the region, usually
                 assumed to be the CRS of the image

        """

        if crs is not None and crs != self.image.crs:
            logger.warning("Geometry CRS is not the same as the image CRS, warping")
            shape = rasterio.warp.transform_geom(crs, self.image.crs, shape)

        if not isinstance(shape, shapely.geometry.Polygon):
            shape = shapely.geometry.shape(shape)

        if not isinstance(shape, shapely.geometry.Polygon):
            logger.warning("Input shape is not a polygon, not applying filter")
            return

        self.valid_region = shapely.geometry.polygon.orient(shape)
        self._filter_roi()
        self._generate_masks()

    def _filter_roi(self):
        logger.warning("No filter function defined, so filtering by ROI has no effect")

    @property
    def num_valid_pixels(self) -> int:
        if self.valid_region is not None:
            return int(self.valid_region.area / (self.image.res[0] ** 2))
        else:
            return np.count_nonzero(self.image.read().mean(0) > 0)

    @property
    def canopy_cover(self) -> float:
        return np.count_nonzero(self.canopy_mask) / self.num_valid_pixels

    # TODO: Use windowed writing here instead
    def _save_mask(self, mask: npt.NDArray, output_path: str, binary=True):
        """Saves a mask array to a GeoTiff file

        Args:
            mask (np.array): mask to save
            output_path (str): path to save mask to
            image_path (str): path to source image, default None
            binary (bool): save a binary mask or not, default True

        """

        if self.image is not None:
            if self.valid_mask is not None:
                mask *= self.valid_mask
                out_transform = rasterio.windows.transform(
                    self.valid_window, self.image.transform
                )
            else:
                out_transform = self.image.transform

            out_meta = self.image.meta
            out_height = mask.shape[0]
            out_width = mask.shape[1]

            out_crs = self.image.crs
            out_meta.update(
                {
                    "driver": "GTiff",
                    "height": out_height,
                    "width": out_width,
                    "compress": "packbits",
                    "count": 1,
                    "nodata": 0,
                    "transform": out_transform,
                    "crs": out_crs,
                }
            )

            with rasterio.env.Env(GDAL_TIFF_INTERNAL_MASK=True):
                with rasterio.open(
                    output_path,
                    "w",
                    nbits=1 if binary else 8,
                    **out_meta,
                ) as dest:
                    dest.write(mask, indexes=1, masked=True)
        else:
            logger.warning(
                "No base image provided, output masks will not be georeferenced"
            )
            Image.fromarray(mask).save(output_path, compress="packbits")

get_hardware_information()

Returns the hardware information of the system

Returns:

Name Type Description
dict dict

hardware information

Source code in src/tcd_pipeline/result/processedresult.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def get_hardware_information(self) -> dict:
    """Returns the hardware information of the system

    Returns:
        dict: hardware information
    """

    self.hardware = {}

    self.hardware["physical_cores"] = psutil.cpu_count()
    self.hardware["logical_cores"] = psutil.cpu_count(logical=True)
    self.hardware["system_memory"] = psutil.virtual_memory().total

    try:
        self.hardware["cpu_frequency"] = psutil.cpu_freq().max
    except:
        pass

    # TODO use actual device?
    if torch.cuda.is_available():
        gpu_memory = torch.cuda.get_device_properties(0).total_memory
        self.hardware["gpu_model"] = torch.cuda.get_device_name(0)
        self.hardware["gpu_memory"] = gpu_memory

    return self.hardware

load() abstractmethod

Load in results from a model as cached (e.g. as a GeoTIFF or as a Shapefile).

Source code in src/tcd_pipeline/result/processedresult.py
66
67
68
69
70
71
@abstractmethod
def load(self):
    """
    Load in results from a model as cached (e.g. as a GeoTIFF or as a Shapefile).
    """
    pass

set_roi(shape, crs=None)

Filter by geometry, should be a simple Polygon i.e. a convex hull that defines the region of interest for analysis.

Parameters:

Name Type Description Default
shape dict

shape to filter the data

required
crs Optional[CRS]

Coordinate reference system for the region, usually assumed to be the CRS of the image

None
Source code in src/tcd_pipeline/result/processedresult.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def set_roi(
    self,
    shape: Union[dict, shapely.geometry.Polygon],
    crs: Optional[rasterio.crs.CRS] = None,
):
    """Filter by geometry, should be a simple Polygon i.e. a
    convex hull that defines the region of interest for analysis.

    Args:
        shape (dict): shape to filter the data
        crs: Coordinate reference system for the region, usually
             assumed to be the CRS of the image

    """

    if crs is not None and crs != self.image.crs:
        logger.warning("Geometry CRS is not the same as the image CRS, warping")
        shape = rasterio.warp.transform_geom(crs, self.image.crs, shape)

    if not isinstance(shape, shapely.geometry.Polygon):
        shape = shapely.geometry.shape(shape)

    if not isinstance(shape, shapely.geometry.Polygon):
        logger.warning("Input shape is not a polygon, not applying filter")
        return

    self.valid_region = shapely.geometry.polygon.orient(shape)
    self._filter_roi()
    self._generate_masks()

set_threshold(new_threshold)

Sets the threshold of the ProcessedResult, also regenerates prediction masks

Parameters:

Name Type Description Default
new_threshold double

new confidence threshold

required
Source code in src/tcd_pipeline/result/processedresult.py
22
23
24
25
26
27
28
29
30
def set_threshold(self, new_threshold: int) -> None:
    """Sets the threshold of the ProcessedResult, also regenerates
    prediction masks

    Args:
        new_threshold (double): new confidence threshold
    """
    self.confidence_threshold = new_threshold
    self._generate_masks()