ProcessPool
- class mdtools.parallel.ProcessPool(nprocs=None, parse_file=True)[source]
Create a pool of worker processes which will carry out tasks submitted to the pool.
This is a very basic custom version of Python’s built-in
multiprocessing.pool.Pool
class. The problem with the built-inmultiprocessing.pool.Pool
is that the message pipe between the parent process and its child processes has a limited size. The pipe may get stuffed when you send too much data between the processes resulting in an error or, in the worst case, in a dead lock (see e.g. https://bugs.python.org/issue8426). This custom version of a process pool circumvents this issue by pickling the data to a file and only sending the filename through the pipe. At the other end of the pipe, the data are unpickled from the file and can be processed.The file names follow a specific pattern. Files sheduling tasks to woker (i.e. child) processes follow the pattern
.pid<ProcessID>_pool<PoolNumber>_task<TaskNumber>_args_uuid_<UUID>.pkl
if they contain non-keyword arguments (args
) or.pid<ProcessID>_pool<PoolNumber>_task<TaskNumber>_kwargs_uuid_<UUID>.pkl
if they contain keyword arguments (kwargs
). Files which are sent back from the worker processes to the parent process follow the pattern.pid<ProcessID>_pool<PoolNumber>_task<TaskNumber>_result_uuid_<UUID>.pkl
. Files are deleted automatically after use. You might only have to clean up your working directory if the program crashes during execution. Note that the leading dot.
marks the files hidden on Unix systems. To show them, you have to invoke ls -a.This process pool was only implemented to circumvent the above mentioned issue and has therefore only a very basic functionality, not comparable with the sophisticated functionality of the built-in
multiprocessing.pool.Pool
.Initialize a pool of worker processes.
- Parameters:
nprocs (
int
) – The number of worker processes to use, i.e. the number of child processes to spawn. IfNone
(default), the number worker processes is inferred frommdtools.run_time_info.get_num_CPUs()
.parse_file (
bool
, optional) – IfTrue
(default), send data between processes in files as described above instead of piping them. This option sets the global behaviour of the pool. It can also be set indivudially for each task submitted to the pool (seesubmit_task()
).
Methods
Close the process pool.
Get the results of all tasks that have not been collected, yet.
Wait for the worker processes to exit.
Get the total number of done tasks.
Get the total number of tasks submitted to the pool.
Submit a task to the process pool.
Terminate the process pool.
- close()[source]
Close the process pool.
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed, the worker processes will exit. You should collect your result with
get_results()
first, because also the resultQueue
will be closed.
- get_results()[source]
Get the results of all tasks that have not been collected, yet.
This method blocks until all undone tasks are done and their results are fetched. Results are returned in the order of task submission (FIFO: first in, first out).
- Returns:
results (
tuple
) – A tuple of all collected results.
- join()[source]
Wait for the worker processes to exit.
One should call
close()
orterminate()
before usingjoin()
. Ifjoin()
is called without callingclose()
orterminate()
before,close()
will be called internally beforejoin()
.
- n_tasks_done()[source]
Get the total number of done tasks.
- Returns:
_ntasks_done (
int
) – The total number of already finished tasks.
- n_tasks_submitted()[source]
Get the total number of tasks submitted to the pool.
- Returns:
_tasknum (
int
) – The total number of tasks submitted to the pool (including already finished tasks).
- submit_task(func, args=(), kwargs={}, parse_file=None)[source]
Submit a task to the process pool. The task will start as soon as a free worker (child) process is available.
- Parameters:
func (
function
) – The function to execute.args (
tuple
) – Non-keyword arguments to parse to func.kwargs (
dict
) – Keyword arguments to parse to func.parse_file (
bool
) – IfTrue
, send data between processes in files as described above. The default (None
) is to infer the behaviour from the parse_file argument that was used for the construction of thisProcessPool
.
- Returns:
_tasknum (
int
) – The number assigned to the submitted task. Tasks are numberd sequentially according to their submission time (the first submitted task gets number 0).
- terminate()[source]
Terminate the process pool.
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected
terminate()
will be called.