# This file is part of MDTools.
# Copyright (C) 2021, The MDTools Development Team and all contributors
# listed in the file AUTHORS.rst
#
# MDTools is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# MDTools is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
# You should have received a copy of the GNU General Public License
# along with MDTools. If not, see <http://www.gnu.org/licenses/>.
"""Functions and clases for parallel computing."""
# Standard libraries
import os
import warnings
import traceback
import atexit
import uuid
import pickle
import multiprocessing as mp
# Local application/library specific imports
import mdtools as mdt
[docs]
class ProcessPool:
"""
Create a pool of worker processes which will carry out tasks
submitted to the pool.
This is a very basic custom version of Python's built-in
:class:`multiprocessing.pool.Pool` class. The problem with the
built-in :class:`multiprocessing.pool.Pool` is that the message pipe
between the parent process and its child processes has a limited
size. The pipe may get stuffed when you send too much data between
the processes resulting in an error or, in the worst case,
in a dead lock (see e.g. https://bugs.python.org/issue8426). This
custom version of a process pool circumvents this issue by pickling
the data to a file and only sending the filename through the pipe.
At the other end of the pipe, the data are unpickled from the file
and can be processed.
The file names follow a specific pattern. Files sheduling tasks to
woker (i.e. child) processes follow the pattern
:file:`.pid<ProcessID>_pool<PoolNumber>_task<TaskNumber>_args_uuid_<UUID>.pkl`
if they contain non-keyword arguments (:file:`args`) or
:file:`.pid<ProcessID>_pool<PoolNumber>_task<TaskNumber>_kwargs_uuid_<UUID>.pkl`
if they contain keyword arguments (:file:`kwargs`). Files which are
sent back from the worker processes to the parent process follow the
pattern
:file:`.pid<ProcessID>_pool<PoolNumber>_task<TaskNumber>_result_uuid_<UUID>.pkl`.
Files are deleted automatically after use. You might only have to
clean up your working directory if the program crashes during
execution. Note that the leading dot :file:`.` marks the files
hidden on Unix systems. To show them, you have to invoke
:command:`ls -a`.
This process pool was only implemented to circumvent the above
mentioned issue and has therefore only a very basic functionality,
not comparable with the sophisticated functionality of the built-in
:class:`multiprocessing.pool.Pool`.
"""
#: Counts how many instances of :class:`ProcessPool`` have been
#: created.
_counter = 0
def __init__(self, nprocs=None, parse_file=True):
"""
Initialize a pool of worker processes.
Parameters
----------
nprocs : int
The number of worker processes to use, i.e. the number of
child processes to spawn. If ``None`` (default), the number
worker processes is inferred from
:func:`mdtools.run_time_info.get_num_CPUs`.
parse_file : bool, optional
If ``True`` (default), send data between processes in files
as described above instead of piping them. This option sets
the global behaviour of the pool. It can also be set
indivudially for each task submitted to the pool (see
:meth:`submit_task`).
"""
self._poolnum = ProcessPool._counter
ProcessPool._counter += 1
atexit.register(self.__del__)
if nprocs is None:
self._nprocs = mdt.rti.get_num_CPUs()
else:
self._nprocs = nprocs
if self._nprocs < 1:
raise ValueError("The number of processes ({}) must be"
" positive".format(self._nprocs))
elif self._nprocs == 1:
warnings.warn("The number of processes is only one. A serial"
" code is likely to be faster due to the"
" overhead of multiprocessing", RuntimeWarning)
elif self._nprocs > mdt.rti.get_num_CPUs():
warnings.warn("The number of processes ({}) is larger than"
" the number of available CPUs ({}). This will"
" probably lead to performance degradation"
.format(self._nprocs, mdt.rti.get_num_CPUs()),
RuntimeWarning)
self._parse_file = bool(parse_file)
self._pool_closed = False
self._tasknum = 0
self._ntasks_done = 0
self._taskq = mp.Queue()
self._resultq = mp.Queue()
self._sentinel = None
self._procs = []
for i in range(self._nprocs):
proc = mp.Process(target=self._worker,
args=(self._taskq,
self._resultq,
self._sentinel))
proc.start()
atexit.register(proc.terminate)
self._procs.append(proc)
def __del__(self):
"""Terminate the process pool."""
self.terminate()
[docs]
def n_tasks_submitted(self):
"""
Get the total number of tasks submitted to the pool.
Returns
-------
_tasknum : int
The *total* number of tasks submitted to the pool (including
already finished tasks).
"""
return self._tasknum
[docs]
def n_tasks_done(self):
"""
Get the total number of done tasks.
Returns
-------
_ntasks_done : int
The total number of already finished tasks.
"""
return self._ntasks_done
def _worker(self, inq, outq, sentinel):
"""
A wrapper function executed by the worker processes.
Each spawned worker (child) process runs one :meth:`_worker`
method. The :meth:`_worker` method reads a task from `inq` (the
task queue of the parent process), executes the task and send
its result back to the parent process via `outq` (the result
queue of the parent process). If a worker reads `sentinel` from
`inq`, it stops and the process running the :meth:`_worker`
method terminates.
Parameters
----------
inq : multiprocessing.Queue
Input queue from wich to read tasks.
outq : multiprocessing.Queue
Output queue in which to put the results.
sentinel : object
The sentinel object indicating the end of the input queue.
Returns
-------
tasknums : list
List of task numbers of all tasks processed by the worker
process running this specific :meth:`_worker` method in the
order of processing.
"""
try:
tasknums = []
for tasknum, parse_file, func, args, kwargs in iter(inq.get,
sentinel):
if parse_file:
with open(args, 'rb') as f:
arguments = pickle.load(f)
os.remove(args)
args = arguments
with open(kwargs, 'rb') as f:
keyword_arguments = pickle.load(f)
os.remove(kwargs)
kwargs = keyword_arguments
result = func(*args, **kwargs)
if parse_file:
fresult = (".pid" + str(os.getpid()) +
"_pool" + str(self._poolnum) +
"_task" + str(tasknum) +
"_result"
"_uuid_" + str(uuid.uuid4()) +
".pkl")
with open(fresult, 'wb') as f:
pickle.dump(result, f, pickle.HIGHEST_PROTOCOL)
result = fresult
outq.put((tasknum, parse_file, result))
tasknums.append(tasknum)
return tasknums
except Exception as e:
print("An exception was raised in {} (PID: {}) while"
" processing task {} of ProcessPool {}:"
.format(mp.current_process().name,
os.getpid(),
tasknum,
self._poolnum))
traceback.print_exc()
outq.put((tasknum, e))
[docs]
def submit_task(self, func, args=(), kwargs={}, parse_file=None):
"""
Submit a task to the process pool. The task will start as soon
as a free worker (child) process is available.
Parameters
----------
func : function
The function to execute.
args : tuple
Non-keyword arguments to parse to `func`.
kwargs : dict
Keyword arguments to parse to `func`.
parse_file : bool
If ``True``, send data between processes in files as
described above. The default (``None``) is to infer the
behaviour from the `parse_file` argument that was used for
the construction of this :class:`ProcessPool`.
Returns
-------
_tasknum : int
The number assigned to the submitted task. Tasks are
numberd sequentially according to their submission time (the
first submitted task gets number 0).
"""
if parse_file is None:
parse_file = self._parse_file
if parse_file:
fargs = (".pid" + str(os.getpid()) +
"_pool" + str(self._poolnum) +
"_task" + str(self._tasknum) +
"_args" +
"_uuid_" + str(uuid.uuid4()) +
".pkl")
with open(fargs, 'wb') as f:
pickle.dump(args, f, pickle.HIGHEST_PROTOCOL)
args = fargs
fkwargs = (".pid" + str(os.getpid()) +
"_pool" + str(self._poolnum) +
"_task" + str(self._tasknum) +
"_kwargs" +
"_uuid_" + str(uuid.uuid4()) +
".pkl")
with open(fkwargs, 'wb') as f:
pickle.dump(kwargs, f, pickle.HIGHEST_PROTOCOL)
kwargs = fkwargs
self._taskq.put((self._tasknum, parse_file, func, args, kwargs))
self._tasknum += 1
return self._tasknum - 1
[docs]
def get_results(self):
"""
Get the results of all tasks that have not been collected, yet.
This method blocks until all undone tasks are done and their
results are fetched. Results are returned in the order of task
submission (FIFO: first in, first out).
Returns
-------
results : tuple
A tuple of all collected results.
"""
results = []
for i in range(self._ntasks_done, self._tasknum):
result = self._resultq.get()
if isinstance(result[-1], BaseException):
print("Task {} exited with error:"
.format(result[0]))
raise result[-1]
else:
results.append(result)
self._ntasks_done += 1
results.sort()
tasknum, parse_file, results = tuple(zip(*results))
results = list(results)
for i, fresult in enumerate(results):
if parse_file[i]:
with open(fresult, 'rb') as f:
results[i] = pickle.load(f)
os.remove(fresult)
return tuple(results)
[docs]
def close(self):
"""
Close the process pool.
Prevents any more tasks from being submitted to the pool. Once
all the tasks have been completed, the worker processes will
exit. You should collect your result with :meth:`get_results`
first, because also the result :class:`~multiprocessing.Queue`
will be closed.
"""
for i in range(self._nprocs):
self._taskq.put(self._sentinel)
self._taskq.close()
self._resultq.close()
self._pool_closed = True
[docs]
def terminate(self):
"""
Terminate the process pool.
Stops the worker processes immediately without completing
outstanding work. When the pool object is garbage collected
:meth:`terminate` will be called.
"""
for proc in self._procs:
proc.terminate()
[docs]
def join(self):
"""
Wait for the worker processes to exit.
One should call :meth:`close` or :meth:`terminate` before using
:meth:`join`. If :meth:`join` is called without calling
:meth:`close` or :meth:`terminate` before, :meth:`close` will be
called internally before :meth:`join`.
"""
if not self._pool_closed:
self.close()
self._taskq.join_thread()
self._resultq.join_thread()
for proc in self._procs:
proc.join()