bumpy-pager-32541
04/29/2024, 12:00 AMpyflyte run wine_classification_example.py training_workflow
And I have it running fine in the local flytectl demo cluster if I comment out the mlflow_autolog and mlflow dependencies.
pyflyte run --remote -p my-project-wine-4 -d development wine_classification_example.py training_workflow
I think my only problem at this point is changing the requirements.txt for the demo cluster. When I run in the demo cluster it cannot find the libraries and errors on the following lines.
from flytekitplugins.mlflow import mlflow_autolog
import mlflow
How can I add to the requirements in the demo cluster. Ideally I would like to do it using a requirements.txt and dockerfile, but I will take anything that works at this point...freezing-airport-6809
freezing-airport-6809
bumpy-pager-32541
04/29/2024, 1:04 AMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
freezing-airport-6809
glamorous-carpet-83516
04/29/2024, 4:42 AMbumpy-pager-32541
04/29/2024, 4:42 AMglamorous-carpet-83516
04/29/2024, 4:45 AMif sklearn_image_spec.is_container():
and try to run it againglamorous-carpet-83516
04/29/2024, 4:45 AMglamorous-carpet-83516
04/29/2024, 4:46 AMget_data
, training_model_loop
and process_data
task as wellbumpy-pager-32541
04/29/2024, 4:52 AMglamorous-carpet-83516
04/29/2024, 4:58 AMbumpy-pager-32541
04/29/2024, 5:00 AMglamorous-carpet-83516
04/29/2024, 5:01 AMglamorous-carpet-83516
04/29/2024, 5:04 AMbumpy-pager-32541
04/29/2024, 5:09 AMglamorous-carpet-83516
04/29/2024, 5:24 AMbumpy-pager-32541
04/29/2024, 5:24 AMglamorous-carpet-83516
04/29/2024, 5:25 AMbumpy-pager-32541
04/29/2024, 5:26 AMpip install flytekitplugins-envd
glamorous-carpet-83516
04/29/2024, 5:27 AMbumpy-pager-32541
04/29/2024, 5:28 AMimport pandas as pd
import numpy as np
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns
from typing import List, Tuple, Dict
from flytekit import ImageSpec
import mlflow
from flytekitplugins.mlflow import mlflow_autolog
sklearn_image_spec = ImageSpec(
base_image="<http://ghcr.io/flyteorg/flytekit:py3.8-1.6.2|ghcr.io/flyteorg/flytekit:py3.8-1.6.2>",
packages=["mlflow", "flytekitplugins-mlflow"],
registry="localhost:30000"
)
# if sklearn_image_spec.is_container():
@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Simplify the task from a 3-class to a binary classification problem."""
data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)
train_x = data_out_train.drop("target", axis=1)
test_x = data_out_test.drop("target", axis=1)
train_y = data_out_train[["target"]]
test_y = data_out_test[["target"]]
return train_x, test_x, train_y, test_y
@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
"""Train a model on the wine dataset."""
lr = LogisticRegression(max_iter=3000, **params)
lr.fit(train_x, train_y.iloc[:, 0])
pred_y = lr.predict(test_x)
mse = float(mean_squared_error(test_y, pred_y))
mae = float(mean_absolute_error(test_y, pred_y))
return mse, mae, lr
@task(container_image=sklearn_image_spec)
def training_model_loop(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
for params_i in params_list:
print('ahhhh')
print(params_i)
rmse_i, mae_i, lr_i = train_model(
train_x = train_x,
test_x = test_x,
train_y = train_y,
test_y = test_y,
params=params_i,
)
@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
"""Put all of the steps together into a single workflow."""
# raise Exception("This is a test")
data = get_data()
train_x, test_x, train_y, test_y = process_data(data=data)
training_model_loop(
train_x = train_x,
test_x = test_x,
train_y = train_y,
test_y = test_y,
params_list=params_list,
)
if __name__ == "__main__":
training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
glamorous-carpet-83516
04/29/2024, 5:29 AMbumpy-pager-32541
04/29/2024, 5:30 AM>>> import flytekit
>>> flytekit.__version__
'1.11.0'
glamorous-carpet-83516
04/29/2024, 5:32 AMtree
in the current directory?bumpy-pager-32541
04/29/2024, 5:39 AM(flyte_env) (base) tchase@HQ9322OSX wine-classification % pwd
/Users/tchase/Documents/repos/flyte_demo/wine-classification
(flyte_env) (base) tchase@HQ9322OSX wine-classification % tree .
It is spitting out a lot of stuffglamorous-carpet-83516
04/29/2024, 5:42 AMglamorous-carpet-83516
04/29/2024, 5:43 AMworkflow
,and put wine_classification_example.py
inside it?bumpy-pager-32541
04/29/2024, 5:49 AMglamorous-carpet-83516
04/29/2024, 5:52 AMbumpy-pager-32541
04/29/2024, 6:00 AMtall-lock-23197
bumpy-pager-32541
04/29/2024, 6:03 AMbumpy-pager-32541
04/29/2024, 6:04 AMbumpy-pager-32541
04/29/2024, 6:10 AMFailed to get signed url for script_mode.tar.gz, reason: SYSTEM:Unknown: error=None, cause=<_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:30080: Failed to connect to remote host: Connection refused"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:30080: Failed to connect to remote host: Connection refused", grpc_status:14, created_time:"2024-04-28T23:08:42.912323-07:00"}"
glamorous-carpet-83516
04/29/2024, 6:10 AMkubectl get pods
tall-lock-23197
print
statements here after you resolve the above issue, in case the command still freezes: https://github.com/flyteorg/flytekit/blob/3966d1a0a1e33137a4bc41d9860d4ed5e264cbdf/flytekit/image_spec/image_spec.py#L218-L244bumpy-pager-32541
04/29/2024, 6:18 AMtall-lock-23197
bumpy-pager-32541
04/29/2024, 6:32 AMtall-lock-23197
bumpy-pager-32541
04/29/2024, 6:40 AMtall-lock-23197
image_spec.exist()
is the culprit. can you verify please? also if that's the case, can you check if client.images.get_registry_data(self.image_name())
line in exist()
is the actual culprit?bumpy-pager-32541
04/29/2024, 6:54 AMtall-lock-23197
tall-lock-23197
bumpy-pager-32541
04/29/2024, 7:14 AMfailed to run command envd build --path /var/folders/_9/60k4g7zj1wq1kyl_pr7hq3k80000gq/T/flyteu4fgcrnq/control_plane_metadata/local_flytekit/ff10a1df7e677dbac1a0dcb8a118a235 --platform linux/amd64 --output type=image,name=localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ,push=true with error b'time="2024-04-29T00:13:31-07:00" level=fatal msg=exit app=envd error="failed to build the image: failed to build: failed to wait error group: failed to solve LLB: failed to solve: failed to do request: Head \\"<http://localhost:30000/v2/flytekit/blobs/sha256:e4141a94de7eb2f73676a7678ff9b1e968f935c4c3390cb75d6427c251b1677a>\\": dial tcp [::1]:30000: connect: connection refused" version=v0.3.45\n'
glamorous-carpet-83516
04/29/2024, 7:16 AMenvd context create --name flyte-sandbox --builder tcp --builder-address localhost:30000 --use
glamorous-carpet-83516
04/29/2024, 7:16 AMbumpy-pager-32541
04/29/2024, 7:19 AMglamorous-carpet-83516
04/29/2024, 7:19 AMbumpy-pager-32541
04/29/2024, 7:20 AMglamorous-carpet-83516
04/29/2024, 7:27 AMenvd context create --name flyte-sandbox --builder tcp --builder-address localhost:30003 --use
bumpy-pager-32541
04/29/2024, 7:28 AM(flyte_env) (base) tchase@HQ9322OSX workflow % envd context create --name flyte-sandbox --builder tcp --builder-address localhost:30003 --use
FATA[2024-04-29T00:27:44-07:00] exit app=envd error="failed to create context: context \"flyte-sandbox\" already exists" version=v0.3.45
glamorous-carpet-83516
04/29/2024, 7:28 AMglamorous-carpet-83516
04/29/2024, 7:28 AMbumpy-pager-32541
04/29/2024, 7:29 AM(flyte_env) (base) tchase@HQ9322OSX workflow % envd context rm --name flyte-sandbox
FATA[2024-04-29T00:28:56-07:00] exit app=envd error="failed to remove context: cannot remove current context \"flyte-sandbox\"" version=v0.3.45
glamorous-carpet-83516
04/29/2024, 7:30 AMenvd context use --name default
bumpy-pager-32541
04/29/2024, 7:36 AMglamorous-carpet-83516
04/29/2024, 7:38 AMsklearn_image_spec = ImageSpec(
base_image="<http://ghcr.io/flyteorg/flytekit:py3.8-1.6.2|ghcr.io/flyteorg/flytekit:py3.8-1.6.2>",
packages=["mlflow", "flytekitplugins-mlflow"],
registry="localhost:30000"
)
glamorous-carpet-83516
04/29/2024, 7:38 AMbumpy-pager-32541
04/29/2024, 7:42 AMglamorous-carpet-83516
04/29/2024, 7:43 AMsklearn_image_spec = ImageSpec(
packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow"],
registry="localhost:30000"
)
bumpy-pager-32541
04/29/2024, 7:47 AMglamorous-carpet-83516
04/29/2024, 7:58 AMglamorous-carpet-83516
04/29/2024, 7:58 AMbumpy-pager-32541
04/29/2024, 8:02 AMimport pandas as pd
import numpy as np
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns
from typing import List, Tuple, Dict
from flytekit import ImageSpec
import mlflow
from flytekitplugins.mlflow import mlflow_autolog
sklearn_image_spec = ImageSpec(
packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow"],
registry="localhost:30000"
)
# if sklearn_image_spec.is_container():
@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Simplify the task from a 3-class to a binary classification problem."""
data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)
train_x = data_out_train.drop("target", axis=1)
test_x = data_out_test.drop("target", axis=1)
train_y = data_out_train[["target"]]
test_y = data_out_test[["target"]]
return train_x, test_x, train_y, test_y
@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
"""Train a model on the wine dataset."""
lr = LogisticRegression(max_iter=3000, **params)
lr.fit(train_x, train_y.iloc[:, 0])
pred_y = lr.predict(test_x)
mse = float(mean_squared_error(test_y, pred_y))
mae = float(mean_absolute_error(test_y, pred_y))
return mse, mae, lr
@task(container_image=sklearn_image_spec)
def training_model_loop(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
for params_i in params_list:
print('ahhhh')
print(params_i)
rmse_i, mae_i, lr_i = train_model(
train_x = train_x,
test_x = test_x,
train_y = train_y,
test_y = test_y,
params=params_i,
)
@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
"""Put all of the steps together into a single workflow."""
# raise Exception("This is a test")
data = get_data()
train_x, test_x, train_y, test_y = process_data(data=data)
training_model_loop(
train_x = train_x,
test_x = test_x,
train_y = train_y,
test_y = test_y,
params_list=params_list,
)
if __name__ == "__main__":
training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
glamorous-carpet-83516
04/29/2024, 8:13 AMbumpy-pager-32541
04/29/2024, 8:14 AMglamorous-carpet-83516
04/29/2024, 8:36 AMglamorous-carpet-83516
04/29/2024, 8:36 AMimport pandas as pd
import numpy as np
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns
from typing import List, Tuple, Dict
from flytekit import ImageSpec
import mlflow
from flytekitplugins.mlflow import mlflow_autolog
sklearn_image_spec = ImageSpec(
packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"],
registry="pingsutw"
)
# if sklearn_image_spec.is_container():
@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Simplify the task from a 3-class to a binary classification problem."""
data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)
train_x = data_out_train.drop("target", axis=1)
test_x = data_out_test.drop("target", axis=1)
train_y = data_out_train[["target"]]
test_y = data_out_test[["target"]]
return train_x, test_x, train_y, test_y
@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
"""Train a model on the wine dataset."""
lr = LogisticRegression(max_iter=3000, **params)
lr.fit(train_x, train_y.iloc[:, 0])
pred_y = lr.predict(test_x)
mse = float(mean_squared_error(test_y, pred_y))
mae = float(mean_absolute_error(test_y, pred_y))
return mse, mae, lr
@task(container_image=sklearn_image_spec)
def training_model_loop(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
for params_i in params_list:
print('ahhhh')
print(params_i)
rmse_i, mae_i, lr_i = train_model(
train_x=train_x,
test_x=test_x,
train_y=train_y,
test_y=test_y,
params=params_i,
)
@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
"""Put all of the steps together into a single workflow."""
# raise Exception("This is a test")
data = get_data()
train_x, test_x, train_y, test_y = process_data(data=data)
training_model_loop(
train_x=train_x,
test_x=test_x,
train_y=train_y,
test_y=test_y,
params_list=params_list,
)
if __name__ == "__main__":
training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
bumpy-pager-32541
04/30/2024, 4:28 PMtall-lock-23197
base_image
in your imagespec?bumpy-pager-32541
04/30/2024, 4:35 PMimport pandas as pd
import numpy as np
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns
from typing import List, Tuple, Dict
from flytekit import ImageSpec
import mlflow
from flytekitplugins.mlflow import mlflow_autolog
sklearn_image_spec = ImageSpec(
packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"],
registry="localhost:30000"
)
# if sklearn_image_spec.is_container():
@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Simplify the task from a 3-class to a binary classification problem."""
data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)
train_x = data_out_train.drop("target", axis=1)
test_x = data_out_test.drop("target", axis=1)
train_y = data_out_train[["target"]]
test_y = data_out_test[["target"]]
return train_x, test_x, train_y, test_y
@task(container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
"""Train a model on the wine dataset."""
lr = LogisticRegression(max_iter=3000, **params)
lr.fit(train_x, train_y.iloc[:, 0])
pred_y = lr.predict(test_x)
mse = float(mean_squared_error(test_y, pred_y))
mae = float(mean_absolute_error(test_y, pred_y))
return mse, mae, lr
@task(container_image=sklearn_image_spec)
def training_model_loop(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
for params_i in params_list:
print('ahhhh')
print(params_i)
rmse_i, mae_i, lr_i = train_model(
train_x=train_x,
test_x=test_x,
train_y=train_y,
test_y=test_y,
params=params_i,
)
@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
"""Put all of the steps together into a single workflow."""
# raise Exception("This is a test")
data = get_data()
train_x, test_x, train_y, test_y = process_data(data=data)
training_model_loop(
train_x=train_x,
test_x=test_x,
train_y=train_y,
test_y=test_y,
params_list=params_list,
)
if __name__ == "__main__":
training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
tall-lock-23197
bumpy-pager-32541
04/30/2024, 4:38 PMtall-lock-23197
pyflyte run ...
command, or specify the base image as <http://ghcr.io/flyteorg/flytekit:py3.11-1.11.0|ghcr.io/flyteorg/flytekit:py3.11-1.11.0>
bumpy-pager-32541
04/30/2024, 4:43 PMtall-lock-23197
docker run -it --rm localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ /bin/bash
pip show flytekit
bumpy-pager-32541
04/30/2024, 4:48 PMtall-lock-23197
bumpy-pager-32541
04/30/2024, 4:55 PMtall-lock-23197
<http://ghcr.io/flyteorg/flytekit:py3.11-1.11.0|ghcr.io/flyteorg/flytekit:py3.11-1.11.0>
?bumpy-pager-32541
04/30/2024, 5:01 PMTraceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/flytekitplugins/mlflow/tracking.py", line 113, in wrapper
if not ctx.execution_state.is_local_execution():
Message:
'ExecutionState' object has no attribute 'is_local_execution'
User error.
bumpy-pager-32541
04/30/2024, 5:10 PMbumpy-pager-32541
04/30/2024, 5:12 PMtall-lock-23197
bumpy-pager-32541
04/30/2024, 5:13 PM(flyte_env) (base) tchase@HQ9322OSX workflow % pyflyte run --remote wine_classification_example.py training_workflow
Running Execution on Remote.
one
two
a
b
e
False
Image localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ not found. Building...
five
six
Run command: envd build --path /var/folders/_9/60k4g7zj1wq1kyl_pr7hq3k80000gq/T/flyte4gq06u5q/control_plane_metadata/local_flytekit/e4078d1595594356d59956dba925c844 --platform linux/amd64 --output type=image,name=localhost:30000/flytekit:1JrBkuUO0aml5MJRduDMXQ,push=true
#1 [internal] setting pip cache mount permissions
#1 DONE 0.0s
#2 <docker-image://ghcr.io/flyteorg/flytekit:py3.8-1.6.2>
#2 resolve <http://ghcr.io/flyteorg/flytekit:py3.8-1.6.2|ghcr.io/flyteorg/flytekit:py3.8-1.6.2>
bumpy-pager-32541
04/30/2024, 5:15 PMbumpy-pager-32541
04/30/2024, 5:21 PMbumpy-pager-32541
04/30/2024, 5:31 PMfreezing-airport-6809
enable_deck=True,
bumpy-pager-32541
04/30/2024, 5:41 PMbumpy-pager-32541
04/30/2024, 5:52 PMimport pandas as pd
import numpy as np
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error
from flytekit import task, workflow
# import seaborn as sns
from typing import List, Tuple, Dict
from flytekit import ImageSpec
import mlflow
from flytekitplugins.mlflow import mlflow_autolog
sklearn_image_spec = ImageSpec(
base_image="<http://ghcr.io/flyteorg/flytekit:py3.11-1.11.0|ghcr.io/flyteorg/flytekit:py3.11-1.11.0>",
packages=["flytekit==1.11.0", "mlflow", "flytekitplugins-mlflow", "scikit-learn==1.2.2"],
registry="localhost:30000"
)
# if sklearn_image_spec.is_container():
@task(container_image=sklearn_image_spec)
def get_data() -> pd.DataFrame:
"""Get the wine dataset."""
return load_wine(as_frame=True).frame
@task(container_image=sklearn_image_spec)
def process_data(data: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Simplify the task from a 3-class to a binary classification problem."""
data_out = data.assign(target=lambda x: x["target"].where(x["target"] == 0, 1))
data_out_train, data_out_test = train_test_split(data_out, test_size=0.2, random_state=42)
train_x = data_out_train.drop("target", axis=1)
test_x = data_out_test.drop("target", axis=1)
train_y = data_out_train[["target"]]
test_y = data_out_test[["target"]]
return train_x, test_x, train_y, test_y
@task(enable_deck=True, container_image=sklearn_image_spec)
@mlflow_autolog(framework=mlflow.sklearn)
def train_model(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params: Dict[str, float]) -> Tuple[float, float, LogisticRegression]:
"""Train a model on the wine dataset."""
lr = LogisticRegression(max_iter=3000, **params)
lr.fit(train_x, train_y.iloc[:, 0])
pred_y = lr.predict(test_x)
mse = float(mean_squared_error(test_y, pred_y))
mae = float(mean_absolute_error(test_y, pred_y))
return mse, mae, lr
@task(container_image=sklearn_image_spec)
def training_model_loop(
train_x: pd.DataFrame,
test_x: pd.DataFrame,
train_y: pd.DataFrame,
test_y: pd.DataFrame,
params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]
) -> None:
for params_i in params_list:
print('ahhhh')
print(params_i)
rmse_i, mae_i, lr_i = train_model(
train_x = train_x,
test_x = test_x,
train_y = train_y,
test_y = test_y,
params=params_i,
)
@workflow
def training_workflow(params_list: List[Dict[str, float]] = [{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}]) -> None:
"""Put all of the steps together into a single workflow."""
# raise Exception("This is a test")
data = get_data()
train_x, test_x, train_y, test_y = process_data(data=data)
training_model_loop(
train_x = train_x,
test_x = test_x,
train_y = train_y,
test_y = test_y,
params_list=params_list,
)
if __name__ == "__main__":
training_workflow(params_list=[{"C": 0.1}, {"C": 0.2}, {"C": 0.3}, {"C": 0.4}])
bumpy-pager-32541
04/30/2024, 6:45 PM