0f5249b8e9e9fd837ab8f3ae529fa2e05df7a292,sos/sos_executor.py,Base_Executor,run,#Base_Executor#Any#Any#Any#,755
Before Change
for p, _, _ in procs + pool:
p[1 ].put(None)
p[0 ].join()
prog.close()
if exec_error.errors:
failed_steps, pending_steps = dag.pending()
if failed_steps:
sections = [self.workflow.section_by_id(x._step_uuid).step_name() for x in failed_steps]
After Change
procs = []
pool = []
try :
prog = ProgressBar(desc=self.workflow.name, total=dag.num_nodes(), disable=dag.num_nodes() <= 1 or env.verbosity != 1 )
exec_error = ExecuteError(self.workflow.name)
while True:
for idx, proc in enumerate (procs) :
if proc is None:
continue
[p, q, runnable] = proc
if not q.poll():
continue
res = q.recv()
if isinstance (res, str) :
if res.startswith ("task" ) :
env.logger.debug ("Receive {}" .format(res) )
runnable._host = Host(res.split(" " )[1 ])
runnable._pending_tasks = res.split(" " )[2 :]
for task in runnable._pending_tasks:
runnable._host.submit_task(task)
runnable._status = "task_pending"
continue
else :
raise RuntimeError ("Unexpected value from step {}" .format(res) )
pool.append (procs[idx])
procs[idx] = None
if isinstance (res, (UnknownTarget, RemovedTarget) ):
runnable._status = None
target = res.target
if dag.regenerate_target(target):
dag.build(self.workflow.auxiliary_sections)
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError ("Circular dependency detected {} after regeneration. It is likely a later step produces input of a previous step." .format(cycle) )
else :
if self.resolve_dangling_targets (dag, [target]) == 0 :
raise RuntimeError ("Failed to regenerate or resolve {}{}."
.format(target, dag.steps_depending_on(target, self.workflow) ))
runnable._depends_targets.append (target)
dag._all_dependent_files[target].append (runnable)
dag.build (self.workflow.auxiliary_sections)
cycle = dag.circular_dependencies()
if cycle:
raise RuntimeError ("Circular dependency detected {}. It is likely a later step produces input of a previous step." .format(cycle) )
self.save_dag (dag)
elif isinstance (res, UnavailableLock) :
runnable._status = "signature_pending"
runnable._signature = (res.output, res.sig_file)
section = self.workflow.section_by_id(runnable._step_uuid)
env.logger.info("Waiting on another process for step {}" .format(section.step_name()))
elif isinstance (res, Exception) :
runnable._status = "failed"
exec_error.append(runnable._node_id, res)
prog.update(1 )
else :
svar = {}
for k, v in res.items():
if k == "__shared__" :
svar = v
env.sos_dict.update(v)
else :
env.sos_dict.set(k, v)
for edge in dag.out_edges(runnable):
node = edge[1 ]
if node._node_index is not None and runnable._node_index is not None:
node._context.update(env.sos_dict.clone_selected_vars(
node._context["__signature_vars__" ] | node._context["__environ_vars__" ] \
| {"_input" , "__step_output__" , "__default_output__" , "__args__" }))
node._context.update(svar)
node._context["__completed__" ].append(res["__step_name__" ])
dag.update_step(runnable, env.sos_dict["__step_input__" ],
env.sos_dict["__step_output__" ],
env.sos_dict["__step_depends__" ],
env.sos_dict["__step_local_input__" ],
env.sos_dict["__step_local_output__" ])
runnable._status = "completed"
prog.update(1 )
procs = [x for x in procs if x is not None]
for proc in procs:
if proc[2 ]._status == "task_pending" :
res = proc[2 ]._host.check_status(proc[2 ]._pending_tasks)
if any (x in ("pending" , "running" , "completed-old" , "failed-old" , "failed-missing-output" , "failed-old-missing-output" ) for x in res):
continue
elif all (x == "completed" for x in res) :
env.logger.debug ("Put results for {}" .format(" " .join(proc[2 ]._pending_tasks) ))
res = runnable._host.retrieve_results(proc[2 ]._pending_tasks)
proc[1 ].send(res)
proc[2 ]._status == "running"
else :
raise RuntimeError ("Job returned with status {}" .format(res) )
while True:
num_running = len([x for x in procs if x[2 ]._status != "task_pending" ])
if num_running >= env.max_jobs:
break
runnable = dag.find_executable()
if runnable is None:
break
section = self.workflow.section_by_id(runnable._step_uuid)
runnable._status = "running"
q = mp.Pipe()
if not pool:
worker_queue = mp.Queue()
worker = StepWorker(queue=worker_queue, config=self.config, args=self.args)
worker.start()
else :
p, _, _ = pool.pop(0 )
worker = p[0 ]
worker_queue = p[1 ]
shared = {x: env.sos_dict[x] for x in self.shared if x in env.sos_dict and pickleable (env.sos_dict[x], x) }
if "shared" in section.options:
if isinstance (section.options["shared" ], str) :
svars = [section.options["shared" ]]
elif isinstance (section.options["shared" ], dict) :
svars = section.options["shared" ].keys()
elif isinstance (section.options["shared" ], Sequence) :
svars = []
for x in section.options["shared" ]:
if isinstance (x, str) :
svars.append (x)
elif isinstance (x, dict) :
svars.extend (x.keys() )
else :
raise ValueError ("Unacceptable value for parameter shared: {}" .format(section.options["shared" ]) )
else :
raise ValueError ("Unacceptable value for parameter shared: {}" .format(section.options["shared" ]) )
shared.update ({x: env.sos_dict[x] for x in svars if x in env.sos_dict and pickleable(env.sos_dict[x], x) })
if "__workflow_sig__" in env.sos_dict:
runnable._context["__workflow_sig__"] = env.sos_dict["__workflow_sig__" ]
worker_queue.put((section, runnable._context, shared, env.sig_mode, env.verbosity, q[1 ]))
procs.append( [[worker, worker_queue], q[0 ], runnable])
num_running = len([x for x in procs if x[2 ]._status != "task_pending" ])
env.logger.trace("PROC {}" .format(", " .join([x[2 ]._status for x in procs])))
if not procs:
break
else :
time.sleep(0.1 )
except:
for p, _, _ in procs + pool:
p[0 ].terminate()
raise
finally :
for p, _, _ in procs + pool:
p[1 ].put(None)
p[0 ].terminate()
p[0 ].join()
prog.close()
if exec_error.errors:
failed_steps, pending_steps = dag.pending()
if failed_steps:
sections = [self.workflow.section_by_id(x._step_uuid).step_name() for x in failed_steps]
In pattern: SUPERPATTERN
Frequency: 3
Non-data size: 4
Instances Project Name: vatlab/SoS
Commit Name: 0f5249b8e9e9fd837ab8f3ae529fa2e05df7a292
Time: 2017-03-02
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run
Project Name: markovmodel/PyEMMA
Commit Name: 44bd049613c2e0fb70d971f2c35775841cbf2ef6
Time: 2016-07-28
Author: m.scherer@fu-berlin.de
File Name: pyemma/coordinates/data/feature_reader.py
Class Name: FeatureReaderIterator
Method Name: _next_chunk
Project Name: deepchem/deepchem
Commit Name: 3b3a06ad8402079c2d18718349d5f0f212ac7b81
Time: 2020-12-11
Author: nfrey213@gmail.com
File Name: deepchem/feat/base_classes.py
Class Name: ComplexFeaturizer
Method Name: featurize