from copy import deepcopy
from io import BytesIO, StringIO
import itertools
import os
import re
import pandas
try:
# Biopython <1.78
from Bio.Alphabet import DNAAlphabet
has_dna_alphabet = True
except ImportError:
# Biopython >=1.78
has_dna_alphabet = False
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
import crazydoc
import flametree
from snapgene_reader import snapgene_file_to_seqrecord
from .record_operations import sequence_to_biopython_record
strands = [" ", "ss-", "ds-", "ms-"]
moleculetypes = [
"NA ",
"DNA ",
"RNA ",
"tRNA ",
"rRNA ",
"mRNA ",
"uRNA ",
"cRNA ",
]
strand_molecules = [
"".join(entry).strip(" ") for entry in itertools.product(strands, moleculetypes)
]
genbank_reference = {
# From the Genbank file format definition (release 260).
# 0-based slicing indices below.
# name: (start, end, [options])
"locus": (0, 5, ["LOCUS"]),
"name": (12, 28, None),
"length": (29, 40, None),
"bp": (41, 43, ["bp"]),
"strandedness": (44, 47, strands),
"moleculetype": (
47,
53,
moleculetypes,
),
# No topology info (spaces) also allowed, for flexibility:
"moleculetopology": (55, 63, ["linear ", "circular", " "]),
"divisioncode": (64, 67, None),
"updatedate": (68, 79, None),
# Spaces:
"space1": (5, 12, " "),
"space2": (28, 29, " "),
"space3": (40, 41, " "),
"space4": (43, 44, " "),
"space5": (53, 55, " "),
"space6": (63, 64, " "),
"space7": (67, 68, " "),
}
genbank_months = [
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC",
]
locus_guide = """+---+ +--------------+ +---------+ +- +--+----+ +------+ +-- +---------+
1 5 13 28 30 40 42 45 48 53 56 63 65 69 79"""
# Guidelines:
# Use only alphanumeric and underscore characters in the sequence name.
# Sequence names must be maximum 20 character long.
# Set SeqRecord.id to the sequence name, and .name="Exported" (this is used in exports).
# Genbank format must be: LOCUS = Exported, filename = sequence name.
# 'Exported' as name is within required length and also used by other software.
[docs]
def load_record(
filename, record_id="filename", adapt_id=True, upperize=False, id_cutoff=20
):
"""Load a FASTA/Genbank/Snapgene file as a Biopython record.
Parameters
----------
filename : str
Path to the sequence file.
record_id : str
ID of the record ("filename": use the file name; "original": keep the record's
original ID (defaults to file name if the record has no ID).
adapt_id : bool
If True, convert ID to alphanumeric and underscore.
upperize : bool
If True, the record's sequence will converted to uppercase.
id_cutoff : int, optional
If the ID is longer than this value, it will get truncated at this cutoff to
conform to guidelines and Genbank name limit. Use `None` for no cutoff.
"""
# FILEFORMAT
if filename.lower().endswith((".gb", ".gbk")):
record = SeqIO.read(filename, "genbank")
elif filename.lower().endswith((".fa", ".fasta")):
record = SeqIO.read(filename, "fasta")
elif filename.lower().endswith(".dna"):
record = snapgene_file_to_seqrecord(filename)
else:
raise ValueError("Unknown format for file: %s" % filename)
# LETTERCASE
if upperize:
record = record.upper()
# NAME
record.name = "Exported" # see guidelines above
if record_id == "original":
if record.id in [None, "", "<unknown id>", ".", " "]:
record.id = os.path.splitext(os.path.basename(filename))[0]
elif record_id == "filename":
record.id = os.path.splitext(os.path.basename(filename))[0]
else: # to catch if a deprecated specification is used
ValueError("`record_id` must be one of `filename` or `original`!")
# LENGTH
if id_cutoff is not None:
record.id = record.id[:id_cutoff]
# CHARACTERS
if adapt_id:
record.id = "".join([char if char.isalnum() else "_" for char in record.id])
return record
[docs]
def write_record(record, target, id_cutoff=20, adapt_id=True):
"""Write a DNA record as Genbank or FASTA via Biopython, with fixes.
Parameters
----------
record : SeqRecord
Biopython SeqRecord.
target : str or StringIO
Filepath, string, or StringIO instance. Desired sequence format is inferred from
the ending. If it's a directory, it uses the ID as filename and exports Genbank.
id_cutoff : int, optional
If the ID is longer than this value, it will get truncated at this cutoff to
conform to guidelines and Genbank name limit. Use `None` for no cutoff.
adapt_id : bool
If True, convert ID to alphanumeric and underscore.
"""
record = deepcopy(record)
record.name = "Exported" # This is used as LOCUS
if id_cutoff is not None:
record.id = record.id[:id_cutoff]
if adapt_id:
record.id = "".join([char if char.isalnum() else "_" for char in record.id])
if has_dna_alphabet: # for Biopython <1.78
if str(record.seq.alphabet.__class__.__name__) != "DNAAlphabet":
record.seq.alphabet = DNAAlphabet()
record.annotations["molecule_type"] = "DNA"
if hasattr(target, "open"):
target = target.open("w")
else:
if type(target) is StringIO:
fmt = "genbank"
elif target.lower().endswith((".gb", ".gbk")):
fmt = "genbank"
elif target.lower().endswith((".fa", ".fasta")):
fmt = "fasta"
else: # directory
target = os.path.join(target, record.id + ".gb")
fmt = "genbank"
SeqIO.write(record, target, fmt)
def string_to_record(string):
"""Convert a string of a FASTA, Genbank... into a simple ATGC string.
Can also be used to detect a format.
"""
matches = re.match("([ATGC][ATGC]*)", string)
# print("============", len(matches.groups()[0]), len(string))
# print (matches.groups()[0] == string)
if (matches is not None) and (matches.groups()[0] == string):
if has_dna_alphabet: # Biopython <1.78
sequence = Seq(string, alphabet=DNAAlphabet())
else:
sequence = Seq(string)
seqrecord = SeqRecord(sequence)
seqrecord.annotations["molecule_type"] = "DNA"
return seqrecord, "ATGC"
for fmt in ("fasta", "genbank"):
try:
stringio = StringIO(string)
records = list(SeqIO.parse(stringio, fmt))
if len(records) > 0:
return (records, fmt)
except Exception:
pass
try:
record = snapgene_file_to_seqrecord(filecontent=StringIO(string))
return record
except Exception:
pass
raise ValueError("Invalid sequence format")
def spreadsheet_file_to_dataframe(filepath, header="infer"):
"""Load a CSV or EXCEL spreadsheet as a Pandas dataframe."""
name = filepath._name if hasattr(filepath, "_name") else filepath
if name.endswith(".csv"):
return pandas.read_csv(filepath, header=header)
else:
return pandas.read_excel(filepath, header=header)
def records_from_zip_file(zip_file):
"""Return all fasta/genbank/snapgene in a zip as Biopython records."""
zip_file = flametree.file_tree(zip_file)
records = []
for f in zip_file._all_files:
ext = f._extension.lower()
if ext in ["gb", "gbk", "fa", "dna"]:
try:
new_records, fmt = string_to_record(f.read())
except Exception:
content_stream = BytesIO(f.read("rb"))
try:
record = snapgene_file_to_seqrecord(fileobject=content_stream)
new_records, _ = [record], "snapgene"
except Exception:
try:
parser = crazydoc.CrazydocParser(
["highlight_color", "bold", "underline"]
)
new_records = parser.parse_doc_file(content_stream)
# fmt = "doc"
except Exception:
raise ValueError("Format not recognized for file " + f._path)
single_record = len(new_records) == 1
for i, record in enumerate(new_records):
name = record.id
if name in [
None,
"",
"<unknown id>",
".",
" ",
"<unknown name>",
]:
number = "" if single_record else ("%04d" % i)
name = f._name_no_extension.replace(" ", "_") + number
record.id = name
record.name = name
record.file_name = f._name_no_extension
records += new_records
return records
def records_from_file(filepath):
"""Autodetect file format and load Biopython records from it."""
with open(filepath, "rb") as f:
content = f.read()
try:
records, fmt = string_to_record(content.decode("utf-8"))
except Exception:
try:
record = snapgene_file_to_seqrecord(fileobject=BytesIO(content))
records, fmt = [record], "snapgene"
except Exception:
try:
parser = crazydoc.CrazydocParser(
["highlight_color", "bold", "underline"]
)
records = parser.parse_doc_file(BytesIO(content))
fmt = "doc"
except Exception:
try:
df = spreadsheet_file_to_dataframe(filepath, header=None)
records = [
sequence_to_biopython_record(sequence=seq, id=name, name=name)
for name, seq in df.values
]
fmt = "spreadsheet"
except Exception:
raise ValueError("Format not recognized for file " + filepath)
if not isinstance(records, list):
records = [records]
return records, fmt
def record_to_formated_string(record, remove_descr=False):
"""Return a string with the content of a Genbank file."""
if remove_descr:
record = deepcopy(record)
if isinstance(record, (list, tuple)):
for r in record:
r.description = ""
else:
record.description = ""
fileobject = StringIO()
write_record(record=record, target=fileobject)
return fileobject.getvalue().encode("utf-8")
[docs]
def records_from_data_files(filepaths=None, folder=None):
"""Automatically convert files or a folder's content to Biopython records."""
if folder is not None:
filepaths = [f._path for f in flametree.file_tree(folder)._all_files]
records = []
for filepath in filepaths:
filename = os.path.basename(filepath)
if filename.lower().endswith("zip"):
records += records_from_zip_file(filepath)
continue
recs, fmt = records_from_file(filepath)
single_record = len(recs) == 1
for i, record in enumerate(recs):
name_no_extension = "".join(filename.split(".")[:-1])
name = name_no_extension + ("" if single_record else ("%04d" % i))
name = name.replace(" ", "_")
UNKNOWN_IDS = [
"None",
"",
"<unknown id>",
".",
"EXPORTED",
"<unknown name>",
"Exported",
]
if has_dna_alphabet: # Biopython <1.78
record.seq.alphabet = DNAAlphabet()
record.annotations["molecule_type"] = "DNA"
# Sorry for this parts, it took a lot of "whatever works".
# keep your part names under 20c and pointless, and everything
# will be good
if str(record.id).strip() in UNKNOWN_IDS:
record.id = name
if str(record.name).strip() in UNKNOWN_IDS:
record.name = name
record.file_name = name_no_extension
records += recs
return records
[docs]
def is_genbank_standard(filepath):
"""Check the LOCUS line of a Genbank file."""
std_locus_line_len = 79 # as per Genbank definition
is_correct = True
expected_entries = 8 # expected, as per Genbank definition
# Strandedness and Molecule Type is handled as one entry.
with open(filepath) as genbank:
first_line = genbank.readline().strip("\n")
entries = re.split(" +", first_line)
if len(entries) == expected_entries:
topology = entries[-3]
# Topology can be empty (spaces), in that case we expect one fewer column:
elif len(entries) == expected_entries - 1:
topology = " "
else: # bad first line, quit
print("Error: malformatted locus line: missing or too many entries")
print(first_line, locus_guide, sep="\n")
is_correct = False
return is_correct
entry_dict = {
"locus": entries[0],
"name": entries[1],
"length": entries[2],
"bp": entries[3],
"strand_molecules": entries[4],
"moleculetopology": topology,
"divisioncode": entries[-2],
"updatedate": entries[-1],
}
if len(first_line) == std_locus_line_len:
# Look up each entry:
for key, value in genbank_reference.items():
if value[2] is not None:
if first_line[value[0] : value[1]] not in value[2]:
print("Error:", key, "=", first_line[value[0] : value[1]])
print(" It should be one of ", value[2])
is_correct = False
else: # incorrect length, but we try and parse using whitespace
print(
"Error: LOCUS line is %d characters long! (standard length = %d)"
% (
len(first_line),
std_locus_line_len,
)
)
is_correct = False
# This checks the same as the block above, except on whitespace-split entries:
for entry, value in entry_dict.items():
if entry in genbank_reference.keys():
if genbank_reference[entry][2] is not None:
if value not in genbank_reference[entry][2]:
print("Error:", entry, "=", value)
print(
" It should be one of ", genbank_reference[entry][2]
)
# We split by whitespace, therefore this entry won't be in the Genbank ref
if entry_dict["strand_molecules"] not in strand_molecules:
print(
"Error: incorrect strandness+molecule type = ",
entry_dict["strand_molecules"],
)
# Inspecting entries in detail:
if not entry_dict["length"].isnumeric():
is_correct = False
print("Error: non-numeric characters in length:", entries[2])
# Date should be dd-MMM-yyyy (15-NOV-2024)
date_values = entry_dict["updatedate"].split("-")
if not date_values[0].isnumeric(): # ignores non-valid day number (e.g. 00)
is_correct = False
print("Error: non-numeric characters in day:", date_values[0])
if date_values[1] not in genbank_months:
is_correct = False
print("Error: invalid month:", date_values[1])
if not date_values[2].isnumeric(): # ignores non-valid year number
is_correct = False
print("Error: non-numeric characters in year:", date_values[2])
if not is_correct:
print(first_line, locus_guide, sep="\n")
return is_correct