Skip to content

[CM][Refactor] Separation of Serial and Asynchronous launchers #20

@KJlaccHoeUM9l

Description

@KJlaccHoeUM9l

#13 describes a bug related to using the TVM launcher with multiple workers.
After analyzing the current implementation, several places were found in it that can be improved.

At the moment, the launcher for TVM looks like this:

g_graph = None


class BackendTVM:
    def __init__(self):
        self.arena_num = utils.NUM_PROCESSES
        self.pool = None

    def load_impl(self, model_path):
        time.sleep(utils.LOAD_DURATION)

    def predict_impl(self, feed):
        time.sleep(utils.INFERENCE_DURATION)
        return feed

    @staticmethod
    def _worker_initializer(model_path):
        """
        ATTENTION!
        This sets the value for arena_num field (g_graph.arena_num = 1).
        Therefore, the predict method always uses a branch with sequential execution.
        """
        global g_graph
        g_graph = BackendTVM()
        g_graph.arena_num = 1
        g_graph.load_impl(model_path)

    @staticmethod
    def _worker_handler(feed):
        global g_graph
        return g_graph.predict(feed)

    def load(self, model_path):
        self.load_impl(model_path)

        if self.arena_num > 1:
            self.pool = multiprocessing.Pool(self.arena_num,
                                             initializer=self._worker_initializer,
                                             initargs=(model_path, )
                                             )
        return self

    def predict(self, feed):
        if self.arena_num > 1:
            resp = self.pool.apply_async(self._worker_handler, args=(feed,))
            return resp.get()
        else:
            return self.predict_impl(feed)

It can be seen from the code that when using the multi-worker mode, we always perform inference in sequential mode. If we replace the value of g_graph.arena_num with something other than 1, we will go to the desired branch in the predict method. However, the call pool.apply_async().get() is inherently synchronous inference, so even in this case, we will get sequential data processing.

In view of this, it is proposed to separate the serial mode and the asynchronous mode into two launchers.

First of all, let's define a structure that will be wrapper for model API:

class WorkerDescriptor:
    """
    Class that provides an API for loading and inferencing a model
    """
    executable: typing.Any
    # Some needed fields
    # ...

    def init_executable(self, executable: typing.Any) -> typing.Any:
        """
        Load of the model in the format of the VirtualMachine / GraphExecutor
        """
        def build_virtual_machine(_executable: typing.Any) -> typing.Any:
            # Some actions
            # ...
            return _executable

        print(f"WorkerData::init_executable[{os.getpid()}]: {executable}")
        self.executable = build_virtual_machine(executable)

    def inference(self, data: typing.Any) -> typing.Any:
        """
        Inference with VirtualMachine / GraphExecutor API
        """
        print(f"WorkerData::inference[{os.getpid()}]: {self.executable}")
        # Some actions for inference
        # ...
        time.sleep(utils.INFERENCE_DURATION)
        return data

In such a case, the serial launcher can be defined as follows:

class SerialBackendTVM:
    """
    Serial launcher
    """
    def __init__(self):
        self.worker_description: WorkerDescriptor = WorkerDescriptor()

    def load(self, model_path: str):
        self.worker_description.init_executable(f"exe_{model_path}")
        return self

    def predict(self, feed: typing.Any) -> typing.Any:
        return self.worker_description.inference(feed)

An asynchronous launcher can be defined in two ways.
Process pool based:

class PoolBackendTVM:
    """
    Asynchronous launcher based on a pool of workers
    """
    pool: multiprocessing.Pool
    worker_descriptor: WorkerDescriptor = WorkerDescriptor()

    def __init__(self, num_processes: int = utils.NUM_PROCESSES):
        self.num_processes: int = num_processes

    @staticmethod
    def worker_initializer(model_path: str) -> None:
        print(f"PoolBackendTVM::worker_initializer[{os.getpid()}]: {model_path}")
        # Some actions for load executable
        # ...
        executable = f"exe_{model_path}_{os.getpid()}"
        PoolBackendTVM.worker_descriptor.init_executable(executable)

    def worker_handler(self, feed: typing.Any) -> typing.Any:
        print(f"PoolBackendTVM::worker_handler[{os.getpid()}][{self.worker_descriptor.executable}]: {feed}")
        return self.worker_descriptor.inference(feed)

    def load(self, model_path: str):
        print(f"PoolBackendTVM::load[{os.getpid()}]: {model_path}")
        PoolBackendTVM.pool = multiprocessing.Pool(
            self.num_processes,
            initializer=PoolBackendTVM.worker_initializer,
            initargs=(model_path,)
        )
        return self

    def predict(
            self, feed: typing.Any, async_mode: bool = True
    ) -> typing.Union[typing.Any, multiprocessing.pool.ApplyResult]:
        print(f"PoolBackendTVM::predict[{os.getpid()}]: {feed}")
        if async_mode:
            return self.predict_async(feed)
        else:
            return self.predict_sync(feed)

    def predict_sync(self, feed: typing.Any) -> typing.Any:
        print(f"PoolBackendTVM::predict_sync[{os.getpid()}]: {feed}")
        return self.pool.apply_async(self.worker_handler, args=(feed,)).get()

    def predict_async(self, feed: typing.Any) -> multiprocessing.pool.ApplyResult:
        print(f"PoolBackendTVM::predict_async[{os.getpid()}]: {feed}")
        resp: multiprocessing.pool.ApplyResult = self.pool.apply_async(self.worker_handler, args=(feed,))
        return resp

    @staticmethod
    def async_response(async_responses: typing.List[multiprocessing.pool.ApplyResult]) -> typing.List[typing.Any]:
        return [resp.get() for resp in async_responses]

    def finish(self) -> None:
        self.pool.terminate()

