Skip to content

Module ediacara.Comparator

None

None

View Source
import itertools

import re

import statistics

import matplotlib.pyplot as plt

import numpy

import pandas

from weighted_levenshtein import lev

from Bio import SeqIO

import cyvcf2

import dna_features_viewer

import geneblocks

vcf_selected_info_keytable = {

    "DP": "Total read depth at the locus",

    "RO": "Reference allele observation count",

    "AO": "Alternate allele observations",

    "TYPE": "The type of allele (either snp, mnp, ins, del or complex)",

}

result_keywords = {

    "good": "PASS",

    "error": "FAIL",

    "warning": "WARNING",

    "uncertain": "LOW_COVERAGE",

}

class CustomTranslator(dna_features_viewer.BiopythonTranslator):

    """Custom translator."""

    def compute_filtered_features(self, features):

        """Display only "From " features and overhangs."""

        filtered_features = []

        for feature in features:

            try:  # may not have a 'label', count 4 to include overhang annotations

                if (len(feature.qualifiers.get("label", "")[0]) == 4) or (

                    "From " in str(feature.qualifiers.get("label", ""))

                ):

                    filtered_features += [feature]

            except:

                pass

        return filtered_features

class SequencingGroup:

    """Analyse alignments of a sequencing run.

    **Parameters**

    **comparatorgroups**

    > List of `ComparatorGroup` instances.

    **name**

    > Name of the sequencing project (`str`).

    """

    def __init__(self, comparatorgroups, name="Unnamed"):

        self.comparatorgroups = comparatorgroups

        self.name = name

        self.vcf_cutoff = 20  # max number of VCF entries to display in report

        self.result_keywords = result_keywords

    def perform_all_comparisons_in_sequencinggroup(self):

        self.number_of_reads = 0

        for comparatorgroup in self.comparatorgroups:

            comparatorgroup.perform_all_comparisons()

            self.number_of_reads += comparatorgroup.n_fastq_reads

        self.number_of_barcodes = len(self.comparatorgroups)

        self.comparisons_performed = True

class ComparatorGroup:

    """Analyse alignments to a set of references.

    **Parameters**

    **references**

    > Dictionary of name: Biopython record.

    **alignments**

    > Dictionary of pandas dataframes: {"paf": df_paf, "tsv": df_tsv}. The PAF dataframe

    was created from the output of `paftools.js sam2paf` and the TSV dataframe was

    created from the output of `samtools depth -aa`.

    **barcode**

    > The barcode number or ID (`str`).

    **assembly_paths**

    > Dictionary of sample: consensus sequence path (`dict`).

    **vcf_paths**

    > Dictionary of sample: filtered VCF filepath (`dict`).

    """

    def __init__(

        self,

        references,

        alignments,

        barcode="barcode",

        assembly_paths=None,

        vcf_paths=None,

    ):

        self.references = references

        self.paf = alignments["paf"]

        self.tsv = alignments["tsv"]

        self.comparators = []

        self.barcode = barcode

        self.assembly_paths = assembly_paths

        self.vcf_paths = vcf_paths

    def create_comparator(self, reference_name):

        """Create a Comparator instance from one of the loaded references.

        **Parameters**

        **reference_name**

        > The name of the reference (`str`).

        """

        subset_paf = self.subset_paf(reference_name)

        subset_tsv = self.tsv[self.tsv["name"] == reference_name]

        alignment = {"paf": subset_paf, "tsv": subset_tsv}

        comparator = Comparator(

            {reference_name: self.references[reference_name]}, alignment

        )

        return comparator

    def add_comparator(self, reference_name):

        """Create comparator and add to the group. See create_comparator()."""

        self.comparators += [self.create_comparator(reference_name)]

    def subset_paf(self, reference_name):

        """Subset PAF dataframe for the reference.

        Select reads that align to the reference and also align *best* to the reference.

        **Parameters**

        **reference_name**

        > Name of the reference sequence to filter for (`str`).

        """

        # Work only with the values used for filtering:

        paf3 = self.paf[["query_name", "target_name", "mapping_matches"]]

        grouped = paf3.groupby("query_name")

        selected_reads = []

        for name, group in grouped:

            # not all reads align to the reference:

            if reference_name in group["target_name"].values:

                max_matches = group["mapping_matches"].max()

                # filter that the read aligns best to this reference:

                if (

                    max(

                        group[group["target_name"] == reference_name][

                            "mapping_matches"

                        ].values

                    )

                    == max_matches

                ):

                    selected_reads += [name]

        # Filter for selected reads ..

        subset_paf_filtered = self.paf[self.paf["query_name"].isin(selected_reads)]

        # ..then get only the reads aligning to reference:

        subset_paf_final = subset_paf_filtered[

            subset_paf_filtered["target_name"] == reference_name

        ]

        return subset_paf_final

    def perform_all_comparisons(self):

        """Call perform_comparison() on each Comparator.

        **Parameters**

        **assembly_paths**

        > Dictionary of construct name: consensus sequence file.

        For example: `{"Construct_1": "/path/to/assembly.fa"}`.

        """

        if self.assembly_paths is None:

            assembly_paths = {}

        else:

            assembly_paths = self.assembly_paths

        if self.vcf_paths is None:

            vcf_paths = {}

        else:

            vcf_paths = self.vcf_paths

        for comparator in self.comparators:

            try:

                assembly_path = assembly_paths[comparator.name]

            except Exception:

                assembly_path = None

            try:

                vcf_path = vcf_paths[comparator.name]

            except Exception:

                vcf_path = None

            comparator.perform_comparison(

                assembly_path=assembly_path, vcf_path=vcf_path

            )

        self.comparisons_performed = True

        self.summarise_analysis()

    def summarise_analysis(self):

        """Create variables for the PDF report summary."""

        self.number_of_constructs = len(self.comparators)

        self.result_good = 0

        self.result_warning = 0

        self.result_error = 0

        self.result_uncertain = 0

        names = []

        reference_lengths = []

        results = []

        number_of_reads_aligning = []

        median_coverages = []

        for comparator in self.comparators:

            names += [comparator.name]

            reference_lengths += [str(comparator.reference_length)]

            number_of_reads_aligning += [

                str(len(comparator.paf["query_name"].unique()))

            ]

            median_coverages += [str(int(comparator.median_yy))]

            if comparator.is_uncertain:

                self.result_uncertain += 1

                comparator.result = result_keywords["uncertain"]

            else:

                if comparator.has_errors:

                    self.result_error += 1

                    comparator.result = result_keywords["error"]

                else:

                    if comparator.has_warnings:

                        self.result_warning += 1

                        comparator.result = result_keywords["warning"]

                    else:

                        self.result_good += 1

                        comparator.result = result_keywords["good"]

            results += [comparator.result]

        self.n_fastq_reads = len(set(self.paf.query_name))

        self.fastq_plot = self.plot_fastq_histogram()

        # Summary table

        d = {

            "Name": pandas.Series(names),

            "Result": pandas.Series(results),

            "Length [bp]": pandas.Series(reference_lengths),

            "FASTQ reads": pandas.Series(number_of_reads_aligning),

            "Coverage [x]": pandas.Series(median_coverages),

        }

        self.summary_table = pandas.DataFrame(d)

        # plt.close("all")

    def plot_fastq_histogram(self, n_bins=50):

        """Plot a histogram of the FASTQ reads.

        **Parameters**

        **n_bins**

        > Number of bins in the histogram (`int`).

        """

        fig, ax = plt.subplots()

        plt.hist(

            self.paf["query_length"],

            bins=int(self.paf.iloc[0]["target_length"] / n_bins),

            alpha=0.8,

        )

        ax.set_ylabel("Number of reads")

        ax.set_xlabel("Read length [bp]")

        # Mark expected lengths for interpreting the plot:

        for comparator in self.comparators:

            plt.axvline(x=len(comparator.record), color="#fd5a31")

        return fig

    @staticmethod

    def load_paf(paf_path):

        """Create a dataframe from a PAF file of alignments.

        **Parameters**

        **paf_path**

        > Path (`str`) to PAF file: `paftools.js sam2paf aln.sam > aln.paf`.

        """

        try:

            paf = pandas.read_csv(paf_path, sep="\t", header=None)

        except Exception:  # unequal number of columns, the first 12 is assumed to be OK

            # The last column is CIGAR, so we need to read lines one by one:

            rows = []

            with open(paf_path, "r") as f_input:

                paf_lines = f_input.read().splitlines()

                for paf_line in paf_lines:

                    paf_line = paf_line.split("\t")  # PAF is tab separated

                    row = paf_line[0:12] + [";".join(paf_line[12:-1])] + [paf_line[-1]]

                    rows += [row]

            paf = pandas.DataFrame(rows)

            # Set numeric columns:

            numeric_columns = [1, 2, 3, 6, 7, 8, 9, 10, 11]  # see list below

            paf[numeric_columns] = paf[numeric_columns].apply(pandas.to_numeric)

        # First 12 columns are defined by the format:

        columns_12 = [

            "query_name",

            "query_length",

            "query_start",

            "query_end",

            "strand",

            "target_name",

            "target_length",

            "target_start",

            "target_end",

            "mapping_matches",

            "mapping_size",

            "mapping_quality",

        ]

        # For additional columns optionally created during alignment:

        columns_13plus = [str(index) for index in range(12, len(paf.columns))]

        paf.columns = columns_12 + columns_13plus

        return paf

    @staticmethod

    def load_tsv(tsv_path):

        """Create a dataframe from a TSV file of depth counts.

        **Parameters**

        **tsv_path**

        > Path to output of `samtools depth -aa` (`str`).

        """

        tsv = pandas.read_csv(tsv_path, sep="\t", header=None)

        tsv.columns = ["name", "position", "depth"]

        return tsv

