Skip to content

cafe.method.CFEDockerBackend

cafe.method.CFEDockerBackend

Bases: DockerBackend

CFEDockerBackend: specific implementation of abstract Backend class using CFE Docker.

Source code in cafe/method/fate_cafe_docker_backend.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class CFEDockerBackend(DockerBackend):
    """CFEDockerBackend: specific implementation of abstract Backend class using CFE Docker."""

    def __init__(self, function_name="comp1", image_id: str = "huangzhaoyang/comp1:0.0.1"):
        """Initialize the CFEDockerBackend class.
        Args:
            image_id (_type_, optional): image id.
        """
        self.function_name = function_name
        logger.debug("CFEDockerBackend __init__")

        self.function_name = function_name
        self.image_id = image_id
        self.use_gpu = True
        self.load_backend()

    def load_backend(self):
        self._load_image()
        self._load_function(self.function_name)  # TODO: test if use gpu

    def __call__(self, adata: AnnData, rewrite: bool = True, **parameters):
        """simplified version for self.run"""
        with tempfile.TemporaryDirectory() as tmp_wd:
            logger.debug(f"Temp wd: {tmp_wd}")
            self.preprocess(adata, {}, parameters, tmp_wd)
            trajectory_dict = self.execute(tmp_wd)
            return trajectory_dict

    def preprocess(self, adata: AnnData, parameters: dict, tmp_wd: str) -> None:
        """save adata h5ad , prior information and parameters json file in tmp_wd dir

        Args:
            adata (AnnData): _description_
            parameters (dict): prior information dict
            tmp_wd (str): tmp working dir for docker mount and saving h5ad.h5, json file
        """
        adata_filename = f"{tmp_wd}/adata.h5ad"
        adata.uns["filename"] = "/data/adata.h5ad"  # save filename in uns for function use in docker
        adata.write(filename=adata_filename)
        if settings.save_external_data:
            self.adata = adata  # need to save for comparison later

        with open(f"{tmp_wd}/parameters.json", "w") as f:
            json.dump(parameters, f)

    def execute(self, tmp_wd: str, benchmark_resource: bool = False) -> dict:
        """CFE Docker run, save dict.pkl in tmp_wd dir, return trajectory_dict

        Args:
            tmp_wd (str): tmp working dir for docker mount and saving h5ad.h5, json file

        Returns:
            dict: trajectory dict
        """
        trajectory_dict = {}

        device_requests = None
        if self.use_gpu:
            # logger.info("GPU access requested for the container.")
            # This is equivalent to `docker run --gpus all`
            device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])]

        client = docker.from_env()
        start_time = time.time()
        cmd = f"python run.py --function_name {self.function_name}"
        if settings.save_external_data:
            pass
            # TODO: need update docker version, run.py file
            # cmd += f" --save_h5ad {tmp_wd}/output.h5ad"
        container = client.containers.run(
            image=self.image_id,
            command=cmd,
            volumes=[f"{tmp_wd}:/data"],
            working_dir="/code",
            device_requests=device_requests,  # add gpu access
            detach=True,
        )  # all are saved in "/data" dir and sync to master

        # real time stat and log list
        stats_list = []

        def collect_stats():
            for stat in container.stats(stream=True, decode=True):
                stats_list.append(stat)

        stats_thread = threading.Thread(target=collect_stats, daemon=True)
        stats_thread.start()

        def collect_logs():
            for line in container.logs(stream=True, follow=True, stdout=True, stderr=True):
                logger.debug(f"[cafe docker]{line.decode('utf-8').strip()}")

        log_thread = threading.Thread(target=collect_logs, daemon=True)
        log_thread.start()

        container.wait()  # wait until docker finish
        end_time = time.time()
        container.stop()
        container.remove()

        output_pkl_filename = f"{tmp_wd}/output.pkl"
        if not os.path.exists(output_pkl_filename):
            # no h5 file generated by docker, show error log
            logger.error("Docker Error!!!")
        else:
            logger.debug("Docker Finish")
            with open(output_pkl_filename, "rb") as f:
                trajectory_dict = pickle.load(f)
            if settings.save_external_data:
                output_adata_filename = f"{tmp_wd}/output.h5ad"
                if os.path.exists(output_adata_filename):
                    adata_new = sc.read_h5ad(output_adata_filename)
                    trajectory_dict["external_data"] = extract_external_data_dict_directly(self.adata, adata_new)
                    logger.debug("save external data from adata after conda execution")
                else:
                    # TODO: for some old version docker image without external data saving, need update
                    logger.warning("don't save external data from adata after conda execution")

            if benchmark_resource:
                usage_dict = parse_docker_resource_usage_string_list(stats_list)
                usage_dict["time"] = end_time - start_time  # mannuly add time
                logger.debug(f"resource usage dict: {usage_dict}")
                trajectory_dict["resource_usage"] = usage_dict
            return trajectory_dict

    # def postprocess(self, fadata: FateAnnData, trajectory_dict: dict) -> None:
    #     """Save trajectory_dict

    #     Args:
    #         fadata (FateAnnData): FateAnnData to be added the trajectory dict
    #         trajectory_dict (dict): trajectory dict
    #     """
    #     fadata.add_trajectory_by_type(trajectory_dict)

    def run(self, fadata: FateAnnData, parameters: dict) -> None:
        """Run cafe docker pipeline to infer the trajectory

        Args:
            fadata (FateAnnData): ateAnnData to be added the trajector
            parameters (dict): parameter dict
        """
        # check if benchmark resource from parameters.
        benchmark_resource = self._check_benchmark_resource(parameters)

        # prepare data and parameters
        adata = fadata.to_anndata(delete_trajectory=True)
        self._get_parameters(fadata, parameters)

        # execute method, save input and output file in tmp dir
        with tempfile.TemporaryDirectory() as tmp_wd:
            logger.debug(f"Temp wd: {tmp_wd}")
            self.preprocess(adata, parameters, tmp_wd)

            trajectory_dict = self.execute(tmp_wd, benchmark_resource)

            fadata.add_trajectory_by_type(trajectory_dict)

            # add resource usage if benchmark_resource is True
            if "resource_usage" in trajectory_dict:
                fadata.add_resource_usage(trajectory_dict["resource_usage"])

    def __str__(self):
        return f"CFE Docker Backend: {self.function_name} in docker image '{self.image_id}'"

