Module bioiain.utilities.sequences
Functions
def d3(resname)-
Expand source code
def d3(resname): try: ri = d3toint[resname] rn = d3to1[resname] except: log("warning", f"Unknown resname: {resname} (using UNK/X)") ri = 20 rn = "X" return rn, ri def intto1(i)-
Expand source code
def intto1(i): for d3, n in d3toint.items(): if i == n: return d3to1[d3] return "X"
Classes
class CLUSTAL (*args, verbose=False, run_msa=True, build_tree=False, matrix_path=None, **kwargs)-
Expand source code
class CLUSTAL(MSA): def __init__(self, *args, verbose=False, run_msa=True, build_tree=False, matrix_path=None, **kwargs): super().__init__(*args, **kwargs) kwargs.pop("name", None) if run_msa: self.msa_path = self._run_clustal_msa(name=self.name, verbose=verbose, matrix_path=matrix_path, **kwargs) self.msa_fasta = FASTA(self.msa_path) self.msa_fasta.rewrite() if build_tree: self.tree_path = self._build_tree(self.msa_path) def _run_clustal_msa(self, fasta_path=None, name="temp", out_folder=None, clustal_cmd="clustalw", matrix="BLOSUM", out_format="fasta", force=False, verbose=False, matrix_path=None, **kwargs): if fasta_path is None: fasta_path = self.fasta_path log(2, f"Calculating MSA ({matrix}) of: {fasta_path}") fname = f"{name}_{matrix}.ms.alignment.fasta" if matrix == "path": assert matrix_path is not None matrix = matrix_path if out_folder is None: out_folder = os.path.join(TEMP_FOLDER, "alignments") os.makedirs(out_folder, exist_ok=True) out_path = os.path.join(out_folder, fname) if os.path.exists(out_path) and not force: log(3, "Alignment already generated (CLUSTAL)") return out_path cmd = [ clustal_cmd, "-align", "-type=protein", f"-infile={fasta_path}", f"-matrix={matrix}", f"-pwmatrix={matrix}", f"-outfile={out_path}", f"-output={out_format}" f"-slow" ] if verbose: log(3, "$", " ".join(cmd)) subprocess.run(cmd) else: out_log = open("/dev/null", "w") subprocess.run(cmd, stdout=out_log) return out_path def _build_tree(self, align_path, force=False): log(2, f"Building tree for: {align_path}") out_path = align_path.replace(".fasta", ".nj") comp_file = out_path + ".list" if os.path.exists(out_path) and os.path.exists(comp_file) and not force: log(3, "Tree already generated") return out_path cmd = [ "clustalw", "-tree", "-type=protein", f"-infile={align_path}", "-outputtree=nj", ] #print("$", " ".join(cmd)) f = open(comp_file, "w") subprocess.run(cmd, stdout=f) return out_path def get_similar(self, target, name="temp", similarity=95): threshold = (100-similarity) / 100 log(2, f"Finding similar at {similarity}% for {target}") seq_num = self._get_seq_num(target) neighbour_nums = self._get_neighbours(seq_num, threshold=threshold) neighbour_names = [self._get_seq_name(n) for n in neighbour_nums] #print(neighbour_names) #exit() return neighbour_names def _get_seq_num(self, seq_name) -> int|None: #log(3, f"Finding seq_num for {seq_name}") comp_path = self.tree_path+".list" seq_num = None with open(comp_path, "r") as f: for line in f.readlines(): comps = line.split(" ") if len(comps) < 2: continue if seq_name in comps: seq_num = int(comps[1].replace(":", "")) break return seq_num def _get_seq_name(self, seq_num): #log(3, f"Finding seq_name for {seq_num}") comp_path = self.tree_path+".list" seq_name = None with open(comp_path, "r") as f: for line in f.readlines(): comps = line.split(" ") if len(comps) < 2: continue if f"{seq_num}:" == comps[1]: seq_name = comps[2] return seq_name def _get_neighbours(self, seq_num, threshold=0.05): log(3, f"Finding neighbours (seq. {seq_num}), threshold={threshold}") import re neighbours = [] with open(self.tree_path, "r") as f: for line in f.readlines(): if "DIST" in line and "length" in line: try: comps = [l for l in re.split(' |vs\.|;|=', line.strip()) if l != ""] num1 = int(comps[0]) num2 = int(comps[1]) dist = float(comps[3]) length = int(comps[5].replace("\n", "")) if dist > threshold: continue if seq_num == num1: neighbours.append(num2) elif seq_num == num2: neighbours.append(num1) except Exception as e: log("warning", f"Error reading tree file: {self.tree_path}") print(line) print(comps) raise e log(3, f"Found {len(neighbours)} neighbours") return neighboursAncestors
Methods
def get_similar(self, target, name='temp', similarity=95)-
Expand source code
def get_similar(self, target, name="temp", similarity=95): threshold = (100-similarity) / 100 log(2, f"Finding similar at {similarity}% for {target}") seq_num = self._get_seq_num(target) neighbour_nums = self._get_neighbours(seq_num, threshold=threshold) neighbour_names = [self._get_seq_name(n) for n in neighbour_nums] #print(neighbour_names) #exit() return neighbour_names
class FASTA (fasta_path)-
Expand source code
class FASTA(object): def __init__(self, fasta_path): self.fasta_path = fasta_path self.single_line = None def __repr__(self): return f"<bi.{self.__class__.__name__}: {self.fasta_path}>" def _parse_fasta(self, names=True, sequences=True, key=None): assert names or sequences if key is not None: if type(key) is str: key = [key] elif type is not list: key = list(key) fasta_dict = {} with open(self.fasta_path) as f: next_seq = False last_key = None wait_key = False for line in f.readlines(): line = line.replace("\n", "").strip() if line.startswith("#"): next_seq = True continue if line.startswith(">"): wait_key = False name = line[1:].strip() if key is not None: if len(key) == 0: break #print(name, key, name in key) if name in key: key.remove(name) else: wait_key = True continue if name not in fasta_dict: fasta_dict[name] = [] next_seq = True last_key = name continue elif wait_key: continue if not sequences: continue if line.strip() == "": next_seq = True continue else: if last_key is None: continue if next_seq: fasta_dict[last_key].append(line) next_seq = False else: fasta_dict[last_key][-1] += line if key is not None: #print(key) assert len(key) == 0 #print(names, sequences) if names and sequences: return fasta_dict elif names: return list(fasta_dict.keys()) elif sequences: seqs = [] [seqs.extend(seq) for seq in fasta_dict.values()] return seqs def rewrite(self, duplicates=False, empties=False, space_between=False, key_start="> "): log(3, "Rewriting FASTA:", self.fasta_path) data = self._parse_fasta() with open(self.fasta_path, "w") as f: for key, sequences in data.items(): if not empties: sequences = [s for s in sequences if len(s) > 0] n_seqs = len(sequences) if not duplicates: if n_seqs > 1: log("warning", f"{n_seqs} sequences for id: {key} (keeping only first)") sequences = sequences[:1] for seq in sequences: f.write(f"{key_start}{key}\n") f.write(f"{seq}\n") if space_between: f.write("\n") self.single_line = True return self.fasta_path def get_names(self, key=None): return self._parse_fasta(names=True, sequences=False, key=key) def get_sequences(self, key=None): return self._parse_fasta(names=False, sequences=True, key=key) def parse(self, key=None): return self._parse_fasta(key=key)Methods
def get_names(self, key=None)-
Expand source code
def get_names(self, key=None): return self._parse_fasta(names=True, sequences=False, key=key) def get_sequences(self, key=None)-
Expand source code
def get_sequences(self, key=None): return self._parse_fasta(names=False, sequences=True, key=key) def parse(self, key=None)-
Expand source code
def parse(self, key=None): return self._parse_fasta(key=key) def rewrite(self,
duplicates=False,
empties=False,
space_between=False,
key_start='> ')-
Expand source code
def rewrite(self, duplicates=False, empties=False, space_between=False, key_start="> "): log(3, "Rewriting FASTA:", self.fasta_path) data = self._parse_fasta() with open(self.fasta_path, "w") as f: for key, sequences in data.items(): if not empties: sequences = [s for s in sequences if len(s) > 0] n_seqs = len(sequences) if not duplicates: if n_seqs > 1: log("warning", f"{n_seqs} sequences for id: {key} (keeping only first)") sequences = sequences[:1] for seq in sequences: f.write(f"{key_start}{key}\n") f.write(f"{seq}\n") if space_between: f.write("\n") self.single_line = True return self.fasta_path
class MMSEQS2 (*args, mmseqs_cmd='mmseqs', db_name=None, verbosity=2, **kwargs)-
Expand source code
class MMSEQS2(MSA): def __init__(self, *args, mmseqs_cmd="mmseqs", db_name=None, verbosity=2, **kwargs): super().__init__(*args, **kwargs) self.tmp_folder = os.path.join(TEMP_FOLDER, "mmseqs2") os.makedirs(self.tmp_folder, exist_ok=True) self.mmseqs_cmd = mmseqs_cmd self.db_name = None self.db_folder = None self.verbosity = verbosity self.name = self.name.replace(".fasta", ".db") if db_name is None: db_name = self.name.split(".")[0] if os.path.isdir(self.fasta_path): log(2, "Input is already DB, setup only") self.db_folder = self.fasta_path self.db_name = db_name self.db_path = os.path.join(self.db_folder, self.db_name) else: log(2, "Input is a file, creating DB...") self.create_db(db_name=db_name, **kwargs) def _cmd(self, command, *args, **kwargs): cmd = [self.mmseqs_cmd, command] for kwarg, value in kwargs.items(): if not kwarg.startswith("--"): if len(kwarg) == 1: kwarg = f"-{kwarg}" else: kwarg = f"--{kwarg.replace('_', '-')}" cmd.extend([kwarg, str(value)]) cmd.extend([str(a) for a in args]) log(3, "$", " ".join(cmd)) subprocess.run(cmd) def create_db(self, db_name=None, fasta_path=None, **kwargs): if fasta_path is None: self.fasta.rewrite(key_start=">") fasta_path = self.fasta_path if db_name is None: db_name = self.name.split(".")[0] db_folder = os.path.join(SUBDIR_NAME, "mmseqs", self.name) db_path = os.path.join(db_folder, db_name) os.makedirs(db_folder, exist_ok=True) self._cmd("createdb", fasta_path, db_path, createdb_mode=0) self.db_name = db_name self.db_folder = db_folder self.db_path = db_path return self def cluster(self, db_name=None, reassign=False, force=False, linear=False, easy=False, **kwargs): if db_name is None: db_name = self.db_name cluster_db_folder = os.path.join(self.db_folder.replace(".db", ".cluster")) cluster_db_path = os.path.join(cluster_db_folder, db_name) out_path = os.path.join(cluster_db_folder, f"{db_name}_clustered.tsv") data_path = out_path.replace(".tsv", ".json") if linear: cmd = ["linclust"] else: cmd = ["cluster"] if easy: cmd = ["easy-"+cmd[0]] cmd.extend([self.db_path, cluster_db_path, self.tmp_folder]) if reassign and not linear: cmd.append("--cluster-reassign") params = { "cmd": " ".join([str(c) for c in cmd]), "reassign":reassign, "linear":linear, "easy":easy, } os.makedirs(cluster_db_folder, exist_ok=True) if not os.path.exists(cluster_db_path) or not os.path.exists(data_path): force=True if os.path.exists(data_path): if json.load(open(data_path))["params"] != params: log(3, "Different params detected") force = True if force: try: self._cmd(*cmd, v=self.verbosity) except: raise ClusteringError() else: log(3, "Cluster DB already clustered (mmseqs2)") if force or not os.path.exists(out_path): try: self._cmd("createtsv", self.db_path, self.db_path, cluster_db_path, out_path, v=self.verbosity) except: raise ClusteringError() try: clusters = {} with open(out_path) as f: for line in f: c, i = line.strip().split("\t") if c not in clusters: clusters[c] = {"name":c, "list": []} clusters[c]["list"].append(i) data = {"params": params, "clusters":{},} for c in clusters: data["clusters"][len(data["clusters"])] = {**clusters[c], "n": len(clusters[c]["list"])} json.dump(data, open(data_path, "w"), indent=4) except: raise ClusteringError() return data_pathAncestors
Methods
def cluster(self, db_name=None, reassign=False, force=False, linear=False, easy=False, **kwargs)-
Expand source code
def cluster(self, db_name=None, reassign=False, force=False, linear=False, easy=False, **kwargs): if db_name is None: db_name = self.db_name cluster_db_folder = os.path.join(self.db_folder.replace(".db", ".cluster")) cluster_db_path = os.path.join(cluster_db_folder, db_name) out_path = os.path.join(cluster_db_folder, f"{db_name}_clustered.tsv") data_path = out_path.replace(".tsv", ".json") if linear: cmd = ["linclust"] else: cmd = ["cluster"] if easy: cmd = ["easy-"+cmd[0]] cmd.extend([self.db_path, cluster_db_path, self.tmp_folder]) if reassign and not linear: cmd.append("--cluster-reassign") params = { "cmd": " ".join([str(c) for c in cmd]), "reassign":reassign, "linear":linear, "easy":easy, } os.makedirs(cluster_db_folder, exist_ok=True) if not os.path.exists(cluster_db_path) or not os.path.exists(data_path): force=True if os.path.exists(data_path): if json.load(open(data_path))["params"] != params: log(3, "Different params detected") force = True if force: try: self._cmd(*cmd, v=self.verbosity) except: raise ClusteringError() else: log(3, "Cluster DB already clustered (mmseqs2)") if force or not os.path.exists(out_path): try: self._cmd("createtsv", self.db_path, self.db_path, cluster_db_path, out_path, v=self.verbosity) except: raise ClusteringError() try: clusters = {} with open(out_path) as f: for line in f: c, i = line.strip().split("\t") if c not in clusters: clusters[c] = {"name":c, "list": []} clusters[c]["list"].append(i) data = {"params": params, "clusters":{},} for c in clusters: data["clusters"][len(data["clusters"])] = {**clusters[c], "n": len(clusters[c]["list"])} json.dump(data, open(data_path, "w"), indent=4) except: raise ClusteringError() return data_path def create_db(self, db_name=None, fasta_path=None, **kwargs)-
Expand source code
def create_db(self, db_name=None, fasta_path=None, **kwargs): if fasta_path is None: self.fasta.rewrite(key_start=">") fasta_path = self.fasta_path if db_name is None: db_name = self.name.split(".")[0] db_folder = os.path.join(SUBDIR_NAME, "mmseqs", self.name) db_path = os.path.join(db_folder, db_name) os.makedirs(db_folder, exist_ok=True) self._cmd("createdb", fasta_path, db_path, createdb_mode=0) self.db_name = db_name self.db_folder = db_folder self.db_path = db_path return self
class MSA (fasta_path, name=None, **kwargs)-
Expand source code
class MSA(object): def __init__(self, fasta_path, name=None, **kwargs): self.fasta_path = fasta_path self.fasta = FASTA(fasta_path) if name is None: name = os.path.basename(fasta_path).replace(".fasta", "") self.name = name log(1, f"Initialising {self.__class__.__name__}...") log(2, "Fasta path:", self.fasta_path) def __repr__(self): return f"<bi.{self.__class__.__name__}:{self.name} ({len(self)} sequences)>" def __len__(self): return len(self.fasta.get_names())Subclasses