0f5249b8e9e9fd837ab8f3ae529fa2e05df7a292,sos/sos_executor.py,Base_Executor,run,#Base_Executor#Any#Any#Any#,755

Before Change


        for p, _, _ in procs + pool:
            p[1].put(None)
            p[0].join()
        prog.close()
        if exec_error.errors:
            failed_steps, pending_steps = dag.pending()
            if failed_steps:
                sections = [self.workflow.section_by_id(x._step_uuid).step_name() for x in failed_steps]

After Change


        procs = []
        // process pools
        pool = []
        try:
            prog = ProgressBar(desc=self.workflow.name, total=dag.num_nodes(), disable=dag.num_nodes() <= 1 or env.verbosity != 1)
            exec_error = ExecuteError(self.workflow.name)
            while True:
                // step 1: check existing jobs and see if they are completed
                for idx, proc in enumerate(procs):
                    if proc is None:
                        continue

                    [p, q, runnable] = proc
                    if not q.poll():
                        continue

                    res = q.recv()
                    // the step is waiting for external tasks
                    if isinstance(res, str):
                        if res.startswith("task"):
                            env.logger.debug("Receive {}".format(res))
                            runnable._host = Host(res.split(" ")[1])
                            runnable._pending_tasks = res.split(" ")[2:]
                            for task in runnable._pending_tasks:
                                runnable._host.submit_task(task)
                            runnable._status = "task_pending"
                            continue
                        else:
                            raise RuntimeError("Unexpected value from step {}".format(res))

                    // if we does get the result,
                    // we send the process to pool
                    pool.append(procs[idx])
                    procs[idx] = None

                    if isinstance(res, (UnknownTarget, RemovedTarget)):
                        runnable._status = None
                        target = res.target
                        if dag.regenerate_target(target):
                            //runnable._depends_targets.append(target)
                            //dag._all_dependent_files[target].append(runnable)
                            dag.build(self.workflow.auxiliary_sections)
                            //
                            cycle = dag.circular_dependencies()
                            if cycle:
                                raise RuntimeError("Circular dependency detected {} after regeneration. It is likely a later step produces input of a previous step.".format(cycle))

                        else:
                            if self.resolve_dangling_targets(dag, [target]) == 0:
                                raise RuntimeError("Failed to regenerate or resolve {}{}."
                                    .format(target, dag.steps_depending_on(target, self.workflow)))
                            runnable._depends_targets.append(target)
                            dag._all_dependent_files[target].append(runnable)
                            dag.build(self.workflow.auxiliary_sections)
                            //
                            cycle = dag.circular_dependencies()
                            if cycle:
                                raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
                        self.save_dag(dag)
                    elif isinstance(res, UnavailableLock):
                        runnable._status = "signature_pending"
                        runnable._signature = (res.output, res.sig_file)
                        section = self.workflow.section_by_id(runnable._step_uuid)
                        env.logger.info("Waiting on another process for step {}".format(section.step_name()))
                    // if the job is failed
                    elif isinstance(res, Exception):
                        runnable._status = "failed"
                        exec_error.append(runnable._node_id, res)
                        prog.update(1)
                    else:
                        //
                        svar = {}
                        for k, v in res.items():
                            if k == "__shared__":
                                svar = v
                                env.sos_dict.update(v)
                            else:
                                env.sos_dict.set(k, v)
                        //
                        // set context to the next logic step.
                        for edge in dag.out_edges(runnable):
                            node = edge[1]
                            // if node is the logical next step...
                            if node._node_index is not None and runnable._node_index is not None:
                                //and node._node_index == runnable._node_index + 1:
                                node._context.update(env.sos_dict.clone_selected_vars(
                                    node._context["__signature_vars__"] | node._context["__environ_vars__"] \
                                    | {"_input", "__step_output__", "__default_output__", "__args__"}))
                            node._context.update(svar)
                            node._context["__completed__"].append(res["__step_name__"])
                        dag.update_step(runnable, env.sos_dict["__step_input__"],
                            env.sos_dict["__step_output__"],
                            env.sos_dict["__step_depends__"],
                            env.sos_dict["__step_local_input__"],
                            env.sos_dict["__step_local_output__"])
                        runnable._status = "completed"
                        prog.update(1)

                // remove None
                procs = [x for x in procs if x is not None]

                // step 2: check is some jobs are done
                for proc in procs:
                    // if a job is pending, check if it is done.
                    if proc[2]._status == "task_pending":
                        res = proc[2]._host.check_status(proc[2]._pending_tasks)
                        if any(x in  ("pending", "running", "completed-old", "failed-old", "failed-missing-output", "failed-old-missing-output") for x in res):
                            continue
                        elif all(x == "completed" for x in res):
                            env.logger.debug("Put results for {}".format(" ".join(proc[2]._pending_tasks)))
                            res = runnable._host.retrieve_results(proc[2]._pending_tasks)
                            proc[1].send(res)
                            proc[2]._status == "running"
                        else:
                            raise RuntimeError("Job returned with status {}".format(res))

                // step 3: check if there is room and need for another job
                while True:
                    num_running = len([x for x in procs if x[2]._status != "task_pending"])
                    if num_running >= env.max_jobs:
                        break
                    // find any step that can be executed and run it, and update the DAT
                    // with status.
                    runnable = dag.find_executable()
                    if runnable is None:
                        // no runnable
                        //dag.show_nodes()
                        break
                    // find the section from runnable
                    section = self.workflow.section_by_id(runnable._step_uuid)
                    // execute section with specified input
                    runnable._status = "running"
                    q = mp.Pipe()

                    // if pool is empty, create a new process
                    if not pool:
                        worker_queue = mp.Queue()
                        worker = StepWorker(queue=worker_queue, config=self.config, args=self.args)
                        worker.start()
                    else:
                        // get worker, q and runnable is not needed any more
                        p, _, _ = pool.pop(0)
                        worker = p[0]
                        worker_queue = p[1]

                    // workflow shared variables
                    shared = {x: env.sos_dict[x] for x in self.shared if x in env.sos_dict and pickleable(env.sos_dict[x], x)}
                    if "shared" in section.options:
                        if isinstance(section.options["shared"], str):
                            svars = [section.options["shared"]]
                        elif isinstance(section.options["shared"], dict):
                            svars = section.options["shared"].keys()
                        elif isinstance(section.options["shared"], Sequence):
                            svars = []
                            for x in section.options["shared"]:
                                if isinstance(x, str):
                                    svars.append(x)
                                elif isinstance(x, dict):
                                    svars.extend(x.keys())
                                else:
                                    raise ValueError("Unacceptable value for parameter shared: {}".format(section.options["shared"]))
                        else:
                            raise ValueError("Unacceptable value for parameter shared: {}".format(section.options["shared"]))
                        shared.update({x: env.sos_dict[x] for x in svars if x in env.sos_dict and pickleable(env.sos_dict[x], x)})
                    if "__workflow_sig__" in env.sos_dict:
                        runnable._context["__workflow_sig__"] = env.sos_dict["__workflow_sig__"]
                    worker_queue.put((section, runnable._context, shared, env.sig_mode, env.verbosity, q[1]))
                    procs.append( [[worker, worker_queue], q[0], runnable])
                //
                num_running = len([x for x in procs if x[2]._status != "task_pending"])

                env.logger.trace("PROC {}".format(", ".join([x[2]._status for x in procs])))
                if not procs:
                    break
                //elif not env.__wait__ and num_running == 0:
                //    // if all jobs are pending, let us check if all jbos have been submitted.
                //    pending_tasks = []
                //    running_tasks = []
                //    for n in [x[2] for x in procs if x[2]._status == "task_pending"]:
                //        p, r = n._host._task_engine.get_tasks()
                //        pending_tasks.extend(p)
                //        running_tasks.extend(r)
                //    if not pending_tasks:
                //        env.logger.info("SoS exists with {} running tasks".format(len(running_tasks)))
                //        for task in running_tasks:
                //            env.logger.info(task)
                //        break
                else:
                    time.sleep(0.1)
            // close all processes
        except:
            for p, _, _ in procs + pool:
                p[0].terminate()
            raise
        finally:
            for p, _, _ in procs + pool:
                p[1].put(None)
                p[0].terminate()
                p[0].join()
            prog.close()
        if exec_error.errors:
            failed_steps, pending_steps = dag.pending()
            if failed_steps:
                sections = [self.workflow.section_by_id(x._step_uuid).step_name() for x in failed_steps]
Italian Trulli
In pattern: SUPERPATTERN

Frequency: 3

Non-data size: 4

Instances


Project Name: vatlab/SoS
Commit Name: 0f5249b8e9e9fd837ab8f3ae529fa2e05df7a292
Time: 2017-03-02
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run


Project Name: markovmodel/PyEMMA
Commit Name: 44bd049613c2e0fb70d971f2c35775841cbf2ef6
Time: 2016-07-28
Author: m.scherer@fu-berlin.de
File Name: pyemma/coordinates/data/feature_reader.py
Class Name: FeatureReaderIterator
Method Name: _next_chunk


Project Name: deepchem/deepchem
Commit Name: 3b3a06ad8402079c2d18718349d5f0f212ac7b81
Time: 2020-12-11
Author: nfrey213@gmail.com
File Name: deepchem/feat/base_classes.py
Class Name: ComplexFeaturizer
Method Name: featurize