Anindya Saha
03/18/2023, 7:37 PMpyflyte run --remote whylogs_example wf
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def get_reference_data() -> pd.DataFrame:
...
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def get_target_data() -> pd.DataFrame:
...
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
...
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def constraints_report(profile_view: DatasetProfileView) -> bool:
...
However, the get_reference_data
and get_target_data
should not need whylogs. They just work with pandas and scikit-learn. We should be able to run those tasks with the @task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
image. I did try that but it fails, k8s logs say:
File "/opt/venv/lib/python3.8/site-packages/flytekit/core/python_auto_container.py", line 279, in load_task
task_module = importlib.import_module(name=task_module) # type: ignore
...
File "/root/whylogs_example.py", line 17, in <module>
import whylogs as why
ModuleNotFoundError: No module named 'whylogs'
Traceback (most recent call last):
Every task container is trying to parse the entire whyogs_example.py file and since fytecookbook:core-latest
does not have whylogs it is failing. What is the best design pattern or strategy to be followed in such cases ? How can I make it work remotely ?
I read the containerization/multi_images.html, that example has two methods svm_trainer
and svm_predictor
but both end up using the same image. All the examples I see in https://github.com/flyteorg/flytesnacks/tree/master/cookbook/case_studies/ml_training also have only one custom docker file.
Is there a production grade example workflow with tasks taking different images which are significantly different with each other ? Looking for a reference complex workflow that talks about these nuances on how to organize the pieces together with different custom images for each task.Ketan (kumare3)
@task
def foo():
import my_module1
<http://my_module1.xyz|my_module1.xyz>()
...
@task
def foo2():
import my_module2
my_module2.abc():
...
Samhita Alla
Ketan (kumare3)
@task(container_image={{.image.my_image1}})
def foo():
...
@task(container_image={{.image.my_image2})
def foo2():
...
@task
def foo3():
...
Then to invoke
pyflyte run --image my_image1="image-uri" --image my_image2="image_uri"
Note that foo3 will use the default base image
More docs on this
https://docs.flyte.org/projects/cookbook/en/latest/auto/core/containerization/multi_images.html#sphx-glr-auto-core-containerization-multi-images-pyAnindya Saha
03/20/2023, 4:11 AMdef constraints_report(profile_view: DatasetProfileView) -> bool:
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
Ketan (kumare3)
Anindya Saha
03/20/2023, 4:21 AMSamhita Alla
What is the difference between "launch time" and "runtime" ?I think what Ketan's referring to is, "launch time" meaning when you are serializing and registering your workflows. "runtime" meaning when the workflow is running.
pyflyte run
(pyflyte run --image <your-whylogs-image>
) and use core
image for the tasks that aren't dependent on whylogs?Anindya Saha
03/20/2023, 4:40 AM@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_reference_data() -> pd.DataFrame:
...
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_target_data() -> pd.DataFrame:
...
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
...
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def constraints_report(profile_view: DatasetProfileView) -> bool:
...
Which should be same as what you mentioned above. Is my understanding correct ?Samhita Alla
Anindya Saha
03/20/2023, 4:42 AMKetan (kumare3)
pyflyte run
does not work with more than 1 file
. this is in progress (coming soon)
But, you can use the traditional route of pyflyte register
OR
pyflyte package
>> flytectl register
Anindya Saha
03/20/2023, 4:57 AMpyflyte run --image <http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest> --remote whylogs_example wf
? I changed the code as in the above snippet to override with flytecookbook:core-latest where needed.Samhita Alla
whylogs_example
a file or a directory?whylogs_example.py
Anindya Saha
03/20/2023, 5:06 AMSamhita Alla
Anindya Saha
03/20/2023, 5:24 AMSamhita Alla
Anindya Saha
03/20/2023, 5:30 AMSamhita Alla
Anindya Saha
03/20/2023, 5:31 AMSamhita Alla
create_profile_view
and constraints_report
?Anindya Saha
03/20/2023, 5:57 AMpyflyte run --image <http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest> --remote whylogs_example.py wf
Can you please check if your code matches this code:
"""
whylogs Example
---------------
This examples shows users how to profile pandas DataFrames with whylogs,
pass them within tasks and also use our renderers to create a SummaryDriftReport
and a ConstraintsReport with failed and passed constraints.
"""
# %%
# First, let's make all the necessary imports for our example to run properly
import os
import flytekit
import numpy as np
import pandas as pd
import whylogs as why
from flytekit import conditional, task, workflow
from flytekitplugins.whylogs.renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer
from flytekitplugins.whylogs.schema import WhylogsDatasetProfileTransformer
from sklearn.datasets import load_diabetes
from whylogs.core import DatasetProfileView
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import (
greater_than_number,
mean_between_range,
null_percentage_below_number,
smaller_than_number,
)
# %%
# Next thing is defining a task to read our reference dataset.
# For this, we will take scikit-learn's entire example Diabetes dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_reference_data() -> pd.DataFrame:
diabetes = load_diabetes()
df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
df["target"] = pd.DataFrame(diabetes.target)
return df
# %%
# To some extent, we wanted to show kinds of drift in our example,
# so in order to reproduce some of what real-life data behaves
# we will take an arbitrary subset of the reference dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_target_data() -> pd.DataFrame:
diabetes = load_diabetes()
df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
df["target"] = pd.DataFrame(diabetes.target)
return df.mask(df["age"] < 0.0).dropna(axis=0)
# %%
# Now we will define a task that can take in any pandas DataFrame
# and return a ``DatasetProfileView``, which is our data profile.
# With it, users can either visualize and check overall statistics
# or even run a constraint suite on top of it.
@task
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
result = why.log(df)
return result.view()
# %%
# And we will also define a constraints report task
# that will run some checks in our existing profile.
@task
def constraints_report(profile_view: DatasetProfileView) -> bool:
builder = ConstraintsBuilder(dataset_profile_view=profile_view)
builder.add_constraint(greater_than_number(column_name="age", number=45.0))
builder.add_constraint(smaller_than_number(column_name="bp", number=20.0))
builder.add_constraint(mean_between_range(column_name="s3", lower=-1.5, upper=1.5))
builder.add_constraint(null_percentage_below_number(column_name="sex", number=0.0))
constraints = builder.build()
renderer = WhylogsConstraintsRenderer()
flytekit.Deck("constraints", renderer.to_html(constraints=constraints))
return constraints.validate()
# %%
# This is a representation of a prediction task. Since we are looking
# to take some of the complexity away from our demonstrations,
# our model prediction here will be represented by generating a bunch of
# random numbers with numpy. This task will take place if we pass our
# constraints suite.
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def make_predictions(input_data: pd.DataFrame, output_path: str) -> str:
input_data["predictions"] = np.random.random(size=len(input_data))
if not os.path.exists(output_path):
os.makedirs(output_path)
input_data.to_csv(os.path.join(output_path, "predictions.csv"))
return f"wrote predictions successfully to {output_path}"
# %%
# Lastly, if the constraint checks fail, we will create a FlyteDeck
# with the Summary Drift Report, which can provide further intuition into
# whether there was a data drift to the failed constraint checks.
@task
def summary_drift_report(new_data: pd.DataFrame, reference_data: pd.DataFrame) -> str:
renderer = WhylogsSummaryDriftRenderer()
report = renderer.to_html(target_data=new_data, reference_data=reference_data)
flytekit.Deck("summary drift", report)
return f"reported summary drift for target dataset with n={len(new_data)}"
# %%
# Finally, we can then create a Flyte workflow that will
# chain together our example data pipeline
@workflow
def wf() -> str:
# 1. Read data
target_df = get_target_data()
# 2. Profile data and validate it
profile_view = create_profile_view(df=target_df)
validated = constraints_report(profile_view=profile_view)
# 3. Conditional actions if data is valid or not
return (
conditional("stop_if_fails")
.if_(validated.is_false())
.then(
summary_drift_report(
new_data=target_df,
reference_data=get_reference_data(),
)
)
.else_()
.then(make_predictions(input_data=target_df, output_path="./data"))
)
# %%
if __name__ == "__main__":
wf()"""
whylogs Example
---------------
This examples shows users how to profile pandas DataFrames with whylogs,
pass them within tasks and also use our renderers to create a SummaryDriftReport
and a ConstraintsReport with failed and passed constraints.
"""
# %%
# First, let's make all the necessary imports for our example to run properly
import os
import flytekit
import numpy as np
import pandas as pd
import whylogs as why
from flytekit import conditional, task, workflow
from flytekitplugins.whylogs.renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer
from flytekitplugins.whylogs.schema import WhylogsDatasetProfileTransformer
from sklearn.datasets import load_diabetes
from whylogs.core import DatasetProfileView
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import (
greater_than_number,
mean_between_range,
null_percentage_below_number,
smaller_than_number,
)
# %%
# Next thing is defining a task to read our reference dataset.
# For this, we will take scikit-learn's entire example Diabetes dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_reference_data() -> pd.DataFrame:
diabetes = load_diabetes()
df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
df["target"] = pd.DataFrame(diabetes.target)
return df
# %%
# To some extent, we wanted to show kinds of drift in our example,
# so in order to reproduce some of what real-life data behaves
# we will take an arbitrary subset of the reference dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_target_data() -> pd.DataFrame:
diabetes = load_diabetes()
df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
df["target"] = pd.DataFrame(diabetes.target)
return df.mask(df["age"] < 0.0).dropna(axis=0)
# %%
# Now we will define a task that can take in any pandas DataFrame
# and return a ``DatasetProfileView``, which is our data profile.
# With it, users can either visualize and check overall statistics
# or even run a constraint suite on top of it.
@task
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
result = why.log(df)
return result.view()
# %%
# And we will also define a constraints report task
# that will run some checks in our existing profile.
@task
def constraints_report(profile_view: DatasetProfileView) -> bool:
builder = ConstraintsBuilder(dataset_profile_view=profile_view)
builder.add_constraint(greater_than_number(column_name="age", number=45.0))
builder.add_constraint(smaller_than_number(column_name="bp", number=20.0))
builder.add_constraint(mean_between_range(column_name="s3", lower=-1.5, upper=1.5))
builder.add_constraint(null_percentage_below_number(column_name="sex", number=0.0))
constraints = builder.build()
renderer = WhylogsConstraintsRenderer()
flytekit.Deck("constraints", renderer.to_html(constraints=constraints))
return constraints.validate()
# %%
# This is a representation of a prediction task. Since we are looking
# to take some of the complexity away from our demonstrations,
# our model prediction here will be represented by generating a bunch of
# random numbers with numpy. This task will take place if we pass our
# constraints suite.
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def make_predictions(input_data: pd.DataFrame, output_path: str) -> str:
input_data["predictions"] = np.random.random(size=len(input_data))
if not os.path.exists(output_path):
os.makedirs(output_path)
input_data.to_csv(os.path.join(output_path, "predictions.csv"))
return f"wrote predictions successfully to {output_path}"
# %%
# Lastly, if the constraint checks fail, we will create a FlyteDeck
# with the Summary Drift Report, which can provide further intuition into
# whether there was a data drift to the failed constraint checks.
@task
def summary_drift_report(new_data: pd.DataFrame, reference_data: pd.DataFrame) -> str:
renderer = WhylogsSummaryDriftRenderer()
report = renderer.to_html(target_data=new_data, reference_data=reference_data)
flytekit.Deck("summary drift", report)
return f"reported summary drift for target dataset with n={len(new_data)}"
# %%
# Finally, we can then create a Flyte workflow that will
# chain together our example data pipeline
@workflow
def wf() -> str:
# 1. Read data
target_df = get_target_data()
# 2. Profile data and validate it
profile_view = create_profile_view(df=target_df)
validated = constraints_report(profile_view=profile_view)
# 3. Conditional actions if data is valid or not
return (
conditional("stop_if_fails")
.if_(validated.is_false())
.then(
summary_drift_report(
new_data=target_df,
reference_data=get_reference_data(),
)
)
.else_()
.then(make_predictions(input_data=target_df, output_path="./data"))
)
# %%
if __name__ == "__main__":
wf()
Samhita Alla
Anindya Saha
03/20/2023, 5:59 AMSamhita Alla
Anindya Saha
03/20/2023, 6:07 AMSamhita Alla
Anindya Saha
03/20/2023, 6:12 AMSamhita Alla