Source code for dnacauldron.biotools.sequence_io

import re
import os
from io import BytesIO, StringIO
from copy import deepcopy
import flametree
from snapgene_reader import snapgene_file_to_seqrecord
from Bio import SeqIO

try:
    # Biopython <1.78
    from Bio.Alphabet import DNAAlphabet

    has_dna_alphabet = True
except ImportError:
    # Biopython >=1.78
    has_dna_alphabet = False
from .record_operations import (
    set_record_topology,
    sequence_to_biopython_record,
)


[docs]def string_to_records(string): """Convert a string of a fasta, genbank... into a simple ATGC string. Can also be used to detect a format. """ matches = re.match("([ATGC][ATGC]*)", string) if (matches is not None) and (matches.groups()[0] == string): return [sequence_to_biopython_record(string)], "ATGC" for fmt in ("fasta", "genbank"): try: stringio = StringIO(string) records = list(SeqIO.parse(stringio, fmt)) if len(records) > 0: return (records, fmt) except Exception: pass try: record = snapgene_file_to_seqrecord(filecontent=StringIO(string)) return [record] except Exception: pass raise ValueError("Invalid sequence format")
[docs]def load_record( filepath, topology="default_to_linear", id="auto", upperize=True, max_name_length=20, ): """Return a Biopython record read from a Fasta/Genbank/Snapgene file. Parameters ---------- filepath Path to a Genbank, Fasta, or Snapgene (.dna) file. topology Can be "circular", "linear", "default_to_circular" (will default to circular if ``annotations['topology']`` is not already set) or "default_to_linear". id Sets the record.id. If "auto", the original record.id is used, and if none is set the name of the file (without extension) is used instead. upperize If true, the sequence will get upperized (recommended in this library, as the mix of upper and lower case can cause problems in Biopython's enzyme site search). max_name_length The name of the record will be truncated if too long to avoid Biopython exceptions being raised. """ if filepath.lower().endswith(("gb", "gbk")): record = SeqIO.read(filepath, "genbank") elif filepath.lower().endswith(("fa", "fasta")): record = SeqIO.read(filepath, "fasta") elif filepath.lower().endswith(".dna"): record = snapgene_file_to_seqrecord(filepath) else: raise ValueError("Unknown format for file: %s" % filepath) if upperize: record = record.upper() set_record_topology(record, topology) if id == "auto": id = record.id if id in [None, "", "<unknown id>", ".", " "]: id = os.path.splitext(os.path.basename(filepath))[0] id = id.replace(" ", "_")[:max_name_length] record.id = id elif id is not None: record.id = id.replace(" ", "_")[:max_name_length] return record
def _load_records_from_zip_file(zip_file, use_file_names_as_ids=False): """Return all fasta/genbank/snapgene in a zip as biopython records. Each record gets a ``source_file`` attribute from the zip's file name without the .zip extension. Used via "load_records_from_files". """ zip_file = flametree.file_tree(zip_file) records = [] for f in zip_file._all_files: ext = f._extension.lower() if ext in ["gb", "gbk", "fa", "dna"]: try: new_records, fmt = string_to_records(f.read()) if not isinstance(new_records, list): new_records = [new_records] except Exception: content_stream = BytesIO(f.read("rb")) try: record = snapgene_file_to_seqrecord(fileobject=content_stream) new_records, _ = [record], "snapgene" except Exception: raise ValueError("Format not recognized for file " + f._path) single_record = len(new_records) == 1 for i, record in enumerate(new_records): name = record.id if name in [ None, "", "<unknown id>", ".", " ", "<unknown name>", ]: number = "" if single_record else ("%04d" % i) name = f._name_no_extension.replace(" ", "_") + number record.id = name record.id = name record.file_name = f._name_no_extension if use_file_names_as_ids and single_record: basename = os.path.basename(record.source_file) basename_no_extension = os.path.splitext(basename)[0] record.id = basename_no_extension for record in new_records: record.source_file = f._path records += new_records return records
[docs]def load_records_from_file(filepath): """Autodetect file format and load biopython records from it.""" with open(filepath, "rb") as f: content = f.read() try: records, fmt = string_to_records(content.decode("utf-8")) except Exception: try: record = snapgene_file_to_seqrecord(fileobject=BytesIO(content)) records, fmt = [record], "snapgene" except Exception: raise ValueError("Format not recognized for file " + filepath) if not isinstance(records, list): records = [records] for record in records: record.source_file = filepath return records, fmt
[docs]def load_records_from_files(files=None, folder=None, use_file_names_as_ids=False): """Automatically convert files or a folder's content to biopython records. Parameters ---------- files A list of path to files. A ``folder`` can be provided instead. folder A path to a folder containing sequence files. use_file_names_as_ids If True, for every file containing a single record, the file name (without extension) will be set as the record's ID. """ if files is not None: for file in files: if isinstance(file, str) and not os.path.exists(file): raise IOError("File %s not found" % file) if folder is not None: files = [f._path for f in flametree.file_tree(folder)._all_files] records = [] for filepath in files: filename = os.path.basename(filepath) if filename.lower().endswith("zip"): records += _load_records_from_zip_file( filepath, use_file_names_as_ids=use_file_names_as_ids ) continue recs, fmt = load_records_from_file(filepath) single_record = len(recs) == 1 for i, record in enumerate(recs): name_no_extension = "".join(filename.split(".")[:-1]) name = name_no_extension + ("" if single_record else ("%04d" % i)) name = name.replace(" ", "_") UNKNOWN_IDS = [ "None", "", "<unknown id>", ".", "EXPORTED", "<unknown name>", "Exported", ] if has_dna_alphabet: # Biopython <1.78 record.seq.alphabet = DNAAlphabet() record.annotations["molecule_type"] = "DNA" # Sorry for this parts, it took a lot of "whatever works". # keep your part names under 20c and pointless, and everything # will be good if str(record.id).strip() in UNKNOWN_IDS: record.id = name record.file_name = name_no_extension if use_file_names_as_ids and single_record: basename = os.path.basename(record.source_file) basename_no_extension = os.path.splitext(basename)[0] record.id = basename_no_extension records += recs return records
[docs]def write_record(record, target, fmt="genbank"): """Write a record as genbank, fasta, etc. via Biopython, with fixes.""" record = deepcopy(record) record.id = record.id[:20] if has_dna_alphabet: # Biopython <1.78 if str(record.seq.alphabet.__class__.__name__) != "DNAAlphabet": record.seq.alphabet = DNAAlphabet() record.annotations["molecule_type"] = "DNA" if hasattr(target, "open"): target = target.open("w") SeqIO.write(record, target, fmt)