clean-glass-36808
01/23/2025, 1:20 AM@task(
requests=JobResourceConfig(
cpu=ComputeResource.cpu(12), memory=StorageResource.memory_GB(16), disk=StorageResource.disk_GB(128)
).to_flyte_resources()
) # type: ignore [misc]
def data_splitter_task(
feature_files: list[list[FlyteFile]], split_ratios: list[float]
) -> tuple[list[FlyteFile], list[FlyteFile], list[FlyteFile]]:
"""Split the data into training, validation and testing subsets.
This is a reduce task and the inputs are from map tasks.
Args:
feature_files: files with extracted features (*.pb).
split_ratios: ratios to split the training dataset into training and validation subsets.
Returns:
Three list of flyte files with extracted features for ML splits.
"""
file_paths: list[Path] = []
for sub_list in feature_files:
file_paths.extend([Path(p) for p in sub_list])
if len(file_paths) == 0:
msg = "The input dir to `data_splitter_task` doesn't have any pb files!"
raise RuntimeError(msg)
files_per_split = create_splits(file_paths, split_ratios)
# Log split sizes.
ml_training_size = len(files_per_split["training"])
ml_validation_size = len(files_per_split["validation"])
ml_testing_size = len(files_per_split["testing"])
msg = f"Creating splits with size: ML training: {ml_training_size}, ML validation: {ml_validation_size}, ML testing: {ml_testing_size}"
<http://logger.info|logger.info>(msg)
# Create three list of files for ML splits.
train_files = [FlyteFile(path=str(p)) for p in files_per_split["training"]]
val_files = [FlyteFile(path=str(p)) for p in files_per_split["validation"]]
test_files = [FlyteFile(path=str(p)) for p in files_per_split["testing"]]
return train_files, val_files, test_files
clean-glass-36808
01/23/2025, 1:22 AMclean-glass-36808
01/23/2025, 1:24 AMhigh-accountant-32689
01/23/2025, 1:24 AMfsspec
versions? Also, how big are those lists?clean-glass-36808
01/23/2025, 1:24 AMclean-glass-36808
01/23/2025, 1:27 AMclean-glass-36808
01/23/2025, 1:32 AMcreate_splits
is doingclean-glass-36808
01/23/2025, 1:36 AMthankful-minister-83577
thankful-minister-83577
from fsspec.config import conf
conf["gather_batch_size"] = 50
thankful-minister-83577
thankful-minister-83577
clean-glass-36808
01/23/2025, 2:34 AMgather_batch_size
didn't seem to helpclean-glass-36808
01/23/2025, 2:38 AMdef create_splits(file_paths: list[Path], split_ratios: list[float]) -> dict[str, list[str]]:
"""Create splits from filepaths and renamed split data queried using UUID(filepath names)."""
# Get the event ids from logs in order to fetch meta data.
event_id_to_filepath: dict[str, Path] = {}
event_ids = []
for file_path in file_paths:
filename = file_path.stem
event_id = filename
event_ids.append(event_id)
event_id_to_filepath[event_id] = file_path
# Get the log meta data for each event.
logs_metadata = _get_event_metadata(
renamed_client,
event_uuids=event_ids,
)
# Group the logs by renamed split. If a logs meta data was not returned we don't include it in any split.
renamed_split_logs: dict[str, list[Path]] = {"train": [], "validate": [], "test": []}
for event_uuid, metadata in logs_metadata.items():
split = metadata[0]
renamed_split_logs[split].append(event_id_to_filepath[event_uuid])
random.seed(0)
random.shuffle(renamed_split_logs["train"])
renamed_train_size = len(renamed_split_logs["train"])
ml_training_size = round(renamed_train_size * split_ratios[0] / sum(split_ratios))
# Create the splits
output_splits: dict[str, list[str]] = {}
output_splits["training"] = [str(file_path) for file_path in renamed_split_logs["train"][:ml_training_size]]
output_splits["validation"] = [str(file_path) for file_path in renamed_split_logs["train"][ml_training_size:]]
output_splits["testing"] = [str(file_path) for file_path in renamed_split_logs["validate"]]
return output_splits
clean-glass-36808
01/23/2025, 2:40 AMclean-glass-36808
01/23/2025, 7:02 AMContainer jparraga-7775b4ddc4f040f4961-n2-0 failed with exit code 137 because OOMKilled:
[INFO][credentials.py:567] Found credentials in environment variables.
[INFO][configprovider.py:974] Found endpoint for s3 via: environment_global.
[INFO][tasks.py:315] enter data splitter
[INFO][split_dataset.py:20] enter create splits
[INFO][split_dataset.py:30] getting event metadata
[INFO][split_dataset.py:38] grouping
[INFO][split_dataset.py:56] creating the splits
[INFO][split_dataset.py:62] exit create splits
[INFO][tasks.py:340] Creating splits with size: ML training: 11708, ML validation: 1301, ML testing: 4153
[INFO][tasks.py:347] exit data splitter
thankful-minister-83577
clean-glass-36808
01/23/2025, 6:58 PMclean-glass-36808
01/23/2025, 7:51 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577