Execution in Worker¶
A Mars worker consists of multiple processes to reduce the impact of the notorious global interpreter lock (GIL) in Python. Executions run in separate processes. To reduce unnecessary memory copy and inter-process communication, shared memory is used to store computation results.
When an operand is submitted into a worker, it will first be queued and wait for memory allocation. When the memory is allocated, dependencies’ data from other workers or from files already spilled to disk are loaded. Once all data required are in memory, calculation can start. When calculation is done, the worker then put the result into shared memory cache. These four states can be seen in the graph below.
A Mars worker starts an ExecutionActor to control all the operands running on the worker. It does not actually do calculation or data transfer itself, but submit these actions to other actors.
OperandActors in schedulers submit an operand into workers through
enqueue_graph calls. The worker accepts and queues the operand. If the
operand is ready for execution, ExecutionActor will notify the scheduler to
determine whether to execute this operand. The scheduler will call
start_execution if it is determined to execute on the worker. Then a
callback is registered via
add_finish_callback. This design allows finish
message be sent to different places, which is necessary for failover.
mars.promise module to handle multiple operands
simultaneously. Execution steps are chained via
then method of the
Promise class. When the final result is successfully stored, all registered
callbacks will be invoked. When exception raises in any chained promise, the
final exception handler registered with
catch will try handling this
All operands in
READY state are submitted into workers selected by the
scheduler. Therefore, the number of operands submitted to the worker is beyond
the capacity of the worker most of the time during execution, and the worker
need to sort these operands in order before picking some of the operands for
execution. This is done in TaskQueueActor in worker, where a priority queue is
maintained to store information of the operands. An allocator runs
periodically, trying to allocate resources for the operand at the head of the
queue till no free space left. The allocator is also triggered when a new
operand arrives, or an operand finishes execution.
Mars worker manages two different parts of memory. The first is private memory in every worker process, handled by every worker process. The second is shared memory between all worker processes, handled by plasma_store in Apache Arrow.
To avoid out-of-memory error in process memory, we introduce a worker-level QuotaActor to allocate process memory. Before an operand starts execution, it sends a memory batch request to the QuotaActor, asking for memory blocks for its input and output chunks. When memory quota left can satisfy the request, the QuotaActor accepts the request. Otherwise the request is queued. After the memory block is released, the allocation is freed and QuotaActor can accept other requests.
Shared memory is handled by plasma_store, which often takes of up to 50% of total memory. As there is no risk of out-of-memory, this part of memory is allocated directly without quota requests. When shared memory is exhausted, Mars worker tries to spill unused chunks into disk.
Chunks spill into disks may be used by later operands, and loading data from disk into shared memory can be costly in IO, especially when the shared memory is exhausted, and we need to spill other chunks to hold the loaded chunk. Therefore, when data sharing is not needed, for instance, the input chunk is only used by a single operand, it can be loaded into private memory instead of shared memory. This can significantly reduce execution time.