class Comparator:

    """Analyse alignments to a reference.

    **Parameters**

    **reference**

    > Dictionary of one name: one Biopython record.

    **alignment**

    > Dictionary of pandas dataframes: {"paf": df_paf, "tsv": df_tsv}. The dataframes

    contain entries only for the reference.

    """

    def __init__(self, reference, alignment):

        self.name = next(iter(reference))

        self.record = next(iter(reference.values()))

        self.paf = alignment["paf"]

        self.tsv = alignment["tsv"]

        self.coverage_cutoff = 0.5  # fraction of median coverage

        self.coverage_cutoff_pct = int(self.coverage_cutoff * 100)  # for PDF report

        self.reference_length = len(self.record)

        self.has_warnings = False

        self.has_errors = False

        self.geneblocks_outcome = "none"  # stores outcome, used in PDF report making

        # Set True if references are DNA Cauldron-simulated files:

        self.dnacauldron = True  # for plotting

        self.has_consensus = False  # for consensus sequence comparison

        self.is_uncertain = True  # for sequencing data quality, used in report

    def perform_comparison(self, assembly_path=None, vcf_path=None):

        """Plot coverage and compare reference with a consensus sequence.

        **Parameters**

        **assembly_path**

        > Optional. Path to a consensus FASTA file (`str`).

        """

        self.fig = self.plot_coverage()

        if assembly_path is not None:

            self.comparison_figure = self.compare_with_assembly(

                assembly_path=assembly_path

            )

            self.has_consensus = True

        if vcf_path is not None:

            self.vcf_table = self.create_vcf_table(vcf_path=vcf_path)

        # plt.close("all")

    def create_vcf_table(self, vcf_path):

        vcf_dict = {key: [] for key in vcf_selected_info_keytable.keys()}

        vcf_dict["REF"] = []

        vcf_dict["ALT"] = []

        vcf_dict["LOC"] = []

        for variant in cyvcf2.VCF(vcf_path):

            vcf_dict["REF"] += [variant.REF]

            vcf_dict["ALT"] += [variant.ALT]

            vcf_dict["LOC"] += [variant.start]

            for key in vcf_selected_info_keytable.keys():

                vcf_dict[key] += [variant.INFO.get(key)]

        vcf_table = pandas.DataFrame(

            vcf_dict, columns=["LOC", "REF", "ALT", "TYPE", "DP", "RO", "AO"]

        )  # also set the order of the columns

        return vcf_table

    def find_big_inserts(self, threshold=100):

        """Calculate % of reads with big inserts.

        **Parameters**

        **threshold**

        > Size of insert in bp to be considered big (`int`)."""

        read_insert_dict = {

            read_id: False for read_id in self.paf["query_name"].unique()

        }  # Initialize. Value True when read has insert.

        for index, row in self.paf.iterrows():

            read_id = row["query_name"]

            cigar = row.tail(1).item()  # CIGAR string column is the last

            for size, mutation in re.findall(r"(\d+)([A-Z]{1})", cigar):

                if int(size) >= threshold:

                    if mutation == "I":  # CIGAR code for insert

                        read_insert_dict[read_id] = True

                        break

        fraction_of_big_inserts = sum(read_insert_dict.values()) / len(

            read_insert_dict.keys()

        )

        # imprecise rounding, but ok for our purposes:

        self.pct_big_insert = round(fraction_of_big_inserts * 100, 1)  # pct multiplier

    def calculate_stats(self):

        """Calculate statistics for the coverage plot, used in plot_coverage()."""

        self.xx = numpy.arange(len(self.record.seq))  # for the plot x axis

        self.yy = self.tsv["depth"].to_list()  # for plotting coverage

        self.median_yy = statistics.median(self.yy)

        if self.median_yy < 30:  # a good cutoff for low-depth sequencing

            self.has_low_coverage = True

            self.has_warnings = True

        else:

            self.is_uncertain = False

            self.has_low_coverage = False

        self.find_big_inserts()

        if self.pct_big_insert >= 50:  # at least half of reads have big insert

            self.has_errors = True

            self.has_big_insert = True

        if self.pct_big_insert >= 5:  # five pct has big insert

            self.has_warnings = True

            self.has_reads_with_insert = True

        else:

            self.has_big_insert = False

            self.has_reads_with_insert = False

        # This section creates a list of low coverage position to be reported

        indices = [

            i

            for i, value in enumerate(self.yy)

            if value < self.median_yy * self.coverage_cutoff

        ]

        G = (

            list(x)

            for _, x in itertools.groupby(

                indices, lambda x, c=itertools.count(): next(c) - x

            )

        )

        self.low_coverage_positions_string = ", ".join(

            "-".join(map(str, (g[0], g[-1])[: len(g)])) for g in G

        )

        if self.low_coverage_positions_string == "":

            self.low_coverage_positions_string = "-"  # looks better in the pdf report

        else:

            self.has_warnings = True

    def plot_coverage(self):

        """Plot the reference with the coverage and weak reads."""

        if not hasattr(self, "xx"):

            self.calculate_stats()  # also calculates yy and zz

        # Plot

        ######

        fig, (ax1, ax2) = plt.subplots(

            2, 1, figsize=(7, 3), sharex=True, gridspec_kw={"height_ratios": [4, 1]}

        )

        if self.dnacauldron:

            graphic_record = CustomTranslator().translate_record(self.record)

        else:

            graphic_record = dna_features_viewer.BiopythonTranslator().translate_record(

                self.record

            )

        graphic_record.plot(ax=ax1, with_ruler=False, strand_in_label_threshold=4)

        # Plot coverage

        ax2.fill_between(self.xx, self.yy, alpha=0.8)

        stdev_yy = statistics.stdev(self.yy)

        ax2.set_ylim(bottom=0)

        ax2.set_ylabel("Depth")

        ax2.set_xlabel("Base [bp]")

        ax2.axhline(

            y=self.median_yy, xmin=0, xmax=1, color="grey", linestyle="-", linewidth=0.8

        )

        ax2.axhline(

            y=self.median_yy - stdev_yy,

            xmin=0,

            xmax=1,

            color="grey",

            linestyle="--",

            linewidth=0.8,

        )

        ax2.axhline(

            y=self.median_yy + stdev_yy,

            xmin=0,

            xmax=1,

            color="grey",

            linestyle="--",

            linewidth=0.8,

        )

        return fig

    def get_weak_read_histogram_data(self, cutoff):

        """Create data for the plotting function.

        **Parameters**

        **cutoff**

        > Cutoff for proportion of bases that map (`float`). Reads below the cutoff are

        viewed as weak reads. Recommended value: 0.8 (80%).

        """

        # For each read, calculate the proportion of bases that map, then create an array

        # for each read that is below the cutoff. The sum of these arrays will give the

        # depth of aligning bad reads for each base of the reference.

        self.proportion_maps = (

            self.paf["mapping_matches"] / self.paf["query_length"].to_list()

        )

        filtered_paf = self.paf[self.proportion_maps < cutoff]

        array_length = filtered_paf.iloc[0]["target_length"]

        histogram_array = []

        for index, row in filtered_paf.iterrows():

            read_array = [0] * array_length

            start = row["target_start"]

            end = row["target_end"]

            value = 1 / row["multialign"]

            values = [value] * (end - start)

            read_array[start:end] = values

            histogram_array.append(read_array)

        histogram_numpy_array = numpy.array(histogram_array)

        self.zz = numpy.sum(histogram_numpy_array, axis=0)

    def compare_with_assembly(self, assembly_path):

        """Compare the reference with a consensus sequence.

        Check length, orientation (sense or reverse complement), then use GeneBlocks to

        highlight differences between reference and the plasmid assembly.

        **Parameters**

        **assembly_path**

        > Path to a consensus FASTA file (`str`).

        """

        # Note that in this context 'assembly' means a consensus of variant calls

        self.assembly = SeqIO.read(handle=assembly_path, format="fasta")

        length_ratio = len(self.assembly) / len(self.record)

        if 0.95 < length_ratio < 1.05:  # length within reason

            self.is_length_ok = True  # can compare with expected sequence

            self.incorrect_length_msg = None

        else:

            self.geneblocks_outcome = "incorrect_length"

            self.is_length_ok = False  # do not compare sequences

            self.has_errors = True

            difference_bases = len(self.assembly) - len(self.record)

            difference_percent = (length_ratio - 1) * 100  # get as percent

            if difference_percent > 0:

                difference_text = "longer"

            else:

                difference_text = "shorter"

            self.incorrect_length_msg = (

                "The consensus is %d%% (%d bp) %s than the reference."

                % (int(abs(difference_percent)), difference_bases, difference_text)

            )

            self.geneblocks_done = False

            self.is_diffblocks_reverse = None

            self.is_comparison_successful = False  # for PDF reporting

            return

        # To find out the orientation of the assembly, we compare using Levenshtein

        lev_distance = lev(str(self.assembly.seq), str(self.record.seq))

        lev_cutoff = 50  # more than this is too much to plot. Also for most plasmids,

        # this represents ~1% difference, which is too much error.

        assembly_for_diffblocks = self.assembly

        if lev_distance > lev_cutoff:

            self.perform_geneblocks = False

        else:

            self.perform_geneblocks = True

        if self.perform_geneblocks:

            try:

                diff_blocks = geneblocks.DiffBlocks.from_sequences(

                    assembly_for_diffblocks, self.record

                )

            except KeyError:

                self.geneblocks_done = False

                self.geneblocks_outcome = "geneblocks_error"

            else:  # diffblocks had no error

                self.geneblocks_done = True

                ax1, ax2 = diff_blocks.plot(figure_width=5)

                self.is_diffblocks_reverse = False

                self.is_comparison_successful = True

                self.geneblocks_outcome = "all_good"

                return ax1

            # We try again as due to a bug in geneblocks, the reverse order may work:

            if self.geneblocks_outcome == "geneblocks_error":

                try:

                    diff_blocks = geneblocks.DiffBlocks.from_sequences(

                        self.record, assembly_for_diffblocks

                    )

                except KeyError:

                    return

                else:

                    self.geneblocks_done = True  # overwrite

                    ax1, ax2 = diff_blocks.plot(figure_width=7)

                    self.is_diffblocks_reverse = True

                    self.is_comparison_successful = True

                    self.geneblocks_outcome = "swapped_diffblocks"

                    return ax1

        else:  # too many differences

            self.geneblocks_done = False

            self.geneblocks_outcome = "too_many_differences"

            self.is_comparison_successful = False

    def filter_fastq(self, fastq_path, target=None):

        """Filter original FASTQ file for reads that best map to the reference.

        This function is used to obtain a FASTQ file that can be used for Canu assembly.

        **Parameters**

        **fastq_path**

        > Path to FASTQ file (`str`).

        **target**

        > Path to save the filtered FASTQ file (`str`).

        """

        if target is None:

            target = fastq_path + "_filtered.fastq"

        read_names = self.paf["query_name"].to_list()

        input_seq_iterator = SeqIO.parse(fastq_path, "fastq")

        seq_iterator = (

            record for record in input_seq_iterator if record.name in read_names

        )

        SeqIO.write(seq_iterator, target, "fastq")