__call__(adata, rewrite=True, **parameters)

simplified version for self.run

Source code in cafe/method/fate_cafe_docker_backend.py
43
44
45
46
47
48
49
def __call__(self, adata: AnnData, rewrite: bool = True, **parameters):
    """simplified version for self.run"""
    with tempfile.TemporaryDirectory() as tmp_wd:
        logger.debug(f"Temp wd: {tmp_wd}")
        self.preprocess(adata, {}, parameters, tmp_wd)
        trajectory_dict = self.execute(tmp_wd)
        return trajectory_dict

__init__(function_name='comp1', image_id='huangzhaoyang/comp1:0.0.1')

Initialize the CFEDockerBackend class. Args: image_id (type, optional): image id.

Source code in cafe/method/fate_cafe_docker_backend.py
26
27
28
29
30
31
32
33
34
35
36
37
def __init__(self, function_name="comp1", image_id: str = "huangzhaoyang/comp1:0.0.1"):
    """Initialize the CFEDockerBackend class.
    Args:
        image_id (_type_, optional): image id.
    """
    self.function_name = function_name
    logger.debug("CFEDockerBackend __init__")

    self.function_name = function_name
    self.image_id = image_id
    self.use_gpu = True
    self.load_backend()

execute(tmp_wd, benchmark_resource=False)

CFE Docker run, save dict.pkl in tmp_wd dir, return trajectory_dict

Parameters:

Name Type Description Default
tmp_wd str

tmp working dir for docker mount and saving h5ad.h5, json file

required

Returns:

Name Type Description
dict dict

trajectory dict

