acoustic-carpenter-78188
08/04/2023, 10:56 PM@eager
workflows. These are similar to @dynamic
workflows in that they are flyte tasks that dynamically execute a graph. However, dynamic workflows can only use the materialized inputs to inform the shape of the graph -- at runtime, the flyte backend still gets a statically compiled workflow, but it's one that's compiled at runtime. @eager
workflows take this to the other extreme of the static <-> dynamic spectrum.
Under the hood, an @eager
"workflow" is actually a task that has access to a FlyteRemote
object. This object needs to be configured correctly so that, at run-time, the remote object can authenticate to the configured flyte cluster. The eager workflow then uses this remote object to invoke the tasks that inside the user-defined eager workflow function body to invoke individual task executions.
Testing
Use the following Dockerfile and script to test this feature:
# Dockerfile.eager
FROM python:3.8-slim-buster
WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
RUN apt-get update && apt-get install -y build-essential git
# Install the AWS cli separately to prevent issues with boto being written over
RUN pip3 install awscli
ENV VENV /opt/venv
# Virtual environment
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"
# Install Python dependencies
COPY . /root/
WORKDIR /root
RUN pip install -e .
RUN pip install -e ./plugins/flytekit-deck-standard
RUN pip install scikit-learn pandas
ENV FLYTE_SDK_LOGGING_LEVEL 20
Python script:
# flytekit_eager.py
"""Async workflows prototype."""
import asyncio
import time
from typing import NamedTuple
import pandas as pd
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from flytekit import workflow, task
from flytekit.configuration import Config, PlatformConfig
from flytekit.experimental import eager
from flytekit.remote import FlyteRemote
class CustomException(Exception): ...
BestModel = NamedTuple("BestModel", model=LogisticRegression, metric=float)
@task
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
"""Simplify the task from a 3-class to a binary classification problem."""
return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
"""Train a model on the wine dataset."""
features = data.drop("target", axis="columns")
target = data["target"]
return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)
@task
def evaluate_model(data: pd.DataFrame, model: LogisticRegression) -> float:
"""Train a model on the wine dataset."""
features = data.drop("target", axis="columns")
target = data["target"]
return float(accuracy_score(target, model.predict(features)))
remote = FlyteRemote(
config=Config.for_sandbox(),
default_project="flytesnacks",
default_domain="development",
)
@eager(remote=remote)
async def main() -> BestModel:
<http://logging.info|logging.info>("STARTING MAIN")
data = await get_data()
processed_data = await process_data(data=data)
# split the data
try:
train, test = train_test_split(processed_data, test_size=0.2)
except Exception as exc:
raise CustomException(str(exc)) from exc
models = await asyncio.gather(*[
train_model(data=train, hyperparameters={"C": x})
# for x in [0.1, 0.01, 0.001, 0.0001, 0.00001]
for x in [0.1]
])
results = await asyncio.gather(*[
evaluate_model(data=test, model=model) for model in models
])
best_model, best_result = None, float("-inf")
for model, result in zip(models, results):
if result > best_result:
best_model, best_result = model, result
assert best_model is not None, "model cannot be None!"
return best_model, best_result
@workflow
def wf() -> BestModel:
return main()
if __name__ == "__main__":
print("training model")
model = asyncio.run(main())
print(f"trained model: {model}")
Tracking Issue
flyteorg/flyte#3396
Follow-up issue
NA
flyteorg/flytekit
✅ All checks have passed
30/30 successful checksacoustic-carpenter-78188
08/04/2023, 10:56 PMacoustic-carpenter-78188
08/25/2023, 6:36 PM