Skip to content

pipeline

Pipeline

Class for wrapping model instances

Source code in src/tcd_pipeline/pipeline.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class Pipeline:
    """Class for wrapping model instances"""

    config: DictConfig = None

    def __init__(
        self,
        model_or_config: Optional[str] = Union[dict, str, DictConfig],
        options: Optional[list[str]] = None,
    ) -> None:
        """Initialise model pipeline. The simplest way to use this class is to
           specify a model e.g. "restor/tcd-segformer-mit-b0".

           You can also pass a generic configuration "instance" or "semantic" to
           either the model or config parameters.

        Args:
            model_or_config Union(str, DictConfig): Model name (repository ID) or config name
            options: List of options passed to Hydra
        """

        # If we get a dict config
        if isinstance(model_or_config, DictConfig):
            self.config = model_or_config
            if options is not None:
                self.config.merge_with(options)

        elif isinstance(model_or_config, str):
            # Check if the input is a standard config (e.g. semantic/instance):
            if model_or_config in ["semantic", "instance"]:
                config = model_or_config
            # Or a known model from the zoo:
            elif model_or_config in known_models:
                model = known_models[model_or_config]

                if not options:
                    options = []

                if "unet" in model or "segformer" in model:
                    config = "semantic"
                    options.append(f"model={model}")
                elif "rcnn" in model:
                    config = "instance"
                    options.append(f"model={model}")
                else:
                    raise ValueError("Unknown model type")

                options.append(f"model.weights={model_or_config}")
            # Otherwise just try and load it as a config name
            else:
                config = model_or_config

            logger.debug(
                f"Attempting to load config: {config} with overrides: {options}"
            )

            self.config = load_config(config, options)

        self.model = None
        self._setup()

    def _setup(self) -> None:
        """Setups the model runner. The primary aim of this function is to assess whether the
        model weights are:

        (1) A local checkpoint
        (2) A reference to an online checkpoint, hosted on HuggingFace (e.g. restor/tcd-segformer-mit-b1)

        First, the function will attempt to locate the weights file either using an asbolute path, or a path
        relative to the package root folder (for example if you have a checkpoint folder stored within
        the repo root). If one of these paths is found, the function will update the config key with the
        absolute path.
        """

        # Attempt to locate weights:
        # 1 Does the absolute path exist?
        if os.path.exists(os.path.abspath(self.config.model.weights)):
            self.config.model.weights = os.path.abspath(self.config.model.weights)
            logger.info(
                f"Found weights file at absolute path: {self.config.model.weights}"
            )
        # 2 Relative to package folder
        elif os.path.exists(
            os.path.join(
                os.path.dirname(__file__), "..", "..", self.config.model.weights
            )
        ):
            self.config.model.weights = os.path.join(
                os.path.dirname(__file__), "..", "..", self.config.model.weights
            )
            logger.info(
                f"Found weights file relative to package install: {self.config.model.weights}"
            )

        task = self.config.model.task
        if task == "instance_segmentation":
            self.config.model.config = os.path.join(
                os.path.dirname(__file__),
                "config/model",
                os.path.splitext(self.config.model.config)[0] + ".yaml",
            )
            self.model = DetectronModel(self.config)
        elif task == "semantic_segmentation":
            if (
                self.config.model.name == "segformer"
                or "segformer" in self.config.model.weights
            ):
                from .models.segformer import Segformer

                self.model = Segformer(self.config)
            else:
                from .models.smp import SMPModel

                self.model = SMPModel(self.config)
        else:
            logger.error(f"Task: {task} is not yet implemented")

    def predict(
        self,
        image: Union[str, rasterio.DatasetReader],
        output: str = None,
        **kwargs: Any,
    ) -> ProcessedResult:
        """Run prediction on an image

        If you want to predict over individual arrays/tensors, use the
        `model.predict` method directly.

        If you don't provide an output folder, one will be created in temporary
        system storage (tempfile.mkdtemp).

        Args:
            image (Union[str, DatasetReader]): Path to image, or rasterio image
            output Optional[str]: Path to output folder

        Returns:
            ProcessedResult: processed results from the model (e.g. merged tiles)
        """

        if not self.config.data.output:
            if output:
                self.config.data.output = output
            else:
                import tempfile

                # If the file is open in w+ mode, we get a writer not a reader (but we can still read)
                if isinstance(image, rasterio.io.DatasetReader) or isinstance(
                    image, rasterio.io.DatasetWriter
                ):
                    image_name = image.name
                else:
                    image_name = image

                self.config.data.output = tempfile.mkdtemp(
                    prefix=f"tcd_{os.path.splitext(os.path.basename(image_name))[0]}_"
                )

        logger.info(f"Saving results to {self.config.data.output}")

        if isinstance(image, str):
            image = rasterio.open(image)

        return self.model.predict_tiled(image, **kwargs)

    def train(self) -> Any:
        """Train the model using settings defined in the configuration file

        Returns:
            bool: Whether training was successful or not
        """
        from .models import train_instance, train_semantic

        if self.config.model.task == "instance_segmentation":
            train_instance.train(self.config)
        elif self.config.model.task == "semantic_segmentation":
            train_semantic.train(self.config)

    def evaluate(self, **kwargs) -> Any:
        """Evaluate the model

        Uses settings in the configuration file.

        """
        return self.model.evaluate(**kwargs)

__init__(model_or_config=Union[dict, str, DictConfig], options=None)

Initialise model pipeline. The simplest way to use this class is to specify a model e.g. "restor/tcd-segformer-mit-b0".

You can also pass a generic configuration "instance" or "semantic" to either the model or config parameters.

Parameters:

Name Type Description Default
model_or_config Union(str, DictConfig

Model name (repository ID) or config name

Union[dict, str, DictConfig]
options Optional[list[str]]

List of options passed to Hydra

None
Source code in src/tcd_pipeline/pipeline.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def __init__(
    self,
    model_or_config: Optional[str] = Union[dict, str, DictConfig],
    options: Optional[list[str]] = None,
) -> None:
    """Initialise model pipeline. The simplest way to use this class is to
       specify a model e.g. "restor/tcd-segformer-mit-b0".

       You can also pass a generic configuration "instance" or "semantic" to
       either the model or config parameters.

    Args:
        model_or_config Union(str, DictConfig): Model name (repository ID) or config name
        options: List of options passed to Hydra
    """

    # If we get a dict config
    if isinstance(model_or_config, DictConfig):
        self.config = model_or_config
        if options is not None:
            self.config.merge_with(options)

    elif isinstance(model_or_config, str):
        # Check if the input is a standard config (e.g. semantic/instance):
        if model_or_config in ["semantic", "instance"]:
            config = model_or_config
        # Or a known model from the zoo:
        elif model_or_config in known_models:
            model = known_models[model_or_config]

            if not options:
                options = []

            if "unet" in model or "segformer" in model:
                config = "semantic"
                options.append(f"model={model}")
            elif "rcnn" in model:
                config = "instance"
                options.append(f"model={model}")
            else:
                raise ValueError("Unknown model type")

            options.append(f"model.weights={model_or_config}")
        # Otherwise just try and load it as a config name
        else:
            config = model_or_config

        logger.debug(
            f"Attempting to load config: {config} with overrides: {options}"
        )

        self.config = load_config(config, options)

    self.model = None
    self._setup()

evaluate(**kwargs)

Evaluate the model

Uses settings in the configuration file.

Source code in src/tcd_pipeline/pipeline.py
219
220
221
222
223
224
225
def evaluate(self, **kwargs) -> Any:
    """Evaluate the model

    Uses settings in the configuration file.

    """
    return self.model.evaluate(**kwargs)

predict(image, output=None, **kwargs)

Run prediction on an image

If you want to predict over individual arrays/tensors, use the model.predict method directly.

If you don't provide an output folder, one will be created in temporary system storage (tempfile.mkdtemp).

Parameters:

Name Type Description Default
image Union[str, DatasetReader]

Path to image, or rasterio image

required
output Optional[str]

Path to output folder

None

Returns:

Name Type Description
ProcessedResult ProcessedResult

processed results from the model (e.g. merged tiles)

Source code in src/tcd_pipeline/pipeline.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
def predict(
    self,
    image: Union[str, rasterio.DatasetReader],
    output: str = None,
    **kwargs: Any,
) -> ProcessedResult:
    """Run prediction on an image

    If you want to predict over individual arrays/tensors, use the
    `model.predict` method directly.

    If you don't provide an output folder, one will be created in temporary
    system storage (tempfile.mkdtemp).

    Args:
        image (Union[str, DatasetReader]): Path to image, or rasterio image
        output Optional[str]: Path to output folder

    Returns:
        ProcessedResult: processed results from the model (e.g. merged tiles)
    """

    if not self.config.data.output:
        if output:
            self.config.data.output = output
        else:
            import tempfile

            # If the file is open in w+ mode, we get a writer not a reader (but we can still read)
            if isinstance(image, rasterio.io.DatasetReader) or isinstance(
                image, rasterio.io.DatasetWriter
            ):
                image_name = image.name
            else:
                image_name = image

            self.config.data.output = tempfile.mkdtemp(
                prefix=f"tcd_{os.path.splitext(os.path.basename(image_name))[0]}_"
            )

    logger.info(f"Saving results to {self.config.data.output}")

    if isinstance(image, str):
        image = rasterio.open(image)

    return self.model.predict_tiled(image, **kwargs)

train()

Train the model using settings defined in the configuration file

Returns:

Name Type Description
bool Any

Whether training was successful or not

Source code in src/tcd_pipeline/pipeline.py
206
207
208
209
210
211
212
213
214
215
216
217
def train(self) -> Any:
    """Train the model using settings defined in the configuration file

    Returns:
        bool: Whether training was successful or not
    """
    from .models import train_instance, train_semantic

    if self.config.model.task == "instance_segmentation":
        train_instance.train(self.config)
    elif self.config.model.task == "semantic_segmentation":
        train_semantic.train(self.config)