Source code in cafe/method/fate_cafe_docker_backend.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def execute(self, tmp_wd: str, benchmark_resource: bool = False) -> dict:
    """CFE Docker run, save dict.pkl in tmp_wd dir, return trajectory_dict

    Args:
        tmp_wd (str): tmp working dir for docker mount and saving h5ad.h5, json file

    Returns:
        dict: trajectory dict
    """
    trajectory_dict = {}

    device_requests = None
    if self.use_gpu:
        # logger.info("GPU access requested for the container.")
        # This is equivalent to `docker run --gpus all`
        device_requests = [docker.types.DeviceRequest(count=-1, capabilities=[["gpu"]])]

    client = docker.from_env()
    start_time = time.time()
    cmd = f"python run.py --function_name {self.function_name}"
    if settings.save_external_data:
        pass
        # TODO: need update docker version, run.py file
        # cmd += f" --save_h5ad {tmp_wd}/output.h5ad"
    container = client.containers.run(
        image=self.image_id,
        command=cmd,
        volumes=[f"{tmp_wd}:/data"],
        working_dir="/code",
        device_requests=device_requests,  # add gpu access
        detach=True,
    )  # all are saved in "/data" dir and sync to master

    # real time stat and log list
    stats_list = []

    def collect_stats():
        for stat in container.stats(stream=True, decode=True):
            stats_list.append(stat)

    stats_thread = threading.Thread(target=collect_stats, daemon=True)
    stats_thread.start()

    def collect_logs():
        for line in container.logs(stream=True, follow=True, stdout=True, stderr=True):
            logger.debug(f"[cafe docker]{line.decode('utf-8').strip()}")

    log_thread = threading.Thread(target=collect_logs, daemon=True)
    log_thread.start()

    container.wait()  # wait until docker finish
    end_time = time.time()
    container.stop()
    container.remove()

    output_pkl_filename = f"{tmp_wd}/output.pkl"
    if not os.path.exists(output_pkl_filename):
        # no h5 file generated by docker, show error log
        logger.error("Docker Error!!!")
    else:
        logger.debug("Docker Finish")
        with open(output_pkl_filename, "rb") as f:
            trajectory_dict = pickle.load(f)
        if settings.save_external_data:
            output_adata_filename = f"{tmp_wd}/output.h5ad"
            if os.path.exists(output_adata_filename):
                adata_new = sc.read_h5ad(output_adata_filename)
                trajectory_dict["external_data"] = extract_external_data_dict_directly(self.adata, adata_new)
                logger.debug("save external data from adata after conda execution")
            else:
                # TODO: for some old version docker image without external data saving, need update
                logger.warning("don't save external data from adata after conda execution")

        if benchmark_resource:
            usage_dict = parse_docker_resource_usage_string_list(stats_list)
            usage_dict["time"] = end_time - start_time  # mannuly add time
            logger.debug(f"resource usage dict: {usage_dict}")
            trajectory_dict["resource_usage"] = usage_dict
        return trajectory_dict

preprocess(adata, parameters, tmp_wd)

save adata h5ad , prior information and parameters json file in tmp_wd dir

Parameters:

Name Type Description Default
adata AnnData

description

required
parameters dict

prior information dict

required
tmp_wd str

tmp working dir for docker mount and saving h5ad.h5, json file

required
Source code in cafe/method/fate_cafe_docker_backend.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def preprocess(self, adata: AnnData, parameters: dict, tmp_wd: str) -> None:
    """save adata h5ad , prior information and parameters json file in tmp_wd dir

    Args:
        adata (AnnData): _description_
        parameters (dict): prior information dict
        tmp_wd (str): tmp working dir for docker mount and saving h5ad.h5, json file
    """
    adata_filename = f"{tmp_wd}/adata.h5ad"
    adata.uns["filename"] = "/data/adata.h5ad"  # save filename in uns for function use in docker
    adata.write(filename=adata_filename)
    if settings.save_external_data:
        self.adata = adata  # need to save for comparison later

    with open(f"{tmp_wd}/parameters.json", "w") as f:
        json.dump(parameters, f)

run(fadata, parameters)

Run cafe docker pipeline to infer the trajectory

Parameters:

Name Type Description Default
fadata FateAnnData

ateAnnData to be added the trajector

required
parameters dict

parameter dict

required
Source code in cafe/method/fate_cafe_docker_backend.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def run(self, fadata: FateAnnData, parameters: dict) -> None:
    """Run cafe docker pipeline to infer the trajectory

    Args:
        fadata (FateAnnData): ateAnnData to be added the trajector
        parameters (dict): parameter dict
    """
    # check if benchmark resource from parameters.
    benchmark_resource = self._check_benchmark_resource(parameters)

    # prepare data and parameters
    adata = fadata.to_anndata(delete_trajectory=True)
    self._get_parameters(fadata, parameters)

    # execute method, save input and output file in tmp dir
    with tempfile.TemporaryDirectory() as tmp_wd:
        logger.debug(f"Temp wd: {tmp_wd}")
        self.preprocess(adata, parameters, tmp_wd)

        trajectory_dict = self.execute(tmp_wd, benchmark_resource)

        fadata.add_trajectory_by_type(trajectory_dict)

        # add resource usage if benchmark_resource is True
        if "resource_usage" in trajectory_dict:
            fadata.add_resource_usage(trajectory_dict["resource_usage"])