Hey folks. We are seeing some strange behavior aft...
# flyte-support
c
Hey folks. We are seeing some strange behavior after upgrading to Flytekit v1.14. We have a task that interacts with Flytefiles. Previously this task might take 500MB - 1GB of ram but after upgrading Flytekit its OOMing at 16 GB. Was curious if you have any insight before digging deeper.
Copy code
@task(
    requests=JobResourceConfig(
        cpu=ComputeResource.cpu(12), memory=StorageResource.memory_GB(16), disk=StorageResource.disk_GB(128)
    ).to_flyte_resources()
)  # type: ignore [misc]
def data_splitter_task(
    feature_files: list[list[FlyteFile]], split_ratios: list[float]
) -> tuple[list[FlyteFile], list[FlyteFile], list[FlyteFile]]:
    """Split the data into training, validation and testing subsets.

    This is a reduce task and the inputs are from map tasks.

    Args:
        feature_files: files with extracted features (*.pb).
        split_ratios: ratios to split the training dataset into training and validation subsets.

    Returns:
        Three list of flyte files with extracted features for ML splits.
    """
    file_paths: list[Path] = []
    for sub_list in feature_files:
        file_paths.extend([Path(p) for p in sub_list])
    if len(file_paths) == 0:
        msg = "The input dir to `data_splitter_task` doesn't have any pb files!"
        raise RuntimeError(msg)

    files_per_split = create_splits(file_paths, split_ratios)
    # Log split sizes.
    ml_training_size = len(files_per_split["training"])
    ml_validation_size = len(files_per_split["validation"])
    ml_testing_size = len(files_per_split["testing"])
    msg = f"Creating splits with size: ML training: {ml_training_size}, ML validation:  {ml_validation_size}, ML testing: {ml_testing_size}"
    <http://logger.info|logger.info>(msg)

    # Create three list of files for ML splits.
    train_files = [FlyteFile(path=str(p)) for p in files_per_split["training"]]
    val_files = [FlyteFile(path=str(p)) for p in files_per_split["validation"]]
    test_files = [FlyteFile(path=str(p)) for p in files_per_split["testing"]]

    return train_files, val_files, test_files
I am still trying to grok whether it could have been some other changes on our end but this task is pretty vanilla
Flyte v1.13.3
h
Interesting. Can you also confirm the
fsspec
versions? Also, how big are those lists?
c
Flyte v1.14.4
Flyte v1.13.3, fsspec[http]==2024.9.0 Flyte v1.14.4, fsspec[http]==2024.9.0 (same)
Let me dig into what
create_splits
is doing
I'm also running an execution on v1.14.4 with old data just to validate its not some issues with the data growth.
t
in 1.14 the uploads are parallelized… could you try something for me please?
Copy code
from fsspec.config import conf
conf["gather_batch_size"] = 50
anywhere in the code before you return from the task
flytekit currently just uses the same utility/conf to gate the number of concurrent coroutines it tries to run.
c
Setting the
gather_batch_size
didn't seem to help
Copy code
def create_splits(file_paths: list[Path], split_ratios: list[float]) -> dict[str, list[str]]:
    """Create splits from filepaths and renamed split data queried using UUID(filepath names)."""
    # Get the event ids from logs in order to fetch meta data.
    event_id_to_filepath: dict[str, Path] = {}
    event_ids = []
    for file_path in file_paths:
        filename = file_path.stem
        event_id = filename
        event_ids.append(event_id)
        event_id_to_filepath[event_id] = file_path

    # Get the log meta data for each event.
    logs_metadata = _get_event_metadata(
        renamed_client,
        event_uuids=event_ids,
    )

    # Group the logs by renamed split. If a logs meta data was not returned we don't include it in any split.
    renamed_split_logs: dict[str, list[Path]] = {"train": [], "validate": [], "test": []}
    for event_uuid, metadata in logs_metadata.items():
        split = metadata[0]
        renamed_split_logs[split].append(event_id_to_filepath[event_uuid])

    random.seed(0)
    random.shuffle(renamed_split_logs["train"])
    renamed_train_size = len(renamed_split_logs["train"])
    ml_training_size = round(renamed_train_size * split_ratios[0] / sum(split_ratios))

    # Create the splits
    output_splits: dict[str, list[str]] = {}
    output_splits["training"] = [str(file_path) for file_path in renamed_split_logs["train"][:ml_training_size]]
    output_splits["validation"] = [str(file_path) for file_path in renamed_split_logs["train"][ml_training_size:]]
    output_splits["testing"] = [str(file_path) for file_path in renamed_split_logs["validate"]]
    return output_splits
Confirmed the dataset is the same and the task code is the same between passing/failing runs
Also just confirmed that the OOM seems to happen after/while the task code is returning
Copy code
Container jparraga-7775b4ddc4f040f4961-n2-0 failed with exit code 137 because OOMKilled: 

[INFO][credentials.py:567] Found credentials in environment variables.
[INFO][configprovider.py:974] Found endpoint for s3 via: environment_global.
[INFO][tasks.py:315] enter data splitter
[INFO][split_dataset.py:20] enter create splits
[INFO][split_dataset.py:30] getting event metadata
[INFO][split_dataset.py:38] grouping
[INFO][split_dataset.py:56] creating the splits
[INFO][split_dataset.py:62] exit create splits
[INFO][tasks.py:340] Creating splits with size: ML training: 11708, ML validation:  1301, ML testing: 4153
[INFO][tasks.py:347] exit data splitter
t
could you give us a rough idea of the number and size distribution of the files please?
c
will do, task is deep in a pipeline so iteration is a little slow
Looks like the input is ~21k files around 2-3MB each. If you need more info on the outputs I can grab that too but I'm hoping that should give you ballpark idea
t
changed the env vars btw jason - https://github.com/flyteorg/flytekit/pull/3080
and we’ll backport this to 1.14 as well.