Source code for easy_dna.extractor
import pandas as pd
import flametree
from .io import load_record, records_from_data_files, write_record
[docs]def extract_from_input(
filename=None,
directory=None,
construct_list=None,
direct_sense=True,
output_path=None,
min_sequence_length=20,
):
"""Extract features from input and return in a dictionary.
Optionally save the features in separate files.
Parameters
==========
file
Input sequence file (Genbank).
directory
Directory name containing input sequence files.
construct_list
A list of SeqRecords.
direct_sense
If True: make antisense features into direct-sense in the exported files.
output_path
Path for the exported feature and report files.
min_sequence_length
Discard sequences with length less than this integer.
"""
genbank_id_limit = 20 # GenBank format hard limit for name
if construct_list:
pass
elif filename:
input_record = load_record(
filename, record_id="auto", upperize=False, id_cutoff=genbank_id_limit
)
construct_list = [input_record]
elif directory:
construct_list = records_from_data_files(filepaths=None, folder=directory)
else:
raise TypeError("Specify one of 'construct_list', 'filename' or 'directory'.")
records_dict = dict()
recordname_list = []
for input_record in construct_list:
records = extract_features(input_record, direct_sense)
record_name = input_record.name[0:genbank_id_limit]
# This part makes the key (used as dir name) unique by appending a copynumber:
number_of_name_occurrences = recordname_list.count(record_name)
if number_of_name_occurrences:
key = "%s_%s" % (record_name, number_of_name_occurrences + 1)
else:
key = record_name
recordname_list.append(record_name)
records_dict[key] = records
parts_report = make_part_dict(records_dict, min_sequence_length=min_sequence_length)
processed_report = process_report(parts_report[1])
all_parts_dict = parts_report[0]
records_dict["all_parts"] = list(all_parts_dict.values())
if output_path is not None:
root = flametree.file_tree(output_path)
for key, records in records_dict.items():
record_dir = root._dir(key)
record_name_alnum_list = []
for record in records:
record_name_alnum = "".join(
x if x.isalnum() else "_" for x in record.name
)
# This part makes the filename unique by appending a copynumber:
number_of_occurrences = record_name_alnum_list.count(record_name_alnum)
if number_of_occurrences:
record_filename = "%s_%s.gb" % (
record_name_alnum,
number_of_occurrences + 1,
)
else:
record_filename = record_name_alnum + ".gb"
record_name_alnum_list.append(record_name_alnum)
record_file_path = record_dir._file(record_filename)
try:
write_record(record, record_file_path, fmt="genbank")
except Exception as err:
print("Error writing", record_filename, str(err))
processed_report.to_csv(root._file("report.csv").open("w"))
records_dict["processed_report"] = processed_report
return records_dict
def compute_id(seq_feature):
"""Computes an ID for extract_features().
The ID can be used as a SeqRecord/Genbank name or id attribute.
"""
label_fields = [
"name",
"gene",
"label",
"product",
"source",
"note",
]
label = "no_label"
for key in label_fields:
if key in seq_feature.qualifiers and len(seq_feature.qualifiers[key]):
label = seq_feature.qualifiers[key]
if isinstance(label, list):
label = "_".join(label)
break
genbank_limit = 20 # GenBank format hard limit for name
label = label[0:genbank_limit]
label = label.replace(" ", "_").replace("'", "p")
return label
def extract_features(seq_record, direct_sense=True):
"""Extract all features from a SeqRecord.
Return the features as a list of SeqRecords.
"""
all_features_in_direct_sense = direct_sense
records = []
# Check whether the feature is fully contained within any other feature:
for i, seq_feature in enumerate(seq_record.features):
start = seq_feature.location.start
end = seq_feature.location.end
contained_feature = False
for other_seq in (
seq_record.features[:i] + seq_record.features[(i + 1) :]
): # exclude current feature
if start in other_seq.location and end in other_seq.location:
contained_feature = True
break
if contained_feature:
continue
if all_features_in_direct_sense:
new_record = seq_feature.extract(seq_record)
else:
new_record = seq_record[
seq_feature.location.start : seq_feature.location.end
]
new_record.annotations = {"topology": "linear"}
new_record.description = str(
'Extracted from "' + new_record.id + '" by easy_dna.extract_features()'
)
new_record.id = compute_id(seq_feature)
new_record.name = new_record.id
records = records + [new_record]
return records
def make_part_dict(records_dict, min_sequence_length=20):
"""Make a full part list and a report.
Uses records_dict made by run_extraction().
Parameters
==========
records_dict
Dictionary of sequence name: list of features as SeqRecords.
min_sequence_length
Discard sequences with length less than this integer.
"""
report_index = [
"input_construct",
"input_sequence",
"shared_with",
"equal_to",
"has_copy",
"too_short",
"notes",
"sequence_string",
]
report = pd.DataFrame(
data=None, index=report_index, columns=None, dtype=str, copy=False
)
all_parts_dict = dict()
# This part is complex because it does two things, and could be simplified.
# It makes a dictionary of all parts and a dataframe of part properties.
# It also checks for a number of constraints and cases: sequence length,
# shared sequences, shared names.
for i, (key, record_list) in enumerate(records_dict.items()):
for j, record in enumerate(record_list):
s = pd.Series(None, index=report_index, name=i)
s["input_construct"] = key
s["input_sequence"] = record.name
s["has_copy"] = False # default
if len(record) < min_sequence_length:
s["too_short"] = True
series_name = str(i) + "_" + str(j)
report[series_name] = s
continue
else:
s["too_short"] = False
sequence_as_key = str(record.seq.lower())
s["sequence_string"] = sequence_as_key
if sequence_as_key in all_parts_dict.keys():
s["has_copy"] = True
else:
remove = 1
while record.name in report.loc["input_sequence"].array:
index = len(record.name) - remove
record.name = record.name[:index] + "X" + record.name[index + 1 :]
remove += 1
s["note"] = "renamed"
record.id = record.name
s["input_sequence"] = record.name # update name in record
all_parts_dict[sequence_as_key] = record
series_name = str(i) + "_" + str(j)
report[series_name] = s
report = report.T
return (all_parts_dict, report)
def process_report(report):
"""Format the report prepared by make_part_dict().
The function finds common parts within constructs and identical sequences
between constructs.
"""
all_shared_with = pd.Series()
all_equal_to = pd.Series()
for sequence in report.loc[report["has_copy"]]["sequence_string"]:
constructs = report.loc[report["sequence_string"] == sequence][
"input_construct"
]
shared_with_text = "_|_".join(constructs.unique())
shared_with = pd.Series(
data=shared_with_text, index=constructs.index, dtype=str
)
all_shared_with = pd.concat([all_shared_with, shared_with])
parts = report.loc[report["sequence_string"] == sequence]["input_sequence"]
equal_to_text = "_|_".join(parts.unique())
equal_to = pd.Series(data=equal_to_text, index=parts.index, dtype=str)
all_equal_to = pd.concat([all_equal_to, equal_to])
report.loc[all_equal_to.index, "equal_to"] = all_equal_to.values
report.loc[all_shared_with.index, "shared_with"] = all_shared_with.values
report.drop("has_copy", axis=1, inplace=True)
return report