<#1579 Eager workflows to support async workflows>...
# flyte-github
a
#1579 Eager workflows to support async workflows Pull request opened by cosmicBboy Fixes flyteorg/flyte#3672 TL;DR Implement an experimental API for "eager workflows": workflows that enable the use of Python async code. Type ☐ Bug Fix ☑︎ Feature ☐ Plugin Are all requirements met? ☑︎ Code completed ☑︎ Smoke tested ☐ Unit tests added ☐ Code documentation added ☐ Any pending items have an associated Issue Flytesnacks Example Link ... Complete description This PR adds support for
@eager
workflows. These are similar to
@dynamic
workflows in that they are flyte tasks that dynamically execute a graph. However, dynamic workflows can only use the materialized inputs to inform the shape of the graph -- at runtime, the flyte backend still gets a statically compiled workflow, but it's one that's compiled at runtime.
@eager
workflows take this to the other extreme of the static <-> dynamic spectrum. Under the hood, an
@eager
"workflow" is actually a task that has access to a
FlyteRemote
object. This object needs to be configured correctly so that, at run-time, the remote object can authenticate to the configured flyte cluster. The eager workflow then uses this remote object to invoke the tasks that inside the user-defined eager workflow function body to invoke individual task executions. Testing Use the following Dockerfile and script to test this feature:
Copy code
# Dockerfile.eager
FROM python:3.8-slim-buster

WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root

RUN apt-get update && apt-get install -y build-essential git

# Install the AWS cli separately to prevent issues with boto being written over
RUN pip3 install awscli

ENV VENV /opt/venv
# Virtual environment
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"

# Install Python dependencies
COPY . /root/
WORKDIR /root

RUN pip install -e .
RUN pip install -e ./plugins/flytekit-deck-standard
RUN pip install scikit-learn pandas

ENV FLYTE_SDK_LOGGING_LEVEL 20
Python script:
Copy code
# flytekit_eager.py
"""Async workflows prototype."""

import asyncio
import time
from typing import NamedTuple

import pandas as pd
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from flytekit import workflow, task
from flytekit.configuration import Config, PlatformConfig
from flytekit.experimental import eager
from flytekit.remote import FlyteRemote


class CustomException(Exception): ...

BestModel = NamedTuple("BestModel", model=LogisticRegression, metric=float)


@task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame


@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
    """Simplify the task from a 3-class to a binary classification problem."""
    return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@task
def evaluate_model(data: pd.DataFrame, model: LogisticRegression) -> float:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return float(accuracy_score(target, model.predict(features)))


remote = FlyteRemote(
    config=Config.for_sandbox(),
    default_project="flytesnacks",
    default_domain="development",
)

@eager(remote=remote)
async def main() -> BestModel:
    <http://logging.info|logging.info>("STARTING MAIN")
    data = await get_data()
    processed_data = await process_data(data=data)

    # split the data
    try:
        train, test = train_test_split(processed_data, test_size=0.2)
    except Exception as exc:
        raise CustomException(str(exc)) from exc

    models = await asyncio.gather(*[
        train_model(data=train, hyperparameters={"C": x})
        # for x in [0.1, 0.01, 0.001, 0.0001, 0.00001]
        for x in [0.1]
    ])
    results = await asyncio.gather(*[
        evaluate_model(data=test, model=model) for model in models
    ])

    best_model, best_result = None, float("-inf")
    for model, result in zip(models, results):
        if result > best_result:
            best_model, best_result = model, result

    assert best_model is not None, "model cannot be None!"
    return best_model, best_result


@workflow
def wf() -> BestModel:
    return main()


if __name__ == "__main__":
    print("training model")
    model = asyncio.run(main())
    print(f"trained model: {model}")
Tracking Issue flyteorg/flyte#3396 Follow-up issue NA flyteorg/flytekit All checks have passed 30/30 successful checks