Source code for taskpacker.io

from taskpacker import Task, Resource
import itertools as itt
import pandas
import numpy as np

try:
    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    from matplotlib.path import Path

    MATPLOTLIB_AVAILABLE = True
except ImportError:
    MATPLOTLIB_AVAILABLE = False

try:
    import networkx as nx

    NX_AVAILABLE = True
except ImportError:
    NX_AVAILABLE = False


def tasks_from_spreadsheet(
    spreadsheet_path,
    resources_dict=None,
    sheetname="tasks",
    resources_sheetname="resources",
    tasks_color="blue",
    task_name_prefix="",
    sep=";",
):
    if resources_dict is None:
        resources_dict = resources_from_spreadsheet(
            spreadsheet_path, sheetname=resources_sheetname
        )

    if spreadsheet_path.endswith("csv"):
        process_df = pandas.read_csv(spreadsheet_path, sep=sep)
    else:
        process_df = pandas.read_excel(spreadsheet_path, sheet_name=sheetname)
    process_tasks = {}
    tasks_list = []
    for i, row in process_df.iterrows():
        task_resources = [resources_dict[r.strip()] for r in row.resources.split(",")]
        follows = str(row.follows)
        if follows == "nan":
            follows = ()
        else:
            follows = [process_tasks[t.strip()] for t in follows.split(",")]

        if str(row.scheduled_resources) == "nan":
            scheduled_resources = None
        else:
            scheduled_resources = {
                resources_dict[r.split(":")[0].strip()]: int(r.split(":")[1])
                for r in row.scheduled_resources.split(",")
            }
        new_task = Task(
            name=task_name_prefix + row.task,
            resources=task_resources,
            duration=row.duration,
            follows=follows,
            color=(tasks_color if (str(row.color) == "nan") else row.color),
            max_wait=(None if (str(row.max_wait) == "nan") else int(row.max_wait)),
            scheduled_start=(
                None
                if (str(row.scheduled_start) == "nan")
                else int(row.scheduled_start)
            ),
            scheduled_resources=scheduled_resources,
        )
        process_tasks[row.task] = new_task
        tasks_list.append(new_task)

    return tasks_list


def tasks_to_spreadsheet(tasks, filepath):
    import pandas

    df_tasks = pandas.DataFrame.from_records([task.to_dict() for task in tasks])
    resources = set(resource for task in tasks for resource in task.resources)
    df_resources = pandas.DataFrame.from_records(
        [resource.to_dict() for resource in resources]
    )

    with pandas.ExcelWriter(filepath, engine="xlsxwriter") as writer:
        df_tasks.to_excel(writer, sheet_name="tasks", index=False)
        df_resources.to_excel(writer, sheet_name="resources", index=False)


def resources_from_spreadsheet(spreadsheet_path, sheetname="resources"):
    if spreadsheet_path.endswith("csv"):
        resources_df = pandas.read_csv(spreadsheet_path)
    else:
        resources_df = pandas.read_excel(spreadsheet_path, sheetname)
    return {
        row.resource_name: Resource(
            name=row.resource_name,
            full_name=row.full_name,
            capacity="inf" if str(row.capacity) == "inf" else int(row.capacity),
        )
        for i, row in resources_df.iterrows()
    }


