while any([udf.is_alive() for udf in self.udfs]) and count_parsed < total_count:
if docs_added < total_count:
in_queue.put(next(xs_generator))
docs_added += 1
if docs_added == total_count:
in_queue.put(UDF.QUEUE_CLOSED)
docs_added += 1
After Change
// Create a Queue to feed documents to parsers
manager = Manager()
in_queue = manager.Queue()
// Use an output queue to track multiprocess progress
out_queue = JoinableQueue()
total_count = len(xs)
// Start UDF Processes
for i in range(parallelism):
udf = self.udf_class(
in_queue=in_queue,
out_queue=out_queue,
worker_id=i,
**self.udf_init_kwargs
)
udf.apply_kwargs = kwargs
self.udfs.append(udf)
// Start the UDF processes, and then join on their completion
for udf in self.udfs:
udf.start()
// Fill input queue with documents
pool = Pool(parallelism)
in_tuples = ((in_queue, x) for x in xs)
pool.map_async(func=async_fill_input_queue, iterable=in_tuples)
count_parsed = 0
while count_parsed < total_count: