Skip to content

cafe.method.DynverseDockerBackend

cafe.method.DynverseDockerBackend

Bases: DockerBackend

DockerBackend: specific implementation of abstract Backend class using Dynverse Docker.

Source code in cafe/method/fate_dynverse_docker_backend.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
class DynverseDockerBackend(DockerBackend):
    """DockerBackend: specific implementation of abstract Backend class using Dynverse Docker."""

    def __init__(self, image_id: str = "dynverse/ti_comp1:v0.9.9.01"):
        """Initialize the DynverseDockerBackend class.

        Args:
            image_id (str, optional): image id.
        """
        # logger.debug("DockerBackend __init__")

        self.image_id = image_id
        self.load_backend()  # implemented in DockerBackend, get self.entrypoint

    def load_backend(self):
        self._load_image()  # load image
        self._load_definition()  # load definition yaml file

    def preprocess(self, inputs: dict, parameters: dict, priors: dict, tmp_wd: dict, seed: int = 0) -> None:
        """Preproces: create input.h5 for dynverse docker execute

        ref: pydynverse/wrap/method_create_ti_method_container._method_execution_preproc_container

        Args:
            inputs (dict): input dict
            parameters (dict): parameter dict
            priors (dict): prior information dict
            tmp_wd (dict): tmp working dir for docker mount and saving input.h5
            seed (int, optional): random seed.
        """

        task = inputs
        task["parameters"] = parameters
        task["priors"] = priors
        task["seed"] = seed
        task["verbose"] = True
        write_h5(task, f"{tmp_wd}/input.h5")  # json->h5

    # def execute_deprecated_deprecated(self, tmp_wd: str, benchmark_resource: bool) -> "DynverseDockerOutput":
    #     """Execute: Dynverse docker execute with bash command and parse result file "output.h5"
    #     try to use /usr/bin/time in docker container, but it is not available
    #     """

    #     image_id = self.definition["run"]["image_id"]
    #     args = "--dataset /ti/input.h5 --output /ti/output.h5"
    #     cmd = f"docker run --rm -v {tmp_wd}:/ti -w /ti/workspace {image_id} {args}"
    #     if benchmark_resource:
    #         # 0. command "/usr/bin/time" should move into docker command,
    #         # cmd = f"/usr/bin/time -v docker run --rm -v {tmp_wd}:/ti -w /ti/workspace {image_id} {args}"
    #         # the resource result is for docker client(such as pull, start container...), not for container running.

    #         # 1. change entrypoint program with "/usr/bin/time"
    #         entrypoint = self.entrypoint[0]

    #         # # (1) solution1: add it in docker command
    #         # if entrypoint.split('.')[-1] == 'R':
    #         #     entrypoint = f"Rscript {entrypoint}"
    #         # else:
    #         #     entrypoint = f"python {entrypoint}"
    #         # cmd = f" {cmd} /usr/bin/time -v {entrypoint} --dataset /ti/input.h5 --output /ti/output.h5"

    #         # (2) solution 2: create new entrypoint file, specify it in bash command. However, "/usr/bin/time" dose not exist in container
    #         new_entrypoint = f"{tmp_wd}/entrypoint.sh"
    #         with open(new_entrypoint, "w") as f:
    #             f.write("#!/bin/bash\n")
    #             # f.write(f"/usr/bin/time -v {entrypoint} {args}") # NOTE: fail to time
    #             f.write(f"time {entrypoint} {args}")
    #         os.chmod(new_entrypoint, 0o755)
    #         cmd = f"docker run --rm -v {tmp_wd}:/ti -w /ti/workspace --entrypoint /ti/entrypoint.sh {image_id} {args}"

    #     logger.info(f"running docker command: {cmd}")

    #     # execuate command
    #     process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    #     # remove unimportant warning log
    #     stderr_lines = []
    #     threading.Thread(target=print_output(logger.info), args=(process.stdout, "[stdout]"), daemon=True).start()
    #     threading.Thread(target=print_output(logger.debug, stderr_lines), args=(process.stderr, "[stderr]"), daemon=True).start()
    #     # wait for process
    #     process.wait()

    #     # read output h5
    #     output_h5_filename = f"{tmp_wd}/output.h5"
    #     if not os.path.exists(output_h5_filename):
    #         # no h5 file generated by docker, show error log
    #         logger.error("Docker error, no output.h5 generated by docker command!!!")
    #     else:
    #         logger.debug("Docker Finish")
    #         dynverse_docker_output = read_h5(f"{tmp_wd}/output.h5")  # read docker result h5
    #         if benchmark_resource:
    #             # read usage string and transfer to dict
    #             usage_string = "".join(stderr_lines)
    #             logger.debug(f"resource usage string: {usage_string}")
    #             usage_dict = parse_bash_resource_usage_string(usage_string)
    #             logger.debug(f"resource usage dict: {usage_dict}")
    #             dynverse_docker_output["resource_usage"] = usage_dict
    #         return dynverse_docker_output

    def execute(self, tmp_wd: str, benchmark_resource: bool) -> "DynverseDockerOutput":
        """Execute: Dynverse docker execute and parse result file "output.h5"

        ref: pydynverse/wrap/method_create_ti_method_container._method_execution_execute_container

        Args:
            tmp_wd (str): tmp working dir for docker mount and saving input.h5

        Returns:
            DynverseDockerOutput:  parse result file "output.h5"
        """
        args = ["--dataset", "/ti/input.h5", "--output", "/ti/output.h5"]
        # TODO: show dynverse docker debug information
        # args = ["--dataset", "/ti/input.h5", "--output", "/ti/output.h5", "--verbosity", "3"]
        client = docker.from_env()
        start_time = time.time()
        container = client.containers.run(
            image=self.definition["run"]["image_id"],
            command=args,  # r R script command args in Docker
            volumes=[f"{tmp_wd}:/ti"],
            working_dir="/ti/workspace",
            detach=True,
        )

        # real time stat and log list
        stats_list = []

        def collect_stats():
            for stat in container.stats(stream=True, decode=True):
                stats_list.append(stat)

        stats_thread = threading.Thread(target=collect_stats, daemon=True)
        stats_thread.start()

        def collect_logs():
            for line in container.logs(stream=True, follow=True, stdout=True, stderr=True):
                logger.debug(f"[dynverse docker]{line.decode('utf-8').strip()}")

        log_thread = threading.Thread(target=collect_logs, daemon=True)
        log_thread.start()

        container.wait()  # wait until docker finish
        end_time = time.time()
        container.stop()
        container.remove()

        output_h5_filename = f"{tmp_wd}/output.h5"
        if not os.path.exists(output_h5_filename):
            # no h5 file generated by docker, show error log
            logger.error("Docker Error!!!")
        else:
            logger.debug("Docker Finish")
            dynverse_docker_output = read_h5(f"{tmp_wd}/output.h5")  # read docker result h5
            if benchmark_resource:
                usage_dict = parse_docker_resource_usage_string_list(stats_list)
                usage_dict["time"] = end_time - start_time  # mannuly add time
                logger.debug(f"resource usage dict: {usage_dict}")
                dynverse_docker_output["resource_usage"] = usage_dict
            return dynverse_docker_output

    def postprocess(self, fadata: FateAnnData, trajectory: dict) -> None:
        """Postprocess: call fadata.add_trajectory

        Args:
            fadata (FateAnnData): FateAnnData to be added the trajectory dict
            trajectory (dict): trajectory dict
        """
        # trajectory is a "DynverseDockerOutput" object that have been transformed from other wrapper
        fadata.raw_wrapper_dict = {}  # dynverse docker can't get raw wrapper dict
        fadata.add_trajectory(
            milestone_network=trajectory.milestone_network,
            divergence_regions=trajectory.divergence_regions,
            milestone_percentages=trajectory.milestone_percentages,
        )

    def run(self, fadata: FateAnnData, parameters: dict):
        """Run dynverse docker pipeline to infer the trajectory

        Args:
            fadata (FateAnnData): FateAnnData to be added the trajectory
            parameters (dict): parameter dict
        """
        # check if benchmark resource from parameters.
        benchmark_resource = self._check_benchmark_resource(parameters)

        # only parse definition file for dynverse docker
        input_df = self.definition.get_inputs_df()
        logger.debug(f"definition input_df: {input_df}")
        inputs = self._extract_inputs(fadata, input_df)  # extract main input, expression matrix
        priors = self._extract_prior_information(fadata, input_df)  # extract prior information
        parameters = self.definition.get_parameters(parameters)

        with tempfile.TemporaryDirectory() as tmp_wd:
            # /tmp/*** temp dir for docker
            logger.debug(f"Temp wd: {tmp_wd}")

            # preprocess
            self.preprocess(inputs, parameters, priors, tmp_wd)

            # method run
            trajectory = self.execute(tmp_wd, benchmark_resource)

            # postprocess
            self.postprocess(fadata, trajectory)

            if "resource_usage" in trajectory:
                fadata.add_resource_usage(trajectory["resource_usage"])

    def _extract_inputs(self, fadata: FateAnnData, inputs_df: pd.DataFrame) -> dict:
        """extract input dict fom definition

        ref: PyDynverse/pydynverse/wrap/method_extract_args.py _method_extract_inputs

        Args:
            fdata (FateAnnData): FateAnnData object
            inputs_df (pd.DataFrame): inputs_df from definition.yml

        Returns:
            dict: input dict
        """

        # logger.debug("FateMethod _extract_inputs")

        # extract model input expression matrix
        input_ids = inputs_df["input_id"][inputs_df["type"] == "expression"].tolist()
        inputs = {}
        if "counts" in input_ids:
            inputs["counts"] = fadata.layers["counts"]
        if "expression" in input_ids:
            inputs["expression"] = fadata.X
        # main expression matrix, for example, Component1 and Slingshot need "expression", while monocle_ddrtree need "counts"
        inputs["expression_id"] = input_ids[0]
        # add cell and gene ids
        inputs["cell_ids"] = fadata.obs.index.tolist()
        inputs["feature_ids"] = fadata.var.index.tolist()
        return inputs

    def _load_definition(self):
        """
        extract and parse definition.yml, including description, required parameters and prior knowledge

        ref: pydynverse.wrap.container_get._container_get_definition
        """
        with tempfile.TemporaryDirectory() as tmp_wd:
            # start docker container
            client = docker.from_env()
            container = client.containers.run(
                entrypoint="cp /code/definition.yml /copy_mount/definition.yml",  # aim copy dir
                image=self.image_id,
                volumes=[f"{tmp_wd}:/copy_mount"],
                detach=True,
            )
            container.wait()
            container.stop()
            container.remove()
            # read and parse yml file
            with open(f"{tmp_wd}/definition.yml", "r") as file:
                definition_raw = yaml.safe_load(file)

        definition = Definition(definition_raw)
        definition["run"] = {"backend": "container", "image_id": self.image_id}
        self.definition = definition

        # Note: Only used for dynverse docker backend

    def _extract_prior_information(self, fadata, inputs_df):
        """
        ref: PyDynverse/pydynverse/wrap/method_extract_args.py _method_extract_priors
        """
        # logger.debug("FateMethod _extract_prior_information")

        # extract prior information from
        cafe_priors = fadata.prior_information
        priors = {}
        # same prior infomation means different key for dynverse_docker and other environment.
        # transfer prior information from cafe style to dynverse style
        cafe2dynverse = {
            "basis": "dimred",
            "start_cell": "start_id",
            # TODO: show available prior infomation
        }
        for k, v in cafe2dynverse.items():
            if k in cafe_priors:
                priors[v] = cafe_priors[k]
        if "cluster" in cafe_priors:
            groups_id = fadata.obs[cafe_priors["cluster"]].tolist()
            priors["groups_id"] = groups_id
            # logger.debug(f"extract .obs['{cafe_priors['cluster']}']({len(groups_id)}) in as prior information `groups_id` key for dynverse ")

        priors_key_set = set(priors.keys())

        # check required priors
        required_prior_ids = inputs_df["input_id"][inputs_df["required"] & (inputs_df["type"] == "prior_information")].tolist()
        required_prior_ids_set = set(required_prior_ids)
        if not (required_prior_ids_set <= priors_key_set):
            # all required priors are necessary, if not, raise error
            missing_priors = required_prior_ids_set - priors_key_set
            msg = f"""
                ! Prior information {','.join(missing_priors)} is missing from dataset {fadata.id} but is required by the method. \n
                -> If known, you can add this prior information using fadata.add_prior_information({' ,'.join([str(i)+' = <prior>' for i in missing_priors])}). \n
                -> Otherwise, this method cannot be used.
            """
            raise Exception(msg)
        required_prior = {k: priors[k] for k in required_prior_ids}
        logger.debug(f"extract required priors: {required_prior.keys()}")

        # check optional priors
        optional_prior_ids = inputs_df["input_id"][(~inputs_df["required"]) & (inputs_df["type"] == "prior_information")].tolist()
        optional_prior_ids_set = set(optional_prior_ids)
        if not (optional_prior_ids_set <= priors_key_set):
            # all optional priors are not necessary, enven if not all are provided, only warning
            missing_priors = list(optional_prior_ids_set - priors_key_set)
            msg = f"""
                Prior information {','.join(missing_priors)} is optional, but missing from dataset {fadata.id}. \n
                Will not give this prior to method.
            """
            logger.warning(msg)
        optional_prior = {k: priors[k] for k in list(optional_prior_ids_set & priors_key_set)}  # remove irrelevant keys
        logger.debug(f"extract optional prior: {optional_prior.keys()}")

        priors = required_prior | optional_prior

        return priors

    def __str__(self):
        return f"Dynverse Docker Backend: docker image '{self.image_id}'"