Variables

result_keywords
vcf_selected_info_keytable

Classes

Comparator

class Comparator(
    reference,
    alignment
)
View Source
class Comparator:

    """Analyse alignments to a reference.

    **Parameters**

    **reference**

    > Dictionary of one name: one Biopython record.

    **alignment**

    > Dictionary of pandas dataframes: {"paf": df_paf, "tsv": df_tsv}. The dataframes

    contain entries only for the reference.

    """

    def __init__(self, reference, alignment):

        self.name = next(iter(reference))

        self.record = next(iter(reference.values()))

        self.paf = alignment["paf"]

        self.tsv = alignment["tsv"]

        self.coverage_cutoff = 0.5  # fraction of median coverage

        self.coverage_cutoff_pct = int(self.coverage_cutoff * 100)  # for PDF report

        self.reference_length = len(self.record)

        self.has_warnings = False

        self.has_errors = False

        self.geneblocks_outcome = "none"  # stores outcome, used in PDF report making

        # Set True if references are DNA Cauldron-simulated files:

        self.dnacauldron = True  # for plotting

        self.has_consensus = False  # for consensus sequence comparison

        self.is_uncertain = True  # for sequencing data quality, used in report

    def perform_comparison(self, assembly_path=None, vcf_path=None):

        """Plot coverage and compare reference with a consensus sequence.

        **Parameters**

        **assembly_path**

        > Optional. Path to a consensus FASTA file (`str`).

        """

        self.fig = self.plot_coverage()

        if assembly_path is not None:

            self.comparison_figure = self.compare_with_assembly(

                assembly_path=assembly_path

            )

            self.has_consensus = True

        if vcf_path is not None:

            self.vcf_table = self.create_vcf_table(vcf_path=vcf_path)

        # plt.close("all")

    def create_vcf_table(self, vcf_path):

        vcf_dict = {key: [] for key in vcf_selected_info_keytable.keys()}

        vcf_dict["REF"] = []

        vcf_dict["ALT"] = []

        vcf_dict["LOC"] = []

        for variant in cyvcf2.VCF(vcf_path):

            vcf_dict["REF"] += [variant.REF]

            vcf_dict["ALT"] += [variant.ALT]

            vcf_dict["LOC"] += [variant.start]

            for key in vcf_selected_info_keytable.keys():

                vcf_dict[key] += [variant.INFO.get(key)]

        vcf_table = pandas.DataFrame(

            vcf_dict, columns=["LOC", "REF", "ALT", "TYPE", "DP", "RO", "AO"]

        )  # also set the order of the columns

        return vcf_table

    def find_big_inserts(self, threshold=100):

        """Calculate % of reads with big inserts.

        **Parameters**

        **threshold**

        > Size of insert in bp to be considered big (`int`)."""

        read_insert_dict = {

            read_id: False for read_id in self.paf["query_name"].unique()

        }  # Initialize. Value True when read has insert.

        for index, row in self.paf.iterrows():

            read_id = row["query_name"]

            cigar = row.tail(1).item()  # CIGAR string column is the last

            for size, mutation in re.findall(r"(\d+)([A-Z]{1})", cigar):

                if int(size) >= threshold:

                    if mutation == "I":  # CIGAR code for insert

                        read_insert_dict[read_id] = True

                        break

        fraction_of_big_inserts = sum(read_insert_dict.values()) / len(

            read_insert_dict.keys()

        )

        # imprecise rounding, but ok for our purposes:

        self.pct_big_insert = round(fraction_of_big_inserts * 100, 1)  # pct multiplier

    def calculate_stats(self):

        """Calculate statistics for the coverage plot, used in plot_coverage()."""

        self.xx = numpy.arange(len(self.record.seq))  # for the plot x axis

        self.yy = self.tsv["depth"].to_list()  # for plotting coverage

        self.median_yy = statistics.median(self.yy)

        if self.median_yy < 30:  # a good cutoff for low-depth sequencing

            self.has_low_coverage = True

            self.has_warnings = True

        else:

            self.is_uncertain = False

            self.has_low_coverage = False

        self.find_big_inserts()

        if self.pct_big_insert >= 50:  # at least half of reads have big insert

            self.has_errors = True

            self.has_big_insert = True

        if self.pct_big_insert >= 5:  # five pct has big insert

            self.has_warnings = True

            self.has_reads_with_insert = True

        else:

            self.has_big_insert = False

            self.has_reads_with_insert = False

        # This section creates a list of low coverage position to be reported

        indices = [

            i

            for i, value in enumerate(self.yy)

            if value < self.median_yy * self.coverage_cutoff

        ]

        G = (

            list(x)

            for _, x in itertools.groupby(

                indices, lambda x, c=itertools.count(): next(c) - x

            )

        )

        self.low_coverage_positions_string = ", ".join(

            "-".join(map(str, (g[0], g[-1])[: len(g)])) for g in G

        )

        if self.low_coverage_positions_string == "":

            self.low_coverage_positions_string = "-"  # looks better in the pdf report

        else:

            self.has_warnings = True

    def plot_coverage(self):

        """Plot the reference with the coverage and weak reads."""

        if not hasattr(self, "xx"):

            self.calculate_stats()  # also calculates yy and zz

        # Plot

        ######

        fig, (ax1, ax2) = plt.subplots(

            2, 1, figsize=(7, 3), sharex=True, gridspec_kw={"height_ratios": [4, 1]}

        )

        if self.dnacauldron:

            graphic_record = CustomTranslator().translate_record(self.record)

        else:

            graphic_record = dna_features_viewer.BiopythonTranslator().translate_record(

                self.record

            )

        graphic_record.plot(ax=ax1, with_ruler=False, strand_in_label_threshold=4)

        # Plot coverage

        ax2.fill_between(self.xx, self.yy, alpha=0.8)

        stdev_yy = statistics.stdev(self.yy)

        ax2.set_ylim(bottom=0)

        ax2.set_ylabel("Depth")

        ax2.set_xlabel("Base [bp]")

        ax2.axhline(

            y=self.median_yy, xmin=0, xmax=1, color="grey", linestyle="-", linewidth=0.8

        )

        ax2.axhline(

            y=self.median_yy - stdev_yy,

            xmin=0,

            xmax=1,

            color="grey",

            linestyle="--",

            linewidth=0.8,

        )

        ax2.axhline(

            y=self.median_yy + stdev_yy,

            xmin=0,

            xmax=1,

            color="grey",

            linestyle="--",

            linewidth=0.8,

        )

        return fig

    def get_weak_read_histogram_data(self, cutoff):

        """Create data for the plotting function.

        **Parameters**

        **cutoff**

        > Cutoff for proportion of bases that map (`float`). Reads below the cutoff are

        viewed as weak reads. Recommended value: 0.8 (80%).

        """

        # For each read, calculate the proportion of bases that map, then create an array

        # for each read that is below the cutoff. The sum of these arrays will give the

        # depth of aligning bad reads for each base of the reference.

        self.proportion_maps = (

            self.paf["mapping_matches"] / self.paf["query_length"].to_list()

        )

        filtered_paf = self.paf[self.proportion_maps < cutoff]

        array_length = filtered_paf.iloc[0]["target_length"]

        histogram_array = []

        for index, row in filtered_paf.iterrows():

            read_array = [0] * array_length

            start = row["target_start"]

            end = row["target_end"]

            value = 1 / row["multialign"]

            values = [value] * (end - start)

            read_array[start:end] = values

            histogram_array.append(read_array)

        histogram_numpy_array = numpy.array(histogram_array)

        self.zz = numpy.sum(histogram_numpy_array, axis=0)

    def compare_with_assembly(self, assembly_path):

        """Compare the reference with a consensus sequence.

        Check length, orientation (sense or reverse complement), then use GeneBlocks to

        highlight differences between reference and the plasmid assembly.

        **Parameters**

        **assembly_path**

        > Path to a consensus FASTA file (`str`).

        """

        # Note that in this context 'assembly' means a consensus of variant calls

        self.assembly = SeqIO.read(handle=assembly_path, format="fasta")

        length_ratio = len(self.assembly) / len(self.record)

        if 0.95 < length_ratio < 1.05:  # length within reason

            self.is_length_ok = True  # can compare with expected sequence

            self.incorrect_length_msg = None

        else:

            self.geneblocks_outcome = "incorrect_length"

            self.is_length_ok = False  # do not compare sequences

            self.has_errors = True

            difference_bases = len(self.assembly) - len(self.record)

            difference_percent = (length_ratio - 1) * 100  # get as percent

            if difference_percent > 0:

                difference_text = "longer"

            else:

                difference_text = "shorter"

            self.incorrect_length_msg = (

                "The consensus is %d%% (%d bp) %s than the reference."

                % (int(abs(difference_percent)), difference_bases, difference_text)

            )

            self.geneblocks_done = False

            self.is_diffblocks_reverse = None

            self.is_comparison_successful = False  # for PDF reporting

            return

        # To find out the orientation of the assembly, we compare using Levenshtein

        lev_distance = lev(str(self.assembly.seq), str(self.record.seq))

        lev_cutoff = 50  # more than this is too much to plot. Also for most plasmids,

        # this represents ~1% difference, which is too much error.

        assembly_for_diffblocks = self.assembly

        if lev_distance > lev_cutoff:

            self.perform_geneblocks = False

        else:

            self.perform_geneblocks = True

        if self.perform_geneblocks:

            try:

                diff_blocks = geneblocks.DiffBlocks.from_sequences(

                    assembly_for_diffblocks, self.record

                )

            except KeyError:

                self.geneblocks_done = False

                self.geneblocks_outcome = "geneblocks_error"

            else:  # diffblocks had no error

                self.geneblocks_done = True

                ax1, ax2 = diff_blocks.plot(figure_width=5)

                self.is_diffblocks_reverse = False

                self.is_comparison_successful = True

                self.geneblocks_outcome = "all_good"

                return ax1

            # We try again as due to a bug in geneblocks, the reverse order may work:

            if self.geneblocks_outcome == "geneblocks_error":

                try:

                    diff_blocks = geneblocks.DiffBlocks.from_sequences(

                        self.record, assembly_for_diffblocks

                    )

                except KeyError:

                    return

                else:

                    self.geneblocks_done = True  # overwrite

                    ax1, ax2 = diff_blocks.plot(figure_width=7)

                    self.is_diffblocks_reverse = True

                    self.is_comparison_successful = True

                    self.geneblocks_outcome = "swapped_diffblocks"

                    return ax1

        else:  # too many differences

            self.geneblocks_done = False

            self.geneblocks_outcome = "too_many_differences"

            self.is_comparison_successful = False

    def filter_fastq(self, fastq_path, target=None):

        """Filter original FASTQ file for reads that best map to the reference.

        This function is used to obtain a FASTQ file that can be used for Canu assembly.

        **Parameters**

        **fastq_path**

        > Path to FASTQ file (`str`).

        **target**

        > Path to save the filtered FASTQ file (`str`).

        """

        if target is None:

            target = fastq_path + "_filtered.fastq"

        read_names = self.paf["query_name"].to_list()

        input_seq_iterator = SeqIO.parse(fastq_path, "fastq")

        seq_iterator = (

            record for record in input_seq_iterator if record.name in read_names

        )

        SeqIO.write(seq_iterator, target, "fastq")

