else:
keyed_request, result = self._maybe_cache_get(step)
if result is None:
result = step(self._node_builder)
self._maybe_cache_put(keyed_request, result)
promise.success(result)
return submitted
def _await_one(self, in_flight):
After Change
Submit pending while there"s capacity, and more than `n` items in pending_submission.
to_submit = min(len(pending_submission) - n, self._pool_size - len(in_flight))
submitted = 0
completed = []
for _ in range(to_submit):
step, runnable = pending_submission.popitem(last=False)
if self._is_async_node(step.node):
// Run in a future.
if step in in_flight:
raise InFlightException("{} is already in_flight!".format(step))
future = self._pool.submit(functools.partial(self._execute_step, step, runnable))
in_flight[step] = future
self._pending.add(future)
future.add_done_callback(self._processed_node_callback)
submitted += 1
else:
// Run inline.
completed.append(self._execute_step(step, runnable))
return submitted, completed
def _await_one(self, in_flight):
Await one completed step, and remove it from in_flight.