Hello Everyone! I am new so please let me know if...
# flyte-support
b
Hello Everyone! I am new so please let me know if I am breaking any etiquette. I am trying to build on the following example and modify it slightly to have tracking with the mlflow_autolog() decorator to enable automatic experiment tracking. https://github.com/flyteorg/flytekit-python-template/tree/main/wine-classification/%7B%7Bcookiecutter.project_name%7D%7D I have it running locally fine.
pyflyte run wine_classification_example.py training_workflow
And I have it running fine in the local flytectl demo cluster if I comment out the mlflow_autolog and mlflow dependencies.
pyflyte run --remote -p my-project-wine-4 -d development wine_classification_example.py training_workflow
I think my only problem at this point is changing the requirements.txt for the demo cluster. When I run in the demo cluster it cannot find the libraries and errors on the following lines.
from flytekitplugins.mlflow import mlflow_autolog
import mlflow
How can I add to the requirements in the demo cluster. Ideally I would like to do it using a requirements.txt and dockerfile, but I will take anything that works at this point...
f
Checkout imagespec
b
Is there a good example or tutorial for this? I was experimenting with this a bit earlier today and struggling. And now I have been looking at the documentation and trying lots of different things with no luck.
f
Ohh that is sad
Yes there are few examples
But we would love to learn what was not working
Cc @tall-lock-23197 can you share some examples for imagespec here
also @glamorous-carpet-83516 or @tall-lock-23197 help here
g
b
I will send a bit more context on my current issue Here is the workflow I have modified. https://github.com/tchase56/flyte_demo/blob/main/wine-classification/workflows/wine_classification_example.py It works ok locally, although I get some warnings. Then when I run in the demo cluster I get the following error, likely because I am doing the imagespec wrong.
g
could you remove
if sklearn_image_spec.is_container():
and try to run it again
👍 1
for some reason, flytekit doesn’t import mlflow_autolog
btw, could you add sklearn_image_spec to the
get_data
,
training_model_loop
and
process_data
task as well
👍 1
b
It seems to be hanging on this command when this happens should I kill the process and start again in general?
g
odd. yes, kill it and try it again
b
It keeps getting stuck I guess I will try rebuilding the demo cluster, and if that doesn't work restart vscode.
g
let me try you example. one sec
it works for me
b
mine keeps getting stuck that is so weird you think it is an environment issue?
g
hmm, did you see the same issue before
b
I got this issue after adding the sklearn image spec to all of the tasks
g
you already install flytekitplugins-envd, right
b
I ran this in my virtual environment •
pip install flytekitplugins-envd
g
could you send me the example you run
b
yep
Copy code
import pandas as pd
import numpy as np

from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns

from typing import List, Tuple, Dict
from flytekit import ImageSpec

import mlflow
from flytekitplugins.mlflow import mlflow_autolog

sklearn_image_spec = ImageSpec(
    base_image="<http://ghcr.io/flyteorg/flytekit:py3.8-1.6.2|ghcr.io/flyteorg/flytekit:py3.8-1.6.2>",
    packages=["mlflow", "flytekitplugins-mlflow"],
    registry="localhost:30000"    
)

# if sklearn_image_spec.is_container():


