<#3672 Eager mode: enable awaitable execution grap...
# flyte-github
a
#3672 Eager mode: enable awaitable execution graphs in Flyte Issue created by cosmicBboy Discussed in #3396 Originally posted by cosmicBboy March 3, 2023 Background Currently, Flyte offers two constructs for composing `task`s into more complex execution graphs: static workflows (via the
@workflow
decorator) and dynamic workflows (via the
@dynamic
decorator). As the names suggest, static workflows are created at compile time and registered to some target Flyte cluster. On the other hand, dynamic workflows are compiled at runtime so that they can materialize the inputs of the workflow and use them to influence the shape of the execution graph. Problem Statement Both static and dynamic workflows pose a problem. While they provide type safety (moreso for static, although type errors will occur when dynamic workflows are created at runtime), they both suffer from inflexibility in expressing execution graphs that many Python
flytekit
users may be accustomed to. This is because in actuality,
@workflow
and
@dynamic
function code is not Python code: it's a DSL for constructing execution graphs that suffer from the "uncanny valley" of looking like Python code, but isn't actually Python code. For example: •
if... elif... else
statements not supported and the equivalent syntax is cumbersome to write with
conditionals
. •
try... except
statements are not supported. • writing
async
code is not supported. For Python users who come in with expectations of writing Python to compose their workflows, Flyte is surprising both in terms of (a) the lack of useful error messages when trying illegal combinations of Flyte and Python syntax and (b) the inability to compose tasks using the
asyncio
syntax. The scope of this RFC is to focus on the latter. Proposal This RFC proposes adding support for "eager workflows" indicated by the
@eager
decorator in a new subpackage
flytekit.experimental
, which will contain experimental features. This construct allows users to write workflows pretty much like how one would write asynchronous Python code. For example:
Copy code
from flytekit import task
from flytekit.experimental import eager


class CustomException(Exception): ...

BestModel = NamedTuple("BestModel", model=LogisticRegression, metric=float)


@task
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame


@task
def process_data(data: pd.DataFrame) -> pd.DataFrame:
    """Simplify the task from a 3-class to a binary classification problem."""
    return data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))


@task
def train_model(data: pd.DataFrame, hyperparameters: dict) -> LogisticRegression:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return LogisticRegression(max_iter=3000, **hyperparameters).fit(features, target)


@task
def evaluate_model(data: pd.DataFrame, model: LogisticRegression) -> float:
    """Train a model on the wine dataset."""
    features = data.drop("target", axis="columns")
    target = data["target"]
    return float(accuracy_score(target, model.predict(features)))


@eager
async def main() -> BestModel:
    data = await get_data()
    processed_data = await process_data(data=data)

    # split the data
    try:
        train, test = train_test_split(processed_data, test_size=0.2)
    except Exception as exc:
        raise CustomException(str(exc)) from exc

    models = await asyncio.gather(*[
        train_model(data=train, hyperparameters={"C": x})
        for x in [0.1, 0.01, 0.001, 0.0001, 0.00001]
    ])
    results = await asyncio.gather(*[
        evaluate_model(data=test, model=model) for model in models
    ])

    best_model, best_result = None, float("-inf")
    for model, result in zip(models, results):
        if result > best_result:
            best_model, best_result = model, result

    assert best_model is not None, "model cannot be None!"
    return best_model, best_result
Trade-offs At a high-level, we can think of these three ways of writing workflows in terms of Flyte promises, and what data are accessible to the user in the workflow code: Open Questions • How to handle
FlyteRemote
configuration? • Authentication: can flytepropeller pass in everything needed into eager workflows to execute tasks? • Use flytepropeller's token to mint a new token with limited permissions, e.g. the eager workflow can only kick off new executions from the eager workflow's execution. MVP WIP PR: flyteorg/flytekit#1579 • Rely on secret requests to use client secrets to be able to authenticate • We'll provide OSS users with instructions to use this feature, pushing the responsibility of creating the client secret to the user's platform team • No backend changes: collect feedback first before investing in more changes. • Eager workflows (tasks masquerading as workflows) also produce a Flyte Deck that shows the list of subtasks that are executed:

image

• The current PR relies on client id
flytepropeller
and hard-coded secret group/key:
Copy code
SECRET_GROUP = "eager-mode"
SECRET_KEY = "client_secret"
flyteorg/flyte