diff options
author | André Erdmann <dywi@mailerd.de> | 2013-08-22 10:50:28 +0200 |
---|---|---|
committer | André Erdmann <dywi@mailerd.de> | 2013-08-22 10:50:28 +0200 |
commit | 2f5e40896c821d2a52c3ef53163a953217b0a366 (patch) | |
tree | b7245e41027ac90df8793b5b52ac9458fdb3d746 /roverlay/util | |
parent | roverlay: add distmap_rebuild command (diff) | |
download | R_overlay-2f5e40896c821d2a52c3ef53163a953217b0a366.tar.gz R_overlay-2f5e40896c821d2a52c3ef53163a953217b0a366.tar.bz2 R_overlay-2f5e40896c821d2a52c3ef53163a953217b0a366.zip |
hashpool: fixup
* run properly in no-thread/mutliproc mode
* check if thread/proc executor has been aborted
* HashFunction can now be pickled
* run_as_completed() function for getting results as soon as they're available
Diffstat (limited to 'roverlay/util')
-rw-r--r-- | roverlay/util/hashpool.py | 103 |
1 files changed, 84 insertions, 19 deletions
diff --git a/roverlay/util/hashpool.py b/roverlay/util/hashpool.py index 70d3d55..61d8336 100644 --- a/roverlay/util/hashpool.py +++ b/roverlay/util/hashpool.py @@ -5,6 +5,12 @@ # either version 2 of the License, or (at your option) any later version. try: + import copyreg +except ImportError: + # python 2 + import copy_reg as copyreg + +try: import concurrent.futures except ImportError: import sys @@ -20,11 +26,31 @@ else: import roverlay.digest -def _calculate_hashes ( hash_job, hashes ): - hash_job.hashdict.update ( - roverlay.digest.multihash_file ( hash_job.filepath, hashes ) - ) -# --- end of _calculate_hashes (...) --- +class HashFunction ( object ): + + def __init__ ( self, hashes ): + super ( HashFunction, self ).__init__() + self.hashes = frozenset ( hashes ) + # --- end of __init__ (...) --- + + def multihash_file ( self, filepath ): + return roverlay.digest.multihash_file ( filepath, self.hashes ) + # --- end of multihash_file (...) --- + + def calculate ( self, hash_job ): + hash_job.hashdict.update ( self.multihash_file ( hash_job.filepath ) ) + return hash_job + # --- end of calculate (...) --- + + __call__ = calculate + + def pack ( self ): + return ( self.__class__, ( self.hashes, ) ) + # --- end of pickle (...) --- + +# --- end of HashFunction --- + +copyreg.pickle ( HashFunction, HashFunction.pack ) class HashJob ( object ): @@ -37,30 +63,69 @@ class HashJob ( object ): # --- end of HashJob --- + class HashPool ( object ): - def __init__ ( self, hashes, max_workers ): + def __init__ ( self, hashes, max_workers, use_threads=None ): super ( HashPool, self ).__init__() - self.hashes = frozenset ( hashes ) + self.hashfunc = HashFunction ( hashes ) self._jobs = dict() - self.max_workers = int ( max_workers ) + self.max_workers = ( + int ( max_workers ) if max_workers is not None else max_workers + ) + + if use_threads or use_threads is None: + self.executor_cls = concurrent.futures.ThreadPoolExecutor + else: + self.executor_cls = concurrent.futures.ProcessPoolExecutor # --- end of __init__ (...) --- def add ( self, backref, filepath, hashdict=None ): self._jobs [backref] = HashJob ( filepath, hashdict ) # --- end of add (...) --- + def get_executor ( self ): + return self.executor_cls ( self.max_workers ) + # --- end of get_executor (...) --- + + def is_concurrent ( self ): + return HAVE_CONCURRENT_FUTURES and ( + self.max_workers is None or self.max_workers > 0 + ) + # --- end of is_concurrent (...) --- + + def run_as_completed ( self ): + if self.is_concurrent(): + with self.get_executor() as exe: + for backref, hash_job in zip ( + self._jobs.keys(), + exe.map ( self.hashfunc, self._jobs.values() ) + ): + yield ( backref, hash_job.hashdict ) + else: + for backref, hash_job in self._jobs.items(): + self.hashfunc.calculate ( hash_job ) + yield ( backref, hash_job.hashdict ) + # --- end of run_as_completed (...) --- + def run ( self ): - #with concurrent.futures.ProcessPoolExecutor ( self.max_workers ) as exe: - with concurrent.futures.ThreadPoolExecutor ( self.max_workers ) as exe: - running_jobs = frozenset ( - exe.submit ( _calculate_hashes, job, self.hashes ) - for job in self._jobs.values() - ) - - # wait - for finished_job in concurrent.futures.as_completed ( running_jobs ): - if finished_job.exception() is not None: - raise finished_job.exception() + if self.is_concurrent(): + with self.get_executor() as exe: + running_jobs = frozenset ( + exe.submit ( self.hashfunc, job ) + for job in self._jobs.values() + ) + + # wait + for finished_job in ( + concurrent.futures.as_completed ( running_jobs ) + ): + if finished_job.exception() is not None: + raise finished_job.exception() + elif finished_job.cancelled(): + break + else: + for hash_job in self._jobs.values(): + self.hashfunc.calculate ( hash_job ) # --- end of run (...) --- def reset ( self ): |