Methods

calculate_stats

def calculate_stats(
    self
)

Calculate statistics for the coverage plot, used in plot_coverage().

View Source
    def calculate_stats(self):

        """Calculate statistics for the coverage plot, used in plot_coverage()."""

        self.xx = numpy.arange(len(self.record.seq))  # for the plot x axis

        self.yy = self.tsv["depth"].to_list()  # for plotting coverage

        self.median_yy = statistics.median(self.yy)

        if self.median_yy < 30:  # a good cutoff for low-depth sequencing

            self.has_low_coverage = True

            self.has_warnings = True

        else:

            self.is_uncertain = False

            self.has_low_coverage = False

        self.find_big_inserts()

        if self.pct_big_insert >= 50:  # at least half of reads have big insert

            self.has_errors = True

            self.has_big_insert = True

        if self.pct_big_insert >= 5:  # five pct has big insert

            self.has_warnings = True

            self.has_reads_with_insert = True

        else:

            self.has_big_insert = False

            self.has_reads_with_insert = False

        # This section creates a list of low coverage position to be reported

        indices = [

            i

            for i, value in enumerate(self.yy)

            if value < self.median_yy * self.coverage_cutoff

        ]

        G = (

            list(x)

            for _, x in itertools.groupby(

                indices, lambda x, c=itertools.count(): next(c) - x

            )

        )

        self.low_coverage_positions_string = ", ".join(

            "-".join(map(str, (g[0], g[-1])[: len(g)])) for g in G

        )

        if self.low_coverage_positions_string == "":

            self.low_coverage_positions_string = "-"  # looks better in the pdf report

        else:

            self.has_warnings = True

compare_with_assembly

def compare_with_assembly(
    self,
    assembly_path
)

Compare the reference with a consensus sequence.

Check length, orientation (sense or reverse complement), then use GeneBlocks to highlight differences between reference and the plasmid assembly.

Parameters

assembly_path

Path to a consensus FASTA file (str).

