for step, promise in step_batch:
keyed_request, result = self._maybe_cache_get(step)
if result is None:
result = step(node_builder)
self._maybe_cache_put(keyed_request, result)
promise.success(result)
def _try_pickle(obj):
try:
After Change
def reduce(self, execution_request):
generator = self._scheduler.schedule(execution_request)
for runnable_batch in generator:
completed = []
for entry, runnable in runnable_batch:
key, result = self._maybe_cache_get(entry, runnable)
if result is None:
try:
result = Return(runnable.func(*runnable.args))
self._maybe_cache_put(key, result)
except Exception as e:
result = Throw(e)
completed.append((entry, result))
generator.send(completed)
def _try_pickle(obj):