for piece in sorted_pieces:
// If we are not using absolute paths, we need to convert the path to a relative path for
// looking up the number of row groups.
row_groups_key = piece.path if use_absolute_paths else os.path.relpath(piece.path, dataset.paths)
for row_group in range(row_groups_per_file[row_groups_key]):
rowgroups.append(pq.ParquetDatasetPiece(piece.path, row_group, piece.partition_keys))
return rowgroups
After Change
num_row_groups = metadata.num_row_groups
if num_row_groups > 0:
// Use the new metadata file
return _split_row_groups(dataset)
// If we don"t have row groups in the common metadata we look for the old way of loading it
logger.warning("You are using a deprecated metadata version. Please run petastorm.etl.metadata_index_run"
" on spark to update.")
dataset_metadata_dict = dataset.common_metadata.metadata
if ROW_GROUPS_PER_FILE_KEY not in dataset_metadata_dict:
raise ValueError("Could not find row group metadata in _metadata file."
" Use materialize_dataset(..) in petastorm.etl.dataset_metadata.py to generate"
" this file in your ETL code."
" You can generate it on an existing dataset using petastorm.etl.metadata_index_run")
metadata_dict_key = ROW_GROUPS_PER_FILE_KEY
row_groups_per_file = json.loads(dataset_metadata_dict[metadata_dict_key].decode())
rowgroups = []
// Force order of pieces. The order is not deterministic since it depends on multithreaded directory
// listing implementation inside pyarrow. We stabilize order here, this way we get reproducable order
// when pieces shuffling is off. This also enables implementing piece shuffling given a seed
sorted_pieces = sorted(dataset.pieces, key=attrgetter("path"))
for piece in sorted_pieces:
// If we are not using absolute paths, we need to convert the path to a relative path for
// looking up the number of row groups.
row_groups_key = os.path.relpath(piece.path, dataset.paths)
for row_group in range(row_groups_per_file[row_groups_key]):
rowgroups.append(pq.ParquetDatasetPiece(piece.path, row_group, piece.partition_keys))
return rowgroups