View Source
    def compare_with_assembly(self, assembly_path):

        """Compare the reference with a consensus sequence.

        Check length, orientation (sense or reverse complement), then use GeneBlocks to

        highlight differences between reference and the plasmid assembly.

        **Parameters**

        **assembly_path**

        > Path to a consensus FASTA file (`str`).

        """

        # Note that in this context 'assembly' means a consensus of variant calls

        self.assembly = SeqIO.read(handle=assembly_path, format="fasta")

        length_ratio = len(self.assembly) / len(self.record)

        if 0.95 < length_ratio < 1.05:  # length within reason

            self.is_length_ok = True  # can compare with expected sequence

            self.incorrect_length_msg = None

        else:

            self.geneblocks_outcome = "incorrect_length"

            self.is_length_ok = False  # do not compare sequences

            self.has_errors = True

            difference_bases = len(self.assembly) - len(self.record)

            difference_percent = (length_ratio - 1) * 100  # get as percent

            if difference_percent > 0:

                difference_text = "longer"

            else:

                difference_text = "shorter"

            self.incorrect_length_msg = (

                "The consensus is %d%% (%d bp) %s than the reference."

                % (int(abs(difference_percent)), difference_bases, difference_text)

            )

            self.geneblocks_done = False

            self.is_diffblocks_reverse = None

            self.is_comparison_successful = False  # for PDF reporting

            return

        # To find out the orientation of the assembly, we compare using Levenshtein

        lev_distance = lev(str(self.assembly.seq), str(self.record.seq))

        lev_cutoff = 50  # more than this is too much to plot. Also for most plasmids,

        # this represents ~1% difference, which is too much error.

        assembly_for_diffblocks = self.assembly

        if lev_distance > lev_cutoff:

            self.perform_geneblocks = False

        else:

            self.perform_geneblocks = True

        if self.perform_geneblocks:

            try:

                diff_blocks = geneblocks.DiffBlocks.from_sequences(

                    assembly_for_diffblocks, self.record

                )

            except KeyError:

                self.geneblocks_done = False

                self.geneblocks_outcome = "geneblocks_error"

            else:  # diffblocks had no error

                self.geneblocks_done = True

                ax1, ax2 = diff_blocks.plot(figure_width=5)

                self.is_diffblocks_reverse = False

                self.is_comparison_successful = True

                self.geneblocks_outcome = "all_good"

                return ax1

            # We try again as due to a bug in geneblocks, the reverse order may work:

            if self.geneblocks_outcome == "geneblocks_error":

                try:

                    diff_blocks = geneblocks.DiffBlocks.from_sequences(

                        self.record, assembly_for_diffblocks

                    )

                except KeyError:

                    return

                else:

                    self.geneblocks_done = True  # overwrite

                    ax1, ax2 = diff_blocks.plot(figure_width=7)

                    self.is_diffblocks_reverse = True

                    self.is_comparison_successful = True

                    self.geneblocks_outcome = "swapped_diffblocks"

                    return ax1

        else:  # too many differences

            self.geneblocks_done = False

            self.geneblocks_outcome = "too_many_differences"

            self.is_comparison_successful = False

create_vcf_table

def create_vcf_table(
    self,
    vcf_path
)
View Source
    def create_vcf_table(self, vcf_path):

        vcf_dict = {key: [] for key in vcf_selected_info_keytable.keys()}

        vcf_dict["REF"] = []

        vcf_dict["ALT"] = []

        vcf_dict["LOC"] = []

        for variant in cyvcf2.VCF(vcf_path):

            vcf_dict["REF"] += [variant.REF]

            vcf_dict["ALT"] += [variant.ALT]

            vcf_dict["LOC"] += [variant.start]

            for key in vcf_selected_info_keytable.keys():

                vcf_dict[key] += [variant.INFO.get(key)]

        vcf_table = pandas.DataFrame(

            vcf_dict, columns=["LOC", "REF", "ALT", "TYPE", "DP", "RO", "AO"]

        )  # also set the order of the columns

        return vcf_table

filter_fastq

def filter_fastq(
    self,
    fastq_path,
    target=None
)

Filter original FASTQ file for reads that best map to the reference.

This function is used to obtain a FASTQ file that can be used for Canu assembly.

Parameters

fastq_path

Path to FASTQ file (str).

target

Path to save the filtered FASTQ file (str).

View Source
    def filter_fastq(self, fastq_path, target=None):

        """Filter original FASTQ file for reads that best map to the reference.

        This function is used to obtain a FASTQ file that can be used for Canu assembly.

        **Parameters**

        **fastq_path**

        > Path to FASTQ file (`str`).

        **target**

        > Path to save the filtered FASTQ file (`str`).

        """

        if target is None:

            target = fastq_path + "_filtered.fastq"

        read_names = self.paf["query_name"].to_list()

        input_seq_iterator = SeqIO.parse(fastq_path, "fastq")

        seq_iterator = (

            record for record in input_seq_iterator if record.name in read_names

        )

        SeqIO.write(seq_iterator, target, "fastq")

find_big_inserts

def find_big_inserts(
    self,
    threshold=100
)

Calculate % of reads with big inserts.

Parameters

threshold

Size of insert in bp to be considered big (int).

View Source
    def find_big_inserts(self, threshold=100):

        """Calculate % of reads with big inserts.

        **Parameters**

        **threshold**

        > Size of insert in bp to be considered big (`int`)."""

        read_insert_dict = {

            read_id: False for read_id in self.paf["query_name"].unique()

        }  # Initialize. Value True when read has insert.

        for index, row in self.paf.iterrows():

            read_id = row["query_name"]

            cigar = row.tail(1).item()  # CIGAR string column is the last

            for size, mutation in re.findall(r"(\d+)([A-Z]{1})", cigar):

                if int(size) >= threshold:

                    if mutation == "I":  # CIGAR code for insert

                        read_insert_dict[read_id] = True

                        break

        fraction_of_big_inserts = sum(read_insert_dict.values()) / len(

            read_insert_dict.keys()

        )

        # imprecise rounding, but ok for our purposes:

        self.pct_big_insert = round(fraction_of_big_inserts * 100, 1)  # pct multiplier

get_weak_read_histogram_data

def get_weak_read_histogram_data(
    self,
    cutoff
)

Create data for the plotting function.

Parameters

cutoff

Cutoff for proportion of bases that map (float). Reads below the cutoff are viewed as weak reads. Recommended value: 0.8 (80%).

View Source
    def get_weak_read_histogram_data(self, cutoff):

        """Create data for the plotting function.

        **Parameters**

        **cutoff**

        > Cutoff for proportion of bases that map (`float`). Reads below the cutoff are

        viewed as weak reads. Recommended value: 0.8 (80%).

        """

        # For each read, calculate the proportion of bases that map, then create an array

        # for each read that is below the cutoff. The sum of these arrays will give the

        # depth of aligning bad reads for each base of the reference.

        self.proportion_maps = (

            self.paf["mapping_matches"] / self.paf["query_length"].to_list()

        )

        filtered_paf = self.paf[self.proportion_maps < cutoff]

        array_length = filtered_paf.iloc[0]["target_length"]

        histogram_array = []

        for index, row in filtered_paf.iterrows():

            read_array = [0] * array_length

            start = row["target_start"]

            end = row["target_end"]

            value = 1 / row["multialign"]

            values = [value] * (end - start)

            read_array[start:end] = values

            histogram_array.append(read_array)

        histogram_numpy_array = numpy.array(histogram_array)

        self.zz = numpy.sum(histogram_numpy_array, axis=0)

perform_comparison

def perform_comparison(
    self,
    assembly_path=None,
    vcf_path=None
)

Plot coverage and compare reference with a consensus sequence.

Parameters

assembly_path

Optional. Path to a consensus FASTA file (str).

View Source
    def perform_comparison(self, assembly_path=None, vcf_path=None):

        """Plot coverage and compare reference with a consensus sequence.

        **Parameters**

        **assembly_path**

        > Optional. Path to a consensus FASTA file (`str`).

        """

        self.fig = self.plot_coverage()

        if assembly_path is not None:

            self.comparison_figure = self.compare_with_assembly(

                assembly_path=assembly_path

            )

            self.has_consensus = True

        if vcf_path is not None:

            self.vcf_table = self.create_vcf_table(vcf_path=vcf_path)

plot_coverage

def plot_coverage(
    self
)

Plot the reference with the coverage and weak reads.

View Source
    def plot_coverage(self):

        """Plot the reference with the coverage and weak reads."""

        if not hasattr(self, "xx"):

            self.calculate_stats()  # also calculates yy and zz

        # Plot

        ######

        fig, (ax1, ax2) = plt.subplots(

            2, 1, figsize=(7, 3), sharex=True, gridspec_kw={"height_ratios": [4, 1]}

        )

        if self.dnacauldron:

            graphic_record = CustomTranslator().translate_record(self.record)

        else:

            graphic_record = dna_features_viewer.BiopythonTranslator().translate_record(

                self.record

            )

        graphic_record.plot(ax=ax1, with_ruler=False, strand_in_label_threshold=4)

        # Plot coverage

        ax2.fill_between(self.xx, self.yy, alpha=0.8)

        stdev_yy = statistics.stdev(self.yy)

        ax2.set_ylim(bottom=0)

        ax2.set_ylabel("Depth")

        ax2.set_xlabel("Base [bp]")

        ax2.axhline(

            y=self.median_yy, xmin=0, xmax=1, color="grey", linestyle="-", linewidth=0.8

        )

        ax2.axhline(

            y=self.median_yy - stdev_yy,

            xmin=0,

            xmax=1,

            color="grey",

            linestyle="--",

            linewidth=0.8,

        )

        ax2.axhline(

            y=self.median_yy + stdev_yy,

            xmin=0,

            xmax=1,

            color="grey",

            linestyle="--",

            linewidth=0.8,

        )

        return fig

