4ae321a17f862aaf4a399609550539d022565138,sos/sos_executor.py,MP_Executor,run,#MP_Executor#Any#Any#,572
Before Change
p.join()
runnable = dag.node_by_id(u)
if isinstance(res, UnknownTarget):
runnable._status = None
target = res.target
if self.resolve_dangling_targets(dag, [target]) == 0:
raise RuntimeError("Failed to resolve {}{}."
.format(target, dag.steps_depending_on(target, self.workflow)))
// now, there should be no dangling targets, let us connect nodes
// this can be done more efficiently
runnable._depends_targets.append(target)
dag._all_dependent_files[target].append(runnable)
dag.build(self.workflow.auxiliary_sections)
self.save_dag(dag)
//
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
elif isinstance(res, RemovedTarget):
runnable._status = None
target = res.target
if not dag.regenerate_target(target):
if self.resolve_dangling_targets(dag, [target]) == 0:
raise RuntimeError("Failed to regenerate or resolve {}{}."
.format(target, dag.steps_depending_on(target, self.workflow)))
runnable._depends_targets.append(target)
dag._all_dependent_files[target].append(runnable)
dag.build(self.workflow.auxiliary_sections)
//
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
self.save_dag(dag)
elif isinstance(res, UnavailableLock):
runnable._status = "pending"
runnable._signature = (res.output, res.sig_file)
section = self.workflow.section_by_id(runnable._step_uuid)
env.logger.info("Waiting on another process for step {}".format(section.step_name()))
// move away to let other tasks to run first
procs[idx] = None
// if the job is failed
elif isinstance(res, Exception):
runnable._status = "failed"
exec_error.append(runnable._node_id, res)
prog.progress(1)
procs[idx] = None
else:
//
for k, v in res.items():
env.sos_dict.set(k, v)
//
// set context to the next logic step.
for edge in dag.out_edges(runnable):
node = edge[1]
// if node is the logical next step...
if node._node_index is not None and runnable._node_index is not None:
//and node._node_index == runnable._node_index + 1:
node._context.update(env.sos_dict.clone_selected_vars(
node._context["__signature_vars__"] | node._context["__environ_vars__"] \
| {"_input", "__step_output__", "__default_output__", "__args__"}))
runnable._status = "completed"
prog.progress(1)
procs[idx] = None
//env.logger.error("completed")
//dag.show_nodes()
// step 2: submit new jobs if there are empty slots
for idx, proc in enumerate(procs):
// if there is empty slot, submit
if proc is not None:
continue
After Change
// python statements in different run modes.
env.sos_dict.set("run_mode", env.run_mode)
// process step of the pipelinp
dag = self.initialize_dag(targets=targets)
// process step of the pipelinp
//
procs = [None for x in range(env.max_jobs)]
prog = ProgressBar(self.workflow.name, dag.num_nodes(), disp=dag.num_nodes() > 1 and env.verbosity == 1)
exec_error = ExecuteError(self.workflow.name)
while True:
// step 1: check existing jobs and see if they are completed
for idx, proc in enumerate(procs):
if proc is None:
continue
(p, q, u) = proc
try:
res = q.get_nowait()
except Empty:
// continue waiting
continue
//
// if we does get the result
p.join()
runnable = dag.node_by_id(u)
if isinstance(res, (UnknownTarget, RemovedTarget)):
runnable._status = None
target = res.target
if dag.regenerate_target(target):
//runnable._depends_targets.append(target)
//dag._all_dependent_files[target].append(runnable)
dag.build(self.workflow.auxiliary_sections)
//
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError("Circular dependency detected {} after regeneration. It is likely a later step produces input of a previous step.".format(cycle))
else:
if self.resolve_dangling_targets(dag, [target]) == 0:
raise RuntimeError("Failed to regenerate or resolve {}{}."
.format(target, dag.steps_depending_on(target, self.workflow)))
runnable._depends_targets.append(target)
dag._all_dependent_files[target].append(runnable)
dag.build(self.workflow.auxiliary_sections)
//
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
self.save_dag(dag)
elif isinstance(res, UnavailableLock):
runnable._status = "pending"
runnable._signature = (res.output, res.sig_file)
section = self.workflow.section_by_id(runnable._step_uuid)
env.logger.info("Waiting on another process for step {}".format(section.step_name()))
// move away to let other tasks to run first
procs[idx] = None
// if the job is failed
elif isinstance(res, Exception):
runnable._status = "failed"
exec_error.append(runnable._node_id, res)
prog.progress(1)
procs[idx] = None
else:
//
for k, v in res.items():
env.sos_dict.set(k, v)
//
// set context to the next logic step.
for edge in dag.out_edges(runnable):
node = edge[1]
// if node is the logical next step...
if node._node_index is not None and runnable._node_index is not None:
//and node._node_index == runnable._node_index + 1:
node._context.update(env.sos_dict.clone_selected_vars(
node._context["__signature_vars__"] | node._context["__environ_vars__"] \
| {"_input", "__step_output__", "__default_output__", "__args__"}))
dag.update_step(runnable, env.sos_dict["__step_input__"],
env.sos_dict["__step_output__"],
env.sos_dict["__step_depends__"],
env.sos_dict["__step_local_input__"],
env.sos_dict["__step_local_output__"])
runnable._status = "completed"
prog.progress(1)
procs[idx] = None
//env.logger.error("completed")
//dag.show_nodes()
// step 2: submit new jobs if there are empty slots
for idx, proc in enumerate(procs):
// if there is empty slot, submit
if proc is not None:
continue
In pattern: SUPERPATTERN
Frequency: 4
Non-data size: 43
Instances
Project Name: vatlab/SoS
Commit Name: 4ae321a17f862aaf4a399609550539d022565138
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: MP_Executor
Method Name: run
Project Name: vatlab/SoS
Commit Name: a38c773548db1a20983f794e16898f83f8c86249
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run
Project Name: vatlab/SoS
Commit Name: 4ae321a17f862aaf4a399609550539d022565138
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: MP_Executor
Method Name: run
Project Name: vatlab/SoS
Commit Name: 4ae321a17f862aaf4a399609550539d022565138
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run
Project Name: vatlab/SoS
Commit Name: a38c773548db1a20983f794e16898f83f8c86249
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: MP_Executor
Method Name: run