#13 describes a bug related to using the TVM launcher with multiple workers.
After analyzing the current implementation, several places were found in it that can be improved.
At the moment, the launcher for TVM looks like this:
g_graph = None
class BackendTVM:
def __init__(self):
self.arena_num = utils.NUM_PROCESSES
self.pool = None
def load_impl(self, model_path):
time.sleep(utils.LOAD_DURATION)
def predict_impl(self, feed):
time.sleep(utils.INFERENCE_DURATION)
return feed
@staticmethod
def _worker_initializer(model_path):
"""
ATTENTION!
This sets the value for arena_num field (g_graph.arena_num = 1).
Therefore, the predict method always uses a branch with sequential execution.
"""
global g_graph
g_graph = BackendTVM()
g_graph.arena_num = 1
g_graph.load_impl(model_path)
@staticmethod
def _worker_handler(feed):
global g_graph
return g_graph.predict(feed)
def load(self, model_path):
self.load_impl(model_path)
if self.arena_num > 1:
self.pool = multiprocessing.Pool(self.arena_num,
initializer=self._worker_initializer,
initargs=(model_path, )
)
return self
def predict(self, feed):
if self.arena_num > 1:
resp = self.pool.apply_async(self._worker_handler, args=(feed,))
return resp.get()
else:
return self.predict_impl(feed)
It can be seen from the code that when using the multi-worker mode, we always perform inference in sequential mode. If we replace the value of g_graph.arena_num with something other than 1, we will go to the desired branch in the predict method. However, the call pool.apply_async().get() is inherently synchronous inference, so even in this case, we will get sequential data processing.
In view of this, it is proposed to separate the serial mode and the asynchronous mode into two launchers.
First of all, let's define a structure that will be wrapper for model API:
class WorkerDescriptor:
"""
Class that provides an API for loading and inferencing a model
"""
executable: typing.Any
# Some needed fields
# ...
def init_executable(self, executable: typing.Any) -> typing.Any:
"""
Load of the model in the format of the VirtualMachine / GraphExecutor
"""
def build_virtual_machine(_executable: typing.Any) -> typing.Any:
# Some actions
# ...
return _executable
print(f"WorkerData::init_executable[{os.getpid()}]: {executable}")
self.executable = build_virtual_machine(executable)
def inference(self, data: typing.Any) -> typing.Any:
"""
Inference with VirtualMachine / GraphExecutor API
"""
print(f"WorkerData::inference[{os.getpid()}]: {self.executable}")
# Some actions for inference
# ...
time.sleep(utils.INFERENCE_DURATION)
return data
In such a case, the serial launcher can be defined as follows:
class SerialBackendTVM:
"""
Serial launcher
"""
def __init__(self):
self.worker_description: WorkerDescriptor = WorkerDescriptor()
def load(self, model_path: str):
self.worker_description.init_executable(f"exe_{model_path}")
return self
def predict(self, feed: typing.Any) -> typing.Any:
return self.worker_description.inference(feed)
An asynchronous launcher can be defined in two ways.
Process pool based:
class PoolBackendTVM:
"""
Asynchronous launcher based on a pool of workers
"""
pool: multiprocessing.Pool
worker_descriptor: WorkerDescriptor = WorkerDescriptor()
def __init__(self, num_processes: int = utils.NUM_PROCESSES):
self.num_processes: int = num_processes
@staticmethod
def worker_initializer(model_path: str) -> None:
print(f"PoolBackendTVM::worker_initializer[{os.getpid()}]: {model_path}")
# Some actions for load executable
# ...
executable = f"exe_{model_path}_{os.getpid()}"
PoolBackendTVM.worker_descriptor.init_executable(executable)
def worker_handler(self, feed: typing.Any) -> typing.Any:
print(f"PoolBackendTVM::worker_handler[{os.getpid()}][{self.worker_descriptor.executable}]: {feed}")
return self.worker_descriptor.inference(feed)
def load(self, model_path: str):
print(f"PoolBackendTVM::load[{os.getpid()}]: {model_path}")
PoolBackendTVM.pool = multiprocessing.Pool(
self.num_processes,
initializer=PoolBackendTVM.worker_initializer,
initargs=(model_path,)
)
return self
def predict(
self, feed: typing.Any, async_mode: bool = True
) -> typing.Union[typing.Any, multiprocessing.pool.ApplyResult]:
print(f"PoolBackendTVM::predict[{os.getpid()}]: {feed}")
if async_mode:
return self.predict_async(feed)
else:
return self.predict_sync(feed)
def predict_sync(self, feed: typing.Any) -> typing.Any:
print(f"PoolBackendTVM::predict_sync[{os.getpid()}]: {feed}")
return self.pool.apply_async(self.worker_handler, args=(feed,)).get()
def predict_async(self, feed: typing.Any) -> multiprocessing.pool.ApplyResult:
print(f"PoolBackendTVM::predict_async[{os.getpid()}]: {feed}")
resp: multiprocessing.pool.ApplyResult = self.pool.apply_async(self.worker_handler, args=(feed,))
return resp
@staticmethod
def async_response(async_responses: typing.List[multiprocessing.pool.ApplyResult]) -> typing.List[typing.Any]:
return [resp.get() for resp in async_responses]
def finish(self) -> None:
self.pool.terminate()
Based on processes and a concurrent task queue:
class AsyncBackendTVM:
"""
Asynchronous launcher based on processes and a concurrent task queue
"""
concurrent_queue: multiprocessing.Queue = multiprocessing.Queue(utils.QUEUE_SIZE)
manager: multiprocessing.Manager = multiprocessing.Manager()
response_map: typing.Dict = manager.dict()
def __init__(self, num_processes: int = utils.NUM_PROCESSES):
print(f"AsyncBackendTVM::__init__[{os.getpid()}]")
self.num_processes: int = num_processes
self.workers: typing.List[multiprocessing.Process] = []
@staticmethod
def _async_inference(samples: typing.Any, descriptor: WorkerDescriptor):
print(f"AsyncBackendTVM::_async_inference[{os.getpid()}]: {samples}")
return descriptor.inference(samples)
@staticmethod
def _async_response(descriptor: WorkerDescriptor) -> bool:
print(f"AsyncBackendTVM::_async_response[{os.getpid()}]: {descriptor.executable}")
sample_id, samples = AsyncBackendTVM.concurrent_queue.get(block=True)
if samples == -1: # End marker
return False
inference_response = AsyncBackendTVM._async_inference(samples, descriptor)
AsyncBackendTVM.response_map[sample_id] = inference_response
return True
@staticmethod
def _worker_action(descriptor: WorkerDescriptor):
print(f"AsyncBackendTVM::_worker_action[{os.getpid()}]: {descriptor.executable}")
while AsyncBackendTVM._async_response(descriptor):
# Empty body
pass
@staticmethod
def _create_worker(descriptor: WorkerDescriptor) -> multiprocessing.Process:
print(f"AsyncBackendTVM::_create_worker[{os.getpid()}]: {descriptor.executable}")
return multiprocessing.Process(
target=AsyncBackendTVM._worker_action,
args=(descriptor, )
)
def load(self, model_path: str):
print(f"AsyncBackendTVM::load[{os.getpid()}]: ", model_path)
for i in range(self.num_processes):
worker_descriptor = WorkerDescriptor()
# Some actions for load executable
# ...
executable = f"exe_{model_path}_{i}"
worker_descriptor.init_executable(executable)
self.workers.append(AsyncBackendTVM._create_worker(worker_descriptor))
for worker in self.workers:
worker.start()
return self
def predict(self, data: typing.Any) -> None:
print(f"AsyncBackendTVM::predict[{os.getpid()}]: {data}")
# Async predict, no response now
data_id = data
AsyncBackendTVM.concurrent_queue.put((data_id, data), block=True)
def finish(self) -> None:
print(f"AsyncBackendTVM::finish[{os.getpid()}]")
for worker in self.workers:
worker.join()
def async_response(self) -> typing.List[typing.Any]:
print(f"AsyncBackendTVM::async_response[{os.getpid()}]")
result: typing.List[typing.Any] = [None] * len(self.response_map)
for key, value in self.response_map.items():
result[key] = value
return result
The main function in this case will require additional actions for asynchronous launchers:
model = backend.load("model_path")
result = [None] * num_samples
for i in range(num_samples):
res = backend.predict(i)
result[i] = res
# AsyncBackendTVM post-actions
if isinstance(backend, AsyncBackendTVM):
# Stop sample for each worker
for _ in range(utils.NUM_PROCESSES):
backend.predict(-1)
backend.finish()
result = backend.async_response()
# PoolBackendTVM post-actions
if isinstance(backend, PoolBackendTVM):
result = backend.async_response(result)
backend.finish()
#13 describes a bug related to using the TVM launcher with multiple workers.
After analyzing the current implementation, several places were found in it that can be improved.
At the moment, the launcher for TVM looks like this:
It can be seen from the code that when using the multi-worker mode, we always perform inference in sequential mode. If we replace the value of
g_graph.arena_numwith something other than 1, we will go to the desired branch in the predict method. However, the callpool.apply_async().get()is inherently synchronous inference, so even in this case, we will get sequential data processing.In view of this, it is proposed to separate the serial mode and the asynchronous mode into two launchers.
First of all, let's define a structure that will be wrapper for model API:
In such a case, the serial launcher can be defined as follows:
An asynchronous launcher can be defined in two ways.
Process pool based:
Based on processes and a concurrent task queue:
The main function in this case will require additional actions for asynchronous launchers: