// Fill queue with docs; progress bar will not be updated until this is done
docs_added = 0
xs_generator = xs.generate()
for _ in range(min(parallelism, total_count)):
in_queue.put(next(xs_generator))
docs_added += 1
while any([udf.is_alive() for udf in self.udfs]) and count_parsed < total_count:
if docs_added < total_count:
in_queue.put(next(xs_generator))
docs_added += 1
if docs_added == total_count:
in_queue.put(UDF.QUEUE_CLOSED)
docs_added += 1
After Change
// Fill input queue with documents
pool = Pool(parallelism)
in_tuples = ((in_queue, x) for x in xs)
pool.map_async(func=async_fill_input_queue, iterable=in_tuples)
count_parsed = 0