Skip to content

Module seqreport.SeqCollection

View Source
import csv

import os

from Bio import SeqIO

class SeqCollection:

    """Class to store basic properties of a collection of sequences.

    **Parameters**

    **records**

    > A list of Biopython SeqRecords.

    **cost_per_base**

    > Cost per nucleotide base.

    **cost_per_seq**

    > Fix overhead cost for each sequence part (cloning, delivery costs etc).

    **currency_symbol**

    > The currency symbol (or string) to display in the report (`str`).

    **projectname**

    > The name of the project (`str`).

    **comments**

    > Any comments to be included in the report (`str`).

    **min_length**

    > Check that all sequences are at least this long (`int`).

    **max_length**

    > Check that all sequences are at most this long (`int`).

    **name_length**

    > Check which sequence IDs are longer than this cutoff. Genbank has a character

    limit (`int`).

    **assembly_plan**

    > Optional assembly plan CSV path for calculating savings by re-using DNA parts.

    **fasta_name**

    > Optional name of the fasta file for easy identification (`str`).

    """

    def __init__(

        self,

        records,

        cost_per_base=0.25,

        cost_per_seq=0,

        currency_symbol="£",

        projectname="",

        comments="",

        min_length=100,  # a good cutoff for min DNA synthesis length

        max_length=3000,  # a good cutoff for max DNA synthesis length

        name_length=15,  # max seq record name length. Genbank character limit.

        assembly_plan="",

        fasta_name="",

    ):

        self.sequences = records

        self.cost_per_base = cost_per_base

        self.cost_per_seq = cost_per_seq

        self.currency_symbol = currency_symbol

        self.projectname = projectname

        self.comments = comments

        self.min_length = min_length

        self.max_length = max_length

        self.name_length = name_length

        self.assembly_plan = assembly_plan

        self.fasta_name = fasta_name

        self.calculate_values()

    def calculate_values(self):

        self.cost_per_base = float(self.cost_per_base)  # could be str after update

        self.cost_per_seq = float(self.cost_per_seq)

        self.n_seq = len(self.sequences)

        # Numbers section

        n_bp = 0

        for part in self.sequences:

            n_bp += len(part.seq)

        self.n_bp = n_bp

        self.cost = self.n_seq * self.cost_per_seq + self.n_bp * self.cost_per_base

        self.cost = round(self.cost)  # (in)accuracy is fine for our purposes

        # Lengths section

        self.too_short = []

        self.too_long = []

        self.long_names = []

        for record in self.sequences:

            if len(record) < self.min_length:

                self.too_short += [record.id]

            if len(record) > self.max_length:

                self.too_long += [record.id]

            if len(record.id) > self.name_length:

                self.long_names += [record.id]

        self.too_short = list(set(self.too_short))

        self.too_long = list(set(self.too_long))

        self.long_names = list(set(self.long_names))

        # Repeats will be checked further below.

        # For the PDF report:

        self.too_short_text = " ; ".join(self.too_short)

        self.too_long_text = " ; ".join(self.too_long)

        self.long_names_text = " ; ".join(self.long_names)

        # Repeats section

        self.repeat_names = []

        self.repeat_seq = []

        self.reverse_complement_seq = []

        for index, record in enumerate(self.sequences[:-1]):

            for other_record in self.sequences[index + 1 :]:

                if record.id == other_record.id:

                    self.repeat_names += [record.id]

                if record.seq == other_record.seq:

                    self.repeat_seq += [(record.id, other_record.id)]

                if record.seq == other_record.seq.reverse_complement():

                    self.reverse_complement_seq += [(record.id, other_record.id)]

        self.repeat_names = list(set(self.repeat_names))

        # For the PDF report:

        self.repeat_names_text = " ; ".join(self.repeat_names)

        repeat_seq_text_list = []

        for identifier in self.repeat_seq:

            combined = " = ".join(identifier)

            repeat_seq_text_list += [combined]

        self.repeat_seq_text = " ; ".join(repeat_seq_text_list)

        reverse_complement_seq_text_list = []

        for identifier in self.reverse_complement_seq:

            combined = " = ".join(identifier)

            reverse_complement_seq_text_list += [combined]

        self.reverse_complement_seq_text = " ; ".join(reverse_complement_seq_text_list)

        # Savings section

        if self.assembly_plan:

            self.not_in_plan = []

            all_rows = []

            with open(self.assembly_plan, "r") as f:

                reader = csv.reader(f, skipinitialspace=True)

                next(reader)  # ignore header

                for row in reader:

                    all_rows += row[1:]  # first column is construct name

            self.savings_dict = {}

            self.total_savings = 0

            for record in self.sequences:

                count_in_plan = all_rows.count(record.id)

                if count_in_plan > 1:  # we count re-use savings only

                    self.total_savings += len(record.seq) * (

                        count_in_plan - 1

                    )  # ignore first synthesis

                    self.savings_dict[record.id] = count_in_plan

                # else we don't have any savings from the sequence.

                if count_in_plan < 1:  # also find the ones not in the assembly plan

                    self.not_in_plan += [record.id]

            self.total_cost_savings = (

                self.total_savings * self.cost_per_base

            )  # ignore cost / seq

            self.total_cost_savings = round(

                self.total_cost_savings

            )  # (in)accuracy is fine for our purposes

            # For the PDF report:

            self.not_in_plan_text = " ; ".join(self.not_in_plan)

            self.savings_dict_text = " ; ".join(

                [f"{key}  (×{value})" for key, value in self.savings_dict.items()]

            )

        else:

            self.savings_dict_text = ""

            self.not_in_plan_text = ""