ComparatorGroup

class ComparatorGroup(
    references,
    alignments,
    barcode='barcode',
    assembly_paths=None,
    vcf_paths=None
)
View Source
class ComparatorGroup:

    """Analyse alignments to a set of references.

    **Parameters**

    **references**

    > Dictionary of name: Biopython record.

    **alignments**

    > Dictionary of pandas dataframes: {"paf": df_paf, "tsv": df_tsv}. The PAF dataframe

    was created from the output of `paftools.js sam2paf` and the TSV dataframe was

    created from the output of `samtools depth -aa`.

    **barcode**

    > The barcode number or ID (`str`).

    **assembly_paths**

    > Dictionary of sample: consensus sequence path (`dict`).

    **vcf_paths**

    > Dictionary of sample: filtered VCF filepath (`dict`).

    """

    def __init__(

        self,

        references,

        alignments,

        barcode="barcode",

        assembly_paths=None,

        vcf_paths=None,

    ):

        self.references = references

        self.paf = alignments["paf"]

        self.tsv = alignments["tsv"]

        self.comparators = []

        self.barcode = barcode

        self.assembly_paths = assembly_paths

        self.vcf_paths = vcf_paths

    def create_comparator(self, reference_name):

        """Create a Comparator instance from one of the loaded references.

        **Parameters**

        **reference_name**

        > The name of the reference (`str`).

        """

        subset_paf = self.subset_paf(reference_name)

        subset_tsv = self.tsv[self.tsv["name"] == reference_name]

        alignment = {"paf": subset_paf, "tsv": subset_tsv}

        comparator = Comparator(

            {reference_name: self.references[reference_name]}, alignment

        )

        return comparator

    def add_comparator(self, reference_name):

        """Create comparator and add to the group. See create_comparator()."""

        self.comparators += [self.create_comparator(reference_name)]

    def subset_paf(self, reference_name):

        """Subset PAF dataframe for the reference.

        Select reads that align to the reference and also align *best* to the reference.

        **Parameters**

        **reference_name**

        > Name of the reference sequence to filter for (`str`).

        """

        # Work only with the values used for filtering:

        paf3 = self.paf[["query_name", "target_name", "mapping_matches"]]

        grouped = paf3.groupby("query_name")

        selected_reads = []

        for name, group in grouped:

            # not all reads align to the reference:

            if reference_name in group["target_name"].values:

                max_matches = group["mapping_matches"].max()

                # filter that the read aligns best to this reference:

                if (

                    max(

                        group[group["target_name"] == reference_name][

                            "mapping_matches"

                        ].values

                    )

                    == max_matches

                ):

                    selected_reads += [name]

        # Filter for selected reads ..

        subset_paf_filtered = self.paf[self.paf["query_name"].isin(selected_reads)]

        # ..then get only the reads aligning to reference:

        subset_paf_final = subset_paf_filtered[

            subset_paf_filtered["target_name"] == reference_name

        ]

        return subset_paf_final

    def perform_all_comparisons(self):

        """Call perform_comparison() on each Comparator.

        **Parameters**

        **assembly_paths**

        > Dictionary of construct name: consensus sequence file.

        For example: `{"Construct_1": "/path/to/assembly.fa"}`.

        """

        if self.assembly_paths is None:

            assembly_paths = {}

        else:

            assembly_paths = self.assembly_paths

        if self.vcf_paths is None:

            vcf_paths = {}

        else:

            vcf_paths = self.vcf_paths

        for comparator in self.comparators:

            try:

                assembly_path = assembly_paths[comparator.name]

            except Exception:

                assembly_path = None

            try:

                vcf_path = vcf_paths[comparator.name]

            except Exception:

                vcf_path = None

            comparator.perform_comparison(

                assembly_path=assembly_path, vcf_path=vcf_path

            )

        self.comparisons_performed = True

        self.summarise_analysis()

    def summarise_analysis(self):

        """Create variables for the PDF report summary."""

        self.number_of_constructs = len(self.comparators)

        self.result_good = 0

        self.result_warning = 0

        self.result_error = 0

        self.result_uncertain = 0

        names = []

        reference_lengths = []

        results = []

        number_of_reads_aligning = []

        median_coverages = []

        for comparator in self.comparators:

            names += [comparator.name]

            reference_lengths += [str(comparator.reference_length)]

            number_of_reads_aligning += [

                str(len(comparator.paf["query_name"].unique()))

            ]

            median_coverages += [str(int(comparator.median_yy))]

            if comparator.is_uncertain:

                self.result_uncertain += 1

                comparator.result = result_keywords["uncertain"]

            else:

                if comparator.has_errors:

                    self.result_error += 1

                    comparator.result = result_keywords["error"]

                else:

                    if comparator.has_warnings:

                        self.result_warning += 1

                        comparator.result = result_keywords["warning"]

                    else:

                        self.result_good += 1

                        comparator.result = result_keywords["good"]

            results += [comparator.result]

        self.n_fastq_reads = len(set(self.paf.query_name))

        self.fastq_plot = self.plot_fastq_histogram()

        # Summary table

        d = {

            "Name": pandas.Series(names),

            "Result": pandas.Series(results),

            "Length [bp]": pandas.Series(reference_lengths),

            "FASTQ reads": pandas.Series(number_of_reads_aligning),

            "Coverage [x]": pandas.Series(median_coverages),

        }

        self.summary_table = pandas.DataFrame(d)

        # plt.close("all")

    def plot_fastq_histogram(self, n_bins=50):

        """Plot a histogram of the FASTQ reads.

        **Parameters**

        **n_bins**

        > Number of bins in the histogram (`int`).

        """

        fig, ax = plt.subplots()

        plt.hist(

            self.paf["query_length"],

            bins=int(self.paf.iloc[0]["target_length"] / n_bins),

            alpha=0.8,

        )

        ax.set_ylabel("Number of reads")

        ax.set_xlabel("Read length [bp]")

        # Mark expected lengths for interpreting the plot:

        for comparator in self.comparators:

            plt.axvline(x=len(comparator.record), color="#fd5a31")

        return fig

    @staticmethod

    def load_paf(paf_path):

        """Create a dataframe from a PAF file of alignments.

        **Parameters**

        **paf_path**

        > Path (`str`) to PAF file: `paftools.js sam2paf aln.sam > aln.paf`.

        """

        try:

            paf = pandas.read_csv(paf_path, sep="\t", header=None)

        except Exception:  # unequal number of columns, the first 12 is assumed to be OK

            # The last column is CIGAR, so we need to read lines one by one:

            rows = []

            with open(paf_path, "r") as f_input:

                paf_lines = f_input.read().splitlines()

                for paf_line in paf_lines:

                    paf_line = paf_line.split("\t")  # PAF is tab separated

                    row = paf_line[0:12] + [";".join(paf_line[12:-1])] + [paf_line[-1]]

                    rows += [row]

            paf = pandas.DataFrame(rows)

            # Set numeric columns:

            numeric_columns = [1, 2, 3, 6, 7, 8, 9, 10, 11]  # see list below

            paf[numeric_columns] = paf[numeric_columns].apply(pandas.to_numeric)

        # First 12 columns are defined by the format:

        columns_12 = [

            "query_name",

            "query_length",

            "query_start",

            "query_end",

            "strand",

            "target_name",

            "target_length",

            "target_start",

            "target_end",

            "mapping_matches",

            "mapping_size",

            "mapping_quality",

        ]

        # For additional columns optionally created during alignment:

        columns_13plus = [str(index) for index in range(12, len(paf.columns))]

        paf.columns = columns_12 + columns_13plus

        return paf

    @staticmethod

    def load_tsv(tsv_path):

        """Create a dataframe from a TSV file of depth counts.

        **Parameters**

        **tsv_path**

        > Path to output of `samtools depth -aa` (`str`).

        """

        tsv = pandas.read_csv(tsv_path, sep="\t", header=None)

        tsv.columns = ["name", "position", "depth"]

        return tsv

Static methods

load_paf

def load_paf(
    paf_path
)

Create a dataframe from a PAF file of alignments.

Parameters

paf_path

Path (str) to PAF file: paftools.js sam2paf aln.sam > aln.paf.

