self.handle_unknown_target(res.target, dag, runnable)
// if the job is failed
elif isinstance(res, Exception):
env.logger.debug(f"Nested received an exception")
runnable._status = "failed"
dag.save(env.config["output_dag"])
exec_error.append(runnable._node_id, res)
raise exec_error
elif "__step_name__" in res:
env.logger.debug(f"Nested receive step result ")
self.step_completed(res, dag, runnable)
else:
raise RuntimeError(
f"Nested wokflow received an unrecognized response: {res}")
manager.cleanup()
// step 3: find steps to run
while True:
if not dag.dirty():
break
// with status.
runnable = dag.find_executable()
if runnable is None:
dag.mark_dirty(False)
break
// find the section from runnable
section = self.workflow.section_by_id(runnable._step_uuid)
// execute section with specified input
runnable._status = "running"
dag.save(env.config["output_dag"])
// workflow shared variables
shared = {x: env.sos_dict[x] for x in self.shared.keys(
) if x in env.sos_dict and pickleable(env.sos_dict[x], x)}
if "shared" in section.options:
shared.update(self.get_shared_vars(
section.options["shared"]))
if "workflow_id" in env.sos_dict:
runnable._context["workflow_id"] = env.sos_dict["workflow_id"]
// send the step to the parent
step_id = uuid.uuid4()
env.logger.debug(
f"Nested send step {section.step_name()} to master with args {self.args} and context {runnable._context}")
socket = create_socket(env.zmq_context, zmq.PAIR, "worker pair socket")
port = socket.bind_to_random_port("tcp://127.0.0.1")
parent_socket.send_pyobj(["step", step_id, section, runnable._context, shared, self.args,
After Change
self.handle_unknown_target(res.target, dag, runnable)
// if the job is failed
elif isinstance(res, Exception):
env.log_to_file("EXECUTOR", f"Nested received an exception")
runnable._status = "failed"
dag.save(env.config["output_dag"])
exec_error.append(runnable._node_id, res)
raise exec_error
elif "__step_name__" in res:
env.log_to_file("EXECUTOR", f"Nested receive step result ")
self.step_completed(res, dag, runnable)
else:
raise RuntimeError(
f"Nested wokflow received an unrecognized response: {res}")
manager.cleanup()
// step 3: find steps to run
while True:
if not dag.dirty():
break
// with status.
runnable = dag.find_executable()
if runnable is None:
dag.mark_dirty(False)
break
// find the section from runnable
section = self.workflow.section_by_id(runnable._step_uuid)
// execute section with specified input
runnable._status = "running"
dag.save(env.config["output_dag"])
// workflow shared variables
shared = {x: env.sos_dict[x] for x in self.shared.keys(
) if x in env.sos_dict and pickleable(env.sos_dict[x], x)}
if "shared" in section.options:
shared.update(self.get_shared_vars(
section.options["shared"]))
if "workflow_id" in env.sos_dict:
runnable._context["workflow_id"] = env.sos_dict["workflow_id"]
// send the step to the parent
step_id = uuid.uuid4()
env.log_to_file("EXECUTOR",
f"Nested send step {section.step_name()} to master with args {self.args} and context {runnable._context}")
socket = create_socket(env.zmq_context, zmq.PAIR, "worker pair socket")
port = socket.bind_to_random_port("tcp://127.0.0.1")
parent_socket.send_pyobj(["step", step_id, section, runnable._context, shared, self.args,