Source code for taskpacker.taskpacker

import uuid
from tqdm import tqdm
import Numberjack as nj
import itertools as itt
from copy import copy
from collections import OrderedDict


[docs]class Task: """Tasks are the steps of a work unit, performed using specific resources. Parameters ---------- name Name of the task (appears when printed and plotted). resources list of Resource objects specifying the resources occupied by the task. duration Duration of the task (the choice of the unit is left to the user. scheduled_resource Either None or a dict {resource: slot_used} """ def __init__( self, name, resources, duration, follows=(), max_wait=None, scheduled_start=None, scheduled_resources=None, priority=1, due_time=None, color="blue", ): self.resources = resources self.duration = duration self.name = name self.scheduled_start = scheduled_start self.scheduled_resources = scheduled_resources self.follows = follows self.max_wait = max_wait self.color = color self.priority = priority self.due_time = due_time self.id = self.name + str(uuid.uuid1().int) @property def scheduled_end(self): if self.scheduled_start is None: return None else: return self.scheduled_start + self.duration def __repr__(self): return "Task(%(name)s, %(duration)s)" % self.__dict__ def __hash__(self): return hash(self.id) def to_dict(self): def color_to_html(color): if isinstance(color, str): return color else: return "#%02x%02x%02x" % tuple([int(255 * e) for e in color[:3]]) return OrderedDict( [ (k, "" if v is None else v) for k, v in [ ("task", self.name), ("resources", ", ".join([r.name for r in self.resources])), ("duration", self.duration), ("follows", ", ".join([task.name for task in self.follows])), ("max_wait", self.max_wait), ("scheduled_start", self.scheduled_start), ( "scheduled_resources", None if self.scheduled_resources is None else ", ".join( [ "%s:%s" % tuple(kv) for kv in self.scheduled_resources.items() ] ), ), ("color", "" if self.color is None else color_to_html(self.color)), ] ] )
[docs]class Resource: """Resources are machines/humans that can perform tasks. Parameters ---------- name Name of the resource. full_name Full name of the resource. capacity How many jobs a resource can do at the same time. """ def __init__(self, name, full_name=None, capacity=1): self.full_name = name if full_name is None else full_name self.name = name self.capacity = capacity def __repr__(self): return self.name def hash(self): return hash(self.name) def to_dict(self): return OrderedDict( [ (k, "" if v is None else v) for k, v in [ ("resource_name", self.name), ("full_name", self.full_name), ("capacity", self.capacity), ] ] )
[docs]def numberjack_scheduler( tasks, upper_bound=500, lower_bound=None, optimize=True, time_limit=5, solver_method="Mistral", randomization=False, verbose_solver=False, ): """Makes an optimized schedule for the processes. Parameters ----------- tasks A list of tasks to be sceduled. upper_bound Upper bound for the time. The unit depends on the unit chosen for the duration of the work unit's tasks. optimize If false, any solution satisfying the constraints (including deadlines) will be returned. But sometimes it is not possible to respect all deadlines. If True, the function will try to return a schedule which minimizes the days over the deadlines. The function minimized is the sum of (wu.priority*wu.delay) for all work units with a due time. time_limit Time in seconds after which the optimizer stops. If the optimizer stops because of this time limit the solution may not be optimal. solver_method The solver used by NumberJack (see NumberJack docs). """ ZERO = nj.Variable([0]) C_LOWER_BOUND = 0 # Create Numberjack variables to represent the tasks # ( starting times and resource instance that they use). tasks = [ task for task in tasks if ( (task.scheduled_start is None) or ( (task.scheduled_start is not None) and (task.scheduled_start < upper_bound) ) or ( (task.scheduled_end is not None) and (lower_bound is not None) and (task.scheduled_end > lower_bound) ) ) ] nj_tasks = {} for task in tasks: if task.scheduled_start is None: if lower_bound is not None: new_nj_task = nj.Task(lower_bound, upper_bound, task.duration) else: new_nj_task = nj.Task(upper_bound, task.duration) else: new_nj_task = nj.Task( task.scheduled_start, task.scheduled_end, task.duration ) new_nj_task.name = task.name nj_tasks[task] = new_nj_task nj_taskresources = { task: { resource: ( # If a task is already scheduled to some slot, its slot is # one-choice variable. Otherwise it is a 1..nSlots variable nj.Variable(1, resource.capacity) if task.scheduled_resources is None else nj.Variable([task.scheduled_resources[resource]]) ) for resource in task.resources } for task in tasks } all_resources = list( set([resource for task in tasks for resource in task.resources]) ) model = nj.Model() for resource in all_resources: if resource.capacity == "inf": continue elif resource.capacity == 1: # The resource has one slot: Only one job at the same time model.add( nj.UnaryResource( [nj_tasks[task] for task in tasks if (resource in task.resources)] ) ) else: # The resource has several slots tasks_pairs_sharing_resource = [ (task, other_task) for (task, other_task) in itt.combinations(tasks, 2) if ((resource in task.resources) and (resource in other_task.resources)) ] for (task, other_task) in tasks_pairs_sharing_resource: different_times = nj.Or( [ nj_tasks[task] + task.duration <= nj_tasks[other_task], nj_tasks[other_task] + other_task.duration < nj_tasks[task], ] ) different_resources = nj.AllDiff( [ nj_taskresources[task][resource], nj_taskresources[other_task][resource], ] ) different_resources.lb = 0 different_resources.ub = 1 model.add(nj.Or([different_times, different_resources])) model.add( [ (nj_tasks[task] + task.duration <= nj_tasks[next_task]) for next_task in tasks for task in next_task.follows ] ) model.add( [ (nj_tasks[next_task] <= nj_tasks[task] + task.duration + next_task.max_wait) for next_task in tasks for task in next_task.follows if next_task.max_wait is not None ] ) if optimize: C_max = nj.Variable(C_LOWER_BOUND, 150000000, "C_max") model.add( C_max > sum( [ nj.Max([ZERO, nj_tasks[task] + task.duration - task.due_time]) * (1000 * task.priority) for task in tasks if task.due_time is not None ] ) + sum([nj_tasks[task] for task in tasks]) ) # Specify that the goal is to minimize C_max (compress the schedule). model.add(nj.Minimize(C_max)) else: model.add( [ nj_tasks[task] + task.duration < task.due_time for task in tasks if task.due_time is not None ] ) solver = model.load(solver_method) solver.setVerbosity(verbose_solver) solver.setTimeLimit(time_limit) solver.setRandomized(randomization) result = solver.solve() if result is False: raise ValueError("No solution found by the schedule optimizer !") for task in tasks: nj_task = nj_tasks[task] start = nj_task.get_value() resources = { resource: nj_taskresources[task][resource].get_value() for resource in task.resources } task.scheduled_start = start task.scheduled_resources = resources return tasks
def schedule_processes_series( processes, est_process_duration=5000, time_limit=20, verbose_solver=False, time_limit_step=0, scheduled_tasks=(), n_trials=2, logger=None, ): lower_bound = None process_duration = upper_bound = est_process_duration def schedule_tasks(tasks, upper_bound, lower_bound, time_limit, randomization): numberjack_scheduler( tasks, upper_bound=upper_bound, lower_bound=lower_bound, time_limit=time_limit, solver_method="Mistral", randomization=randomization, verbose_solver=verbose_solver, ) considered_tasks = [copy(t) for t in scheduled_tasks] schedule_tasks( list(copy(processes[0])) + considered_tasks, upper_bound, lower_bound, time_limit, randomization=-1, ) new_processes = [] iterator = list(enumerate(processes)) if logger is not None: iterator = logger.iter_bar(process=iterator) for i, process in iterator: new_tasks = copy(process) new_processes.append(new_tasks) considered_tasks += new_tasks for trial in range(n_trials): try: schedule_tasks( considered_tasks, upper_bound, lower_bound, time_limit + time_limit_step * trial, randomization=0, ) lower_bound = min([t.scheduled_start for t in new_tasks]) latest = max([t.scheduled_end for t in new_tasks]) process_duration = min(process_duration, latest - lower_bound) upper_bound = latest + est_process_duration break except ValueError as e: pass assert all([(t.scheduled_resources is not None) for t in considered_tasks]) return new_processes