aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Erdmann <dywi@mailerd.de>2013-08-22 10:50:28 +0200
committerAndré Erdmann <dywi@mailerd.de>2013-08-22 10:50:28 +0200
commit2f5e40896c821d2a52c3ef53163a953217b0a366 (patch)
treeb7245e41027ac90df8793b5b52ac9458fdb3d746 /roverlay/util
parentroverlay: add distmap_rebuild command (diff)
downloadR_overlay-2f5e40896c821d2a52c3ef53163a953217b0a366.tar.gz
R_overlay-2f5e40896c821d2a52c3ef53163a953217b0a366.tar.bz2
R_overlay-2f5e40896c821d2a52c3ef53163a953217b0a366.zip
hashpool: fixup
* run properly in no-thread/mutliproc mode * check if thread/proc executor has been aborted * HashFunction can now be pickled * run_as_completed() function for getting results as soon as they're available
Diffstat (limited to 'roverlay/util')
-rw-r--r--roverlay/util/hashpool.py103
1 files changed, 84 insertions, 19 deletions
diff --git a/roverlay/util/hashpool.py b/roverlay/util/hashpool.py
index 70d3d55..61d8336 100644
--- a/roverlay/util/hashpool.py
+++ b/roverlay/util/hashpool.py
@@ -5,6 +5,12 @@
# either version 2 of the License, or (at your option) any later version.
try:
+ import copyreg
+except ImportError:
+ # python 2
+ import copy_reg as copyreg
+
+try:
import concurrent.futures
except ImportError:
import sys
@@ -20,11 +26,31 @@ else:
import roverlay.digest
-def _calculate_hashes ( hash_job, hashes ):
- hash_job.hashdict.update (
- roverlay.digest.multihash_file ( hash_job.filepath, hashes )
- )
-# --- end of _calculate_hashes (...) ---
+class HashFunction ( object ):
+
+ def __init__ ( self, hashes ):
+ super ( HashFunction, self ).__init__()
+ self.hashes = frozenset ( hashes )
+ # --- end of __init__ (...) ---
+
+ def multihash_file ( self, filepath ):
+ return roverlay.digest.multihash_file ( filepath, self.hashes )
+ # --- end of multihash_file (...) ---
+
+ def calculate ( self, hash_job ):
+ hash_job.hashdict.update ( self.multihash_file ( hash_job.filepath ) )
+ return hash_job
+ # --- end of calculate (...) ---
+
+ __call__ = calculate
+
+ def pack ( self ):
+ return ( self.__class__, ( self.hashes, ) )
+ # --- end of pickle (...) ---
+
+# --- end of HashFunction ---
+
+copyreg.pickle ( HashFunction, HashFunction.pack )
class HashJob ( object ):
@@ -37,30 +63,69 @@ class HashJob ( object ):
# --- end of HashJob ---
+
class HashPool ( object ):
- def __init__ ( self, hashes, max_workers ):
+ def __init__ ( self, hashes, max_workers, use_threads=None ):
super ( HashPool, self ).__init__()
- self.hashes = frozenset ( hashes )
+ self.hashfunc = HashFunction ( hashes )
self._jobs = dict()
- self.max_workers = int ( max_workers )
+ self.max_workers = (
+ int ( max_workers ) if max_workers is not None else max_workers
+ )
+
+ if use_threads or use_threads is None:
+ self.executor_cls = concurrent.futures.ThreadPoolExecutor
+ else:
+ self.executor_cls = concurrent.futures.ProcessPoolExecutor
# --- end of __init__ (...) ---
def add ( self, backref, filepath, hashdict=None ):
self._jobs [backref] = HashJob ( filepath, hashdict )
# --- end of add (...) ---
+ def get_executor ( self ):
+ return self.executor_cls ( self.max_workers )
+ # --- end of get_executor (...) ---
+
+ def is_concurrent ( self ):
+ return HAVE_CONCURRENT_FUTURES and (
+ self.max_workers is None or self.max_workers > 0
+ )
+ # --- end of is_concurrent (...) ---
+
+ def run_as_completed ( self ):
+ if self.is_concurrent():
+ with self.get_executor() as exe:
+ for backref, hash_job in zip (
+ self._jobs.keys(),
+ exe.map ( self.hashfunc, self._jobs.values() )
+ ):
+ yield ( backref, hash_job.hashdict )
+ else:
+ for backref, hash_job in self._jobs.items():
+ self.hashfunc.calculate ( hash_job )
+ yield ( backref, hash_job.hashdict )
+ # --- end of run_as_completed (...) ---
+
def run ( self ):
- #with concurrent.futures.ProcessPoolExecutor ( self.max_workers ) as exe:
- with concurrent.futures.ThreadPoolExecutor ( self.max_workers ) as exe:
- running_jobs = frozenset (
- exe.submit ( _calculate_hashes, job, self.hashes )
- for job in self._jobs.values()
- )
-
- # wait
- for finished_job in concurrent.futures.as_completed ( running_jobs ):
- if finished_job.exception() is not None:
- raise finished_job.exception()
+ if self.is_concurrent():
+ with self.get_executor() as exe:
+ running_jobs = frozenset (
+ exe.submit ( self.hashfunc, job )
+ for job in self._jobs.values()
+ )
+
+ # wait
+ for finished_job in (
+ concurrent.futures.as_completed ( running_jobs )
+ ):
+ if finished_job.exception() is not None:
+ raise finished_job.exception()
+ elif finished_job.cancelled():
+ break
+ else:
+ for hash_job in self._jobs.values():
+ self.hashfunc.calculate ( hash_job )
# --- end of run (...) ---
def reset ( self ):