Module bioiain.utilities.parallel
Functions
def end_pools()-
Expand source code
def end_pools(): if len(pools) > 0: log("start", "ENDING POOLS") #print(pools) for pool in pools: #print(pool) pool.terminate() log("end", "POOLS TERMINATED") def mem_log()-
Expand source code
def mem_log(): pool = ThreadPool() pool.add(_indefinite_mem_log) print(pool) pools.append(pool) pool.start(wait=False) def mem_usage(as_dict=False)-
Expand source code
def mem_usage(as_dict=False): if as_dict: return psutil.virtual_memory()._asdict() else: return psutil.virtual_memory().available * 100 / psutil.virtual_memory().total def split_iterable(iterable, n_parts: int | str = 'auto') ‑> list-
Expand source code
def split_iterable(iterable, n_parts:int|str="auto") -> list: if n_parts == "auto": n_parts = avail_cpus elif n_parts == "max": n_parts = cpu_count elif n_parts == "double": n_parts = avail_cpus*2 elif n_parts == "half": n_parts = avail_cpus//2 assert type(n_parts) == int if n_parts <= 1: return [iterable] l = len(iterable) log("header", "Splitting iterable of length:", l) log(1, "Number of parts:", n_parts,) if l <= n_parts: part_size = 1 last_part_size = 0 else: part_size = l//n_parts last_part_size = l%n_parts log(1, "Size of parts:", part_size, f"({last_part_size})" ) print(part_size, last_part_size) out = [] t = type(iterable) for n in range(n_parts): start = part_size*n if n == n_parts -1: if last_part_size == 0: continue end = start + last_part_size else: end = start + part_size if t in (list, tuple, str): out.append([e for e in iterable[start:end]]) elif t == dict: out.append({k:v for k,v in iterable.items()[start:end]}) else: log("error", "Unrecognised iterable type:", t) return iterable return out
Classes
class ThreadPool-
Expand source code
class ThreadPool(object): def __init__(self): self.threads = {} self.running = False self.context = None self.returns = {} pools.append(self) class Thread(threading.Thread): def __init__(self, *args, target=None, ret=None, thread_name=None, **kwargs): self.thread_name = thread_name super().__init__(target=target, args=args, kwargs=kwargs) self.name = self.thread_name self.ret = ret self.error = False class ThreadKilled(BaseException): def __init__(self, *args, **kwargs): super().__init__(self, *args, **kwargs) log("warning", f"Thread Killed:") def run(self, *args, **kwargs): try: r = super().run(*args, **kwargs) self.ret = r return r except: print("ERRRRORRRR in Thread") self.error = True raise return None def get_id(self): if hasattr(self, '_thread_id'): print("TREAD_ID:", self._thread_id) return self._thread_id for id, thread in threading._active.items(): if thread is self: return id def kill(self): ptssae = ctypes.pythonapi.PyThreadState_SetAsyncExc ptssae.argtypes = (ctypes.c_ulong, ctypes.py_object) ptssae.restype = ctypes.c_int thread_id = self.get_id() log("warning", "Trying to kill thread:", thread_id) res = ptssae(thread_id, ctypes.py_object(self.ThreadKilled)) return res def add(self, fun, *args, **kwargs): n = len(self.threads) name = f"Thread {n}" t = self.Thread(*args, target=fun, thread_name=name, **kwargs) self.threads[n] = {"thread": t, "fun": fun, "status": "pending", "ret":t.ret, "name": name} return t def start(self, wait=False, **kwargs): pending_threads = {k: v for k, v in self.threads.items() if v["status"] == "pending"} for k, t in pending_threads.items(): t["thread"].start() t["status"] = "running" log("header", f"ThreadPool: Running {len(pending_threads)} tasks (wait={wait})") if wait: self._await(**kwargs) def _await(self, **kwargs): running_threads = {k:v for k,v in self.threads.items() if v["status"] == "running"} ok = 0 errors = 0 for k, t in running_threads.items(): t["thread"].join() self.returns[t["name"]] = t["ret"] if t["thread"].error: t["status"] = "error" errors += 1 else: t["status"] = "done" ok += 1 log("header", f"Threadpool: Finished {ok + errors} tasks ({errors} errors), returning:") for k, t in running_threads.items(): log(1, f"{k} ({t['status']}): {t['ret']}") pools.remove(self) return self.returns.values() def terminate(self): running_threads = {k:v for k,v in self.threads.items() if v["status"] == "running"} for k, t in running_threads.items(): #print(t["thread"]) t["thread"].kill() self._await()Class variables
var Thread-
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.
Methods
def add(self, fun, *args, **kwargs)-
Expand source code
def add(self, fun, *args, **kwargs): n = len(self.threads) name = f"Thread {n}" t = self.Thread(*args, target=fun, thread_name=name, **kwargs) self.threads[n] = {"thread": t, "fun": fun, "status": "pending", "ret":t.ret, "name": name} return t def start(self, wait=False, **kwargs)-
Expand source code
def start(self, wait=False, **kwargs): pending_threads = {k: v for k, v in self.threads.items() if v["status"] == "pending"} for k, t in pending_threads.items(): t["thread"].start() t["status"] = "running" log("header", f"ThreadPool: Running {len(pending_threads)} tasks (wait={wait})") if wait: self._await(**kwargs) def terminate(self)-
Expand source code
def terminate(self): running_threads = {k:v for k,v in self.threads.items() if v["status"] == "running"} for k, t in running_threads.items(): #print(t["thread"]) t["thread"].kill() self._await()