"""Defines central class BlockFinder."""
import itertools
from collections import OrderedDict
from copy import deepcopy
from .CommonBlocksRecordTranslator import CommonBlocksRecordTranslator
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from ..biotools import annotate_record
from .commonblocks_tools import (
format_sequences_as_dicts,
select_common_blocks,
find_homologies_between_sequences,
)
# TODO: Simplify the code by using the new Location class in Location.py
[docs]class CommonBlocks:
"""Class to represent a set of common blocks from different sequences.
Create with ``CommonBlocks.from_sequences``:
>>> common_blocks = CommonBlocks.from_sequences({'s1': 'ATGC...'})
Parameters
----------
common_blocks
A dictionary of the sequences to compare, of the form
{sequence_name: ATGC_sequence_string} or a list of records, all with
different IDs.
records
A dictionary of the Biopython records of the sequences
{record_id: record}.
"""
def __init__(self, common_blocks, records):
"""Initialize, compute best blocks."""
self.common_blocks = common_blocks
self.records = records
@staticmethod
def from_sequences(
sequences,
block_selection_method="most_coverage_first",
include_self_homologies=True,
min_block_size=80,
max_block_size=None,
):
sequences_dict, records_dict = format_sequences_as_dicts(sequences)
homologies_dict = find_homologies_between_sequences(
sequences_dict,
min_size=min_block_size,
max_size=max_block_size,
include_self_homologies=include_self_homologies,
)
common_blocks = select_common_blocks(
homologies_dict,
sequences_dict,
min_size=min_block_size,
method=block_selection_method,
)
return CommonBlocks(common_blocks=common_blocks, records=records_dict)
[docs] def compute_unique_blocks(self):
"""Return a dictionary listing unique blocks by sequence.
The unique blocks are the blocks between the selected common blocks.
The result is of the form {seq: [(start, end), (start2, end2), ...]}
"""
unique_blocks = OrderedDict()
for seqname, rec in self.sequences_with_annotated_blocks().items():
blocks_locations = (
[(0, 0)]
+ sorted(
[
(f.location.start, f.location.end)
for f in rec.features
if f.qualifiers.get("is_block", False)
]
)
+ [(len(rec), len(rec))]
)
unique_blocks[seqname] = [
(end1, start2)
for (_, end1), (start2, _) in zip(
blocks_locations, blocks_locations[1:]
)
if (start2 - end1) > 1
]
return unique_blocks
[docs] def common_blocks_to_csv(self, target_file=None):
"""Write the common blocks into a CSV file.
If a target CSV file is provided the result is written to that file.
Otherwise the result is returned as a string.
The columns of the CSV file are "block", "size", "locations", and
sequence.
"""
csv_content = "\n".join(
["block;size;locations;sequence"]
+ [
";".join(
[
block_name,
str(len(data["sequence"])),
" ".join(
[
"%s(%d, %d, %d)" % (cst, start, end, strand)
for (cst, (start, end, strand)) in data["locations"]
]
),
data["sequence"],
]
)
for block_name, data in self.common_blocks.items()
]
)
if target_file:
with open(target_file, "w+") as f:
f.write(csv_content)
else:
return csv_content
[docs] def common_blocks_records(self):
"""Return all common blocks as a list of Biopython records.
"""
if self.records is None:
raise ValueError("")
records = []
for block_name, data in self.common_blocks.items():
cst, (start, end, strand) = data["locations"][0]
record = self.records[cst][start:end]
if strand == -1:
record = record.reverse_complement()
record.id = record.name = block_name
records.append(record)
return records
[docs] def unique_blocks_records(self, target_file=None):
"""Return all unique blocks as a list of Biopython records."""
if self.records is None:
raise ValueError("")
records = []
for seqname, locations in self.compute_unique_blocks().items():
for i, (start, end) in enumerate(locations):
record = self.records[seqname][start:end]
record.id = "%s_%03d" % (seqname, i)
records.append(record)
return records
[docs] def sequences_with_annotated_blocks(self, colors="auto"):
"""Return a list of Biopython records representing the sequences
with annotations indicating the common blocks.
Parameter ``colors`` is either a list of colors or "auto" for the
default.
"""
records = deepcopy(self.records)
if colors == "auto":
colors = itertools.cycle([cm.Paired(0.21 * i % 1.0) for i in range(30)])
blocks_and_colors = zip(self.common_blocks.items(), colors)
for (name, data), color in blocks_and_colors:
for (seqname, location) in data["locations"]:
annotate_record(
records[seqname],
location,
feature_type="misc_feature",
is_block=True,
label=name,
color=color,
)
return records
[docs] def plot_common_blocks(
self, colors="auto", axes=None, figure_width=10, ax_height=2
):
"""Plot the common blocks found on vertically stacked axes.
The axes on which the plots are drawn are returned at the end.
Parameters
----------
colors
Either a list of colors to use for blocks or "auto" for the default.
axes
A list of matplotlib axes on which to plot, or None for new axes.
figure_width
Width of the final figure in inches.
ax_eight
Height of each plot.
"""
translator = CommonBlocksRecordTranslator()
records = self.sequences_with_annotated_blocks(colors=colors)
if axes is None:
fig, axes = plt.subplots(
len(self.records),
1,
facecolor="white",
sharex=True,
figsize=(figure_width, ax_height * len(self.records)),
)
else:
fig = axes[0].figure
for (ax, (seqname, record)) in zip(axes, records.items()):
gr_record = translator.translate_record(record)
gr_record.plot(
ax,
x_lim=(0, max([len(rec) for rec in self.records.values()])),
with_ruler=(ax == axes[-1]),
)
ax.set_ylim(top=ax.get_ylim()[1])
ax.set_title(seqname, loc="left", fontdict=dict(weight="bold"))
# fig.tight_layout()
return axes
def copy_features_between_common_blocks(self, inplace=False):
def extract_subrecord(record, location):
start, end, strand = location
record = record[start:end]
if strand == -1:
record = record.reverse_complement()
return record
def extract_features(record, offset, reverse=False):
if reverse:
record = record.reverse_complement()
new_features = [deepcopy(f) for f in record.features]
for f in new_features:
f.qualifiers["original_record"] = record.id
for f in new_features:
f.location += offset
return new_features
if inplace:
records = self.records
else:
records = deepcopy(self.records)
for data in self.common_blocks.values():
locations = data["locations"]
subrecords = {
rec_id: extract_subrecord(records[rec_id], location)
for rec_id, location in data["locations"]
}
for l1, l2 in itertools.combinations(locations, 2):
for ((id1, loc1), (id2, __loc2)) in ((l1, l2), (l2, l1)):
start1, __end1, strand1 = loc1
# start2, end2, strand2 = loc2
records[id1].features += extract_features(
subrecords[id2], offset=start1, reverse=(strand1 == -1)
)
return records