__init__(image_id='dynverse/ti_comp1:v0.9.9.01')

Initialize the DynverseDockerBackend class.

Parameters:

Name Type Description Default
image_id str

image id.

'dynverse/ti_comp1:v0.9.9.01'
Source code in cafe/method/fate_dynverse_docker_backend.py
34
35
36
37
38
39
40
41
42
43
def __init__(self, image_id: str = "dynverse/ti_comp1:v0.9.9.01"):
    """Initialize the DynverseDockerBackend class.

    Args:
        image_id (str, optional): image id.
    """
    # logger.debug("DockerBackend __init__")

    self.image_id = image_id
    self.load_backend()  # implemented in DockerBackend, get self.entrypoint

execute(tmp_wd, benchmark_resource)

Execute: Dynverse docker execute and parse result file "output.h5"

ref: pydynverse/wrap/method_create_ti_method_container._method_execution_execute_container

Parameters:

Name Type Description Default
tmp_wd str

tmp working dir for docker mount and saving input.h5

required

Returns:

Name Type Description
DynverseDockerOutput DynverseDockerOutput

parse result file "output.h5"

Source code in cafe/method/fate_dynverse_docker_backend.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def execute(self, tmp_wd: str, benchmark_resource: bool) -> "DynverseDockerOutput":
    """Execute: Dynverse docker execute and parse result file "output.h5"

    ref: pydynverse/wrap/method_create_ti_method_container._method_execution_execute_container

    Args:
        tmp_wd (str): tmp working dir for docker mount and saving input.h5

    Returns:
        DynverseDockerOutput:  parse result file "output.h5"
    """
    args = ["--dataset", "/ti/input.h5", "--output", "/ti/output.h5"]
    # TODO: show dynverse docker debug information
    # args = ["--dataset", "/ti/input.h5", "--output", "/ti/output.h5", "--verbosity", "3"]
    client = docker.from_env()
    start_time = time.time()
    container = client.containers.run(
        image=self.definition["run"]["image_id"],
        command=args,  # r R script command args in Docker
        volumes=[f"{tmp_wd}:/ti"],
        working_dir="/ti/workspace",
        detach=True,
    )

    # real time stat and log list
    stats_list = []

    def collect_stats():
        for stat in container.stats(stream=True, decode=True):
            stats_list.append(stat)

    stats_thread = threading.Thread(target=collect_stats, daemon=True)
    stats_thread.start()

    def collect_logs():
        for line in container.logs(stream=True, follow=True, stdout=True, stderr=True):
            logger.debug(f"[dynverse docker]{line.decode('utf-8').strip()}")

    log_thread = threading.Thread(target=collect_logs, daemon=True)
    log_thread.start()

    container.wait()  # wait until docker finish
    end_time = time.time()
    container.stop()
    container.remove()

    output_h5_filename = f"{tmp_wd}/output.h5"
    if not os.path.exists(output_h5_filename):
        # no h5 file generated by docker, show error log
        logger.error("Docker Error!!!")
    else:
        logger.debug("Docker Finish")
        dynverse_docker_output = read_h5(f"{tmp_wd}/output.h5")  # read docker result h5
        if benchmark_resource:
            usage_dict = parse_docker_resource_usage_string_list(stats_list)
            usage_dict["time"] = end_time - start_time  # mannuly add time
            logger.debug(f"resource usage dict: {usage_dict}")
            dynverse_docker_output["resource_usage"] = usage_dict
        return dynverse_docker_output