@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Simplify the task from a 3-class to a binary classification problem."""
    data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
    data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)

    train_x = data_out_train.drop("target", axis=1)
    test_x = data_out_test.drop("target", axis=1)
    train_y = data_out_train[["target"]]
    test_y = data_out_test[["target"]]

    return train_x, test_x, train_y, test_y
    
@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
    train_x: pd.DataFrame, 
    test_x: pd.DataFrame, 
    train_y: pd.DataFrame, 
    test_y: pd.DataFrame, 
    params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
    """Train a model on the wine dataset."""

    lr = LogisticRegression(max_iter=3000, **params)
    lr.fit(train_x, train_y.iloc[:, 0])

    pred_y = lr.predict(test_x)
    mse = float(mean_squared_error(test_y, pred_y))
    mae = float(mean_absolute_error(test_y, pred_y))

    return mse, mae, lr


@task(container_image=sklearn_image_spec)
def training_model_loop(
    train_x: pd.DataFrame, 
    test_x: pd.DataFrame, 
    train_y: pd.DataFrame, 
    test_y: pd.DataFrame, 
    params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:

    for params_i in params_list:
        print('ahhhh')
        print(params_i)
        rmse_i, mae_i, lr_i = train_model(
            train_x = train_x,
            test_x = test_x,
            train_y = train_y,
            test_y = test_y,
            params=params_i,
        )

@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
    """Put all of the steps together into a single workflow."""
    # raise Exception("This is a test")
    data = get_data()
    train_x, test_x, train_y, test_y = process_data(data=data)

    training_model_loop(
        train_x = train_x,
        test_x = test_x,
        train_y = train_y,
        test_y = test_y,
        params_list=params_list,
    )

if __name__ == "__main__":
    training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
g
which version of flytekit are you using
b
1.11.0
Copy code
>>> import flytekit
>>> flytekit.__version__
'1.11.0'
g
could you show me the output of
tree
in the current directory?
b
this right?
Copy code
(flyte_env) (base) tchase@HQ9322OSX wine-classification % pwd
/Users/tchase/Documents/repos/flyte_demo/wine-classification
(flyte_env) (base) tchase@HQ9322OSX wine-classification % tree .
It is spitting out a lot of stuff
g
flytekit tries to copy you mlflow metadata to remote as well I guess
could you create a new directory
workflow
,and put
wine_classification_example.py
inside it?
👍 1
b
still getting stuck 😢
g
your docker is also running, right
b
my docker is running I just had to prune a bunch of stuff in order to rebuild my demo cluster I re-ran and it is hanging again I thought for sure the pruning would fix it... this is so frustrating
t
have you tried restarting docker?
b
I closed and re-opened rancher desktop before the latest attempt yeah
maybe I should restart my computer... lol
alrighty then restarting the computer stopped the hanging but gave me a new error
Copy code
Failed to get signed url for script_mode.tar.gz, reason: SYSTEM:Unknown: error=None, cause=<_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:30080: Failed to connect to remote host: Connection refused"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:30080: Failed to connect to remote host: Connection refused", grpc_status:14, created_time:"2024-04-28T23:08:42.912323-07:00"}"
g
is your sandbox running?
Copy code
kubectl get pods
t
you may also want to add
print
statements here after you resolve the above issue, in case the command still freezes: https://github.com/flyteorg/flytekit/blob/3966d1a0a1e33137a4bc41d9860d4ed5e264cbdf/flytekit/image_spec/image_spec.py#L218-L244
b
yep when I restarted the computer it must have stopped the cluster I rebuilt the demo cluster and now it is hanging again...
b
like this? Now it hangs and says hello world
t
can you add more prints so that we can find the root cause of this issue?
👍 1
b
this should be slightly more helpful
t
i think
image_spec.exist()
is the culprit. can you verify please? also if that's the case, can you check if
client.images.get_registry_data(self.image_name())
line in
exist()
is the actual culprit?
b
Yep, that seems to be it
t
then it's related to docker. is it possible to switch context from rancher to docker desktop?
👍 1
i encountered a similar issue before when i used orbstack.
👍 1
b
I switched to docker, which fixed the hanging, but now a new error has popped up
Copy code
failed to run command envd build --path /var/folders/_9/60k4g7zj1wq1kyl_pr7hq3k80000gq/T/flyteu4fgcrnq/control_plane_metadata/local_flytekit/ff10a1df7e677dbac1a0dcb8a118a235  --platform linux/amd64 --output type=image,name=localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ,push=true with error b'time="2024-04-29T00:13:31-07:00" level=fatal msg=exit app=envd error="failed to build the image: failed to build: failed to wait error group: failed to solve LLB: failed to solve: failed to do request: Head \\"<http://localhost:30000/v2/flytekit/blobs/sha256:e4141a94de7eb2f73676a7678ff9b1e968f935c4c3390cb75d6427c251b1677a>\\": dial tcp [::1]:30000: connect: connection refused" version=v0.3.45\n'
g
could you create a new envd context?
Copy code
envd context create --name flyte-sandbox --builder tcp --builder-address localhost:30000 --use
and register it again
b
what do you mean by register?
g
pyflyte run ..
👍 1
b
I seem to be still getting the same error
g
sorry, my bad. not 30000. should be 30003
Copy code
envd context create --name flyte-sandbox --builder tcp --builder-address localhost:30003 --use
b
Copy code
(flyte_env) (base) tchase@HQ9322OSX workflow % envd context create --name flyte-sandbox --builder tcp --builder-address localhost:30003 --use
FATA[2024-04-29T00:27:44-07:00] exit                                          app=envd error="failed to create context: context \"flyte-sandbox\" already exists" version=v0.3.45
g
envd context rm --name flyte-sandbox
remove previous one first
b
Copy code
(flyte_env) (base) tchase@HQ9322OSX workflow % envd context rm --name flyte-sandbox
FATA[2024-04-29T00:28:56-07:00] exit                                          app=envd error="failed to remove context: cannot remove current context \"flyte-sandbox\"" version=v0.3.45
g
checkout to default
Copy code
envd context use --name default
👍 1
b
I'm getting so close!
g
could you remove base_image from your imageSpec
Copy code
sklearn_image_spec = ImageSpec(
    base_image="<http://ghcr.io/flyteorg/flytekit:py3.8-1.6.2|ghcr.io/flyteorg/flytekit:py3.8-1.6.2>",
    packages=["mlflow", "flytekitplugins-mlflow"],
    registry="localhost:30000"
)
👍 1
it will use latest flytekit
b
After removing the line that sets the base_image I am still getting the same error.
g
could you try this
Copy code
sklearn_image_spec = ImageSpec(
    packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow"],
    registry="localhost:30000"
)
b
still the same error
g
could you send me the example you run again, sorry
want to run it on my side
👍 1
b
Copy code
import pandas as pd
import numpy as np

from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns

from typing import List, Tuple, Dict
from flytekit import ImageSpec

import mlflow
from flytekitplugins.mlflow import mlflow_autolog

sklearn_image_spec = ImageSpec(
    packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow"],
    registry="localhost:30000"    
)

# if sklearn_image_spec.is_container():


@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Simplify the task from a 3-class to a binary classification problem."""
    data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
    data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)

    train_x = data_out_train.drop("target", axis=1)
    test_x = data_out_test.drop("target", axis=1)
    train_y = data_out_train[["target"]]
    test_y = data_out_test[["target"]]

    return train_x, test_x, train_y, test_y
    
