0f5249b8e9e9fd837ab8f3ae529fa2e05df7a292,sos/sos_executor.py,Base_Executor,run,#Base_Executor#Any#Any#Any#,755
Before Change
for p, _, _ in procs + pool:
p[1].put(None)
p[0].join()
prog.close()
if exec_error.errors:
failed_steps, pending_steps = dag.pending()
if failed_steps:
sections = [self.workflow.section_by_id(x._step_uuid).step_name() for x in failed_steps]
After Change
procs = []
// process pools
pool = []
try:
prog = ProgressBar(desc=self.workflow.name, total=dag.num_nodes(), disable=dag.num_nodes() <= 1 or env.verbosity != 1)
exec_error = ExecuteError(self.workflow.name)
while True:
// step 1: check existing jobs and see if they are completed
for idx, proc in enumerate(procs):
if proc is None:
continue
[p, q, runnable] = proc
if not q.poll():
continue
res = q.recv()
// the step is waiting for external tasks
if isinstance(res, str):
if res.startswith("task"):
env.logger.debug("Receive {}".format(res))
runnable._host = Host(res.split(" ")[1])
runnable._pending_tasks = res.split(" ")[2:]
for task in runnable._pending_tasks:
runnable._host.submit_task(task)
runnable._status = "task_pending"
continue
else:
raise RuntimeError("Unexpected value from step {}".format(res))
// if we does get the result,
// we send the process to pool
pool.append(procs[idx])
procs[idx] = None
if isinstance(res, (UnknownTarget, RemovedTarget)):
runnable._status = None
target = res.target
if dag.regenerate_target(target):
//runnable._depends_targets.append(target)
//dag._all_dependent_files[target].append(runnable)
dag.build(self.workflow.auxiliary_sections)
//
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError("Circular dependency detected {} after regeneration. It is likely a later step produces input of a previous step.".format(cycle))
else:
if self.resolve_dangling_targets(dag, [target]) == 0:
raise RuntimeError("Failed to regenerate or resolve {}{}."
.format(target, dag.steps_depending_on(target, self.workflow)))
runnable._depends_targets.append(target)
dag._all_dependent_files[target].append(runnable)
dag.build(self.workflow.auxiliary_sections)
//
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
self.save_dag(dag)
elif isinstance(res, UnavailableLock):
runnable._status = "signature_pending"
runnable._signature = (res.output, res.sig_file)
section = self.workflow.section_by_id(runnable._step_uuid)
env.logger.info("Waiting on another process for step {}".format(section.step_name()))
// if the job is failed
elif isinstance(res, Exception):
runnable._status = "failed"
exec_error.append(runnable._node_id, res)
prog.update(1)
else:
//
svar = {}
for k, v in res.items():
if k == "__shared__":
svar = v
env.sos_dict.update(v)
else:
env.sos_dict.set(k, v)
//
// set context to the next logic step.
for edge in dag.out_edges(runnable):
node = edge[1]
// if node is the logical next step...
if node._node_index is not None and runnable._node_index is not None:
//and node._node_index == runnable._node_index + 1:
node._context.update(env.sos_dict.clone_selected_vars(
node._context["__signature_vars__"] | node._context["__environ_vars__"] \
| {"_input", "__step_output__", "__default_output__", "__args__"}))
node._context.update(svar)
node._context["__completed__"].append(res["__step_name__"])
dag.update_step(runnable, env.sos_dict["__step_input__"],
env.sos_dict["__step_output__"],
env.sos_dict["__step_depends__"],
env.sos_dict["__step_local_input__"],
env.sos_dict["__step_local_output__"])
runnable._status = "completed"
prog.update(1)
// remove None
procs = [x for x in procs if x is not None]
// step 2: check is some jobs are done
for proc in procs:
// if a job is pending, check if it is done.
if proc[2]._status == "task_pending":
res = proc[2]._host.check_status(proc[2]._pending_tasks)
if any(x in ("pending", "running", "completed-old", "failed-old", "failed-missing-output", "failed-old-missing-output") for x in res):
continue
elif all(x == "completed" for x in res):
env.logger.debug("Put results for {}".format(" ".join(proc[2]._pending_tasks)))
res = runnable._host.retrieve_results(proc[2]._pending_tasks)
proc[1].send(res)
proc[2]._status == "running"
else:
raise RuntimeError("Job returned with status {}".format(res))
// step 3: check if there is room and need for another job
while True:
num_running = len([x for x in procs if x[2]._status != "task_pending"])
if num_running >= env.max_jobs:
break
// find any step that can be executed and run it, and update the DAT
// with status.
runnable = dag.find_executable()
if runnable is None:
// no runnable
//dag.show_nodes()
break
// find the section from runnable
section = self.workflow.section_by_id(runnable._step_uuid)
// execute section with specified input
runnable._status = "running"
q = mp.Pipe()
// if pool is empty, create a new process
if not pool:
worker_queue = mp.Queue()
worker = StepWorker(queue=worker_queue, config=self.config, args=self.args)
worker.start()
else:
// get worker, q and runnable is not needed any more
p, _, _ = pool.pop(0)
worker = p[0]
worker_queue = p[1]
// workflow shared variables
shared = {x: env.sos_dict[x] for x in self.shared if x in env.sos_dict and pickleable(env.sos_dict[x], x)}
if "shared" in section.options:
if isinstance(section.options["shared"], str):
svars = [section.options["shared"]]
elif isinstance(section.options["shared"], dict):
svars = section.options["shared"].keys()
elif isinstance(section.options["shared"], Sequence):
svars = []
for x in section.options["shared"]:
if isinstance(x, str):
svars.append(x)
elif isinstance(x, dict):
svars.extend(x.keys())
else:
raise ValueError("Unacceptable value for parameter shared: {}".format(section.options["shared"]))
else:
raise ValueError("Unacceptable value for parameter shared: {}".format(section.options["shared"]))
shared.update({x: env.sos_dict[x] for x in svars if x in env.sos_dict and pickleable(env.sos_dict[x], x)})
if "__workflow_sig__" in env.sos_dict:
runnable._context["__workflow_sig__"] = env.sos_dict["__workflow_sig__"]
worker_queue.put((section, runnable._context, shared, env.sig_mode, env.verbosity, q[1]))
procs.append( [[worker, worker_queue], q[0], runnable])
//
num_running = len([x for x in procs if x[2]._status != "task_pending"])
env.logger.trace("PROC {}".format(", ".join([x[2]._status for x in procs])))
if not procs:
break
//elif not env.__wait__ and num_running == 0:
// // if all jobs are pending, let us check if all jbos have been submitted.
// pending_tasks = []
// running_tasks = []
// for n in [x[2] for x in procs if x[2]._status == "task_pending"]:
// p, r = n._host._task_engine.get_tasks()
// pending_tasks.extend(p)
// running_tasks.extend(r)
// if not pending_tasks:
// env.logger.info("SoS exists with {} running tasks".format(len(running_tasks)))
// for task in running_tasks:
// env.logger.info(task)
// break
else:
time.sleep(0.1)
// close all processes
except:
for p, _, _ in procs + pool:
p[0].terminate()
raise
finally:
for p, _, _ in procs + pool:
p[1].put(None)
p[0].terminate()
p[0].join()
prog.close()
if exec_error.errors:
failed_steps, pending_steps = dag.pending()
if failed_steps:
sections = [self.workflow.section_by_id(x._step_uuid).step_name() for x in failed_steps]
In pattern: SUPERPATTERN
Frequency: 3
Non-data size: 4
Instances Project Name: vatlab/SoS
Commit Name: 0f5249b8e9e9fd837ab8f3ae529fa2e05df7a292
Time: 2017-03-02
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run
Project Name: markovmodel/PyEMMA
Commit Name: 44bd049613c2e0fb70d971f2c35775841cbf2ef6
Time: 2016-07-28
Author: m.scherer@fu-berlin.de
File Name: pyemma/coordinates/data/feature_reader.py
Class Name: FeatureReaderIterator
Method Name: _next_chunk
Project Name: deepchem/deepchem
Commit Name: 3b3a06ad8402079c2d18718349d5f0f212ac7b81
Time: 2020-12-11
Author: nfrey213@gmail.com
File Name: deepchem/feat/base_classes.py
Class Name: ComplexFeaturizer
Method Name: featurize