postprocess(fadata, trajectory)

Postprocess: call fadata.add_trajectory

Parameters:

Name Type Description Default
fadata FateAnnData

FateAnnData to be added the trajectory dict

required
trajectory dict

trajectory dict

required
Source code in cafe/method/fate_dynverse_docker_backend.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def postprocess(self, fadata: FateAnnData, trajectory: dict) -> None:
    """Postprocess: call fadata.add_trajectory

    Args:
        fadata (FateAnnData): FateAnnData to be added the trajectory dict
        trajectory (dict): trajectory dict
    """
    # trajectory is a "DynverseDockerOutput" object that have been transformed from other wrapper
    fadata.raw_wrapper_dict = {}  # dynverse docker can't get raw wrapper dict
    fadata.add_trajectory(
        milestone_network=trajectory.milestone_network,
        divergence_regions=trajectory.divergence_regions,
        milestone_percentages=trajectory.milestone_percentages,
    )

preprocess(inputs, parameters, priors, tmp_wd, seed=0)

Preproces: create input.h5 for dynverse docker execute

ref: pydynverse/wrap/method_create_ti_method_container._method_execution_preproc_container

Parameters:

Name Type Description Default
inputs dict

input dict

required
parameters dict

parameter dict

required
priors dict

prior information dict

required
tmp_wd dict

