968bb199ef1d0d44f5105cdc5202cecec79d60b9,src/garage/sampler/multiprocessing_sampler.py,MultiprocessingSampler,obtain_samples,#MultiprocessingSampler#Any#Any#Any#Any#,121
Before Change
del itr
pbar = ProgBarCounter(num_samples)
batches = []
completed_samples = 0
self._agent_version += 1
updated_workers = set()
agent_ups = self._factory.prepare_worker_messages(
agent_update, cloudpickle.dumps)
env_ups = self._factory.prepare_worker_messages(env_update)
while completed_samples < num_samples:
self._push_updates(updated_workers, agent_ups, env_ups)
for _ in range(self._factory.n_workers):
try:
tag, contents = self._to_sampler.get_nowait()
if tag == "trajectory":
batch, version, worker_n = contents
del worker_n
if version == self._agent_version:
batches.append(batch)
num_returned_samples = batch.lengths.sum()
completed_samples += num_returned_samples
pbar.inc(num_returned_samples)
else:
// Receiving paths from previous iterations is
// normal. Potentially, we could gather them here,
// if an off-policy method wants them.
pass
else:
raise AssertionError(
"Unknown tag {} with contents {}".format(
tag, contents))
except queue.Empty:
pass
for q in self._to_worker:
try:
q.put_nowait(("stop", ()))
except queue.Full:
pass
pbar.stop()
return TrajectoryBatch.concatenate(*batches)
def obtain_exact_trajectories(self,
n_traj_per_worker,
After Change
agent_update, cloudpickle.dumps)
env_ups = self._factory.prepare_worker_messages(env_update)
with click.progressbar(length=num_samples, label="Sampling") as pbar:
while completed_samples < num_samples:
self._push_updates(updated_workers, agent_ups, env_ups)
for _ in range(self._factory.n_workers):
try:
tag, contents = self._to_sampler.get_nowait()
if tag == "trajectory":
batch, version, worker_n = contents
del worker_n
if version == self._agent_version:
batches.append(batch)
num_returned_samples = batch.lengths.sum()
completed_samples += num_returned_samples
pbar.update(num_returned_samples)
else:
// Receiving paths from previous iterations is
// normal. Potentially, we could gather them
// here, if an off-policy method wants them.
pass
else:
raise AssertionError(
"Unknown tag {} with contents {}".format(
tag, contents))
except queue.Empty:
pass
for q in self._to_worker:
try:
q.put_nowait(("stop", ()))
except queue.Full:
pass
return TrajectoryBatch.concatenate(*batches)
def obtain_exact_trajectories(self,
n_traj_per_worker,
In pattern: SUPERPATTERN
Frequency: 6
Non-data size: 7
Instances
Project Name: rlworkgroup/garage
Commit Name: 968bb199ef1d0d44f5105cdc5202cecec79d60b9
Time: 2020-06-02
Author: ryanjulian@users.noreply.github.com
File Name: src/garage/sampler/multiprocessing_sampler.py
Class Name: MultiprocessingSampler
Method Name: obtain_samples
Project Name: rlworkgroup/garage
Commit Name: 968bb199ef1d0d44f5105cdc5202cecec79d60b9
Time: 2020-06-02
Author: ryanjulian@users.noreply.github.com
File Name: src/garage/sampler/ray_sampler.py
Class Name: RaySampler
Method Name: obtain_samples
Project Name: rlworkgroup/garage
Commit Name: 968bb199ef1d0d44f5105cdc5202cecec79d60b9
Time: 2020-06-02
Author: ryanjulian@users.noreply.github.com
File Name: src/garage/sampler/multiprocessing_sampler.py
Class Name: MultiprocessingSampler
Method Name: obtain_exact_trajectories
Project Name: rlworkgroup/garage
Commit Name: 968bb199ef1d0d44f5105cdc5202cecec79d60b9
Time: 2020-06-02
Author: ryanjulian@users.noreply.github.com
File Name: src/garage/sampler/ray_sampler.py
Class Name: RaySampler
Method Name: obtain_exact_trajectories
Project Name: rlworkgroup/garage
Commit Name: 968bb199ef1d0d44f5105cdc5202cecec79d60b9
Time: 2020-06-02
Author: ryanjulian@users.noreply.github.com
File Name: src/garage/sampler/stateful_pool.py
Class Name: StatefulPool
Method Name: run_collect
Project Name: rlworkgroup/garage
Commit Name: 968bb199ef1d0d44f5105cdc5202cecec79d60b9
Time: 2020-06-02
Author: ryanjulian@users.noreply.github.com
File Name: src/garage/sampler/on_policy_vectorized_sampler.py
Class Name: OnPolicyVectorizedSampler
Method Name: obtain_samples