View Source
    @staticmethod

    def load_paf(paf_path):

        """Create a dataframe from a PAF file of alignments.

        **Parameters**

        **paf_path**

        > Path (`str`) to PAF file: `paftools.js sam2paf aln.sam > aln.paf`.

        """

        try:

            paf = pandas.read_csv(paf_path, sep="\t", header=None)

        except Exception:  # unequal number of columns, the first 12 is assumed to be OK

            # The last column is CIGAR, so we need to read lines one by one:

            rows = []

            with open(paf_path, "r") as f_input:

                paf_lines = f_input.read().splitlines()

                for paf_line in paf_lines:

                    paf_line = paf_line.split("\t")  # PAF is tab separated

                    row = paf_line[0:12] + [";".join(paf_line[12:-1])] + [paf_line[-1]]

                    rows += [row]

            paf = pandas.DataFrame(rows)

            # Set numeric columns:

            numeric_columns = [1, 2, 3, 6, 7, 8, 9, 10, 11]  # see list below

            paf[numeric_columns] = paf[numeric_columns].apply(pandas.to_numeric)

        # First 12 columns are defined by the format:

        columns_12 = [

            "query_name",

            "query_length",

            "query_start",

            "query_end",

            "strand",

            "target_name",

            "target_length",

            "target_start",

            "target_end",

            "mapping_matches",

            "mapping_size",

            "mapping_quality",

        ]

        # For additional columns optionally created during alignment:

        columns_13plus = [str(index) for index in range(12, len(paf.columns))]

        paf.columns = columns_12 + columns_13plus

        return paf

load_tsv

def load_tsv(
    tsv_path
)

Create a dataframe from a TSV file of depth counts.

Parameters

tsv_path

Path to output of samtools depth -aa (str).

View Source
    @staticmethod

    def load_tsv(tsv_path):

        """Create a dataframe from a TSV file of depth counts.

        **Parameters**

        **tsv_path**

        > Path to output of `samtools depth -aa` (`str`).

        """

        tsv = pandas.read_csv(tsv_path, sep="\t", header=None)

        tsv.columns = ["name", "position", "depth"]

        return tsv

Methods

add_comparator

def add_comparator(
    self,
    reference_name
)

Create comparator and add to the group. See create_comparator().

View Source
    def add_comparator(self, reference_name):

        """Create comparator and add to the group. See create_comparator()."""

        self.comparators += [self.create_comparator(reference_name)]

create_comparator

def create_comparator(
    self,
    reference_name
)

Create a Comparator instance from one of the loaded references.

Parameters

reference_name

The name of the reference (str).

View Source
    def create_comparator(self, reference_name):

        """Create a Comparator instance from one of the loaded references.

        **Parameters**

        **reference_name**

        > The name of the reference (`str`).

        """

        subset_paf = self.subset_paf(reference_name)

        subset_tsv = self.tsv[self.tsv["name"] == reference_name]

        alignment = {"paf": subset_paf, "tsv": subset_tsv}

        comparator = Comparator(

            {reference_name: self.references[reference_name]}, alignment

        )

        return comparator

perform_all_comparisons

def perform_all_comparisons(
    self
)

Call perform_comparison() on each Comparator.

Parameters

assembly_paths

Dictionary of construct name: consensus sequence file. For example: {"Construct_1": "/path/to/assembly.fa"}.

View Source
    def perform_all_comparisons(self):

        """Call perform_comparison() on each Comparator.

        **Parameters**

        **assembly_paths**

        > Dictionary of construct name: consensus sequence file.

        For example: `{"Construct_1": "/path/to/assembly.fa"}`.

        """

        if self.assembly_paths is None:

            assembly_paths = {}

        else:

            assembly_paths = self.assembly_paths

        if self.vcf_paths is None:

            vcf_paths = {}

        else:

            vcf_paths = self.vcf_paths

        for comparator in self.comparators:

            try:

                assembly_path = assembly_paths[comparator.name]

            except Exception:

                assembly_path = None

            try:

                vcf_path = vcf_paths[comparator.name]

            except Exception:

                vcf_path = None

            comparator.perform_comparison(

                assembly_path=assembly_path, vcf_path=vcf_path

            )

        self.comparisons_performed = True

        self.summarise_analysis()

plot_fastq_histogram

def plot_fastq_histogram(
    self,
    n_bins=50
)

Plot a histogram of the FASTQ reads.

Parameters

n_bins

Number of bins in the histogram (int).

View Source
    def plot_fastq_histogram(self, n_bins=50):

        """Plot a histogram of the FASTQ reads.

        **Parameters**

        **n_bins**

        > Number of bins in the histogram (`int`).

        """

        fig, ax = plt.subplots()

        plt.hist(

            self.paf["query_length"],

            bins=int(self.paf.iloc[0]["target_length"] / n_bins),

            alpha=0.8,

        )

        ax.set_ylabel("Number of reads")

        ax.set_xlabel("Read length [bp]")

        # Mark expected lengths for interpreting the plot:

        for comparator in self.comparators:

            plt.axvline(x=len(comparator.record), color="#fd5a31")

        return fig

subset_paf

def subset_paf(
    self,
    reference_name
)

Subset PAF dataframe for the reference.

Select reads that align to the reference and also align best to the reference.

Parameters

reference_name

Name of the reference sequence to filter for (str).

View Source
    def subset_paf(self, reference_name):

        """Subset PAF dataframe for the reference.

        Select reads that align to the reference and also align *best* to the reference.

        **Parameters**

        **reference_name**

        > Name of the reference sequence to filter for (`str`).

        """

        # Work only with the values used for filtering:

        paf3 = self.paf[["query_name", "target_name", "mapping_matches"]]

        grouped = paf3.groupby("query_name")

        selected_reads = []

        for name, group in grouped:

            # not all reads align to the reference:

            if reference_name in group["target_name"].values:

                max_matches = group["mapping_matches"].max()

                # filter that the read aligns best to this reference:

                if (

                    max(

                        group[group["target_name"] == reference_name][

                            "mapping_matches"

                        ].values

                    )

                    == max_matches

                ):

                    selected_reads += [name]

        # Filter for selected reads ..

        subset_paf_filtered = self.paf[self.paf["query_name"].isin(selected_reads)]

        # ..then get only the reads aligning to reference:

        subset_paf_final = subset_paf_filtered[

            subset_paf_filtered["target_name"] == reference_name

        ]

        return subset_paf_final

summarise_analysis

def summarise_analysis(
    self
)

Create variables for the PDF report summary.

View Source
    def summarise_analysis(self):

        """Create variables for the PDF report summary."""

        self.number_of_constructs = len(self.comparators)

        self.result_good = 0

        self.result_warning = 0

        self.result_error = 0

        self.result_uncertain = 0

        names = []

        reference_lengths = []

        results = []

        number_of_reads_aligning = []

        median_coverages = []

        for comparator in self.comparators:

            names += [comparator.name]

            reference_lengths += [str(comparator.reference_length)]

            number_of_reads_aligning += [

                str(len(comparator.paf["query_name"].unique()))

            ]

            median_coverages += [str(int(comparator.median_yy))]

            if comparator.is_uncertain:

                self.result_uncertain += 1

                comparator.result = result_keywords["uncertain"]

            else:

                if comparator.has_errors:

                    self.result_error += 1

                    comparator.result = result_keywords["error"]

                else:

                    if comparator.has_warnings:

                        self.result_warning += 1

                        comparator.result = result_keywords["warning"]

                    else:

                        self.result_good += 1

                        comparator.result = result_keywords["good"]

            results += [comparator.result]

        self.n_fastq_reads = len(set(self.paf.query_name))

        self.fastq_plot = self.plot_fastq_histogram()

        # Summary table

        d = {

            "Name": pandas.Series(names),

            "Result": pandas.Series(results),

            "Length [bp]": pandas.Series(reference_lengths),

            "FASTQ reads": pandas.Series(number_of_reads_aligning),

            "Coverage [x]": pandas.Series(median_coverages),

        }

        self.summary_table = pandas.DataFrame(d)

CustomTranslator

class CustomTranslator(
    features_filters=(),
    features_properties=None
)
View Source
class CustomTranslator(dna_features_viewer.BiopythonTranslator):

    """Custom translator."""

    def compute_filtered_features(self, features):

        """Display only "From " features and overhangs."""

        filtered_features = []

        for feature in features:

            try:  # may not have a 'label', count 4 to include overhang annotations

                if (len(feature.qualifiers.get("label", "")[0]) == 4) or (

                    "From " in str(feature.qualifiers.get("label", ""))

                ):

                    filtered_features += [feature]

            except:

                pass

        return filtered_features

Ancestors (in MRO)

  • dna_features_viewer.BiopythonTranslator.BiopythonTranslator.BiopythonTranslator
  • dna_features_viewer.BiopythonTranslator.BiopythonTranslatorBase.BiopythonTranslatorBase

Class variables

default_feature_color
graphic_record_parameters
ignored_features_types
label_fields

Static methods

quick_class_plot

def quick_class_plot(
    record,
    figure_width=12,
    **kwargs
)

Allows super quick and dirty plotting of Biopython records.

This is really meant for use in a Jupyter/Ipython notebook with the "%matplotlib inline" setting.

from dna_features_viewer import BiopythonTranslator BiopythonTranslator.quick_plot(my_record)