[docs]def plot_schedule(tasks, legend=False, ax=None, edgewidth=1.0): """ Plot the work units schedule in a Gantt chart-like way. This is quite basic and arbitrary and really meant for R&D purposes. """ if not MATPLOTLIB_AVAILABLE: raise ImportError("Plotting requires Matplotlib.") all_resources = sorted( list(set([resource for task in tasks for resource in task.resources])), key=lambda e: e.full_name, )[::-1] if ax is None: fig, ax = plt.subplots(1, figsize=(15, 6)) max_end = 0 for task in tasks: start = task.scheduled_start end = task.scheduled_end resources = task.scheduled_resources max_end = max(end, max_end) margin = 0.2 def height(resource): if resource.capacity == "inf": slots = 1.0 else: slots = resource.capacity return (1.0 - 2 * margin) / slots for r in task.resources: y = all_resources.index(r) + margin + height(r) * max(0, (resources[r] - 1)) ax.add_patch( patches.Rectangle( (start, y), # (x,y) end - start, # width height(r), # height facecolor=task.color, edgecolor="k", linewidth=edgewidth, ) ) strips_colors = itt.cycle([(1, 1, 1), (1, 0.92, 0.92)]) for i, color in zip(range(-1, len(all_resources)), strips_colors): ax.fill_between([0, 10 * max_end], [i, i], y2=[i + 1, i + 1], color=color) N = len(all_resources) ax.set_yticks(np.arange(N) + 0.5) ax.set_ylim(-max(1, int(0.2 * N)), max(2, int(1.2 * N))) ax.set_yticklabels([rsrc.full_name for rsrc in all_resources]) if legend: ax.legend(ncol=3, fontsize=8) ax.set_xlabel("Time") ax.set_xlim(0, 1.1 * max_end) return ax
[docs]def plot_tree_graph( levels, edges, draw_node, elements_positions=None, ax=None, width_factor=2.5, height_factor=2, scale=1.0, edge_left_space=0.015, edge_right_space=0.015, interlevel_shift=0, **txt_kw ): """General function for plotting tree graphs. Parameters ---------- levels A list of lists of nodes grouped by "level", i.e distance to the in the graph to the level 0. levels will be displayed on a same column. edges List of nodes pairs (source node, target node). draw_node A function f(x , y , node, ax, **kw) which draws something related to the node at the position x,y on the given Matplotlib ax. ax The matplotlib ax to use. If none is provided, a new ax is generated. Examples: --------- >>> def draw_node(x,y, node, ax): ax.text(x,y, node) >>> plot_tree_graph(levels=[["A","B","C"], ["D,E"], ["F"]], edges=[("A","D"),("B","D"),("C","E") ("D","F"),("E","F")], draw_node = draw_node,) """ levels_dict = { element: level for level, elements in enumerate(levels) for element in elements } if elements_positions is None: elements_positions = {} for lvl, elements in enumerate(levels): yy = np.linspace(0, 1, len(elements) + 2)[1:-1] yy += interlevel_shift * (1 - 2 * (lvl % 2)) x = 1.0 * (1 + lvl) / (len(levels) + 1) for y, element in zip(yy, elements): elements_positions[element] = (x, y) if ax is None: width = width_factor * len(levels) * scale height = height_factor * max([len(lvl) for lvl in levels]) * scale fig, ax = plt.subplots(1, figsize=(width, height)) for element, (x, y) in elements_positions.items(): draw_node(x, y, element, ax, **txt_kw) y_spans = [ elements_positions[elements[1]][1] - elements_positions[elements[0]][1] for elements in levels if len(elements) > 1 ] delta_y = 0.5 * min(y_spans) if y_spans != [] else 0 for el1, el2 in edges: x1, y1 = elements_positions[el1] x2, y2 = elements_positions[el2] x1 += edge_left_space * np.sqrt(scale) x2 += -edge_right_space * np.sqrt(scale) if ((levels_dict[el2] - levels_dict[el1]) > 1) and (y1 == y2): patch = patches.PathPatch( Path( [ (x1, y1), (0.5 * x2 + 0.5 * x1, y1 - delta_y), (0.5 * x2 + 0.5 * x1, y2 - delta_y), (x2, y2), ], [Path.MOVETO, Path.CURVE4, Path.CURVE4, Path.CURVE4], ), facecolor="none", lw=1 * scale, zorder=-1000, ) else: patch = patches.PathPatch( Path( [ (x1, y1), (0.9 * x2 + 0.1 * x1, y1), (0.1 * x2 + 0.9 * x1, y2), (x2, y2), ], [Path.MOVETO, Path.CURVE4, Path.CURVE4, Path.CURVE4], ), facecolor="none", lw=1 * scale, zorder=-1000, ) ax.add_patch(patch) ax.axis("off") return ax
[docs]def plot_tasks_dependency_graph(tasks, ax=None): """Plot the graph of all inter-dependencies in the provided tasks list.""" if not NX_AVAILABLE: raise ImportError("Install Networkx to plot task dependency graphs.") if not MATPLOTLIB_AVAILABLE: raise ImportError("Install Matplotlib to plot task dependency graphs.") g = nx.DiGraph() tasks_dict = {task.id: task for task in tasks} for task_id, task in tasks_dict.items(): for parent_task in task.follows: g.add_edge(parent_task.id, task_id) nodes_depths = {node: 0 for node in g.nodes()} for source, lengths in nx.shortest_path_length(g): for target, length in lengths.items(): nodes_depths[target] = max(nodes_depths[target], length) levels = [ sorted([node for node, depth in nodes_depths.items() if depth == this_depth])[ ::-1 ] for this_depth in range(max(nodes_depths.values()) + 1) ] def draw_node(x, y, node, ax): task = tasks_dict[node] text = task.name.replace("_", "\n") + "\nduration: %d" % task.duration ax.text( x, y, text, verticalalignment="center", horizontalalignment="center", bbox={"facecolor": "white", "lw": 0}, ) return plot_tree_graph(levels, g.edges(), draw_node, width_factor=2, ax=ax)