if reply is None:
if len(wr) != 0:
env.logger.error(f"WORKER terminates with pending tasks. sos might not be termianting properly.")
env.logger.trace(f"WORKER {self.name} ({os.getpid()}) quits after receiving None.")
break
if not reply: // if an empty job is returned
time.sleep(0.1)
continue
//
// if a real job is returned, run it. _process_job will either return True
// or a runner in case it is interrupted.
env.logger.trace(
f"WORKER {self.name} ({os.getpid()}, {self.num_pending()} pending) receives {self._type_of_work(reply)} request {self._name_of_work(reply)} with master port {self._master_ports[new_idx]}")
if "task" in reply:
self.run_substep(reply)
env.logger.trace(
f"WORKER {self.name} ({os.getpid()}) completes substep {self._name_of_work(reply)}")
self._runners[new_idx] = True
continue
master_port = reply["config"]["sockets"]["master_port"]
After Change
if reply is None:
if len(wr) != 0:
env.logger.error(f"WORKER terminates with pending tasks. sos might not be termianting properly.")
env.log_to_file("WORKER", f"WORKER {self.name} ({os.getpid()}) quits after receiving None.")
break
if not reply: // if an empty job is returned
time.sleep(0.1)
continue
//
// if a real job is returned, run it. _process_job will either return True
// or a runner in case it is interrupted.
env.log_to_file("WORKER",
f"WORKER {self.name} ({os.getpid()}, {self.num_pending()} pending) receives {self._type_of_work(reply)} request {self._name_of_work(reply)} with master port {self._master_ports[new_idx]}")
if "task" in reply:
self.run_substep(reply)
env.log_to_file("WORKER",
f"WORKER {self.name} ({os.getpid()}) completes substep {self._name_of_work(reply)}")
self._runners[new_idx] = True
continue
master_port = reply["config"]["sockets"]["master_port"]