Based on processes and a concurrent task queue:

class AsyncBackendTVM:
    """
    Asynchronous launcher based on processes and a concurrent task queue
    """
    concurrent_queue: multiprocessing.Queue = multiprocessing.Queue(utils.QUEUE_SIZE)
    manager: multiprocessing.Manager = multiprocessing.Manager()
    response_map: typing.Dict = manager.dict()

    def __init__(self, num_processes: int = utils.NUM_PROCESSES):
        print(f"AsyncBackendTVM::__init__[{os.getpid()}]")
        self.num_processes: int = num_processes
        self.workers: typing.List[multiprocessing.Process] = []

    @staticmethod
    def _async_inference(samples: typing.Any, descriptor: WorkerDescriptor):
        print(f"AsyncBackendTVM::_async_inference[{os.getpid()}]: {samples}")
        return descriptor.inference(samples)

    @staticmethod
    def _async_response(descriptor: WorkerDescriptor) -> bool:
        print(f"AsyncBackendTVM::_async_response[{os.getpid()}]: {descriptor.executable}")
        sample_id, samples = AsyncBackendTVM.concurrent_queue.get(block=True)
        if samples == -1:   # End marker
            return False
        inference_response = AsyncBackendTVM._async_inference(samples, descriptor)
        AsyncBackendTVM.response_map[sample_id] = inference_response
        return True

    @staticmethod
    def _worker_action(descriptor: WorkerDescriptor):
        print(f"AsyncBackendTVM::_worker_action[{os.getpid()}]: {descriptor.executable}")
        while AsyncBackendTVM._async_response(descriptor):
            # Empty body
            pass

    @staticmethod
    def _create_worker(descriptor: WorkerDescriptor) -> multiprocessing.Process:
        print(f"AsyncBackendTVM::_create_worker[{os.getpid()}]: {descriptor.executable}")
        return multiprocessing.Process(
            target=AsyncBackendTVM._worker_action,
            args=(descriptor, )
        )

    def load(self, model_path: str):
        print(f"AsyncBackendTVM::load[{os.getpid()}]: ", model_path)
        for i in range(self.num_processes):
            worker_descriptor = WorkerDescriptor()
            # Some actions for load executable
            # ...
            executable = f"exe_{model_path}_{i}"
            worker_descriptor.init_executable(executable)
            self.workers.append(AsyncBackendTVM._create_worker(worker_descriptor))

        for worker in self.workers:
            worker.start()

        return self

    def predict(self, data: typing.Any) -> None:
        print(f"AsyncBackendTVM::predict[{os.getpid()}]: {data}")
        # Async predict, no response now
        data_id = data
        AsyncBackendTVM.concurrent_queue.put((data_id, data), block=True)

    def finish(self) -> None:
        print(f"AsyncBackendTVM::finish[{os.getpid()}]")
        for worker in self.workers:
            worker.join()

    def async_response(self) -> typing.List[typing.Any]:
        print(f"AsyncBackendTVM::async_response[{os.getpid()}]")
        result: typing.List[typing.Any] = [None] * len(self.response_map)
        for key, value in self.response_map.items():
            result[key] = value
        return result

The main function in this case will require additional actions for asynchronous launchers:

    model = backend.load("model_path")

    result = [None] * num_samples

    for i in range(num_samples):
        res = backend.predict(i)
        result[i] = res

    # AsyncBackendTVM post-actions
    if isinstance(backend, AsyncBackendTVM):
        # Stop sample for each worker
        for _ in range(utils.NUM_PROCESSES):
            backend.predict(-1)
        backend.finish()

        result = backend.async_response()

    # PoolBackendTVM post-actions
    if isinstance(backend, PoolBackendTVM):
        result = backend.async_response(result)
        backend.finish()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions