���ѧۧݧ�ӧ�� �ާ֧ߧ֧էا֧� - ���֧էѧܧ�ڧ��ӧѧ�� - /home/ukubnwwtacc0unt/chapelbellstudios.com/uploads/cover/multiprocessing.tar
���ѧ٧ѧ�
__init__.py 0000644 00000001633 15204243001 0006647 0 ustar 00 # # Package analogous to 'threading.py' but using processes # # multiprocessing/__init__.py # # This package is intended to duplicate the functionality (and much of # the API) of threading.py but uses processes instead of threads. A # subpackage 'multiprocessing.dummy' has the same API but is a simple # wrapper for 'threading'. # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # import sys from . import context # # Copy stuff from default context # globals().update((name, getattr(context._default_context, name)) for name in context._default_context.__all__) __all__ = context._default_context.__all__ # # XXX These should not really be documented or public. # SUBDEBUG = 5 SUBWARNING = 25 # # Alias for main module -- will be reset by bootstrapping child processes # if '__main__' in sys.modules: sys.modules['__mp_main__'] = sys.modules['__main__'] context.py 0000644 00000025245 15204243001 0006601 0 ustar 00 import os import sys import threading from . import process from . import reduction __all__ = [] # things are copied from here to __init__.py # # Exceptions # class ProcessError(Exception): pass class BufferTooShort(ProcessError): pass class TimeoutError(ProcessError): pass class AuthenticationError(ProcessError): pass # # Base type for contexts # class BaseContext(object): ProcessError = ProcessError BufferTooShort = BufferTooShort TimeoutError = TimeoutError AuthenticationError = AuthenticationError current_process = staticmethod(process.current_process) active_children = staticmethod(process.active_children) def cpu_count(self): '''Returns the number of CPUs in the system''' num = os.cpu_count() if num is None: raise NotImplementedError('cannot determine number of cpus') else: return num def Manager(self): '''Returns a manager associated with a running server process The managers methods such as `Lock()`, `Condition()` and `Queue()` can be used to create shared objects. ''' from .managers import SyncManager m = SyncManager(ctx=self.get_context()) m.start() return m def Pipe(self, duplex=True): '''Returns two connection object connected by a pipe''' from .connection import Pipe return Pipe(duplex) def Lock(self): '''Returns a non-recursive lock object''' from .synchronize import Lock return Lock(ctx=self.get_context()) def RLock(self): '''Returns a recursive lock object''' from .synchronize import RLock return RLock(ctx=self.get_context()) def Condition(self, lock=None): '''Returns a condition object''' from .synchronize import Condition return Condition(lock, ctx=self.get_context()) def Semaphore(self, value=1): '''Returns a semaphore object''' from .synchronize import Semaphore return Semaphore(value, ctx=self.get_context()) def BoundedSemaphore(self, value=1): '''Returns a bounded semaphore object''' from .synchronize import BoundedSemaphore return BoundedSemaphore(value, ctx=self.get_context()) def Event(self): '''Returns an event object''' from .synchronize import Event return Event(ctx=self.get_context()) def Barrier(self, parties, action=None, timeout=None): '''Returns a barrier object''' from .synchronize import Barrier return Barrier(parties, action, timeout, ctx=self.get_context()) def Queue(self, maxsize=0): '''Returns a queue object''' from .queues import Queue return Queue(maxsize, ctx=self.get_context()) def JoinableQueue(self, maxsize=0): '''Returns a queue object''' from .queues import JoinableQueue return JoinableQueue(maxsize, ctx=self.get_context()) def SimpleQueue(self): '''Returns a queue object''' from .queues import SimpleQueue return SimpleQueue(ctx=self.get_context()) def Pool(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): '''Returns a process pool object''' from .pool import Pool return Pool(processes, initializer, initargs, maxtasksperchild, context=self.get_context()) def RawValue(self, typecode_or_type, *args): '''Returns a shared object''' from .sharedctypes import RawValue return RawValue(typecode_or_type, *args) def RawArray(self, typecode_or_type, size_or_initializer): '''Returns a shared array''' from .sharedctypes import RawArray return RawArray(typecode_or_type, size_or_initializer) def Value(self, typecode_or_type, *args, lock=True): '''Returns a synchronized shared object''' from .sharedctypes import Value return Value(typecode_or_type, *args, lock=lock, ctx=self.get_context()) def Array(self, typecode_or_type, size_or_initializer, *, lock=True): '''Returns a synchronized shared array''' from .sharedctypes import Array return Array(typecode_or_type, size_or_initializer, lock=lock, ctx=self.get_context()) def freeze_support(self): '''Check whether this is a fake forked process in a frozen executable. If so then run code specified by commandline and exit. ''' if sys.platform == 'win32' and getattr(sys, 'frozen', False): from .spawn import freeze_support freeze_support() def get_logger(self): '''Return package logger -- if it does not already exist then it is created. ''' from .util import get_logger return get_logger() def log_to_stderr(self, level=None): '''Turn on logging and add a handler which prints to stderr''' from .util import log_to_stderr return log_to_stderr(level) def allow_connection_pickling(self): '''Install support for sending connections and sockets between processes ''' # This is undocumented. In previous versions of multiprocessing # its only effect was to make socket objects inheritable on Windows. from . import connection def set_executable(self, executable): '''Sets the path to a python.exe or pythonw.exe binary used to run child processes instead of sys.executable when using the 'spawn' start method. Useful for people embedding Python. ''' from .spawn import set_executable set_executable(executable) def set_forkserver_preload(self, module_names): '''Set list of module names to try to load in forkserver process. This is really just a hint. ''' from .forkserver import set_forkserver_preload set_forkserver_preload(module_names) def get_context(self, method=None): if method is None: return self try: ctx = _concrete_contexts[method] except KeyError: raise ValueError('cannot find context for %r' % method) ctx._check_available() return ctx def get_start_method(self, allow_none=False): return self._name def set_start_method(self, method, force=False): raise ValueError('cannot set start method of concrete context') @property def reducer(self): '''Controls how objects will be reduced to a form that can be shared with other processes.''' return globals().get('reduction') @reducer.setter def reducer(self, reduction): globals()['reduction'] = reduction def _check_available(self): pass # # Type of default context -- underlying context can be set at most once # class Process(process.BaseProcess): _start_method = None @staticmethod def _Popen(process_obj): return _default_context.get_context().Process._Popen(process_obj) class DefaultContext(BaseContext): Process = Process def __init__(self, context): self._default_context = context self._actual_context = None def get_context(self, method=None): if method is None: if self._actual_context is None: self._actual_context = self._default_context return self._actual_context else: return super().get_context(method) def set_start_method(self, method, force=False): if self._actual_context is not None and not force: raise RuntimeError('context has already been set') if method is None and force: self._actual_context = None return self._actual_context = self.get_context(method) def get_start_method(self, allow_none=False): if self._actual_context is None: if allow_none: return None self._actual_context = self._default_context return self._actual_context._name def get_all_start_methods(self): if sys.platform == 'win32': return ['spawn'] else: if reduction.HAVE_SEND_HANDLE: return ['fork', 'spawn', 'forkserver'] else: return ['fork', 'spawn'] DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_') # # Context types for fixed start method # if sys.platform != 'win32': class ForkProcess(process.BaseProcess): _start_method = 'fork' @staticmethod def _Popen(process_obj): from .popen_fork import Popen return Popen(process_obj) class SpawnProcess(process.BaseProcess): _start_method = 'spawn' @staticmethod def _Popen(process_obj): from .popen_spawn_posix import Popen return Popen(process_obj) class ForkServerProcess(process.BaseProcess): _start_method = 'forkserver' @staticmethod def _Popen(process_obj): from .popen_forkserver import Popen return Popen(process_obj) class ForkContext(BaseContext): _name = 'fork' Process = ForkProcess class SpawnContext(BaseContext): _name = 'spawn' Process = SpawnProcess class ForkServerContext(BaseContext): _name = 'forkserver' Process = ForkServerProcess def _check_available(self): if not reduction.HAVE_SEND_HANDLE: raise ValueError('forkserver start method not available') _concrete_contexts = { 'fork': ForkContext(), 'spawn': SpawnContext(), 'forkserver': ForkServerContext(), } _default_context = DefaultContext(_concrete_contexts['fork']) else: class SpawnProcess(process.BaseProcess): _start_method = 'spawn' @staticmethod def _Popen(process_obj): from .popen_spawn_win32 import Popen return Popen(process_obj) class SpawnContext(BaseContext): _name = 'spawn' Process = SpawnProcess _concrete_contexts = { 'spawn': SpawnContext(), } _default_context = DefaultContext(_concrete_contexts['spawn']) # # Force the start method # def _force_start_method(method): _default_context._actual_context = _concrete_contexts[method] # # Check that the current thread is spawning a child process # _tls = threading.local() def get_spawning_popen(): return getattr(_tls, 'spawning_popen', None) def set_spawning_popen(popen): _tls.spawning_popen = popen def assert_spawning(obj): if get_spawning_popen() is None: raise RuntimeError( '%s objects should only be shared between processes' ' through inheritance' % type(obj).__name__ ) pool.py 0000644 00000062713 15204243001 0006067 0 ustar 00 # # Module providing the `Pool` class for managing a process pool # # multiprocessing/pool.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = ['Pool', 'ThreadPool'] # # Imports # import threading import queue import itertools import collections import os import time import traceback # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util from . import get_context, TimeoutError # # Constants representing the state of a pool # RUN = 0 CLOSE = 1 TERMINATE = 2 # # Miscellaneous # job_counter = itertools.count() def mapstar(args): return list(map(*args)) def starmapstar(args): return list(itertools.starmap(args[0], args[1])) # # Hack to embed stringification of remote traceback in local traceback # class RemoteTraceback(Exception): def __init__(self, tb): self.tb = tb def __str__(self): return self.tb class ExceptionWithTraceback: def __init__(self, exc, tb): tb = traceback.format_exception(type(exc), exc, tb) tb = ''.join(tb) self.exc = exc self.tb = '\n"""\n%s"""' % tb def __reduce__(self): return rebuild_exc, (self.exc, self.tb) def rebuild_exc(exc, tb): exc.__cause__ = RemoteTraceback(tb) return exc # # Code run by worker processes # class MaybeEncodingError(Exception): """Wraps possible unpickleable errors, so they can be safely sent through the socket.""" def __init__(self, exc, value): self.exc = repr(exc) self.value = repr(value) super(MaybeEncodingError, self).__init__(self.exc, self.value) def __str__(self): return "Error sending result: '%s'. Reason: '%s'" % (self.value, self.exc) def __repr__(self): return "<%s: %s>" % (self.__class__.__name__, self) def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, wrap_exception=False): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put get = inqueue.get if hasattr(inqueue, '_writer'): inqueue._writer.close() outqueue._reader.close() if initializer is not None: initializer(*initargs) completed = 0 while maxtasks is None or (maxtasks and completed < maxtasks): try: task = get() except (EOFError, OSError): util.debug('worker got EOFError or OSError -- exiting') break if task is None: util.debug('worker got sentinel -- exiting') break job, i, func, args, kwds = task try: result = (True, func(*args, **kwds)) except Exception as e: if wrap_exception and func is not _helper_reraises_exception: e = ExceptionWithTraceback(e, e.__traceback__) result = (False, e) try: put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1]) util.debug("Possible encoding error while sending result: %s" % ( wrapped)) put((job, i, (False, wrapped))) task = job = result = func = args = kwds = None completed += 1 util.debug('worker exiting after %d tasks' % completed) def _helper_reraises_exception(ex): 'Pickle-able helper function for use by _guarded_task_generation.' raise ex # # Class representing a process pool # class Pool(object): ''' Class which supports an async version of applying functions to arguments. ''' _wrap_exception = True def Process(self, *args, **kwds): return self._ctx.Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None): self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.Queue() self._cache = {} self._state = RUN self._maxtasksperchild = maxtasksperchild self._initializer = initializer self._initargs = initargs if processes is None: processes = os.cpu_count() or 1 if processes < 1: raise ValueError("Number of processes must be at least 1") if initializer is not None and not callable(initializer): raise TypeError('initializer must be a callable') self._processes = processes self._pool = [] self._repopulate_pool() self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) ) self._worker_handler.daemon = True self._worker_handler._state = RUN self._worker_handler.start() self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache) ) self._task_handler.daemon = True self._task_handler._state = RUN self._task_handler.start() self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) ) self._result_handler.daemon = True self._result_handler._state = RUN self._result_handler.start() self._terminate = util.Finalize( self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._worker_handler, self._task_handler, self._result_handler, self._cache), exitpriority=15 ) def _join_exited_workers(self): """Cleanup after any worker processes which have exited due to reaching their specified lifetime. Returns True if any workers were cleaned up. """ cleaned = False for i in reversed(range(len(self._pool))): worker = self._pool[i] if worker.exitcode is not None: # worker exited util.debug('cleaning up worker %d' % i) worker.join() cleaned = True del self._pool[i] return cleaned def _repopulate_pool(self): """Bring the number of pool processes up to the specified number, for use after reaping workers which have exited. """ for i in range(self._processes - len(self._pool)): w = self.Process(target=worker, args=(self._inqueue, self._outqueue, self._initializer, self._initargs, self._maxtasksperchild, self._wrap_exception) ) self._pool.append(w) w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start() util.debug('added worker') def _maintain_pool(self): """Clean up any exited workers and start replacements for them. """ if self._join_exited_workers(): self._repopulate_pool() def _setup_queues(self): self._inqueue = self._ctx.SimpleQueue() self._outqueue = self._ctx.SimpleQueue() self._quick_put = self._inqueue._writer.send self._quick_get = self._outqueue._reader.recv def apply(self, func, args=(), kwds={}): ''' Equivalent of `func(*args, **kwds)`. ''' assert self._state == RUN return self.apply_async(func, args, kwds).get() def map(self, func, iterable, chunksize=None): ''' Apply `func` to each element in `iterable`, collecting the results in a list that is returned. ''' return self._map_async(func, iterable, mapstar, chunksize).get() def starmap(self, func, iterable, chunksize=None): ''' Like `map()` method but the elements of the `iterable` are expected to be iterables as well and will be unpacked as arguments. Hence `func` and (a, b) becomes func(a, b). ''' return self._map_async(func, iterable, starmapstar, chunksize).get() def starmap_async(self, func, iterable, chunksize=None, callback=None, error_callback=None): ''' Asynchronous version of `starmap()` method. ''' return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) def _guarded_task_generation(self, result_job, func, iterable): '''Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.''' try: i = -1 for i, x in enumerate(iterable): yield (result_job, i, func, (x,), {}) except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) def imap(self, func, iterable, chunksize=1): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' if self._state != RUN: raise ValueError("Pool not running") if chunksize == 1: result = IMapIterator(self._cache) self._taskqueue.put( ( self._guarded_task_generation(result._job, func, iterable), result._set_length )) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapIterator(self._cache) self._taskqueue.put( ( self._guarded_task_generation(result._job, mapstar, task_batches), result._set_length )) return (item for chunk in result for item in chunk) def imap_unordered(self, func, iterable, chunksize=1): ''' Like `imap()` method but ordering of results is arbitrary. ''' if self._state != RUN: raise ValueError("Pool not running") if chunksize == 1: result = IMapUnorderedIterator(self._cache) self._taskqueue.put( ( self._guarded_task_generation(result._job, func, iterable), result._set_length )) return result else: assert chunksize > 1 task_batches = Pool._get_tasks(func, iterable, chunksize) result = IMapUnorderedIterator(self._cache) self._taskqueue.put( ( self._guarded_task_generation(result._job, mapstar, task_batches), result._set_length )) return (item for chunk in result for item in chunk) def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ''' Asynchronous version of `apply()` method. ''' if self._state != RUN: raise ValueError("Pool not running") result = ApplyResult(self._cache, callback, error_callback) self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) return result def map_async(self, func, iterable, chunksize=None, callback=None, error_callback=None): ''' Asynchronous version of `map()` method. ''' return self._map_async(func, iterable, mapstar, chunksize, callback, error_callback) def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, error_callback=None): ''' Helper function to implement map, starmap and their async counterparts. ''' if self._state != RUN: raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 if len(iterable) == 0: chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) self._taskqueue.put( ( self._guarded_task_generation(result._job, mapper, task_batches), None ) ) return result @staticmethod def _handle_workers(pool): thread = threading.current_thread() # Keep maintaining workers until the cache gets drained, unless the pool # is terminated. while thread._state == RUN or (pool._cache and thread._state != TERMINATE): pool._maintain_pool() time.sleep(0.1) # send sentinel to stop workers pool._taskqueue.put(None) util.debug('worker handler exiting') @staticmethod def _handle_tasks(taskqueue, put, outqueue, pool, cache): thread = threading.current_thread() for taskseq, set_length in iter(taskqueue.get, None): task = None try: # iterating taskseq cannot fail for task in taskseq: if thread._state: util.debug('task handler found thread._state != RUN') break try: put(task) except Exception as e: job, idx = task[:2] try: cache[job]._set(idx, (False, e)) except KeyError: pass else: if set_length: util.debug('doing set_length()') idx = task[1] if task else -1 set_length(idx + 1) continue break finally: task = taskseq = job = None else: util.debug('task handler got sentinel') try: # tell result handler to finish when cache is empty util.debug('task handler sending sentinel to result handler') outqueue.put(None) # tell workers there is no more work util.debug('task handler sending sentinel to workers') for p in pool: put(None) except OSError: util.debug('task handler got OSError when sending sentinels') util.debug('task handler exiting') @staticmethod def _handle_results(outqueue, get, cache): thread = threading.current_thread() while 1: try: task = get() except (OSError, EOFError): util.debug('result handler got EOFError/OSError -- exiting') return if thread._state: assert thread._state == TERMINATE util.debug('result handler found thread._state=TERMINATE') break if task is None: util.debug('result handler got sentinel') break job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass task = job = obj = None while cache and thread._state != TERMINATE: try: task = get() except (OSError, EOFError): util.debug('result handler got EOFError/OSError -- exiting') return if task is None: util.debug('result handler ignoring extra sentinel') continue job, i, obj = task try: cache[job]._set(i, obj) except KeyError: pass task = job = obj = None if hasattr(outqueue, '_reader'): util.debug('ensuring that outqueue is not full') # If we don't make room available in outqueue then # attempts to add the sentinel (None) to outqueue may # block. There is guaranteed to be no more than 2 sentinels. try: for i in range(10): if not outqueue._reader.poll(): break get() except (OSError, EOFError): pass util.debug('result handler exiting: len(cache)=%s, thread._state=%s', len(cache), thread._state) @staticmethod def _get_tasks(func, it, size): it = iter(it) while 1: x = tuple(itertools.islice(it, size)) if not x: return yield (func, x) def __reduce__(self): raise NotImplementedError( 'pool objects cannot be passed between processes or pickled' ) def close(self): util.debug('closing pool') if self._state == RUN: self._state = CLOSE self._worker_handler._state = CLOSE def terminate(self): util.debug('terminating pool') self._state = TERMINATE self._worker_handler._state = TERMINATE self._terminate() def join(self): util.debug('joining pool') assert self._state in (CLOSE, TERMINATE) self._worker_handler.join() self._task_handler.join() self._result_handler.join() for p in self._pool: p.join() @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # task_handler may be blocked trying to put items on inqueue util.debug('removing tasks from inqueue until task handler finished') inqueue._rlock.acquire() while task_handler.is_alive() and inqueue._reader.poll(): inqueue._reader.recv() time.sleep(0) @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once util.debug('finalizing pool') worker_handler._state = TERMINATE task_handler._state = TERMINATE util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) assert result_handler.is_alive() or len(cache) == 0 result_handler._state = TERMINATE outqueue.put(None) # sentinel # We must wait for the worker handler to exit before terminating # workers because we don't want workers to be restarted behind our back. util.debug('joining worker handler') if threading.current_thread() is not worker_handler: worker_handler.join() # Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'): util.debug('terminating workers') for p in pool: if p.exitcode is None: p.terminate() util.debug('joining task handler') if threading.current_thread() is not task_handler: task_handler.join() util.debug('joining result handler') if threading.current_thread() is not result_handler: result_handler.join() if pool and hasattr(pool[0], 'terminate'): util.debug('joining pool workers') for p in pool: if p.is_alive(): # worker has not yet exited util.debug('cleaning up worker %d' % p.pid) p.join() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.terminate() # # Class whose instances are returned by `Pool.apply_async()` # class ApplyResult(object): def __init__(self, cache, callback, error_callback): self._event = threading.Event() self._job = next(job_counter) self._cache = cache self._callback = callback self._error_callback = error_callback cache[self._job] = self def ready(self): return self._event.is_set() def successful(self): assert self.ready() return self._success def wait(self, timeout=None): self._event.wait(timeout) def get(self, timeout=None): self.wait(timeout) if not self.ready(): raise TimeoutError if self._success: return self._value else: raise self._value def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) if self._error_callback and not self._success: self._error_callback(self._value) self._event.set() del self._cache[self._job] AsyncResult = ApplyResult # create alias -- see #17805 # # Class whose instances are returned by `Pool.map_async()` # class MapResult(ApplyResult): def __init__(self, cache, chunksize, length, callback, error_callback): ApplyResult.__init__(self, cache, callback, error_callback=error_callback) self._success = True self._value = [None] * length self._chunksize = chunksize if chunksize <= 0: self._number_left = 0 self._event.set() del cache[self._job] else: self._number_left = length//chunksize + bool(length % chunksize) def _set(self, i, success_result): self._number_left -= 1 success, result = success_result if success and self._success: self._value[i*self._chunksize:(i+1)*self._chunksize] = result if self._number_left == 0: if self._callback: self._callback(self._value) del self._cache[self._job] self._event.set() else: if not success and self._success: # only store first exception self._success = False self._value = result if self._number_left == 0: # only consider the result ready once all jobs are done if self._error_callback: self._error_callback(self._value) del self._cache[self._job] self._event.set() # # Class whose instances are returned by `Pool.imap()` # class IMapIterator(object): def __init__(self, cache): self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) self._cache = cache self._items = collections.deque() self._index = 0 self._length = None self._unsorted = {} cache[self._job] = self def __iter__(self): return self def next(self, timeout=None): with self._cond: try: item = self._items.popleft() except IndexError: if self._index == self._length: raise StopIteration self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: raise StopIteration raise TimeoutError success, value = item if success: return value raise value __next__ = next # XXX def _set(self, i, obj): with self._cond: if self._index == i: self._items.append(obj) self._index += 1 while self._index in self._unsorted: obj = self._unsorted.pop(self._index) self._items.append(obj) self._index += 1 self._cond.notify() else: self._unsorted[i] = obj if self._index == self._length: del self._cache[self._job] def _set_length(self, length): with self._cond: self._length = length if self._index == self._length: self._cond.notify() del self._cache[self._job] # # Class whose instances are returned by `Pool.imap_unordered()` # class IMapUnorderedIterator(IMapIterator): def _set(self, i, obj): with self._cond: self._items.append(obj) self._index += 1 self._cond.notify() if self._index == self._length: del self._cache[self._job] # # # class ThreadPool(Pool): _wrap_exception = False @staticmethod def Process(*args, **kwds): from .dummy import Process return Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=()): Pool.__init__(self, processes, initializer, initargs) def _setup_queues(self): self._inqueue = queue.Queue() self._outqueue = queue.Queue() self._quick_put = self._inqueue.put self._quick_get = self._outqueue.get @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # put sentinels at head of inqueue to make workers finish with inqueue.not_empty: inqueue.queue.clear() inqueue.queue.extend([None] * size) inqueue.not_empty.notify_all() popen_fork.py 0000644 00000004403 15204243001 0007250 0 ustar 00 import os import sys import signal from . import util __all__ = ['Popen'] # # Start child process using fork # class Popen(object): method = 'fork' def __init__(self, process_obj): util._flush_std_streams() self.returncode = None self._launch(process_obj) def duplicate_for_child(self, fd): return fd def poll(self, flag=os.WNOHANG): if self.returncode is None: while True: try: pid, sts = os.waitpid(self.pid, flag) except OSError as e: # Child process not yet created. See #1731717 # e.errno == errno.ECHILD == 10 return None else: break if pid == self.pid: if os.WIFSIGNALED(sts): self.returncode = -os.WTERMSIG(sts) else: assert os.WIFEXITED(sts) self.returncode = os.WEXITSTATUS(sts) return self.returncode def wait(self, timeout=None): if self.returncode is None: if timeout is not None: from multiprocessing.connection import wait if not wait([self.sentinel], timeout): return None # This shouldn't block if wait() returned successfully. return self.poll(os.WNOHANG if timeout == 0.0 else 0) return self.returncode def terminate(self): if self.returncode is None: try: os.kill(self.pid, signal.SIGTERM) except ProcessLookupError: pass except OSError: if self.wait(timeout=0.1) is None: raise def _launch(self, process_obj): code = 1 parent_r, child_w = os.pipe() self.pid = os.fork() if self.pid == 0: try: os.close(parent_r) if 'random' in sys.modules: import random random.seed() code = process_obj._bootstrap() finally: os._exit(code) else: os.close(child_w) util.Finalize(self, os.close, (parent_r,)) self.sentinel = parent_r connection.py 0000644 00000074526 15204243001 0007262 0 ustar 00 # # A higher level module for using sockets (or Windows named pipes) # # multiprocessing/connection.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # __all__ = [ 'Client', 'Listener', 'Pipe', 'wait' ] import io import os import sys import socket import struct import time import tempfile import itertools import _multiprocessing from . import util from . import AuthenticationError, BufferTooShort from .context import reduction _ForkingPickler = reduction.ForkingPickler try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE except ImportError: if sys.platform == 'win32': raise _winapi = None # # # BUFSIZE = 8192 # A very generous timeout when it comes to local connections... CONNECTION_TIMEOUT = 20. # The hmac module implicitly defaults to using MD5. # Support using a stronger algorithm for the challenge/response code: HMAC_DIGEST_NAME='sha256' _mmap_counter = itertools.count() default_family = 'AF_INET' families = ['AF_INET'] if hasattr(socket, 'AF_UNIX'): default_family = 'AF_UNIX' families += ['AF_UNIX'] if sys.platform == 'win32': default_family = 'AF_PIPE' families += ['AF_PIPE'] def _init_timeout(timeout=CONNECTION_TIMEOUT): return time.monotonic() + timeout def _check_timeout(t): return time.monotonic() > t # # # def arbitrary_address(family): ''' Return an arbitrary free address for the given family ''' if family == 'AF_INET': return ('localhost', 0) elif family == 'AF_UNIX': return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) elif family == 'AF_PIPE': return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % (os.getpid(), next(_mmap_counter)), dir="") else: raise ValueError('unrecognized family') def _validate_family(family): ''' Checks if the family is valid for the current environment. ''' if sys.platform != 'win32' and family == 'AF_PIPE': raise ValueError('Family %s is not recognized.' % family) if sys.platform == 'win32' and family == 'AF_UNIX': # double check if not hasattr(socket, family): raise ValueError('Family %s is not recognized.' % family) def address_type(address): ''' Return the types of the address This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE' ''' if type(address) == tuple: return 'AF_INET' elif type(address) is str and address.startswith('\\\\'): return 'AF_PIPE' elif type(address) is str: return 'AF_UNIX' else: raise ValueError('address type of %r unrecognized' % address) # # Connection classes # class _ConnectionBase: _handle = None def __init__(self, handle, readable=True, writable=True): handle = handle.__index__() if handle < 0: raise ValueError("invalid handle") if not readable and not writable: raise ValueError( "at least one of `readable` and `writable` must be True") self._handle = handle self._readable = readable self._writable = writable # XXX should we use util.Finalize instead of a __del__? def __del__(self): if self._handle is not None: self._close() def _check_closed(self): if self._handle is None: raise OSError("handle is closed") def _check_readable(self): if not self._readable: raise OSError("connection is write-only") def _check_writable(self): if not self._writable: raise OSError("connection is read-only") def _bad_message_length(self): if self._writable: self._readable = False else: self.close() raise OSError("bad message length") @property def closed(self): """True if the connection is closed""" return self._handle is None @property def readable(self): """True if the connection is readable""" return self._readable @property def writable(self): """True if the connection is writable""" return self._writable def fileno(self): """File descriptor or handle of the connection""" self._check_closed() return self._handle def close(self): """Close the connection""" if self._handle is not None: try: self._close() finally: self._handle = None def send_bytes(self, buf, offset=0, size=None): """Send the bytes data from a bytes-like object""" self._check_closed() self._check_writable() m = memoryview(buf) # HACK for byte-indexing of non-bytewise buffers (e.g. array.array) if m.itemsize > 1: m = memoryview(bytes(m)) n = len(m) if offset < 0: raise ValueError("offset is negative") if n < offset: raise ValueError("buffer length < offset") if size is None: size = n - offset elif size < 0: raise ValueError("size is negative") elif offset + size > n: raise ValueError("buffer length < offset + size") self._send_bytes(m[offset:offset + size]) def send(self, obj): """Send a (picklable) object""" self._check_closed() self._check_writable() self._send_bytes(_ForkingPickler.dumps(obj)) def recv_bytes(self, maxlength=None): """ Receive bytes data as a bytes object. """ self._check_closed() self._check_readable() if maxlength is not None and maxlength < 0: raise ValueError("negative maxlength") buf = self._recv_bytes(maxlength) if buf is None: self._bad_message_length() return buf.getvalue() def recv_bytes_into(self, buf, offset=0): """ Receive bytes data into a writeable bytes-like object. Return the number of bytes read. """ self._check_closed() self._check_readable() with memoryview(buf) as m: # Get bytesize of arbitrary buffer itemsize = m.itemsize bytesize = itemsize * len(m) if offset < 0: raise ValueError("negative offset") elif offset > bytesize: raise ValueError("offset too large") result = self._recv_bytes() size = result.tell() if bytesize < offset + size: raise BufferTooShort(result.getvalue()) # Message can fit in dest result.seek(0) result.readinto(m[offset // itemsize : (offset + size) // itemsize]) return size def recv(self): """Receive a (picklable) object""" self._check_closed() self._check_readable() buf = self._recv_bytes() return _ForkingPickler.loads(buf.getbuffer()) def poll(self, timeout=0.0): """Whether there is any input available to be read""" self._check_closed() self._check_readable() return self._poll(timeout) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() if _winapi: class PipeConnection(_ConnectionBase): """ Connection class based on a Windows named pipe. Overlapped I/O is used, so the handles must have been created with FILE_FLAG_OVERLAPPED. """ _got_empty_message = False def _close(self, _CloseHandle=_winapi.CloseHandle): _CloseHandle(self._handle) def _send_bytes(self, buf): ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True) try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) assert waitres == WAIT_OBJECT_0 except: ov.cancel() raise finally: nwritten, err = ov.GetOverlappedResult(True) assert err == 0 assert nwritten == len(buf) def _recv_bytes(self, maxsize=None): if self._got_empty_message: self._got_empty_message = False return io.BytesIO() else: bsize = 128 if maxsize is None else min(maxsize, 128) try: ov, err = _winapi.ReadFile(self._handle, bsize, overlapped=True) try: if err == _winapi.ERROR_IO_PENDING: waitres = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) assert waitres == WAIT_OBJECT_0 except: ov.cancel() raise finally: nread, err = ov.GetOverlappedResult(True) if err == 0: f = io.BytesIO() f.write(ov.getbuffer()) return f elif err == _winapi.ERROR_MORE_DATA: return self._get_more_data(ov, maxsize) except OSError as e: if e.winerror == _winapi.ERROR_BROKEN_PIPE: raise EOFError else: raise raise RuntimeError("shouldn't get here; expected KeyboardInterrupt") def _poll(self, timeout): if (self._got_empty_message or _winapi.PeekNamedPipe(self._handle)[0] != 0): return True return bool(wait([self], timeout)) def _get_more_data(self, ov, maxsize): buf = ov.getbuffer() f = io.BytesIO() f.write(buf) left = _winapi.PeekNamedPipe(self._handle)[1] assert left > 0 if maxsize is not None and len(buf) + left > maxsize: self._bad_message_length() ov, err = _winapi.ReadFile(self._handle, left, overlapped=True) rbytes, err = ov.GetOverlappedResult(True) assert err == 0 assert rbytes == left f.write(ov.getbuffer()) return f class Connection(_ConnectionBase): """ Connection class based on an arbitrary file descriptor (Unix only), or a socket handle (Windows). """ if _winapi: def _close(self, _close=_multiprocessing.closesocket): _close(self._handle) _write = _multiprocessing.send _read = _multiprocessing.recv else: def _close(self, _close=os.close): _close(self._handle) _write = os.write _read = os.read def _send(self, buf, write=_write): remaining = len(buf) while True: n = write(self._handle, buf) remaining -= n if remaining == 0: break buf = buf[n:] def _recv(self, size, read=_read): buf = io.BytesIO() handle = self._handle remaining = size while remaining > 0: chunk = read(handle, remaining) n = len(chunk) if n == 0: if remaining == size: raise EOFError else: raise OSError("got end of file during message") buf.write(chunk) remaining -= n return buf def _send_bytes(self, buf): n = len(buf) # For wire compatibility with 3.2 and lower header = struct.pack("!i", n) if n > 16384: # The payload is large so Nagle's algorithm won't be triggered # and we'd better avoid the cost of concatenation. self._send(header) self._send(buf) else: # Issue #20540: concatenate before sending, to avoid delays due # to Nagle's algorithm on a TCP socket. # Also note we want to avoid sending a 0-length buffer separately, # to avoid "broken pipe" errors if the other end closed the pipe. self._send(header + buf) def _recv_bytes(self, maxsize=None): buf = self._recv(4) size, = struct.unpack("!i", buf.getvalue()) if maxsize is not None and size > maxsize: return None return self._recv(size) def _poll(self, timeout): r = wait([self], timeout) return bool(r) # # Public functions # class Listener(object): ''' Returns a listener object. This is a wrapper for a bound socket which is 'listening' for connections, or for a Windows named pipe. ''' def __init__(self, address=None, family=None, backlog=1, authkey=None): family = family or (address and address_type(address)) \ or default_family address = address or arbitrary_address(family) _validate_family(family) if family == 'AF_PIPE': self._listener = PipeListener(address, backlog) else: self._listener = SocketListener(address, family, backlog) if authkey is not None and not isinstance(authkey, bytes): raise TypeError('authkey should be a byte string') self._authkey = authkey def accept(self): ''' Accept a connection on the bound socket or named pipe of `self`. Returns a `Connection` object. ''' if self._listener is None: raise OSError('listener is closed') c = self._listener.accept() if self._authkey: deliver_challenge(c, self._authkey) answer_challenge(c, self._authkey) return c def close(self): ''' Close the bound socket or named pipe of `self`. ''' listener = self._listener if listener is not None: self._listener = None listener.close() address = property(lambda self: self._listener._address) last_accepted = property(lambda self: self._listener._last_accepted) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() def Client(address, family=None, authkey=None): ''' Returns a connection to the address of a `Listener` ''' family = family or address_type(address) _validate_family(family) if family == 'AF_PIPE': c = PipeClient(address) else: c = SocketClient(address) if authkey is not None and not isinstance(authkey, bytes): raise TypeError('authkey should be a byte string') if authkey is not None: answer_challenge(c, authkey) deliver_challenge(c, authkey) return c if sys.platform != 'win32': def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: fd1, fd2 = os.pipe() c1 = Connection(fd1, writable=False) c2 = Connection(fd2, readable=False) return c1, c2 else: def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' address = arbitrary_address('AF_PIPE') if duplex: openmode = _winapi.PIPE_ACCESS_DUPLEX access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: openmode = _winapi.PIPE_ACCESS_INBOUND access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE h1 = _winapi.CreateNamedPipe( address, openmode | _winapi.FILE_FLAG_OVERLAPPED | _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE, _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_WAIT, 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, # default security descriptor: the handle cannot be inherited _winapi.NULL ) h2 = _winapi.CreateFile( address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) _winapi.SetNamedPipeHandleState( h2, _winapi.PIPE_READMODE_MESSAGE, None, None ) overlapped = _winapi.ConnectNamedPipe(h1, overlapped=True) _, err = overlapped.GetOverlappedResult(True) assert err == 0 c1 = PipeConnection(h1, writable=duplex) c2 = PipeConnection(h2, readable=duplex) return c1, c2 # # Definitions for connections based on sockets # class SocketListener(object): ''' Representation of a socket which is bound to an address and listening ''' def __init__(self, address, family, backlog=1): self._socket = socket.socket(getattr(socket, family)) try: # SO_REUSEADDR has different semantics on Windows (issue #2550). if os.name == 'posix': self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setblocking(True) self._socket.bind(address) self._socket.listen(backlog) self._address = self._socket.getsockname() except OSError: self._socket.close() raise self._family = family self._last_accepted = None if family == 'AF_UNIX': self._unlink = util.Finalize( self, os.unlink, args=(address,), exitpriority=0 ) else: self._unlink = None def accept(self): s, self._last_accepted = self._socket.accept() s.setblocking(True) return Connection(s.detach()) def close(self): try: self._socket.close() finally: unlink = self._unlink if unlink is not None: self._unlink = None unlink() def SocketClient(address): ''' Return a connection object connected to the socket given by `address` ''' family = address_type(address) with socket.socket( getattr(socket, family) ) as s: s.setblocking(True) s.connect(address) return Connection(s.detach()) # # Definitions for connections based on named pipes # if sys.platform == 'win32': class PipeListener(object): ''' Representation of a named pipe ''' def __init__(self, address, backlog=None): self._address = address self._handle_queue = [self._new_handle(first=True)] self._last_accepted = None util.sub_debug('listener created with address=%r', self._address) self.close = util.Finalize( self, PipeListener._finalize_pipe_listener, args=(self._handle_queue, self._address), exitpriority=0 ) def _new_handle(self, first=False): flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED if first: flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE return _winapi.CreateNamedPipe( self._address, flags, _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE | _winapi.PIPE_WAIT, _winapi.PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL ) def accept(self): self._handle_queue.append(self._new_handle()) handle = self._handle_queue.pop(0) try: ov = _winapi.ConnectNamedPipe(handle, overlapped=True) except OSError as e: if e.winerror != _winapi.ERROR_NO_DATA: raise # ERROR_NO_DATA can occur if a client has already connected, # written data and then disconnected -- see Issue 14725. else: try: res = _winapi.WaitForMultipleObjects( [ov.event], False, INFINITE) except: ov.cancel() _winapi.CloseHandle(handle) raise finally: _, err = ov.GetOverlappedResult(True) assert err == 0 return PipeConnection(handle) @staticmethod def _finalize_pipe_listener(queue, address): util.sub_debug('closing listener with address=%r', address) for handle in queue: _winapi.CloseHandle(handle) def PipeClient(address): ''' Return a connection object connected to the pipe given by `address` ''' t = _init_timeout() while 1: try: _winapi.WaitNamedPipe(address, 1000) h = _winapi.CreateFile( address, _winapi.GENERIC_READ | _winapi.GENERIC_WRITE, 0, _winapi.NULL, _winapi.OPEN_EXISTING, _winapi.FILE_FLAG_OVERLAPPED, _winapi.NULL ) except OSError as e: if e.winerror not in (_winapi.ERROR_SEM_TIMEOUT, _winapi.ERROR_PIPE_BUSY) or _check_timeout(t): raise else: break else: raise _winapi.SetNamedPipeHandleState( h, _winapi.PIPE_READMODE_MESSAGE, None, None ) return PipeConnection(h) # # Authentication stuff # MESSAGE_LENGTH = 20 CHALLENGE = b'#CHALLENGE#' WELCOME = b'#WELCOME#' FAILURE = b'#FAILURE#' def deliver_challenge(connection, authkey): import hmac assert isinstance(authkey, bytes) message = os.urandom(MESSAGE_LENGTH) connection.send_bytes(CHALLENGE + message) digest = hmac.new(authkey, message, HMAC_DIGEST_NAME).digest() response = connection.recv_bytes(256) # reject large message if response == digest: connection.send_bytes(WELCOME) else: connection.send_bytes(FAILURE) raise AuthenticationError('digest received was wrong') def answer_challenge(connection, authkey): import hmac assert isinstance(authkey, bytes) message = connection.recv_bytes(256) # reject large message assert message[:len(CHALLENGE)] == CHALLENGE, 'message = %r' % message message = message[len(CHALLENGE):] digest = hmac.new(authkey, message, HMAC_DIGEST_NAME).digest() connection.send_bytes(digest) response = connection.recv_bytes(256) # reject large message if response != WELCOME: raise AuthenticationError('digest sent was rejected') # # Support for using xmlrpclib for serialization # class ConnectionWrapper(object): def __init__(self, conn, dumps, loads): self._conn = conn self._dumps = dumps self._loads = loads for attr in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'): obj = getattr(conn, attr) setattr(self, attr, obj) def send(self, obj): s = self._dumps(obj) self._conn.send_bytes(s) def recv(self): s = self._conn.recv_bytes() return self._loads(s) def _xml_dumps(obj): return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf-8') def _xml_loads(s): (obj,), method = xmlrpclib.loads(s.decode('utf-8')) return obj class XmlListener(Listener): def accept(self): global xmlrpclib import xmlrpc.client as xmlrpclib obj = Listener.accept(self) return ConnectionWrapper(obj, _xml_dumps, _xml_loads) def XmlClient(*args, **kwds): global xmlrpclib import xmlrpc.client as xmlrpclib return ConnectionWrapper(Client(*args, **kwds), _xml_dumps, _xml_loads) # # Wait # if sys.platform == 'win32': def _exhaustive_wait(handles, timeout): # Return ALL handles which are currently signalled. (Only # returning the first signalled might create starvation issues.) L = list(handles) ready = [] while L: res = _winapi.WaitForMultipleObjects(L, False, timeout) if res == WAIT_TIMEOUT: break elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L): res -= WAIT_OBJECT_0 elif WAIT_ABANDONED_0 <= res < WAIT_ABANDONED_0 + len(L): res -= WAIT_ABANDONED_0 else: raise RuntimeError('Should not get here') ready.append(L[res]) L = L[res+1:] timeout = 0 return ready _ready_errors = {_winapi.ERROR_BROKEN_PIPE, _winapi.ERROR_NETNAME_DELETED} def wait(object_list, timeout=None): ''' Wait till an object in object_list is ready/readable. Returns list of those objects in object_list which are ready/readable. ''' if timeout is None: timeout = INFINITE elif timeout < 0: timeout = 0 else: timeout = int(timeout * 1000 + 0.5) object_list = list(object_list) waithandle_to_obj = {} ov_list = [] ready_objects = set() ready_handles = set() try: for o in object_list: try: fileno = getattr(o, 'fileno') except AttributeError: waithandle_to_obj[o.__index__()] = o else: # start an overlapped read of length zero try: ov, err = _winapi.ReadFile(fileno(), 0, True) except OSError as e: ov, err = None, e.winerror if err not in _ready_errors: raise if err == _winapi.ERROR_IO_PENDING: ov_list.append(ov) waithandle_to_obj[ov.event] = o else: # If o.fileno() is an overlapped pipe handle and # err == 0 then there is a zero length message # in the pipe, but it HAS NOT been consumed... if ov and sys.getwindowsversion()[:2] >= (6, 2): # ... except on Windows 8 and later, where # the message HAS been consumed. try: _, err = ov.GetOverlappedResult(False) except OSError as e: err = e.winerror if not err and hasattr(o, '_got_empty_message'): o._got_empty_message = True ready_objects.add(o) timeout = 0 ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout) finally: # request that overlapped reads stop for ov in ov_list: ov.cancel() # wait for all overlapped reads to stop for ov in ov_list: try: _, err = ov.GetOverlappedResult(True) except OSError as e: err = e.winerror if err not in _ready_errors: raise if err != _winapi.ERROR_OPERATION_ABORTED: o = waithandle_to_obj[ov.event] ready_objects.add(o) if err == 0: # If o.fileno() is an overlapped pipe handle then # a zero length message HAS been consumed. if hasattr(o, '_got_empty_message'): o._got_empty_message = True ready_objects.update(waithandle_to_obj[h] for h in ready_handles) return [o for o in object_list if o in ready_objects] else: import selectors # poll/select have the advantage of not requiring any extra file # descriptor, contrarily to epoll/kqueue (also, they require a single # syscall). if hasattr(selectors, 'PollSelector'): _WaitSelector = selectors.PollSelector else: _WaitSelector = selectors.SelectSelector def wait(object_list, timeout=None): ''' Wait till an object in object_list is ready/readable. Returns list of those objects in object_list which are ready/readable. ''' with _WaitSelector() as selector: for obj in object_list: selector.register(obj, selectors.EVENT_READ) if timeout is not None: deadline = time.monotonic() + timeout while True: ready = selector.select(timeout) if ready: return [key.fileobj for (key, events) in ready] else: if timeout is not None: timeout = deadline - time.monotonic() if timeout < 0: return ready # # Make connection and socket objects sharable if possible # if sys.platform == 'win32': def reduce_connection(conn): handle = conn.fileno() with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s: from . import resource_sharer ds = resource_sharer.DupSocket(s) return rebuild_connection, (ds, conn.readable, conn.writable) def rebuild_connection(ds, readable, writable): sock = ds.detach() return Connection(sock.detach(), readable, writable) reduction.register(Connection, reduce_connection) def reduce_pipe_connection(conn): access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) | (_winapi.FILE_GENERIC_WRITE if conn.writable else 0)) dh = reduction.DupHandle(conn.fileno(), access) return rebuild_pipe_connection, (dh, conn.readable, conn.writable) def rebuild_pipe_connection(dh, readable, writable): handle = dh.detach() return PipeConnection(handle, readable, writable) reduction.register(PipeConnection, reduce_pipe_connection) else: def reduce_connection(conn): df = reduction.DupFd(conn.fileno()) return rebuild_connection, (df, conn.readable, conn.writable) def rebuild_connection(df, readable, writable): fd = df.detach() return Connection(fd, readable, writable) reduction.register(Connection, reduce_connection) util.py 0000644 00000027156 15204243002 0006076 0 ustar 00 # # Module providing various facilities to other parts of the package # # multiprocessing/util.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # import os import itertools import sys import weakref import atexit import threading # we want threading to install it's # cleanup function before multiprocessing does from subprocess import _args_from_interpreter_flags from . import process __all__ = [ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal', 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING', ] # # Logging # NOTSET = 0 SUBDEBUG = 5 DEBUG = 10 INFO = 20 SUBWARNING = 25 LOGGER_NAME = 'multiprocessing' DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' _logger = None _log_to_stderr = False def sub_debug(msg, *args): if _logger: _logger.log(SUBDEBUG, msg, *args) def debug(msg, *args): if _logger: _logger.log(DEBUG, msg, *args) def info(msg, *args): if _logger: _logger.log(INFO, msg, *args) def sub_warning(msg, *args): if _logger: _logger.log(SUBWARNING, msg, *args) def get_logger(): ''' Returns logger used by multiprocessing ''' global _logger import logging logging._acquireLock() try: if not _logger: _logger = logging.getLogger(LOGGER_NAME) _logger.propagate = 0 # XXX multiprocessing should cleanup before logging if hasattr(atexit, 'unregister'): atexit.unregister(_exit_function) atexit.register(_exit_function) else: atexit._exithandlers.remove((_exit_function, (), {})) atexit._exithandlers.append((_exit_function, (), {})) finally: logging._releaseLock() return _logger def log_to_stderr(level=None): ''' Turn on logging and add a handler which prints to stderr ''' global _log_to_stderr import logging logger = get_logger() formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) if level: logger.setLevel(level) _log_to_stderr = True return _logger # # Function returning a temp directory which will be removed on exit # def get_temp_dir(): # get name of a temp directory which will be automatically cleaned up tempdir = process.current_process()._config.get('tempdir') if tempdir is None: import shutil, tempfile tempdir = tempfile.mkdtemp(prefix='pymp-') info('created temp directory %s', tempdir) Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) process.current_process()._config['tempdir'] = tempdir return tempdir # # Support for reinitialization of objects when bootstrapping a child process # _afterfork_registry = weakref.WeakValueDictionary() _afterfork_counter = itertools.count() def _run_after_forkers(): items = list(_afterfork_registry.items()) items.sort() for (index, ident, func), obj in items: try: func(obj) except Exception as e: info('after forker raised exception %s', e) def register_after_fork(obj, func): _afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj # # Finalization using weakrefs # _finalizer_registry = {} _finalizer_counter = itertools.count() class Finalize(object): ''' Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): assert exitpriority is None or type(exitpriority) is int if obj is not None: self._weakref = weakref.ref(obj, self) else: assert exitpriority is not None self._callback = callback self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, next(_finalizer_counter)) self._pid = os.getpid() _finalizer_registry[self._key] = self def __call__(self, wr=None, # Need to bind these locally because the globals can have # been cleared at shutdown _finalizer_registry=_finalizer_registry, sub_debug=sub_debug, getpid=os.getpid): ''' Run the callback unless it has already been called or cancelled ''' try: del _finalizer_registry[self._key] except KeyError: sub_debug('finalizer no longer registered') else: if self._pid != getpid(): sub_debug('finalizer ignored because different process') res = None else: sub_debug('finalizer calling %s with args %s and kwargs %s', self._callback, self._args, self._kwargs) res = self._callback(*self._args, **self._kwargs) self._weakref = self._callback = self._args = \ self._kwargs = self._key = None return res def cancel(self): ''' Cancel finalization of the object ''' try: del _finalizer_registry[self._key] except KeyError: pass else: self._weakref = self._callback = self._args = \ self._kwargs = self._key = None def still_active(self): ''' Return whether this finalizer is still waiting to invoke callback ''' return self._key in _finalizer_registry def __repr__(self): try: obj = self._weakref() except (AttributeError, TypeError): obj = None if obj is None: return '<%s object, dead>' % self.__class__.__name__ x = '<%s object, callback=%s' % ( self.__class__.__name__, getattr(self._callback, '__name__', self._callback)) if self._args: x += ', args=' + str(self._args) if self._kwargs: x += ', kwargs=' + str(self._kwargs) if self._key[0] is not None: x += ', exitprority=' + str(self._key[0]) return x + '>' def _run_finalizers(minpriority=None): ''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. ''' if _finalizer_registry is None: # This function may be called after this module's globals are # destroyed. See the _exit_function function in this module for more # notes. return if minpriority is None: f = lambda p : p[0] is not None else: f = lambda p : p[0] is not None and p[0] >= minpriority # Careful: _finalizer_registry may be mutated while this function # is running (either by a GC run or by another thread). # list(_finalizer_registry) should be atomic, while # list(_finalizer_registry.items()) is not. keys = [key for key in list(_finalizer_registry) if f(key)] keys.sort(reverse=True) for key in keys: finalizer = _finalizer_registry.get(key) # key may have been removed from the registry if finalizer is not None: sub_debug('calling %s', finalizer) try: finalizer() except Exception: import traceback traceback.print_exc() if minpriority is None: _finalizer_registry.clear() # # Clean up on exit # def is_exiting(): ''' Returns true if the process is shutting down ''' return _exiting or _exiting is None _exiting = False def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, active_children=process.active_children, current_process=process.current_process): # We hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module's globals are destroyed. global _exiting if not _exiting: _exiting = True info('process shutting down') debug('running all "atexit" finalizers with priority >= 0') _run_finalizers(0) if current_process() is not None: # We check if the current process is None here because if # it's None, any call to ``active_children()`` will raise # an AttributeError (active_children winds up trying to # get attributes from util._current_process). One # situation where this can happen is if someone has # manipulated sys.modules, causing this module to be # garbage collected. The destructor for the module type # then replaces all values in the module dict with None. # For instance, after setuptools runs a test it replaces # sys.modules with a copy created earlier. See issues # #9775 and #15881. Also related: #4106, #9205, and # #9207. for p in active_children(): if p.daemon: info('calling terminate() for daemon %s', p.name) p._popen.terminate() for p in active_children(): info('calling join() for process %s', p.name) p.join() debug('running the remaining "atexit" finalizers') _run_finalizers() atexit.register(_exit_function) # # Some fork aware types # class ForkAwareThreadLock(object): def __init__(self): self._reset() register_after_fork(self, ForkAwareThreadLock._reset) def _reset(self): self._lock = threading.Lock() self.acquire = self._lock.acquire self.release = self._lock.release def __enter__(self): return self._lock.__enter__() def __exit__(self, *args): return self._lock.__exit__(*args) class ForkAwareLocal(threading.local): def __init__(self): register_after_fork(self, lambda obj : obj.__dict__.clear()) def __reduce__(self): return type(self), () # # Close fds except those specified # try: MAXFD = os.sysconf("SC_OPEN_MAX") except Exception: MAXFD = 256 def close_all_fds_except(fds): fds = list(fds) + [-1, MAXFD] fds.sort() assert fds[-1] == MAXFD, 'fd too large' for i in range(len(fds) - 1): os.closerange(fds[i]+1, fds[i+1]) # # Close sys.stdin and replace stdin with os.devnull # def _close_stdin(): if sys.stdin is None: return try: sys.stdin.close() except (OSError, ValueError): pass try: fd = os.open(os.devnull, os.O_RDONLY) try: sys.stdin = open(fd, closefd=False) except: os.close(fd) raise except (OSError, ValueError): pass # # Flush standard streams, if any # def _flush_std_streams(): try: sys.stdout.flush() except (AttributeError, ValueError): pass try: sys.stderr.flush() except (AttributeError, ValueError): pass # # Start a program with only specified fds kept open # def spawnv_passfds(path, args, passfds): import _posixsubprocess passfds = tuple(sorted(map(int, passfds))) errpipe_read, errpipe_write = os.pipe() try: return _posixsubprocess.fork_exec( args, [os.fsencode(path)], True, passfds, None, None, -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write, False, False, None) finally: os.close(errpipe_read) os.close(errpipe_write) heap.py 0000644 00000020177 15204243002 0006032 0 ustar 00 # # Module which supports allocation of memory from an mmap # # multiprocessing/heap.py # # Copyright (c) 2006-2008, R Oudkerk # Licensed to PSF under a Contributor Agreement. # import bisect import mmap import os import sys import tempfile import threading from .context import reduction, assert_spawning from . import util __all__ = ['BufferWrapper'] # # Inheritable class which wraps an mmap, and from which blocks can be allocated # if sys.platform == 'win32': import _winapi class Arena(object): _rand = tempfile._RandomNameSequence() def __init__(self, size): self.size = size for i in range(100): name = 'pym-%d-%s' % (os.getpid(), next(self._rand)) buf = mmap.mmap(-1, size, tagname=name) if _winapi.GetLastError() == 0: break # We have reopened a preexisting mmap. buf.close() else: raise FileExistsError('Cannot find name for new mmap') self.name = name self.buffer = buf self._state = (self.size, self.name) def __getstate__(self): assert_spawning(self) return self._state def __setstate__(self, state): self.size, self.name = self._state = state self.buffer = mmap.mmap(-1, self.size, tagname=self.name) # XXX Temporarily preventing buildbot failures while determining # XXX the correct long-term fix. See issue 23060 #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS else: class Arena(object): def __init__(self, size, fd=-1): self.size = size self.fd = fd if fd == -1: self.fd, name = tempfile.mkstemp( prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir()) os.unlink(name) util.Finalize(self, os.close, (self.fd,)) with open(self.fd, 'wb', closefd=False) as f: bs = 1024 * 1024 if size >= bs: zeros = b'\0' * bs for _ in range(size // bs): f.write(zeros) del zeros f.write(b'\0' * (size % bs)) assert f.tell() == size self.buffer = mmap.mmap(self.fd, self.size) def reduce_arena(a): if a.fd == -1: raise ValueError('Arena is unpicklable because ' 'forking was enabled when it was created') return rebuild_arena, (a.size, reduction.DupFd(a.fd)) def rebuild_arena(size, dupfd): return Arena(size, dupfd.detach()) reduction.register(Arena, reduce_arena) # # Class allowing allocation of chunks of memory from arenas # class Heap(object): _alignment = 8 def __init__(self, size=mmap.PAGESIZE): self._lastpid = os.getpid() self._lock = threading.Lock() self._size = size self._lengths = [] self._len_to_seq = {} self._start_to_block = {} self._stop_to_block = {} self._allocated_blocks = set() self._arenas = [] # list of pending blocks to free - see free() comment below self._pending_free_blocks = [] @staticmethod def _roundup(n, alignment): # alignment must be a power of 2 mask = alignment - 1 return (n + mask) & ~mask def _malloc(self, size): # returns a large enough block -- it might be much larger i = bisect.bisect_left(self._lengths, size) if i == len(self._lengths): length = self._roundup(max(self._size, size), mmap.PAGESIZE) self._size *= 2 util.info('allocating a new mmap of length %d', length) arena = Arena(length) self._arenas.append(arena) return (arena, 0, length) else: length = self._lengths[i] seq = self._len_to_seq[length] block = seq.pop() if not seq: del self._len_to_seq[length], self._lengths[i] (arena, start, stop) = block del self._start_to_block[(arena, start)] del self._stop_to_block[(arena, stop)] return block def _free(self, block): # free location and try to merge with neighbours (arena, start, stop) = block try: prev_block = self._stop_to_block[(arena, start)] except KeyError: pass else: start, _ = self._absorb(prev_block) try: next_block = self._start_to_block[(arena, stop)] except KeyError: pass else: _, stop = self._absorb(next_block) block = (arena, start, stop) length = stop - start try: self._len_to_seq[length].append(block) except KeyError: self._len_to_seq[length] = [block] bisect.insort(self._lengths, length) self._start_to_block[(arena, start)] = block self._stop_to_block[(arena, stop)] = block def _absorb(self, block): # deregister this block so it can be merged with a neighbour (arena, start, stop) = block del self._start_to_block[(arena, start)] del self._stop_to_block[(arena, stop)] length = stop - start seq = self._len_to_seq[length] seq.remove(block) if not seq: del self._len_to_seq[length] self._lengths.remove(length) return start, stop def _free_pending_blocks(self): # Free all the blocks in the pending list - called with the lock held. while True: try: block = self._pending_free_blocks.pop() except IndexError: break self._allocated_blocks.remove(block) self._free(block) def free(self, block): # free a block returned by malloc() # Since free() can be called asynchronously by the GC, it could happen # that it's called while self._lock is held: in that case, # self._lock.acquire() would deadlock (issue #12352). To avoid that, a # trylock is used instead, and if the lock can't be acquired # immediately, the block is added to a list of blocks to be freed # synchronously sometimes later from malloc() or free(), by calling # _free_pending_blocks() (appending and retrieving from a list is not # strictly thread-safe but under cPython it's atomic thanks to the GIL). assert os.getpid() == self._lastpid if not self._lock.acquire(False): # can't acquire the lock right now, add the block to the list of # pending blocks to free self._pending_free_blocks.append(block) else: # we hold the lock try: self._free_pending_blocks() self._allocated_blocks.remove(block) self._free(block) finally: self._lock.release() def malloc(self, size): # return a block of right size (possibly rounded up) assert 0 <= size < sys.maxsize if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork with self._lock: self._free_pending_blocks() size = self._roundup(max(size,1), self._alignment) (arena, start, stop) = self._malloc(size) new_stop = start + size if new_stop < stop: self._free((arena, new_stop, stop)) block = (arena, start, new_stop) self._allocated_blocks.add(block) return block # # Class representing a chunk of an mmap -- can be inherited by child process # class BufferWrapper(object): _heap = Heap() def __init__(self, size): assert 0 <= size < sys.maxsize block = BufferWrapper._heap.malloc(size) self._state = (block, size) util.Finalize(self, BufferWrapper._heap.free, args=(block,)) def create_memoryview(self): (arena, start, stop), size = self._state return memoryview(arena.buffer)[start:start+size] __pycache__/pool.cpython-36.opt-1.pyc 0000644 00000050126 15204243002 0013306 0 ustar 00 3 \�e � @ s8 d dgZ ddlZddlZddlZddlZddlZddlZddlZddlm Z ddlm Z mZ dZdZ dZej� Zdd � Zd d� ZG dd � d e�ZG dd� d�Zdd� ZG dd� de�Zdf ddfdd�Zdd� ZG dd � d e�ZG dd� de�ZeZG dd� de�ZG dd� de�ZG d d!� d!e�Z G d"d� de�Z!dS )#�Pool� ThreadPool� N� )�util)�get_context�TimeoutError� c C s t t| � �S )N)�list�map)�args� r �,/usr/lib64/python3.6/multiprocessing/pool.py�mapstar+ s r c C s t tj| d | d ��S )Nr r )r � itertools�starmap)r r r r �starmapstar. s r c @ s e Zd Zdd� Zdd� ZdS )�RemoteTracebackc C s || _ d S )N)�tb)�selfr r r r �__init__6 s zRemoteTraceback.__init__c C s | j S )N)r )r r r r �__str__8 s zRemoteTraceback.__str__N)�__name__� __module__�__qualname__r r r r r r r 5 s r c @ s e Zd Zdd� Zdd� ZdS )�ExceptionWithTracebackc C s0 t jt|�||�}dj|�}|| _d| | _d S )N� z """ %s""")� traceback�format_exception�type�join�excr )r r r r r r r <