4ae321a17f862aaf4a399609550539d022565138,sos/sos_executor.py,MP_Executor,run,#MP_Executor#Any#Any#,572

Before Change


                p.join()

                runnable = dag.node_by_id(u)
                if isinstance(res, UnknownTarget):
                    runnable._status = None
                    target = res.target
                    if self.resolve_dangling_targets(dag, [target]) == 0:
                        raise RuntimeError("Failed to resolve {}{}."
                            .format(target, dag.steps_depending_on(target, self.workflow)))
                    // now, there should be no dangling targets, let us connect nodes
                    // this can be done more efficiently
                    runnable._depends_targets.append(target)
                    dag._all_dependent_files[target].append(runnable)
                    dag.build(self.workflow.auxiliary_sections)
                    self.save_dag(dag)
                    //
                    cycle = dag.circular_dependencies()
                    if cycle:
                        raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
                elif isinstance(res, RemovedTarget):
                    runnable._status = None
                    target = res.target
                    if not dag.regenerate_target(target):
                        if self.resolve_dangling_targets(dag, [target]) == 0:
                            raise RuntimeError("Failed to regenerate or resolve {}{}."
                                .format(target, dag.steps_depending_on(target, self.workflow)))
                        runnable._depends_targets.append(target)
                        dag._all_dependent_files[target].append(runnable)
                        dag.build(self.workflow.auxiliary_sections)
                        //
                        cycle = dag.circular_dependencies()
                        if cycle:
                            raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
                    self.save_dag(dag)
                elif isinstance(res, UnavailableLock):
                    runnable._status = "pending"
                    runnable._signature = (res.output, res.sig_file)
                    section = self.workflow.section_by_id(runnable._step_uuid)
                    env.logger.info("Waiting on another process for step {}".format(section.step_name()))
                    // move away to let other tasks to run first
                    procs[idx] = None
                // if the job is failed
                elif isinstance(res, Exception):
                    runnable._status = "failed"
                    exec_error.append(runnable._node_id, res)
                    prog.progress(1)
                    procs[idx] = None
                else:
                    //
                    for k, v in res.items():
                        env.sos_dict.set(k, v)
                    //
                    // set context to the next logic step.
                    for edge in dag.out_edges(runnable):
                        node = edge[1]
                        // if node is the logical next step...
                        if node._node_index is not None and runnable._node_index is not None:
                            //and node._node_index == runnable._node_index + 1:
                            node._context.update(env.sos_dict.clone_selected_vars(
                                node._context["__signature_vars__"] | node._context["__environ_vars__"] \
                                | {"_input", "__step_output__", "__default_output__", "__args__"}))
                    runnable._status = "completed"
                    prog.progress(1)
                    procs[idx] = None
                //env.logger.error("completed")
                //dag.show_nodes()
            // step 2: submit new jobs if there are empty slots
            for idx, proc in enumerate(procs):
                // if there is empty slot, submit
                if proc is not None:
                    continue

After Change


        // python statements in different run modes.
        env.sos_dict.set("run_mode", env.run_mode)
        // process step of the pipelinp
        dag = self.initialize_dag(targets=targets)

        // process step of the pipelinp
        //
        procs = [None for x in range(env.max_jobs)]
        prog = ProgressBar(self.workflow.name, dag.num_nodes(), disp=dag.num_nodes() > 1 and env.verbosity == 1)
        exec_error = ExecuteError(self.workflow.name)
        while True:
            // step 1: check existing jobs and see if they are completed
            for idx, proc in enumerate(procs):
                if proc is None:
                    continue
                (p, q, u) = proc
                try:
                    res = q.get_nowait()
                except Empty:
                    // continue waiting
                    continue
                //
                // if we does get the result
                p.join()

                runnable = dag.node_by_id(u)
                if isinstance(res, (UnknownTarget, RemovedTarget)):
                    runnable._status = None
                    target = res.target
                    if dag.regenerate_target(target):
                        //runnable._depends_targets.append(target)
                        //dag._all_dependent_files[target].append(runnable)
                        dag.build(self.workflow.auxiliary_sections)
                        //
                        cycle = dag.circular_dependencies()
                        if cycle:
                            raise RuntimeError("Circular dependency detected {} after regeneration. It is likely a later step produces input of a previous step.".format(cycle))

                    else:
                        if self.resolve_dangling_targets(dag, [target]) == 0:
                            raise RuntimeError("Failed to regenerate or resolve {}{}."
                                .format(target, dag.steps_depending_on(target, self.workflow)))
                        runnable._depends_targets.append(target)
                        dag._all_dependent_files[target].append(runnable)
                        dag.build(self.workflow.auxiliary_sections)
                        //
                        cycle = dag.circular_dependencies()
                        if cycle:
                            raise RuntimeError("Circular dependency detected {}. It is likely a later step produces input of a previous step.".format(cycle))
                    self.save_dag(dag)
                elif isinstance(res, UnavailableLock):
                    runnable._status = "pending"
                    runnable._signature = (res.output, res.sig_file)
                    section = self.workflow.section_by_id(runnable._step_uuid)
                    env.logger.info("Waiting on another process for step {}".format(section.step_name()))
                    // move away to let other tasks to run first
                    procs[idx] = None
                // if the job is failed
                elif isinstance(res, Exception):
                    runnable._status = "failed"
                    exec_error.append(runnable._node_id, res)
                    prog.progress(1)
                    procs[idx] = None
                else:
                    //
                    for k, v in res.items():
                        env.sos_dict.set(k, v)
                    //
                    // set context to the next logic step.
                    for edge in dag.out_edges(runnable):
                        node = edge[1]
                        // if node is the logical next step...
                        if node._node_index is not None and runnable._node_index is not None:
                            //and node._node_index == runnable._node_index + 1:
                            node._context.update(env.sos_dict.clone_selected_vars(
                                node._context["__signature_vars__"] | node._context["__environ_vars__"] \
                                | {"_input", "__step_output__", "__default_output__", "__args__"}))
                    dag.update_step(runnable, env.sos_dict["__step_input__"],
                        env.sos_dict["__step_output__"],
                        env.sos_dict["__step_depends__"],
                        env.sos_dict["__step_local_input__"],
                        env.sos_dict["__step_local_output__"])
                    runnable._status = "completed"
                    prog.progress(1)
                    procs[idx] = None
                //env.logger.error("completed")
                //dag.show_nodes()
            // step 2: submit new jobs if there are empty slots
            for idx, proc in enumerate(procs):
                // if there is empty slot, submit
                if proc is not None:
                    continue
Italian Trulli
In pattern: SUPERPATTERN

Frequency: 4

Non-data size: 43

Instances


Project Name: vatlab/SoS
Commit Name: 4ae321a17f862aaf4a399609550539d022565138
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: MP_Executor
Method Name: run


Project Name: vatlab/SoS
Commit Name: a38c773548db1a20983f794e16898f83f8c86249
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run


Project Name: vatlab/SoS
Commit Name: 4ae321a17f862aaf4a399609550539d022565138
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: MP_Executor
Method Name: run


Project Name: vatlab/SoS
Commit Name: 4ae321a17f862aaf4a399609550539d022565138
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: Base_Executor
Method Name: run


Project Name: vatlab/SoS
Commit Name: a38c773548db1a20983f794e16898f83f8c86249
Time: 2016-12-17
Author: ben.bog@gmail.com
File Name: sos/sos_executor.py
Class Name: MP_Executor
Method Name: run