@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
    train_x: pd.DataFrame, 
    test_x: pd.DataFrame, 
    train_y: pd.DataFrame, 
    test_y: pd.DataFrame, 
    params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
    """Train a model on the wine dataset."""

    lr = LogisticRegression(max_iter=3000, **params)
    lr.fit(train_x, train_y.iloc[:, 0])

    pred_y = lr.predict(test_x)
    mse = float(mean_squared_error(test_y, pred_y))
    mae = float(mean_absolute_error(test_y, pred_y))

    return mse, mae, lr


@task(container_image=sklearn_image_spec)
def training_model_loop(
    train_x: pd.DataFrame, 
    test_x: pd.DataFrame, 
    train_y: pd.DataFrame, 
    test_y: pd.DataFrame, 
    params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:

    for params_i in params_list:
        print('ahhhh')
        print(params_i)
        rmse_i, mae_i, lr_i = train_model(
            train_x = train_x,
            test_x = test_x,
            train_y = train_y,
            test_y = test_y,
            params=params_i,
        )

@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
    """Put all of the steps together into a single workflow."""
    # raise Exception("This is a test")
    data = get_data()
    train_x, test_x, train_y, test_y = process_data(data=data)

    training_model_loop(
        train_x = train_x,
        test_x = test_x,
        train_y = train_y,
        test_y = test_y,
        params_list=params_list,
    )

if __name__ == "__main__":
    training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
g
I saw this error instead
b
I got that a day or two ago when debugging I think installing scikit-learn=1.2.2 in my conda environment fixed that error for me when I was running locally
g
it works
my code
Copy code
import pandas as pd
import numpy as np

from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns

from typing import List, Tuple, Dict
from flytekit import ImageSpec

import mlflow
from flytekitplugins.mlflow import mlflow_autolog

sklearn_image_spec = ImageSpec(
    packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"],
    registry="pingsutw"
)


# if sklearn_image_spec.is_container():


@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame


@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Simplify the task from a 3-class to a binary classification problem."""
    data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
    data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)

    train_x = data_out_train.drop("target", axis=1)
    test_x = data_out_test.drop("target", axis=1)
    train_y = data_out_train[["target"]]
    test_y = data_out_test[["target"]]

    return train_x, test_x, train_y, test_y


@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
        train_x: pd.DataFrame,
        test_x: pd.DataFrame,
        train_y: pd.DataFrame,
        test_y: pd.DataFrame,
        params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
    """Train a model on the wine dataset."""

    lr = LogisticRegression(max_iter=3000, **params)
    lr.fit(train_x, train_y.iloc[:, 0])

    pred_y = lr.predict(test_x)
    mse = float(mean_squared_error(test_y, pred_y))
    mae = float(mean_absolute_error(test_y, pred_y))

    return mse, mae, lr


@task(container_image=sklearn_image_spec)
def training_model_loop(
        train_x: pd.DataFrame,
        test_x: pd.DataFrame,
        train_y: pd.DataFrame,
        test_y: pd.DataFrame,
        params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
    for params_i in params_list:
        print('ahhhh')
        print(params_i)
        rmse_i, mae_i, lr_i = train_model(
            train_x=train_x,
            test_x=test_x,
            train_y=train_y,
            test_y=test_y,
            params=params_i,
        )


@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
    """Put all of the steps together into a single workflow."""
    # raise Exception("This is a test")
    data = get_data()
    train_x, test_x, train_y, test_y = process_data(data=data)

    training_model_loop(
        train_x=train_x,
        test_x=test_x,
        train_y=train_y,
        test_y=test_y,
        params_list=params_list,
    )


if __name__ == "__main__":
    training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
b
I am so close. Did you encounter this error at all while debugging?
t
not sure why python 3.8 is still being used. have you removed
base_image
in your imagespec?
b
yeah This is the code I am running. I copied the code from above that worked for Kevin and just changed the registry argument in imagespec.
Copy code
import pandas as pd
import numpy as np

from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns

from typing import List, Tuple, Dict
from flytekit import ImageSpec

import mlflow
from flytekitplugins.mlflow import mlflow_autolog

sklearn_image_spec = ImageSpec(
    packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"],
    registry="localhost:30000"    
)


# if sklearn_image_spec.is_container():


@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame


@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Simplify the task from a 3-class to a binary classification problem."""
    data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
    data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)

    train_x = data_out_train.drop("target", axis=1)
    test_x = data_out_test.drop("target", axis=1)
    train_y = data_out_train[["target"]]
    test_y = data_out_test[["target"]]

    return train_x, test_x, train_y, test_y


@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
        train_x: pd.DataFrame,
        test_x: pd.DataFrame,
        train_y: pd.DataFrame,
        test_y: pd.DataFrame,
        params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
    """Train a model on the wine dataset."""

    lr = LogisticRegression(max_iter=3000, **params)
    lr.fit(train_x, train_y.iloc[:, 0])

    pred_y = lr.predict(test_x)
    mse = float(mean_squared_error(test_y, pred_y))
    mae = float(mean_absolute_error(test_y, pred_y))

    return mse, mae, lr


@task(container_image=sklearn_image_spec)
def training_model_loop(
        train_x: pd.DataFrame,
        test_x: pd.DataFrame,
        train_y: pd.DataFrame,
        test_y: pd.DataFrame,
        params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
    for params_i in params_list:
        print('ahhhh')
        print(params_i)
        rmse_i, mae_i, lr_i = train_model(
            train_x=train_x,
            test_x=test_x,
            train_y=train_y,
            test_y=test_y,
            params=params_i,
        )


@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
    """Put all of the steps together into a single workflow."""
    # raise Exception("This is a test")
    data = get_data()
    train_x, test_x, train_y, test_y = process_data(data=data)

    training_model_loop(
        train_x=train_x,
        test_x=test_x,
        train_y=train_y,
        test_y=test_y,
        params_list=params_list,
    )


if __name__ == "__main__":
    training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
t
would you mind sharing the base image that's being pulled while building the image?
b
What's the best way to do that?
t
you can see the image in the terminal when you run the
pyflyte run ...
command, or specify the base image as
<http://ghcr.io/flyteorg/flytekit:py3.11-1.11.0|ghcr.io/flyteorg/flytekit:py3.11-1.11.0>
b
(flyte_env) (base) tchase@HQ9322OSX workflow % pyflyte run --remote wine_classification_example.py training_workflow Running Execution on Remote. one two a b c True Image localhost30000/flytekit1JrBkuUO0aml5MJRduDMXQ found. Skip building. three [✔️] Go to http://localhost:30080/console/projects/flytesnacks/domains/development/executions/f3b0cab4ad1554cf1911 to see execution in the console.
t
oh okay. could you check the version of flytekit that got installed in your image?
Copy code
docker run -it --rm localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ /bin/bash
pip show flytekit
b
flytekit@f9c7d7f06429:/root$ pip show flytekit WARNING: The directory '/home/flytekit/.cache/pip' or its parent directory is not owned or is not writable by the current user. The cache has been disabled. Check the permissions and owner of that directory. If executing pip with sudo, you should use sudo's -H flag. Name: flytekit Version: 1.6.2 Summary: Flyte SDK for Python Home-page: https://github.com/flyteorg/flytekit Author: Author-email: License: apache2 Location: /usr/local/lib/python3.8/site-packages Requires: adlfs, click, cloudpickle, cookiecutter, croniter, dataclasses-json, deprecated, diskcache, docker, docker-image-py, docstring-parser, flyteidl, fsspec, gcsfs, gitpython, googleapis-common-protos, grpcio, grpcio-status, importlib-metadata, joblib, keyring, kubernetes, marshmallow-jsonschema, natsort, numpy, pandas, pyarrow, pyopenssl, python-dateutil, python-json-logger, pytimeparse, pytz, pyyaml, requests, responses, rich, rich-click, s3fs, sortedcontainers, statsd, typing-extensions, urllib3, wheel, wrapt Required-by: flytekitplugins-deck-standard, flytekitplugins-envd, flytekitplugins-mlflow, flytekitplugins-pod
t
this is a really old version. could you specify the base image and try again? the image should get re-built.
👍 1
b
this base image seem fine? base_image="ghcr.io/flyteorg/flytekit:py3.8-1.6.2"
t
no this install flytekit-1.6.2. we need the latest version 1.11.0. can you try specifying
<http://ghcr.io/flyteorg/flytekit:py3.11-1.11.0|ghcr.io/flyteorg/flytekit:py3.11-1.11.0>
?
👍 1
b
I specified the base image but It doesn't seem to have rebuilt sklearn_image_spec = ImageSpec( base_image="ghcr.io/flyteorg/flytekit:py3.11-1.11.0", packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"], registry="localhost:30000" )
Copy code
Traceback (most recent call last):

      File "/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/usr/local/lib/python3.8/site-packages/flytekitplugins/mlflow/tracking.py", line 113, in wrapper
        if not ctx.execution_state.is_local_execution():

Message:

    'ExecutionState' object has no attribute 'is_local_execution'

User error.
ok I tore down the demo cluster and rebuilt it, now the image is rebuilding when I run.
and yet I have the exact same error...
t
is it still the same flytekit version?
b
it is yeah... No idea why
Copy code
(flyte_env) (base) tchase@HQ9322OSX workflow % pyflyte run --remote wine_classification_example.py training_workflow
Running Execution on Remote.
one
two
a
b
e
False
Image localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ not found. Building...
five
six
Run command: envd build --path /var/folders/_9/60k4g7zj1wq1kyl_pr7hq3k80000gq/T/flyte4gq06u5q/control_plane_metadata/local_flytekit/e4078d1595594356d59956dba925c844  --platform linux/amd64 --output type=image,name=localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ,push=true 
#1 [internal] setting pip cache mount permissions
#1 DONE 0.0s
#2 <docker-image://ghcr.io/flyteorg/flytekit:py3.8-1.6.2>
#2 resolve <http://ghcr.io/flyteorg/flytekit:py3.8-1.6.2|ghcr.io/flyteorg/flytekit:py3.8-1.6.2>
One sec, I may be running the wrong file. I created a copy when I was debugging another problem earlier.
Ok it ran!!! The mlflow stuff should be available through the flyte UI as well correct?
I was hoping to see the mlflow information in the flyte deck similar to this example, but I cannot seem to find it. https://docs.flyte.org/en/latest/flytesnacks/examples/mlflow_plugin/mlflow_example.html
f
do you have
Copy code
enable_deck=True,
b
I just added that but I still cannot seem to find the plots. Where should they be in the UI?
Does the task with the tracking need to be directly called by the workflow? Right now train_model_loop is being called by the workflow, and train_model is called by train_model_loop.
Copy code
import pandas as pd
import numpy as np

from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns

from typing import List, Tuple, Dict
from flytekit import ImageSpec

import mlflow
from flytekitplugins.mlflow import mlflow_autolog

sklearn_image_spec = ImageSpec(
    base_image="<http://ghcr.io/flyteorg/flytekit:py3.11-1.11.0|ghcr.io/flyteorg/flytekit:py3.11-1.11.0>",
    packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"],
    registry="localhost:30000"    
)

# if sklearn_image_spec.is_container():


@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
    """Get the wine dataset."""
    return load_wine(as_frame=True).frame

@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Simplify the task from a 3-class to a binary classification problem."""
    data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
    data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)

    train_x = data_out_train.drop("target", axis=1)
    test_x = data_out_test.drop("target", axis=1)
    train_y = data_out_train[["target"]]
    test_y = data_out_test[["target"]]

    return train_x, test_x, train_y, test_y
    
@task(enable_deck=True, container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
    train_x: pd.DataFrame, 
    test_x: pd.DataFrame, 
    train_y: pd.DataFrame, 
    test_y: pd.DataFrame, 
    params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
    """Train a model on the wine dataset."""

    lr = LogisticRegression(max_iter=3000, **params)
    lr.fit(train_x, train_y.iloc[:, 0])

    pred_y = lr.predict(test_x)
    mse = float(mean_squared_error(test_y, pred_y))
    mae = float(mean_absolute_error(test_y, pred_y))

    return mse, mae, lr


@task(container_image=sklearn_image_spec)
def training_model_loop(
    train_x: pd.DataFrame, 
    test_x: pd.DataFrame, 
    train_y: pd.DataFrame, 
    test_y: pd.DataFrame, 
    params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:

    for params_i in params_list:
        print('ahhhh')
        print(params_i)
        rmse_i, mae_i, lr_i = train_model(
            train_x = train_x,
            test_x = test_x,
            train_y = train_y,
            test_y = test_y,
            params=params_i,
        )

@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
    """Put all of the steps together into a single workflow."""
    # raise Exception("This is a test")
    data = get_data()
    train_x, test_x, train_y, test_y = process_data(data=data)

    training_model_loop(
        train_x = train_x,
        test_x = test_x,
        train_y = train_y,
        test_y = test_y,
        params_list=params_list,
    )

if __name__ == "__main__":
    training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
I have some mlflow configuration specific questions that I will start in a new thread
👍 1