31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354 | class DynverseDockerBackend(DockerBackend):
"""DockerBackend: specific implementation of abstract Backend class using Dynverse Docker."""
def __init__(self, image_id: str = "dynverse/ti_comp1:v0.9.9.01"):
"""Initialize the DynverseDockerBackend class.
Args:
image_id (str, optional): image id.
"""
# logger.debug("DockerBackend __init__")
self.image_id = image_id
self.load_backend() # implemented in DockerBackend, get self.entrypoint
def load_backend(self):
self._load_image() # load image
self._load_definition() # load definition yaml file
def preprocess(self, inputs: dict, parameters: dict, priors: dict, tmp_wd: dict, seed: int = 0) -> None:
"""Preproces: create input.h5 for dynverse docker execute
ref: pydynverse/wrap/method_create_ti_method_container._method_execution_preproc_container
Args:
inputs (dict): input dict
parameters (dict): parameter dict
priors (dict): prior information dict
tmp_wd (dict): tmp working dir for docker mount and saving input.h5
seed (int, optional): random seed.
"""
task = inputs
task["parameters"] = parameters
task["priors"] = priors
task["seed"] = seed
task["verbose"] = True
write_h5(task, f"{tmp_wd}/input.h5") # json->h5
# def execute_deprecated_deprecated(self, tmp_wd: str, benchmark_resource: bool) -> "DynverseDockerOutput":
# """Execute: Dynverse docker execute with bash command and parse result file "output.h5"
# try to use /usr/bin/time in docker container, but it is not available
# """
# image_id = self.definition["run"]["image_id"]
# args = "--dataset /ti/input.h5 --output /ti/output.h5"
# cmd = f"docker run --rm -v {tmp_wd}:/ti -w /ti/workspace {image_id} {args}"
# if benchmark_resource:
# # 0. command "/usr/bin/time" should move into docker command,
# # cmd = f"/usr/bin/time -v docker run --rm -v {tmp_wd}:/ti -w /ti/workspace {image_id} {args}"
# # the resource result is for docker client(such as pull, start container...), not for container running.
# # 1. change entrypoint program with "/usr/bin/time"
# entrypoint = self.entrypoint[0]
# # # (1) solution1: add it in docker command
# # if entrypoint.split('.')[-1] == 'R':
# # entrypoint = f"Rscript {entrypoint}"
# # else:
# # entrypoint = f"python {entrypoint}"
# # cmd = f" {cmd} /usr/bin/time -v {entrypoint} --dataset /ti/input.h5 --output /ti/output.h5"
# # (2) solution 2: create new entrypoint file, specify it in bash command. However, "/usr/bin/time" dose not exist in container
# new_entrypoint = f"{tmp_wd}/entrypoint.sh"
# with open(new_entrypoint, "w") as f:
# f.write("#!/bin/bash\n")
# # f.write(f"/usr/bin/time -v {entrypoint} {args}") # NOTE: fail to time
# f.write(f"time {entrypoint} {args}")
# os.chmod(new_entrypoint, 0o755)
# cmd = f"docker run --rm -v {tmp_wd}:/ti -w /ti/workspace --entrypoint /ti/entrypoint.sh {image_id} {args}"
# logger.info(f"running docker command: {cmd}")
# # execuate command
# process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# # remove unimportant warning log
# stderr_lines = []
# threading.Thread(target=print_output(logger.info), args=(process.stdout, "[stdout]"), daemon=True).start()
# threading.Thread(target=print_output(logger.debug, stderr_lines), args=(process.stderr, "[stderr]"), daemon=True).start()
# # wait for process
# process.wait()
# # read output h5
# output_h5_filename = f"{tmp_wd}/output.h5"
# if not os.path.exists(output_h5_filename):
# # no h5 file generated by docker, show error log
# logger.error("Docker error, no output.h5 generated by docker command!!!")
# else:
# logger.debug("Docker Finish")
# dynverse_docker_output = read_h5(f"{tmp_wd}/output.h5") # read docker result h5
# if benchmark_resource:
# # read usage string and transfer to dict
# usage_string = "".join(stderr_lines)
# logger.debug(f"resource usage string: {usage_string}")
# usage_dict = parse_bash_resource_usage_string(usage_string)
# logger.debug(f"resource usage dict: {usage_dict}")
# dynverse_docker_output["resource_usage"] = usage_dict
# return dynverse_docker_output
def execute(self, tmp_wd: str, benchmark_resource: bool) -> "DynverseDockerOutput":
"""Execute: Dynverse docker execute and parse result file "output.h5"
ref: pydynverse/wrap/method_create_ti_method_container._method_execution_execute_container
Args:
tmp_wd (str): tmp working dir for docker mount and saving input.h5
Returns:
DynverseDockerOutput: parse result file "output.h5"
"""
args = ["--dataset", "/ti/input.h5", "--output", "/ti/output.h5"]
# TODO: show dynverse docker debug information
# args = ["--dataset", "/ti/input.h5", "--output", "/ti/output.h5", "--verbosity", "3"]
client = docker.from_env()
start_time = time.time()
container = client.containers.run(
image=self.definition["run"]["image_id"],
command=args, # r R script command args in Docker
volumes=[f"{tmp_wd}:/ti"],
working_dir="/ti/workspace",
detach=True,
)
# real time stat and log list
stats_list = []
def collect_stats():
for stat in container.stats(stream=True, decode=True):
stats_list.append(stat)
stats_thread = threading.Thread(target=collect_stats, daemon=True)
stats_thread.start()
def collect_logs():
for line in container.logs(stream=True, follow=True, stdout=True, stderr=True):
logger.debug(f"[dynverse docker]{line.decode('utf-8').strip()}")
log_thread = threading.Thread(target=collect_logs, daemon=True)
log_thread.start()
container.wait() # wait until docker finish
end_time = time.time()
container.stop()
container.remove()
output_h5_filename = f"{tmp_wd}/output.h5"
if not os.path.exists(output_h5_filename):
# no h5 file generated by docker, show error log
logger.error("Docker Error!!!")
else:
logger.debug("Docker Finish")
dynverse_docker_output = read_h5(f"{tmp_wd}/output.h5") # read docker result h5
if benchmark_resource:
usage_dict = parse_docker_resource_usage_string_list(stats_list)
usage_dict["time"] = end_time - start_time # mannuly add time
logger.debug(f"resource usage dict: {usage_dict}")
dynverse_docker_output["resource_usage"] = usage_dict
return dynverse_docker_output
def postprocess(self, fadata: FateAnnData, trajectory: dict) -> None:
"""Postprocess: call fadata.add_trajectory
Args:
fadata (FateAnnData): FateAnnData to be added the trajectory dict
trajectory (dict): trajectory dict
"""
# trajectory is a "DynverseDockerOutput" object that have been transformed from other wrapper
fadata.raw_wrapper_dict = {} # dynverse docker can't get raw wrapper dict
fadata.add_trajectory(
milestone_network=trajectory.milestone_network,
divergence_regions=trajectory.divergence_regions,
milestone_percentages=trajectory.milestone_percentages,
)
def run(self, fadata: FateAnnData, parameters: dict):
"""Run dynverse docker pipeline to infer the trajectory
Args:
fadata (FateAnnData): FateAnnData to be added the trajectory
parameters (dict): parameter dict
"""
# check if benchmark resource from parameters.
benchmark_resource = self._check_benchmark_resource(parameters)
# only parse definition file for dynverse docker
input_df = self.definition.get_inputs_df()
logger.debug(f"definition input_df: {input_df}")
inputs = self._extract_inputs(fadata, input_df) # extract main input, expression matrix
priors = self._extract_prior_information(fadata, input_df) # extract prior information
parameters = self.definition.get_parameters(parameters)
with tempfile.TemporaryDirectory() as tmp_wd:
# /tmp/*** temp dir for docker
logger.debug(f"Temp wd: {tmp_wd}")
# preprocess
self.preprocess(inputs, parameters, priors, tmp_wd)
# method run
trajectory = self.execute(tmp_wd, benchmark_resource)
# postprocess
self.postprocess(fadata, trajectory)
if "resource_usage" in trajectory:
fadata.add_resource_usage(trajectory["resource_usage"])
def _extract_inputs(self, fadata: FateAnnData, inputs_df: pd.DataFrame) -> dict:
"""extract input dict fom definition
ref: PyDynverse/pydynverse/wrap/method_extract_args.py _method_extract_inputs
Args:
fdata (FateAnnData): FateAnnData object
inputs_df (pd.DataFrame): inputs_df from definition.yml
Returns:
dict: input dict
"""
# logger.debug("FateMethod _extract_inputs")
# extract model input expression matrix
input_ids = inputs_df["input_id"][inputs_df["type"] == "expression"].tolist()
inputs = {}
if "counts" in input_ids:
inputs["counts"] = fadata.layers["counts"]
if "expression" in input_ids:
inputs["expression"] = fadata.X
# main expression matrix, for example, Component1 and Slingshot need "expression", while monocle_ddrtree need "counts"
inputs["expression_id"] = input_ids[0]
# add cell and gene ids
inputs["cell_ids"] = fadata.obs.index.tolist()
inputs["feature_ids"] = fadata.var.index.tolist()
return inputs
def _load_definition(self):
"""
extract and parse definition.yml, including description, required parameters and prior knowledge
ref: pydynverse.wrap.container_get._container_get_definition
"""
with tempfile.TemporaryDirectory() as tmp_wd:
# start docker container
client = docker.from_env()
container = client.containers.run(
entrypoint="cp /code/definition.yml /copy_mount/definition.yml", # aim copy dir
image=self.image_id,
volumes=[f"{tmp_wd}:/copy_mount"],
detach=True,
)
container.wait()
container.stop()
container.remove()
# read and parse yml file
with open(f"{tmp_wd}/definition.yml", "r") as file:
definition_raw = yaml.safe_load(file)
definition = Definition(definition_raw)
definition["run"] = {"backend": "container", "image_id": self.image_id}
self.definition = definition
# Note: Only used for dynverse docker backend
def _extract_prior_information(self, fadata, inputs_df):
"""
ref: PyDynverse/pydynverse/wrap/method_extract_args.py _method_extract_priors
"""
# logger.debug("FateMethod _extract_prior_information")
# extract prior information from
cafe_priors = fadata.prior_information
priors = {}
# same prior infomation means different key for dynverse_docker and other environment.
# transfer prior information from cafe style to dynverse style
cafe2dynverse = {
"basis": "dimred",
"start_cell": "start_id",
# TODO: show available prior infomation
}
for k, v in cafe2dynverse.items():
if k in cafe_priors:
priors[v] = cafe_priors[k]
if "cluster" in cafe_priors:
groups_id = fadata.obs[cafe_priors["cluster"]].tolist()
priors["groups_id"] = groups_id
# logger.debug(f"extract .obs['{cafe_priors['cluster']}']({len(groups_id)}) in as prior information `groups_id` key for dynverse ")
priors_key_set = set(priors.keys())
# check required priors
required_prior_ids = inputs_df["input_id"][inputs_df["required"] & (inputs_df["type"] == "prior_information")].tolist()
required_prior_ids_set = set(required_prior_ids)
if not (required_prior_ids_set <= priors_key_set):
# all required priors are necessary, if not, raise error
missing_priors = required_prior_ids_set - priors_key_set
msg = f"""
! Prior information {','.join(missing_priors)} is missing from dataset {fadata.id} but is required by the method. \n
-> If known, you can add this prior information using fadata.add_prior_information({' ,'.join([str(i)+' = <prior>' for i in missing_priors])}). \n
-> Otherwise, this method cannot be used.
"""
raise Exception(msg)
required_prior = {k: priors[k] for k in required_prior_ids}
logger.debug(f"extract required priors: {required_prior.keys()}")
# check optional priors
optional_prior_ids = inputs_df["input_id"][(~inputs_df["required"]) & (inputs_df["type"] == "prior_information")].tolist()
optional_prior_ids_set = set(optional_prior_ids)
if not (optional_prior_ids_set <= priors_key_set):
# all optional priors are not necessary, enven if not all are provided, only warning
missing_priors = list(optional_prior_ids_set - priors_key_set)
msg = f"""
Prior information {','.join(missing_priors)} is optional, but missing from dataset {fadata.id}. \n
Will not give this prior to method.
"""
logger.warning(msg)
optional_prior = {k: priors[k] for k in list(optional_prior_ids_set & priors_key_set)} # remove irrelevant keys
logger.debug(f"extract optional prior: {optional_prior.keys()}")
priors = required_prior | optional_prior
return priors
def __str__(self):
return f"Dynverse Docker Backend: docker image '{self.image_id}'"
|