View Source
    @classmethod

    def quick_class_plot(cls, record, figure_width=12, **kwargs):

        """Allows super quick and dirty plotting of Biopython records.

        This is really meant for use in a Jupyter/Ipython notebook with

        the "%matplotlib inline" setting.

        >>> from dna_features_viewer import BiopythonTranslator

        >>> BiopythonTranslator.quick_plot(my_record)

        """

        graphic_record = cls().translate_record(record)

        ax, _ = graphic_record.plot(figure_width=figure_width, **kwargs)

        return ax

Methods

compute_feature_box_color

def compute_feature_box_color(
    self,
    feature
)

Compute a box_color for this feature.

View Source
    def compute_feature_box_color(self, feature):

        """Compute a box_color for this feature."""

        return "auto"

compute_feature_box_linewidth

def compute_feature_box_linewidth(
    self,
    feature
)

Compute a box_linewidth for this feature.

View Source
    def compute_feature_box_linewidth(self, feature):

        """Compute a box_linewidth for this feature."""

        return 0.3

compute_feature_color

def compute_feature_color(
    self,
    feature
)

Compute a color for this feature.

If the feature has a color qualifier it will be used. Otherwise, the classe's default_feature_color is used.

To change the behaviour, create a subclass of BiopythonTranslator and overwrite this method.

View Source
    def compute_feature_color(self, feature):

        """Compute a color for this feature.

        If the feature has a ``color`` qualifier it will be used. Otherwise,

        the classe's ``default_feature_color`` is used.

        To change the behaviour, create a subclass of ``BiopythonTranslator``

        and overwrite this method.

        """

        if "color" in feature.qualifiers:

            color = feature.qualifiers["color"]

            if isinstance(color[0], str):

                return "".join(feature.qualifiers["color"])

            else:

                return color

        else:

            return self.default_feature_color

compute_feature_fontdict

def compute_feature_fontdict(
    self,
    feature
)

Compute a font dict for this feature.

View Source
    def compute_feature_fontdict(self, feature):

        """Compute a font dict for this feature."""

        return None

compute_feature_html

def compute_feature_html(
    self,
    feature
)

Gets the 'label' of the feature.

View Source
    def compute_feature_html(self, feature):

        """Gets the 'label' of the feature."""

        return self.compute_feature_label(feature)

compute_feature_label

def compute_feature_label(
    self,
    feature
)

Compute the label of the feature.

View Source
    def compute_feature_label(self, feature):

        """Compute the label of the feature."""

        label = feature.type

        for key in self.label_fields:

            if key in feature.qualifiers and len(feature.qualifiers[key]):

                label = feature.qualifiers[key]

                break

        if isinstance(label, list):

            label = "|".join(label)

        return label
def compute_feature_label_link_color(
    self,
    feature
)

Compute the color of the line linking the label to its feature.

View Source
    def compute_feature_label_link_color(self, feature):

        """Compute the color of the line linking the label to its feature."""

        return "black"

compute_feature_legend_text

def compute_feature_legend_text(
    self,
    feature
)
View Source
    def compute_feature_legend_text(self, feature):

        return None

compute_feature_linewidth

def compute_feature_linewidth(
    self,
    feature
)

Compute the edge width of the feature's arrow/rectangle.

View Source
    def compute_feature_linewidth(self, feature):

        """Compute the edge width of the feature's arrow/rectangle."""

        return 1.0

compute_filtered_features

def compute_filtered_features(
    self,
    features
)

Display only "From " features and overhangs.

View Source
    def compute_filtered_features(self, features):

        """Display only "From " features and overhangs."""

        filtered_features = []

        for feature in features:

            try:  # may not have a 'label', count 4 to include overhang annotations

                if (len(feature.qualifiers.get("label", "")[0]) == 4) or (

                    "From " in str(feature.qualifiers.get("label", ""))

                ):

                    filtered_features += [feature]

            except:

                pass

        return filtered_features

quick_plot

def quick_plot(
    self,
    record,
    figure_width=12,
    **kwargs
)

Allows super quick and dirty plotting of Biopython records.

This is really meant for use in a Jupyter/Ipython notebook with the "%matplotlib inline" setting.

from dna_features_viewer import BiopythonTranslator BiopythonTranslator.quick_plot(my_record)

View Source
    def quick_plot(self, record, figure_width=12, **kwargs):

        """Allows super quick and dirty plotting of Biopython records.

        This is really meant for use in a Jupyter/Ipython notebook with

        the "%matplotlib inline" setting.

        >>> from dna_features_viewer import BiopythonTranslator

        >>> BiopythonTranslator.quick_plot(my_record)

        """

        graphic_record = self.translate_record(record)

        ax, _ = graphic_record.plot(figure_width=figure_width, **kwargs)

        return ax

translate_feature

def translate_feature(
    self,
    feature
)

Translate a Biopython feature into a Dna Features Viewer feature.

View Source
    def translate_feature(self, feature):

        """Translate a Biopython feature into a Dna Features Viewer feature."""

        properties = dict(

            label=self.compute_feature_label(feature),

            color=self.compute_feature_color(feature),

            html=self.compute_feature_html(feature),

            fontdict=self.compute_feature_fontdict(feature),

            box_linewidth=self.compute_feature_box_linewidth(feature),

            box_color=self.compute_feature_box_color(feature),

            linewidth=self.compute_feature_linewidth(feature),

            label_link_color=self.compute_feature_label_link_color(feature),

            legend_text=self.compute_feature_legend_text(feature),

        )

        if self.features_properties is not None:

            other_properties = self.features_properties

            if hasattr(other_properties, "__call__"):

                other_properties = other_properties(feature)

            properties.update(other_properties)

        return GraphicFeature(

            start=feature.location.start,

            end=feature.location.end,

            strand=feature.location.strand,

            **properties

        )

translate_record

def translate_record(
    self,
    record,
    record_class=None,
    filetype=None
)

Create a new GraphicRecord from a BioPython Record object.

Parameters:

Name Type Description Default
record None A BioPython Record object or the path to a Genbank or a GFF file. None
record_class None The graphic record class to use, e.g. GraphicRecord (default) or
CircularGraphicRecord. Strings 'circular' and 'linear' can also be
provided. None
filetype None Used only when a Genbank or a GFF file is provided; one of "genbank"
or "gff" to be used. Default None infers from file extension. None
View Source
    def translate_record(self, record, record_class=None, filetype=None):

        """Create a new GraphicRecord from a BioPython Record object.

        Parameters

        ----------

        record

          A BioPython Record object or the path to a Genbank or a GFF file.

        record_class

          The graphic record class to use, e.g. GraphicRecord (default) or

          CircularGraphicRecord. Strings 'circular' and 'linear' can also be

          provided.

        filetype

          Used only when a Genbank or a GFF file is provided; one of "genbank"

          or "gff" to be used. Default None infers from file extension.

        """

        classes = {

            "linear": GraphicRecord,

            "circular": CircularGraphicRecord,

            None: GraphicRecord,

        }

        if record_class in classes:

            record_class = classes[record_class]

        if isinstance(record, str) or hasattr(record, "read"):

            record = load_record(record, filetype=filetype)

        filtered_features = self.compute_filtered_features(record.features)

        return record_class(

            sequence_length=len(record),

            sequence=str(record.seq),

            features=[

                self.translate_feature(feature)

                for feature in filtered_features

                if feature.location is not None

            ],

            **self.graphic_record_parameters

        )

SequencingGroup

class SequencingGroup(
    comparatorgroups,
    name='Unnamed'
)
View Source
class SequencingGroup:

    """Analyse alignments of a sequencing run.

    **Parameters**

    **comparatorgroups**

    > List of `ComparatorGroup` instances.

    **name**

    > Name of the sequencing project (`str`).

    """

    def __init__(self, comparatorgroups, name="Unnamed"):

        self.comparatorgroups = comparatorgroups

        self.name = name

        self.vcf_cutoff = 20  # max number of VCF entries to display in report

        self.result_keywords = result_keywords

    def perform_all_comparisons_in_sequencinggroup(self):

        self.number_of_reads = 0

        for comparatorgroup in self.comparatorgroups:

            comparatorgroup.perform_all_comparisons()

            self.number_of_reads += comparatorgroup.n_fastq_reads

        self.number_of_barcodes = len(self.comparatorgroups)

        self.comparisons_performed = True

Methods

perform_all_comparisons_in_sequencinggroup

def perform_all_comparisons_in_sequencinggroup(
    self
)
View Source
    def perform_all_comparisons_in_sequencinggroup(self):

        self.number_of_reads = 0

        for comparatorgroup in self.comparatorgroups:

            comparatorgroup.perform_all_comparisons()

            self.number_of_reads += comparatorgroup.n_fastq_reads

        self.number_of_barcodes = len(self.comparatorgroups)

        self.comparisons_performed = True