Source code for dnacauldron.AssemblyPlan.AssemblyPlan

import os
import pandas
import networkx as nx
from proglog import default_bar_logger
from .AssemblyPlanSimulation import AssemblyPlanSimulation
from ..Assembly import ASSEMBLY_CLASS_DICT


[docs]class AssemblyPlan: def __init__(self, assemblies, name="plan", logger="bar"): """Class to represent, analyze and simulate assembly plans Parameters ---------- assemblies List of Assembly instances. name Assembly plan name as it will appear in reports. logger Either "bar" for a progress bar, or None, or any Proglog logger. """ self.assemblies = assemblies self._raise_an_error_if_duplicate_assembly_names() self.assemblies_dict = {a.name: a for a in self.assemblies} self._compute_assemblies_levels() self.logger = default_bar_logger(logger) self.name = name def _raise_an_error_if_duplicate_assembly_names(self): names_indices = {} for i, assembly in enumerate(self.assemblies): if assembly.name not in names_indices: names_indices[assembly.name] = [] names_indices[assembly.name].append(i) if any(len(indices) > 1 for indices in names_indices.values()): duplicates = ", ".join( [ "%s (lines %s)" % (name, "-".join([str(i) for i in indices])) for name, indices in sorted(names_indices.items()) if len(indices) > 1 ] ) raise ValueError("Multiple assemblies named " + duplicates) def _compute_assemblies_levels(self): graph_edges = [ (part, assembly.name) for assembly in self.assemblies for part in assembly.parts ] self.graph = nx.DiGraph(graph_edges) if not nx.dag.is_directed_acyclic_graph(self.graph): cycle = nx.cycles.find_cycle(self.graph) raise ValueError("Circular dependency found involving %s" % cycle) level_0_nodes = [ n for n in self.graph if list(self.graph.predecessors(n)) == [] ] nodes_levels = {node: 0 for node in self.graph} def mark_depth(node, depth): nodes_levels[node] = max(nodes_levels[node], depth) for child in self.graph.successors(node): mark_depth(child, depth + 1) self.all_parts = [ n for n in self.graph if len(list(self.graph.predecessors(n))) == 0 ] for node in level_0_nodes: mark_depth(node, 0) for assembly in self.assemblies: assembly.dependencies = dict( level=nodes_levels[assembly.name], depends_on=[ part for part in self.graph.predecessors(assembly.name) if part not in self.all_parts ], used_in=list(self.graph.successors(assembly.name)), ) levels = sorted(set(a.dependencies["level"] for a in self.assemblies)) self.levels = { level: [ asm for asm in self.assemblies if asm.dependencies["level"] == level ] for level in levels }
[docs] @staticmethod def from_spreadsheet( path=None, dataframe=None, assembly_class="from_spreadsheet", sheet_name="all", header=None, name="auto_from_filename", logger="bar", assembly_class_dict="default", is_csv="auto_from_filename", **assembly_params ): """Import an assembly plan from a spreadsheet. You can either read these docs or browse the examples in the repo. Note that this function autoselects the enzyme, based on the sites in each part. To explicitly set enzymes, set ``assembly.enzyme`` for each assembly in ``AssemblyPlan.assemblies``. Parameters ---------- path Path to a spreadsheet file (a dataframe can be used instead). dataframe A pandas dataframe, possibly obtained from a spreadsheet. sheet_name Name of the spreadsheet's sheet on which the assembly plan is defined. Use "all" to load assemblies from all the sheets. header True or False, indicates whether there is a header in the spreadsheet. name Name of the assembly plan (leave to "auto_from_filename" to use the file name as assembly plan name). logger Logger of the created assembly plan. Either "bar" for a progress bar or None for none, or any Proglog logger. assembly_params Extra keyword parameters which will be fed to each assembly. """ if name == "auto_from_filename": if path is None: name = "unnamed" else: filename = os.path.basename(path) name, _ = os.path.splitext(filename) if is_csv == "auto_from_filename": is_csv = path.lower().endswith(".csv") if sheet_name == "all" and not is_csv: excel_file = pandas.ExcelFile(path) return AssemblyPlan( name=name, logger=logger, assemblies=[ assembly for _sheet_name in excel_file.sheet_names for assembly in AssemblyPlan.from_spreadsheet( path=path, sheet_name=_sheet_name, header=header, assembly_class=assembly_class, is_csv=False, name=name, **assembly_params ).assemblies ], ) if dataframe is None: if is_csv: dataframe = pandas.read_csv(path, header=header) # with open(path, "r") as f: # dataframe = pandas.DataFrame( # [line.split(",") for line in f.read().split("\n")] # ) else: dataframe = pandas.read_excel( path, sheet_name=sheet_name, header=header ) ignore_list = [ "nan", "construct name", "construct", "none", "assembly", "", ] if assembly_class == "from_spreadsheet": if assembly_class_dict == "default": assembly_class_dict = ASSEMBLY_CLASS_DICT def extract_assembly_class_from_row(row): row = list(row) for i, value in enumerate(row): if value.startswith("class:"): break else: row = ", ".join(row) msg = "Could not find assembly class in row: %s" % (row) raise ValueError(msg) row.pop(i) return value[6:].strip(), row assembly_classes_and_rows = [ extract_assembly_class_from_row(row) for i, row in dataframe.iterrows() if str(row[0]).lower() not in ignore_list ] assemblies = [ assembly_class_dict[_class].from_dataframe_row(row) for _class, row in assembly_classes_and_rows ] else: assemblies = [ assembly_class.from_dataframe_row(row) for i, row in dataframe.iterrows() if str(row[0]).lower() not in ignore_list ] return AssemblyPlan(assemblies, name=name, logger=logger)
def to_spreadsheet(self, path): lines = [",".join([asm.name] + asm.parts) for asm in self.assemblies] with open(path, "w") as f: f.write("\n".join(["construct,parts"] + lines))
[docs] def to_dataframe(self): """Return a dataframe describing the assembly plan.""" sorted_assemblies = sorted( self.assemblies, key=lambda a: (a.dependencies["level"], a.name) ) return pandas.DataFrame.from_records( [ {"assembly": assembly.name, "parts": ", ".join(assembly.parts)} for assembly in sorted_assemblies ], columns=["assembly", "parts"], index="assembly", )
[docs] def simulate(self, sequence_repository): """Simulate the whole assembly plan, return an AssemblyPlanSimulation. """ ordered_assemblies = [ assembly for level in sorted(self.levels) for assembly in self.levels[level] ] self.logger(message="Simulating assembly plan %s..." % self.name) simulation_results = [] cancelled_assemblies = {} # cancelled because dependencies failed for assembly in self.logger.iter_bar(assembly=ordered_assemblies): if assembly.name in cancelled_assemblies: continue simulation_result = assembly.simulate(sequence_repository) simulation_results.append(simulation_result) for record in simulation_result.construct_records: sequence_repository.add_record(record, collection="constructs") if len(simulation_result.errors): for next_assembly in assembly.dependencies["used_in"]: cancelled_assemblies[next_assembly] = assembly.name return AssemblyPlanSimulation( assembly_plan=self, assembly_simulations=simulation_results, sequence_repository=sequence_repository, cancelled=[ AssemblySimulationCancellation(assembly_name, dependency) for (assembly_name, dependency) in cancelled_assemblies.items() ], )
class AssemblySimulationCancellation: def __init__(self, assembly_name, failed_dependency): self.assembly_name = assembly_name self.failed_dependency = failed_dependency