def read_fasta(fasta):

    """Read a FASTA sequence file into a list of records.

    **Parameters**

    **fasta**

    > The FASTA filepath (`str`).

    """

    return list(SeqIO.parse(fasta, "fasta"))

def seqcollection_from_csv(csv_file, records=None, param_dict={}):

    """Create a SeqCollection, using parameters in a CSV file.

    The CSV file parameters override the default class parameters, and this function's

    parameters override the CSV file parameters. Either a FASTA file (in the CSV or in

    `param_dict`) or a list of `SeqRecords` must be specified.

    For the parameter descriptions, see parameters in the docstring of `SeqCollection`,

    except records and fasta_name.

    """

    with open(csv_file, "r") as f:

        reader = csv.reader(f, skipinitialspace=True)

        next(reader)  # ignore header

        parameters = {}

        for row in reader:

            if row[1] == "":  # in case the full param list is in the csv

                continue

            else:

                parameters[row[0]] = row[1]

    for key, value in param_dict.items():  # override with function values

        parameters[key] = value

    if records:

        pass

    else:

        records = read_fasta(parameters["fasta"])

    seq_coll = SeqCollection(records=records)

    for key, value in parameters.items():

        setattr(seq_coll, key, value)

    # To ensure that the correct filename is used:

    setattr(seq_coll, "fasta_name", os.path.basename(parameters["fasta"]))

    seq_coll.calculate_values()

    return seq_coll

Functions

read_fasta

def read_fasta(
    fasta
)

Read a FASTA sequence file into a list of records.

Parameters

fasta

The FASTA filepath (str).

View Source
def read_fasta(fasta):

    """Read a FASTA sequence file into a list of records.

    **Parameters**

    **fasta**

    > The FASTA filepath (`str`).

    """

    return list(SeqIO.parse(fasta, "fasta"))

seqcollection_from_csv

def seqcollection_from_csv(
    csv_file,
    records=None,
    param_dict={}
)

Create a SeqCollection, using parameters in a CSV file.

The CSV file parameters override the default class parameters, and this function's parameters override the CSV file parameters. Either a FASTA file (in the CSV or in param_dict) or a list of SeqRecords must be specified. For the parameter descriptions, see parameters in the docstring of SeqCollection, except records and fasta_name.

View Source
def seqcollection_from_csv(csv_file, records=None, param_dict={}):

    """Create a SeqCollection, using parameters in a CSV file.

    The CSV file parameters override the default class parameters, and this function's

    parameters override the CSV file parameters. Either a FASTA file (in the CSV or in

    `param_dict`) or a list of `SeqRecords` must be specified.

    For the parameter descriptions, see parameters in the docstring of `SeqCollection`,

    except records and fasta_name.

    """

    with open(csv_file, "r") as f:

        reader = csv.reader(f, skipinitialspace=True)

        next(reader)  # ignore header

        parameters = {}

        for row in reader:

            if row[1] == "":  # in case the full param list is in the csv

                continue

            else:

                parameters[row[0]] = row[1]

    for key, value in param_dict.items():  # override with function values

        parameters[key] = value

    if records:

        pass

    else:

        records = read_fasta(parameters["fasta"])

    seq_coll = SeqCollection(records=records)

    for key, value in parameters.items():

        setattr(seq_coll, key, value)

    # To ensure that the correct filename is used:

    setattr(seq_coll, "fasta_name", os.path.basename(parameters["fasta"]))

    seq_coll.calculate_values()

    return seq_coll

Classes

SeqCollection

class SeqCollection(
    records,
    cost_per_base=0.25,
    cost_per_seq=0,
    currency_symbol='£',
    projectname='',
    comments='',
    min_length=100,
    max_length=3000,
    name_length=15,
    assembly_plan='',
    fasta_name=''
)

Class to store basic properties of a collection of sequences.

Parameters

records

A list of Biopython SeqRecords.

cost_per_base

Cost per nucleotide base.

cost_per_seq

Fix overhead cost for each sequence part (cloning, delivery costs etc).

currency_symbol

The currency symbol (or string) to display in the report (str).

projectname

The name of the project (str).

comments

Any comments to be included in the report (str).

min_length

Check that all sequences are at least this long (int).

max_length

Check that all sequences are at most this long (int).

name_length

Check which sequence IDs are longer than this cutoff. Genbank has a character limit (int).

assembly_plan

Optional assembly plan CSV path for calculating savings by re-using DNA parts.

fasta_name

Optional name of the fasta file for easy identification (str).

