Module bioiain.utilities.parallel

Functions

def end_pools()
Expand source code
def end_pools():
    if len(pools) > 0:
        log("start", "ENDING POOLS")
        #print(pools)
        for pool in pools:
            #print(pool)
            pool.terminate()

        log("end", "POOLS TERMINATED")
def mem_log()
Expand source code
def mem_log():
    pool = ThreadPool()
    pool.add(_indefinite_mem_log)
    print(pool)
    pools.append(pool)
    pool.start(wait=False)
def mem_usage(as_dict=False)
Expand source code
def mem_usage(as_dict=False):
    if as_dict:
        return psutil.virtual_memory()._asdict()
    else:
        return psutil.virtual_memory().available * 100 / psutil.virtual_memory().total
def split_iterable(iterable, n_parts: int | str = 'auto') ‑> list
Expand source code
def split_iterable(iterable, n_parts:int|str="auto") -> list:

    if n_parts == "auto":
        n_parts = avail_cpus
    elif n_parts == "max":
        n_parts = cpu_count
    elif n_parts == "double":
        n_parts = avail_cpus*2
    elif n_parts == "half":
        n_parts = avail_cpus//2

    assert type(n_parts) == int
    if n_parts <= 1:
        return [iterable]

    l = len(iterable)
    log("header", "Splitting iterable of length:", l)
    log(1, "Number of parts:", n_parts,)
    if l <= n_parts:
        part_size = 1
        last_part_size = 0
    else:
        part_size = l//n_parts
        last_part_size = l%n_parts
    log(1, "Size of parts:", part_size, f"({last_part_size})" )
    print(part_size, last_part_size)
    out = []
    t = type(iterable)

    for n in range(n_parts):
        start = part_size*n
        if n == n_parts -1:
            if last_part_size == 0:
                continue
            end = start + last_part_size
        else:
            end = start + part_size
        if t in (list, tuple, str):
            out.append([e for e in iterable[start:end]])
        elif t == dict:
            out.append({k:v for k,v in iterable.items()[start:end]})
        else:
            log("error", "Unrecognised iterable type:", t)
            return iterable
    return out

Classes

class ThreadPool
Expand source code
class ThreadPool(object):
    def __init__(self):
        self.threads = {}
        self.running = False
        self.context = None
        self.returns = {}
        pools.append(self)



    class Thread(threading.Thread):
        def __init__(self, *args, target=None, ret=None, thread_name=None, **kwargs):
            self.thread_name = thread_name
            super().__init__(target=target, args=args, kwargs=kwargs)
            self.name = self.thread_name
            self.ret = ret

            self.error = False


        class ThreadKilled(BaseException):
            def __init__(self, *args, **kwargs):
                super().__init__(self, *args, **kwargs)
                log("warning", f"Thread Killed:")



        def run(self, *args, **kwargs):
            try:
                r = super().run(*args, **kwargs)
                self.ret = r
                return r
            except:
                print("ERRRRORRRR in Thread")
                self.error = True
                raise
                return None

        def get_id(self):

            if hasattr(self, '_thread_id'):
                print("TREAD_ID:", self._thread_id)
                return self._thread_id
            for id, thread in threading._active.items():
                if thread is self:
                    return id

        def kill(self):
            ptssae = ctypes.pythonapi.PyThreadState_SetAsyncExc
            ptssae.argtypes = (ctypes.c_ulong, ctypes.py_object)
            ptssae.restype = ctypes.c_int
            thread_id = self.get_id()
            log("warning", "Trying to kill thread:", thread_id)
            res = ptssae(thread_id, ctypes.py_object(self.ThreadKilled))

            return res


    def add(self, fun, *args, **kwargs):
        n = len(self.threads)
        name = f"Thread {n}"
        t = self.Thread(*args, target=fun, thread_name=name, **kwargs)
        self.threads[n] = {"thread": t, "fun": fun, "status": "pending", "ret":t.ret, "name": name}
        return t

    def start(self, wait=False, **kwargs):
        pending_threads = {k: v for k, v in self.threads.items() if v["status"] == "pending"}
        for k, t in pending_threads.items():
            t["thread"].start()
            t["status"] = "running"
        log("header", f"ThreadPool: Running {len(pending_threads)} tasks (wait={wait})")
        if wait:
            self._await(**kwargs)


    def _await(self, **kwargs):
        running_threads = {k:v for k,v in self.threads.items() if v["status"] == "running"}
        ok = 0
        errors = 0
        for k, t in running_threads.items():
            t["thread"].join()

            self.returns[t["name"]] = t["ret"]
            if t["thread"].error:
                t["status"] = "error"
                errors += 1
            else:
                t["status"] = "done"
                ok += 1
        log("header", f"Threadpool: Finished {ok + errors} tasks ({errors} errors), returning:")
        for k, t in running_threads.items():
            log(1, f"{k} ({t['status']}): {t['ret']}")

        pools.remove(self)
        return self.returns.values()

    def terminate(self):
        running_threads = {k:v for k,v in self.threads.items() if v["status"] == "running"}
        for k, t in running_threads.items():
            #print(t["thread"])
            t["thread"].kill()
        self._await()

Class variables

var Thread

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

Methods

def add(self, fun, *args, **kwargs)
Expand source code
def add(self, fun, *args, **kwargs):
    n = len(self.threads)
    name = f"Thread {n}"
    t = self.Thread(*args, target=fun, thread_name=name, **kwargs)
    self.threads[n] = {"thread": t, "fun": fun, "status": "pending", "ret":t.ret, "name": name}
    return t
def start(self, wait=False, **kwargs)
Expand source code
def start(self, wait=False, **kwargs):
    pending_threads = {k: v for k, v in self.threads.items() if v["status"] == "pending"}
    for k, t in pending_threads.items():
        t["thread"].start()
        t["status"] = "running"
    log("header", f"ThreadPool: Running {len(pending_threads)} tasks (wait={wait})")
    if wait:
        self._await(**kwargs)
def terminate(self)
Expand source code
def terminate(self):
    running_threads = {k:v for k,v in self.threads.items() if v["status"] == "running"}
    for k, t in running_threads.items():
        #print(t["thread"])
        t["thread"].kill()
    self._await()