"""Class to automatically select available and new primers for sequencing runs.
"""
from copy import deepcopy
import re
from collections import defaultdict
import pandas
from matplotlib.backends.backend_pdf import PdfPages
import matplotlib.pyplot as plt
import numpy as np
import flametree
from proglog import TqdmProgressBarLogger, ProgressBarLogger
from dnachisel import (AvoidPattern, repeated_kmers, homopolymer_pattern,
DnaOptimizationProblem)
from .sequencing_simulation import simulate_sequencing
from .biotools import (reverse_complement, find_non_unique_segments,
find_best_primer_locations)
from .tools import minimal_cover, segments_to_array, group_overlapping_segments
from .Primer import Primer
class PrimerSelectorLogger(TqdmProgressBarLogger):
"""Custom logger class adapted to the logger selector."""
def __init__(self, bars=('record', 'primer'), notebook='default'):
ignored_bars = set(('record', 'primer')).difference(bars)
TqdmProgressBarLogger.__init__(self, bars=bars, notebook=notebook,
ignored_bars=ignored_bars,
min_time_interval=0.2)
[docs]class PrimerSelector:
"""A selector to compute the best primers to sequence a set of constructs.
Examples
--------
>>> selector = PrimerSelector()
>>> selected_primers = selector.select_primers(records, available_primers)
>>> selector.plot_coverage(records, selected_primers, 'my_report.pdf'
Parameters
-----------
read_range
The experimentally measured range (start, end) so that, when a primer
anneals in the sequence at index i, the range ``[i + start, i + end]`` will
be correctly sequenced.
size_range
Size range (min, max) for the size of the primers, in nucleotides.
tm_range
Acceptable melting temperature range for the primers (in Celsius), as
computed using the heuristic A/T=2C, G/C=4C
primer_conditions
A list of functions of the form ``primer_sequence => True/False``.
Primers for which at least one condition returns False will not be
considered.
primer_reuse_bonus
Weight that the availability of the primer should have in the decision
to select this primer. A higher value of this parameter leads to
solutions where a higher less new primershave to be ordered, but more
sequencing reactions have to be done. Set to e.g. 200 to test if there
exists solutions involving solely already-available primers.
logger
Leave to 'bars' for default progress-bar logger, to None for no logger,
or any Proglog ProgressBarLogger object.
coverage_resolution
When the user provides a record with "cover" features to indicate where
to cover, the coverage points used by the algorithm are the 1-in-N
nucleotides along the feature region, where N is this parameter.
"""
def __init__(self, read_range=(150, 800), size_range=(16, 25),
tm_range=(55, 70), primer_conditions=(),
primer_reuse_bonus=2, logger='bars',
coverage_resolution=5):
self.read_range = read_range
self.size_range = size_range
self.tm_range = tm_range
self.primers_conditions = primer_conditions
self.coverage_resolution = coverage_resolution
self.primer_reuse_bonus = 2
if logger == 'bars':
logger = PrimerSelectorLogger()
if logger is None:
logger = ProgressBarLogger()
self.logger = logger
[docs] def select_primers(self, records, available_primers=(),
new_primers_prefix='P', new_primers_digits=6):
"""Select primers to sequence the given records.
Parameters
----------
records
A list of biopython records to sequence. The zones to cover in the
record should be indicated by a feature of type ``misc_feature`` and
label ``cover``. The zones where no primers are desired should be
indicated by a feature of type ``misc_feature`` and label
``no_primer``.
available_primers
List of Primer objects representing the available primers.
new_primers_prefix
Prefix to use for names of the new primers
new_primers_digits
The new primers will have names of the form P000435, with a number
of digits provided by this parameter.
Returns
-------
selected_primers
A list of lists of primers, one list of primers for each consrtruct.
"""
available_primers_dict = {p.sequence: p for p in available_primers}
available_primers_seqs = set([p.sequence for p in available_primers])
# COMPUTE PRIMERS AND COVERAGES
indices_to_cover = {}
primers_coverages = defaultdict(lambda *a: set())
self.logger(message='Analyzing the records...')
for record in self.logger.iter_bar(record=records):
indices_to_cover[record.id] = {
ind: '%s_%03d' % (record.id, ind)
for ind in self.compute_indices_to_cover(record)
}
coverages = self.compute_all_primers_coverage_on_record(
record, available_primers=available_primers_seqs,
indices_to_cover=indices_to_cover[record.id])
for primer, coverage in coverages.items():
primers_coverages[primer].update(coverage)
# FIND GLOBAL MINIMAL COVER
self.logger(message='Selecting primers...')
elements_set = set(
index
for rec_id, named_indices in indices_to_cover.items()
for index in named_indices.values()
)
def heuristic(named_subset, selected):
name, subset = named_subset
primer_is_reused = name in available_primers_seqs
reuse_bonus = self.primer_reuse_bonus * primer_is_reused
return len(subset) + reuse_bonus
subsets = primers_coverages.items()
primers_cover = minimal_cover(elements_set, subsets=subsets,
heuristic=heuristic)
# REORGANIZE AND NAME THE SELECTED PRIMERS
available_primers_names = [p.name for p in available_primers]
selected_primers = []
selected_primer_from_seq = {}
for primer_seq in primers_cover:
if primer_seq in available_primers_dict:
name = available_primers_dict[primer_seq].name
primer = available_primers_dict[primer_seq]
infos = primer.metadata.get('infos', '')
primer = Primer(primer.name, primer.sequence,
metadata={'available': True, 'infos': infos})
else:
name = self.generate_primer_name(
available_primers_names=available_primers_names,
prefix=new_primers_prefix,
n_digits=new_primers_digits
)
try:
part, index = self.find_subsequence_in_records(
sequence=primer_seq, records=records)
infos = "From part %s (at position %d)" % (part, index)
except ValueError:
infos = "No containing part could be identified"
available_primers_names.append(name)
primer = Primer(name, primer_seq,
metadata={'available': False, 'infos': infos})
selected_primers.append(primer)
selected_primer_from_seq[primer_seq] = primer
# CHOOSE A MINIMAL PRIMER COVER FOR EACH CONSTRUCT
per_record_primers = []
for record in self.logger.iter_bar(record=records):
elements = set(indices_to_cover[record.id].values())
subcovers = {
prim_seq: primers_coverages[prim_seq].intersection(elements)
for prim_seq in primers_cover
}
# subsets = deepcopy(list(subcovers.items()))
subsets = list(subcovers.items())
sub_primers_cover = minimal_cover(elements, subsets=subsets,
heuristic=heuristic)
# All primers selected for this construct, sorted by availability.
sub_selected_primers = sorted([
selected_primer_from_seq[primer_seq]
for primer_seq in sub_primers_cover
], key=lambda p: p.metadata['available'])
per_record_primers.append(sub_selected_primers)
return per_record_primers
[docs] def compute_indices_to_cover(self, record):
"""List all indices in the record which should be covered.
These indices are equidistant points inside the user-defined zones to
cover in the record.
The use determines the zones to cover via features of
type ``misc_feature`` and label 'cover'.
"""
segments_to_cover = [
sorted([int(f.location.start), int(f.location.end)])
for f in record.features
if f.type == 'misc_feature'
and "".join(f.qualifiers.get('label', '')) == 'cover'
]
res = self.coverage_resolution
return set([
int(indice) % len(record)
for (start, end) in segments_to_cover
for indice in np.linspace(start, end, 1.0 * (end - start) / res)
])
[docs] @staticmethod
def locate_primer_sequence(primer, sequence):
"""Find the location (start, end, strand) of a primer in the sequence.
Return None if the primer sequence and its reverse complement are not
found in the sequence.
"""
ind = sequence.find(primer)
strand = 1
if ind == -1:
ind = sequence.find(reverse_complement(primer))
if ind == -1:
return None
else:
strand = -1
start, end = ind, ind + len(primer)
return start, end, strand
[docs] def compute_forbidden_patterns_locations(self, record):
"""Return an array where ``arr[i] == 1`` means that i is surrounded by
a user-forbidden pattern."""
pattern_constraints = [AvoidPattern(homopolymer_pattern(c, 5))
for c in 'ATGC']
kmer_constraints = [AvoidPattern(repeated_kmers(k, n))
for k, n in [(4, 2), (3, 3), (2, 4)]]
problem = DnaOptimizationProblem(
sequence=record,
constraints=pattern_constraints + kmer_constraints
)
constraints_breaches = group_overlapping_segments([
(f.location.start, f.location.end)
for f in problem.constraints_evaluations().locations_as_features()
])
return segments_to_array(constraints_breaches, len(record))
[docs] def compute_user_forbidden_locations(self, record):
"""Return an array where ``arr[i] == 1`` means that i is surrounded by
a user-forbidden location."""
forbidden_segments = [
sorted([int(f.location.start), int(f.location.end)])
for f in record.features
if f.type == 'misc_feature'
and "".join(f.qualifiers.get('label', '')) == 'no_primer'
]
return segments_to_array(forbidden_segments, len(record))
[docs] def compute_nonunique_segments_locations(self, record):
"""Return an array where ``arr[i] == 1`` means that i is surrounded by
a non-unique location."""
sequence = str(record.seq)
non_unique_segments = find_non_unique_segments(sequence)
return segments_to_array(non_unique_segments, len(record))
[docs] def compute_all_forbidden_locations(self, record):
"""Return an array indicating which positions should be avoided.
We take into account forbidden patterns, user-forbidden locations,
and non-unique locations.
``arr[i] == 1`` indicates that position i should be avoided in the
record.
"""
return np.maximum(*(f(record) for f in (
self.compute_nonunique_segments_locations,
self.compute_forbidden_patterns_locations,
self.compute_user_forbidden_locations
)))
[docs] def compute_sequence_primers(self, record):
"""Return, primers for the sequence, one around each index.
The primers are chosen to fit the length and melting temperature
specified by the class parameters.
Parameters
----------
record
The record in which to list the primers
Returns
-------
primers
List of primers sequences.
"""
sequence = str(record.seq)
L = len(sequence)
rev_sequence = reverse_complement(sequence)
locations = find_best_primer_locations(
sequence,
tm_range=self.tm_range,
size_range=self.size_range
)
return list(set(sum([
[
(sequence[l[0]: l[1]], (l[0], l[1], 1)),
(rev_sequence[L - l[1]: L - l[0]], (l[0], l[1], -1))
]
for l in locations
if l is not None
], [])))
[docs] def compute_all_primers_coverage_on_record(self, record, indices_to_cover,
available_primers):
"""Return, for each primer, the list of indices covered in this record.
Parameters
----------
record
The record in which to search
indices_to_cover
List of indices to cover (defined by the user) in this record.
available_primers
List of candidate primers.
Returns
-------
primers_coverage
``{primer_name: list_of_indices_covered_in_this_record}``.
"""
sequence = str(record.seq)
linear = record.__dict__.get('linear', True)
L = len(sequence)
forbidden_locations = self.compute_all_forbidden_locations(record)
sequence_primers = self.compute_sequence_primers(record)
reusable_primers_sequences = [
(primer, location) for (primer, location) in [
(primer, self.locate_primer_sequence(primer, sequence))
for primer in available_primers
]
if location is not None
]
primers = reusable_primers_sequences + sequence_primers
iter_bar = self.logger.iter_bar
primers_coverages = {}
for primer, (start, end, strand) in iter_bar(primer=primers):
if forbidden_locations[start:end].max():
continue
if not all(cond(primer) for cond in self.primers_conditions):
continue
if strand == 1:
coverage_start = end + self.read_range[0]
coverage_end = end + self.read_range[1]
else:
coverage_start = start - self.read_range[1]
coverage_end = start - self.read_range[0]
if (not linear) and (coverage_start < 0):
a, b, c, d = -1000, coverage_end, L + coverage_start, L + 1000
elif (not linear) and (coverage_end > L):
a, b, c, d = -1000, coverage_end - L, coverage_start, L + 1000
else:
a, b, c, d = (coverage_start, coverage_end,
coverage_start, coverage_end)
primers_coverages[primer] = set([
indices_to_cover[ind]
for ind in indices_to_cover
if (a <= ind <= b) or (c <= ind <= d)
])
return primers_coverages
[docs] def generate_primer_name(self, prefix='P', available_primers_names=(),
n_digits=6):
"""Return a suitable primer name, considering existing primer names.
The result will be of the form P000425 where 'P' is the prefix and
'000425' means that 'P000424' was the highest-numbered primer name
sarting with P in the list of available primer names
Parameters
----------
prefix
The prefix for the primers name
available_primers_names
List of already-allocated primer names
"""
max_index = 0
for name in available_primers_names:
if not name.startswith(prefix):
continue
index = int(re.search(r'\d+', name[len(prefix):]).group())
if (index is not None) and (index > max_index):
max_index = index
return prefix + str(max_index + 1).zfill(n_digits)
[docs] def find_part_name_in_record(self, record, index):
"""Find a part where the sequence appears in the provided record.
This is used to provide primers infos ("where does this come from ?").
Parameters
----------
sequence
An ATGC string representing a sequence (of a primer)
record
A single Biopython records where to find the sequence. The parts
should be marked by features with a qualifier ``{'part': part_name}``
Returns
-------
part_name, index
The name of the part, and position in the part, where it was found.
If the sequence is found in different parts, only the first find
is returned.
"""
for f in record.features:
if 'part' not in str(f.qualifiers):
continue
part_name = ''.join(f.qualifiers['part'])
strand = f.location.strand
start, end = sorted([int(e) for e in [f.location.start,
f.location.end]])
if start <= index <= end:
distance = (index - start) if (strand >= 0) else (end - index)
return (part_name, distance)
else:
raise ValueError(
'The given index %d does not correspond to any part in "%s"'
% (index, record.id)
)
[docs] def find_subsequence_in_records(self, sequence, records):
"""Find a part where the sequence appears in the provided records.
This is used to provide primers infos ("where does this come from ?").
This will look for either the sequence or its reverse-complement.
Parameters
----------
sequence
An ATGC string representing a sequence (of a primer)
records
A list of Biopython records where to find the sequence. The parts
should be marked by features with a qualifier ``{'part': part_name}``
Returns
-------
part_name, index
The name of the part, and position in the part, where it was found.
If the sequence is found in different parts, only the first find
is returned.
"""
for r in records:
ind = r.seq.find(sequence)
if ind > 0:
part_name, index = self.find_part_name_in_record(r, ind)
break
else:
ind = r.seq.reverse_complement().find(sequence)
if ind > 0:
part_name, index = self.find_part_name_in_record(r, ind)
break
else:
raise ValueError('sequence not found in records')
return (part_name, index)
[docs] def name_subsequence_in_records(self, sequence, records, prefix='P'):
"""Write a table of primers with columns 'name', 'sequence', etc.
The columns after 'sequence' are one column per primer metadata, such
as 'available', 'infos', etc.
Parameters
----------
primers
A list of primers, or list of list, as returned by ``select_primers``
csv_path
The path to a csv file to write to. If None, no file is written, only
the pandas dataframe of the table is returned.
Returns
-------
dataframe
The pandas dataframe of the table.
"""
for r in records:
ind = r.seq.find(sequence)
if ind > 0:
part_name, index = self.find_part_name_in_record(r, ind)
break
else:
ind = r.seq.reverse_complement().find(sequence)
if ind > 0:
part_name, index = self.find_part_name_in_record(r, ind)
break
else:
raise ValueError('sequence not found in records')
return "%s%s_%04d" % (prefix, part_name, index)
[docs] def plot_coverage(self, records, selected_primers, pdf_path):
"""Plot the predicted sequencing coverage for each construct."""
# PLOT THE PREDICTED SEQUENCING COVERAGE FOR EACH CONSTRUCT
with PdfPages(pdf_path) as pdf:
iterator = zip(records, selected_primers)
self.logger(message='Plotting coverages...')
for record, primers in self.logger.iter_bar(record=iterator):
matches_set = simulate_sequencing(
record, primers=primers, read_range=self.read_range,
linear=record.__dict__.get('linear', True)
)
ax = matches_set.plot(plot_reference=True, plot_coverage=False)
ax.set_title(record.id, fontsize=14, weight='bold')
for x in self.compute_indices_to_cover(record):
ax.axvline(x, c='#fceddb', lw=2, ls='--', zorder=-2000)
pdf.savefig(ax.figure, bbox_inches='tight')
plt.close(ax.figure)
[docs] def write_records_primers_table(self, selected_primers, records, sep='|',
csv_path=None):
"""Write a table with columns 'construct','primers' for this construct.
Parameters
----------
selected_primers
The list of list of primers, as returned by the ``select_primers``
method.
records
The list of records, as provided to the ``select_primers`` method.
sep
The separator between the different primer names in the ``primers``
column. Avoid ';' or ',' as this might be used by the CSV formatter.
csv_path
The path to a csv file to write to. If None, no file is written, only
the pandas dataframe of the table is returned.
Returns
-------
dataframe
The pandas dataframe of the table.
"""
df = pandas.DataFrame.from_records([
{
'record': record.id,
'primers': sep.join([p.name for p in primers_list])
}
for record, primers_list in zip(records, selected_primers)
], columns=['record', 'primers'])
if csv_path is not None:
df.to_csv(csv_path, index=False)
return df
[docs] def write_primers_table(self, selected_primers, csv_path=None):
"""Write a table of primers with columns 'name', 'sequence', etc.
The columns after 'sequence' are one column per primer metadata, such
as 'available', 'infos', etc.
Parameters
----------
primers
A list of primers, or list of list, as returned by ``select_primers``
csv_path
The path to a csv file to write to. If None, no file is written, only
the pandas dataframe of the table is returned.
Returns
-------
dataframe
The pandas dataframe of the table.
"""
if isinstance(selected_primers[0], (list, tuple)):
selected_primers = set([p for pp in selected_primers for p in pp])
df = Primer.list_to_spreadsheet(selected_primers)
df = df.sort_values(by=['available', 'name'])
if csv_path is not None:
df.to_csv(csv_path, index=False)
return df
def write_multifile_report(self, records, selected_primers,
target='@memory'):
root = flametree.file_tree(target)
self.plot_coverage(
records=records,
selected_primers=selected_primers,
pdf_path=root._file('coverages_plots.pdf').open('wb'))
self.write_primers_table(
selected_primers=selected_primers,
csv_path=root._file('primers_list.csv')
)
self.write_records_primers_table(
records=records,
selected_primers=selected_primers,
csv_path=root._file('primers_per_record.csv')
)
return root._close()