View Source
class SeqCollection:

    """Class to store basic properties of a collection of sequences.

    **Parameters**

    **records**

    > A list of Biopython SeqRecords.

    **cost_per_base**

    > Cost per nucleotide base.

    **cost_per_seq**

    > Fix overhead cost for each sequence part (cloning, delivery costs etc).

    **currency_symbol**

    > The currency symbol (or string) to display in the report (`str`).

    **projectname**

    > The name of the project (`str`).

    **comments**

    > Any comments to be included in the report (`str`).

    **min_length**

    > Check that all sequences are at least this long (`int`).

    **max_length**

    > Check that all sequences are at most this long (`int`).

    **name_length**

    > Check which sequence IDs are longer than this cutoff. Genbank has a character

    limit (`int`).

    **assembly_plan**

    > Optional assembly plan CSV path for calculating savings by re-using DNA parts.

    **fasta_name**

    > Optional name of the fasta file for easy identification (`str`).

    """

    def __init__(

        self,

        records,

        cost_per_base=0.25,

        cost_per_seq=0,

        currency_symbol="£",

        projectname="",

        comments="",

        min_length=100,  # a good cutoff for min DNA synthesis length

        max_length=3000,  # a good cutoff for max DNA synthesis length

        name_length=15,  # max seq record name length. Genbank character limit.

        assembly_plan="",

        fasta_name="",

    ):

        self.sequences = records

        self.cost_per_base = cost_per_base

        self.cost_per_seq = cost_per_seq

        self.currency_symbol = currency_symbol

        self.projectname = projectname

        self.comments = comments

        self.min_length = min_length

        self.max_length = max_length

        self.name_length = name_length

        self.assembly_plan = assembly_plan

        self.fasta_name = fasta_name

        self.calculate_values()

    def calculate_values(self):

        self.cost_per_base = float(self.cost_per_base)  # could be str after update

        self.cost_per_seq = float(self.cost_per_seq)

        self.n_seq = len(self.sequences)

        # Numbers section

        n_bp = 0

        for part in self.sequences:

            n_bp += len(part.seq)

        self.n_bp = n_bp

        self.cost = self.n_seq * self.cost_per_seq + self.n_bp * self.cost_per_base

        self.cost = round(self.cost)  # (in)accuracy is fine for our purposes

        # Lengths section

        self.too_short = []

        self.too_long = []

        self.long_names = []

        for record in self.sequences:

            if len(record) < self.min_length:

                self.too_short += [record.id]

            if len(record) > self.max_length:

                self.too_long += [record.id]

            if len(record.id) > self.name_length:

                self.long_names += [record.id]

        self.too_short = list(set(self.too_short))

        self.too_long = list(set(self.too_long))

        self.long_names = list(set(self.long_names))

        # Repeats will be checked further below.

        # For the PDF report:

        self.too_short_text = " ; ".join(self.too_short)

        self.too_long_text = " ; ".join(self.too_long)

        self.long_names_text = " ; ".join(self.long_names)

        # Repeats section

        self.repeat_names = []

        self.repeat_seq = []

        self.reverse_complement_seq = []

        for index, record in enumerate(self.sequences[:-1]):

            for other_record in self.sequences[index + 1 :]:

                if record.id == other_record.id:

                    self.repeat_names += [record.id]

                if record.seq == other_record.seq:

                    self.repeat_seq += [(record.id, other_record.id)]

                if record.seq == other_record.seq.reverse_complement():

                    self.reverse_complement_seq += [(record.id, other_record.id)]

        self.repeat_names = list(set(self.repeat_names))

        # For the PDF report:

        self.repeat_names_text = " ; ".join(self.repeat_names)

        repeat_seq_text_list = []

        for identifier in self.repeat_seq:

            combined = " = ".join(identifier)

            repeat_seq_text_list += [combined]

        self.repeat_seq_text = " ; ".join(repeat_seq_text_list)

        reverse_complement_seq_text_list = []

        for identifier in self.reverse_complement_seq:

            combined = " = ".join(identifier)

            reverse_complement_seq_text_list += [combined]

        self.reverse_complement_seq_text = " ; ".join(reverse_complement_seq_text_list)

        # Savings section

        if self.assembly_plan:

            self.not_in_plan = []

            all_rows = []

            with open(self.assembly_plan, "r") as f:

                reader = csv.reader(f, skipinitialspace=True)

                next(reader)  # ignore header

                for row in reader:

                    all_rows += row[1:]  # first column is construct name

            self.savings_dict = {}

            self.total_savings = 0

            for record in self.sequences:

                count_in_plan = all_rows.count(record.id)

                if count_in_plan > 1:  # we count re-use savings only

                    self.total_savings += len(record.seq) * (

                        count_in_plan - 1

                    )  # ignore first synthesis

                    self.savings_dict[record.id] = count_in_plan

                # else we don't have any savings from the sequence.

                if count_in_plan < 1:  # also find the ones not in the assembly plan

                    self.not_in_plan += [record.id]

            self.total_cost_savings = (

                self.total_savings * self.cost_per_base

            )  # ignore cost / seq

            self.total_cost_savings = round(

                self.total_cost_savings

            )  # (in)accuracy is fine for our purposes

            # For the PDF report:

            self.not_in_plan_text = " ; ".join(self.not_in_plan)

            self.savings_dict_text = " ; ".join(

                [f"{key}  (×{value})" for key, value in self.savings_dict.items()]

            )

        else:

            self.savings_dict_text = ""

            self.not_in_plan_text = ""