tmp working dir for docker mount and saving input.h5

required
seed int

random seed.

0
Source code in cafe/method/fate_dynverse_docker_backend.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def preprocess(self, inputs: dict, parameters: dict, priors: dict, tmp_wd: dict, seed: int = 0) -> None:
    """Preproces: create input.h5 for dynverse docker execute

    ref: pydynverse/wrap/method_create_ti_method_container._method_execution_preproc_container

    Args:
        inputs (dict): input dict
        parameters (dict): parameter dict
        priors (dict): prior information dict
        tmp_wd (dict): tmp working dir for docker mount and saving input.h5
        seed (int, optional): random seed.
    """

    task = inputs
    task["parameters"] = parameters
    task["priors"] = priors
    task["seed"] = seed
    task["verbose"] = True
    write_h5(task, f"{tmp_wd}/input.h5")  # json->h5

run(fadata, parameters)

Run dynverse docker pipeline to infer the trajectory

Parameters:

Name Type Description Default
fadata FateAnnData

FateAnnData to be added the trajectory

required
parameters dict

parameter dict

required
Source code in cafe/method/fate_dynverse_docker_backend.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def run(self, fadata: FateAnnData, parameters: dict):
    """Run dynverse docker pipeline to infer the trajectory

    Args:
        fadata (FateAnnData): FateAnnData to be added the trajectory
        parameters (dict): parameter dict
    """
    # check if benchmark resource from parameters.
    benchmark_resource = self._check_benchmark_resource(parameters)

    # only parse definition file for dynverse docker
    input_df = self.definition.get_inputs_df()
    logger.debug(f"definition input_df: {input_df}")
    inputs = self._extract_inputs(fadata, input_df)  # extract main input, expression matrix
    priors = self._extract_prior_information(fadata, input_df)  # extract prior information
    parameters = self.definition.get_parameters(parameters)

    with tempfile.TemporaryDirectory() as tmp_wd:
        # /tmp/*** temp dir for docker
        logger.debug(f"Temp wd: {tmp_wd}")

        # preprocess
        self.preprocess(inputs, parameters, priors, tmp_wd)

        # method run
        trajectory = self.execute(tmp_wd, benchmark_resource)

        # postprocess
        self.postprocess(fadata, trajectory)

        if "resource_usage" in trajectory:
            fadata.add_resource_usage(trajectory["resource_usage"])