Methods

calculate_values

def calculate_values(
    self
)
View Source
    def calculate_values(self):

        self.cost_per_base = float(self.cost_per_base)  # could be str after update

        self.cost_per_seq = float(self.cost_per_seq)

        self.n_seq = len(self.sequences)

        # Numbers section

        n_bp = 0

        for part in self.sequences:

            n_bp += len(part.seq)

        self.n_bp = n_bp

        self.cost = self.n_seq * self.cost_per_seq + self.n_bp * self.cost_per_base

        self.cost = round(self.cost)  # (in)accuracy is fine for our purposes

        # Lengths section

        self.too_short = []

        self.too_long = []

        self.long_names = []

        for record in self.sequences:

            if len(record) < self.min_length:

                self.too_short += [record.id]

            if len(record) > self.max_length:

                self.too_long += [record.id]

            if len(record.id) > self.name_length:

                self.long_names += [record.id]

        self.too_short = list(set(self.too_short))

        self.too_long = list(set(self.too_long))

        self.long_names = list(set(self.long_names))

        # Repeats will be checked further below.

        # For the PDF report:

        self.too_short_text = " ; ".join(self.too_short)

        self.too_long_text = " ; ".join(self.too_long)

        self.long_names_text = " ; ".join(self.long_names)

        # Repeats section

        self.repeat_names = []

        self.repeat_seq = []

        self.reverse_complement_seq = []

        for index, record in enumerate(self.sequences[:-1]):

            for other_record in self.sequences[index + 1 :]:

                if record.id == other_record.id:

                    self.repeat_names += [record.id]

                if record.seq == other_record.seq:

                    self.repeat_seq += [(record.id, other_record.id)]

                if record.seq == other_record.seq.reverse_complement():

                    self.reverse_complement_seq += [(record.id, other_record.id)]

        self.repeat_names = list(set(self.repeat_names))

        # For the PDF report:

        self.repeat_names_text = " ; ".join(self.repeat_names)

        repeat_seq_text_list = []

        for identifier in self.repeat_seq:

            combined = " = ".join(identifier)

            repeat_seq_text_list += [combined]

        self.repeat_seq_text = " ; ".join(repeat_seq_text_list)

        reverse_complement_seq_text_list = []

        for identifier in self.reverse_complement_seq:

            combined = " = ".join(identifier)

            reverse_complement_seq_text_list += [combined]

        self.reverse_complement_seq_text = " ; ".join(reverse_complement_seq_text_list)

        # Savings section

        if self.assembly_plan:

            self.not_in_plan = []

            all_rows = []

            with open(self.assembly_plan, "r") as f:

                reader = csv.reader(f, skipinitialspace=True)

                next(reader)  # ignore header

                for row in reader:

                    all_rows += row[1:]  # first column is construct name

            self.savings_dict = {}

            self.total_savings = 0

            for record in self.sequences:

                count_in_plan = all_rows.count(record.id)

                if count_in_plan > 1:  # we count re-use savings only

                    self.total_savings += len(record.seq) * (

                        count_in_plan - 1

                    )  # ignore first synthesis

                    self.savings_dict[record.id] = count_in_plan

                # else we don't have any savings from the sequence.

                if count_in_plan < 1:  # also find the ones not in the assembly plan

                    self.not_in_plan += [record.id]

            self.total_cost_savings = (

                self.total_savings * self.cost_per_base

            )  # ignore cost / seq

            self.total_cost_savings = round(

                self.total_cost_savings

            )  # (in)accuracy is fine for our purposes

            # For the PDF report:

            self.not_in_plan_text = " ; ".join(self.not_in_plan)

            self.savings_dict_text = " ; ".join(

                [f"{key}  (×{value})" for key, value in self.savings_dict.items()]

            )

        else:

            self.savings_dict_text = ""

            self.not_in_plan_text = ""