Hi Folks, We need to be able to provide different ...
# flyte-deployment
a
Hi Folks, We need to be able to provide different images for different tasks in a workflow, so I am testing the Multiple Container Images in a Single Workflow feature. I am using the Whylogs example. It won't work as is in a remote cluster. So, I added the flytecoobook whylogs container image to each task to be able to run the workflow successfully in a remote cluster with
pyflyte run --remote whylogs_example wf
Copy code
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def get_reference_data() -> pd.DataFrame:
    ...

@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def get_target_data() -> pd.DataFrame:
    ...

@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
    ...

@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def constraints_report(profile_view: DatasetProfileView) -> bool:
    ...
However, the
get_reference_data
and
get_target_data
should not need whylogs. They just work with pandas and scikit-learn. We should be able to run those tasks with the
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
image. I did try that but it fails, k8s logs say:
Copy code
File "/opt/venv/lib/python3.8/site-packages/flytekit/core/python_auto_container.py", line 279, in load_task
    task_module = importlib.import_module(name=task_module)  # type: ignore
  ...
  File "/root/whylogs_example.py", line 17, in <module>
    import whylogs as why
ModuleNotFoundError: No module named 'whylogs'
Traceback (most recent call last):
Every task container is trying to parse the entire whyogs_example.py file and since
fytecookbook:core-latest
does not have whylogs it is failing. What is the best design pattern or strategy to be followed in such cases ? How can I make it work remotely ? I read the containerization/multi_images.html, that example has two methods
svm_trainer
and
svm_predictor
but both end up using the same image. All the examples I see in https://github.com/flyteorg/flytesnacks/tree/master/cookbook/case_studies/ml_training also have only one custom docker file. Is there a production grade example workflow with tasks taking different images which are significantly different with each other ? Looking for a reference complex workflow that talks about these nuances on how to organize the pieces together with different custom images for each task.
k
@Samhita Alla can you take this one when you get a chance
Cc @Eduardo Apolinario (eapolinario) fyi
@Anindya Saha sorry i just read this. So the problem is you have ModuleLevel import for WhyLogs. In python who cannot simply load a module partially (at least I am not aware of a technique)
One idea is to avoid having module level imports and hide them in the task function itself - example
Copy code
@task
def foo():
  import my_module1
  <http://my_module1.xyz|my_module1.xyz>()
  ...

@task
def foo2():
   import my_module2
   my_module2.abc():
   ...
This may be cumbersome and not the best - but this is the reason why there are so many platforms out there that infact show this as the default way of working. This is lazy loading of the module
s
About to tell the same! Let me know if you're still seeing issues, Anindya.
k
Also for different images we use the following convention
Copy code
@task(container_image={{.image.my_image1}})
def foo():
  ...

@task(container_image={{.image.my_image2})
def foo2():
  ...

@task
def foo3():
  ...
Then to invoke
Copy code
pyflyte run --image my_image1="image-uri" --image my_image2="image_uri"
Note that
foo3 will use the default base image
More docs on this https://docs.flyte.org/projects/cookbook/en/latest/auto/core/containerization/multi_images.html#sphx-glr-auto-core-containerization-multi-images-py
a
Yes, I thought about it already. But, I think this particular example will still not work, because we cannot put all imports inside the method. The reason being these two methods which have typed inputs and outputs (which is an amazing feature of Flyte)
Copy code
def constraints_report(profile_view: DatasetProfileView) -> bool:
Copy code
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
so, the imports need to be at top level.
Scratch my comment above. I see what you are suggesting. Break the tasks into separate modules of py files and import them in the workflow. Let me try that.
k
yup
aah nm, at "launch time" you will need all dependencies
but at runtime, you wont
but this is a good thing to capture - cc @Samhita Alla how about adding to the docs, an example that uses different modules?
a
Yeah, @Samhita Alla could you please try that on your end and see if that works. I am happy to work with you. It would be good to have a working example for everyone.
What is the difference between "launch time" and "runtime" ?
Also, if I break into multiple task .py files, when the worklow is run on remote will it be able to pack all related submodules.py files along with the workflow_example.py and ship it to the remote cluster ? Or how can Flyte understand when executing each task it does not need the other task files so that whylogs is not needed in all cases ?
s
Sure thing. I'll create a docs issue and work on it.
What is the difference between "launch time" and "runtime" ?
I think what Ketan's referring to is, "launch time" meaning when you are serializing and registering your workflows. "runtime" meaning when the workflow is running.
Anindya, can you register your workflow with a default image (the whylogs one) by sending an image argument to
pyflyte run
(
pyflyte run --image <your-whylogs-image>
) and use
core
image for the tasks that aren't dependent on whylogs?
a
Yes, I already tried on the whylogs_example.py but it would not work in the remote as we discussed. I ran this:
Copy code
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_reference_data() -> pd.DataFrame:
    ...

@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_target_data() -> pd.DataFrame:
    ...

@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
    ...

@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest>")
def constraints_report(profile_view: DatasetProfileView) -> bool:
    ...
Which should be same as what you mentioned above. Is my understanding correct ?
s
I don't think it should be. Can you give my suggestion a try?
a
Okay. Let me try that
k
@Anindya Saha currently
pyflyte run
does not work with
more than 1 file
. this is in progress (coming soon) But, you can use the traditional route of
pyflyte register
OR
pyflyte package
>>
flytectl register
this can use fast registration, which does not need image builds
a
Actual workflow code with only flytecookbook:core-latest overridden
@Samhita Alla You mean like this
pyflyte run --image <http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest> --remote whylogs_example wf
? I changed the code as in the above snippet to override with flytecookbook:core-latest where needed.
s
Is
whylogs_example
a file or a directory?
If a file, please make it
whylogs_example.py
a
That was my typo in slack. Please see image
s
What's your flytekit version?
The command works for me.
a
It shows 1.4.2. Is that okay ?
s
Yes. 1.4.2 is the latest.
image.png
Can you reinitialize your virtual env and install the requirements again?
a
Is your whylogs_example a directory or file ?
s
Oh it's a file. With and without .py are working for me.
a
Is the workflow passing successfully ? And did you add the flytecookbok core container image in the code ? Can you pls share the file ?
s
I haven't modified the file at all.
Let me try that too.
Did you intentionally remove the task decorators for
create_profile_view
and
constraints_report
?
You shouldn't see that error if you add the decorators. Also, it won't work if they aren't tasks because promises are sent to the tasks.
a
Thanks. I fixed the task decorator. That was the cause of not implemented error 🙏 Please run
pyflyte run --image <http://ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest|ghcr.io/flyteorg/flytecookbook:whylogs_examples-latest> --remote whylogs_example.py wf
Can you please check if your code matches this code:
Copy code
"""
whylogs Example
---------------

This examples shows users how to profile pandas DataFrames with whylogs,
pass them within tasks and also use our renderers to create a SummaryDriftReport
and a ConstraintsReport with failed and passed constraints.
"""

# %%
# First, let's make all the necessary imports for our example to run properly
import os

import flytekit
import numpy as np
import pandas as pd
import whylogs as why
from flytekit import conditional, task, workflow
from flytekitplugins.whylogs.renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer
from flytekitplugins.whylogs.schema import WhylogsDatasetProfileTransformer
from sklearn.datasets import load_diabetes
from whylogs.core import DatasetProfileView
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import (
    greater_than_number,
    mean_between_range,
    null_percentage_below_number,
    smaller_than_number,
)


# %%
# Next thing is defining a task to read our reference dataset.
# For this, we will take scikit-learn's entire example Diabetes dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_reference_data() -> pd.DataFrame:
    diabetes = load_diabetes()
    df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
    df["target"] = pd.DataFrame(diabetes.target)
    return df


# %%
# To some extent, we wanted to show kinds of drift in our example,
# so in order to reproduce some of what real-life data behaves
# we will take an arbitrary subset of the reference dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_target_data() -> pd.DataFrame:
    diabetes = load_diabetes()
    df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
    df["target"] = pd.DataFrame(diabetes.target)
    return df.mask(df["age"] < 0.0).dropna(axis=0)


# %%
# Now we will define a task that can take in any pandas DataFrame
# and return a ``DatasetProfileView``, which is our data profile.
# With it, users can either visualize and check overall statistics
# or even run a constraint suite on top of it.
@task
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
    result = why.log(df)
    return result.view()


# %%
# And we will also define a constraints report task
# that will run some checks in our existing profile.
@task
def constraints_report(profile_view: DatasetProfileView) -> bool:
    builder = ConstraintsBuilder(dataset_profile_view=profile_view)
    builder.add_constraint(greater_than_number(column_name="age", number=45.0))
    builder.add_constraint(smaller_than_number(column_name="bp", number=20.0))
    builder.add_constraint(mean_between_range(column_name="s3", lower=-1.5, upper=1.5))
    builder.add_constraint(null_percentage_below_number(column_name="sex", number=0.0))

    constraints = builder.build()

    renderer = WhylogsConstraintsRenderer()
    flytekit.Deck("constraints", renderer.to_html(constraints=constraints))

    return constraints.validate()


# %%
# This is a representation of a prediction task. Since we are looking
# to take some of the complexity away from our demonstrations,
# our model prediction here will be represented by generating a bunch of
# random numbers with numpy. This task will take place if we pass our
# constraints suite.
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def make_predictions(input_data: pd.DataFrame, output_path: str) -> str:
    input_data["predictions"] = np.random.random(size=len(input_data))
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    input_data.to_csv(os.path.join(output_path, "predictions.csv"))
    return f"wrote predictions successfully to {output_path}"


# %%
# Lastly, if the constraint checks fail, we will create a FlyteDeck
# with the Summary Drift Report, which can provide further intuition into
# whether there was a data drift to the failed constraint checks.
@task
def summary_drift_report(new_data: pd.DataFrame, reference_data: pd.DataFrame) -> str:
    renderer = WhylogsSummaryDriftRenderer()
    report = renderer.to_html(target_data=new_data, reference_data=reference_data)
    flytekit.Deck("summary drift", report)
    return f"reported summary drift for target dataset with n={len(new_data)}"


# %%
# Finally, we can then create a Flyte workflow that will
# chain together our example data pipeline
@workflow
def wf() -> str:
    # 1. Read data
    target_df = get_target_data()

    # 2. Profile data and validate it
    profile_view = create_profile_view(df=target_df)
    validated = constraints_report(profile_view=profile_view)

    # 3. Conditional actions if data is valid or not
    return (
        conditional("stop_if_fails")
        .if_(validated.is_false())
        .then(
            summary_drift_report(
                new_data=target_df,
                reference_data=get_reference_data(),
            )
        )
        .else_()
        .then(make_predictions(input_data=target_df, output_path="./data"))
    )


# %%
if __name__ == "__main__":
    wf()"""
whylogs Example
---------------

This examples shows users how to profile pandas DataFrames with whylogs,
pass them within tasks and also use our renderers to create a SummaryDriftReport
and a ConstraintsReport with failed and passed constraints.
"""

# %%
# First, let's make all the necessary imports for our example to run properly
import os

import flytekit
import numpy as np
import pandas as pd
import whylogs as why
from flytekit import conditional, task, workflow
from flytekitplugins.whylogs.renderer import WhylogsConstraintsRenderer, WhylogsSummaryDriftRenderer
from flytekitplugins.whylogs.schema import WhylogsDatasetProfileTransformer
from sklearn.datasets import load_diabetes
from whylogs.core import DatasetProfileView
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import (
    greater_than_number,
    mean_between_range,
    null_percentage_below_number,
    smaller_than_number,
)


# %%
# Next thing is defining a task to read our reference dataset.
# For this, we will take scikit-learn's entire example Diabetes dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_reference_data() -> pd.DataFrame:
    diabetes = load_diabetes()
    df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
    df["target"] = pd.DataFrame(diabetes.target)
    return df


# %%
# To some extent, we wanted to show kinds of drift in our example,
# so in order to reproduce some of what real-life data behaves
# we will take an arbitrary subset of the reference dataset
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def get_target_data() -> pd.DataFrame:
    diabetes = load_diabetes()
    df = pd.DataFrame(diabetes.data, columns=diabetes.feature_names)
    df["target"] = pd.DataFrame(diabetes.target)
    return df.mask(df["age"] < 0.0).dropna(axis=0)


# %%
# Now we will define a task that can take in any pandas DataFrame
# and return a ``DatasetProfileView``, which is our data profile.
# With it, users can either visualize and check overall statistics
# or even run a constraint suite on top of it.
@task
def create_profile_view(df: pd.DataFrame) -> DatasetProfileView:
    result = why.log(df)
    return result.view()


# %%
# And we will also define a constraints report task
# that will run some checks in our existing profile.
@task
def constraints_report(profile_view: DatasetProfileView) -> bool:
    builder = ConstraintsBuilder(dataset_profile_view=profile_view)
    builder.add_constraint(greater_than_number(column_name="age", number=45.0))
    builder.add_constraint(smaller_than_number(column_name="bp", number=20.0))
    builder.add_constraint(mean_between_range(column_name="s3", lower=-1.5, upper=1.5))
    builder.add_constraint(null_percentage_below_number(column_name="sex", number=0.0))

    constraints = builder.build()

    renderer = WhylogsConstraintsRenderer()
    flytekit.Deck("constraints", renderer.to_html(constraints=constraints))

    return constraints.validate()


# %%
# This is a representation of a prediction task. Since we are looking
# to take some of the complexity away from our demonstrations,
# our model prediction here will be represented by generating a bunch of
# random numbers with numpy. This task will take place if we pass our
# constraints suite.
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:core-latest|ghcr.io/flyteorg/flytecookbook:core-latest>")
def make_predictions(input_data: pd.DataFrame, output_path: str) -> str:
    input_data["predictions"] = np.random.random(size=len(input_data))
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    input_data.to_csv(os.path.join(output_path, "predictions.csv"))
    return f"wrote predictions successfully to {output_path}"


# %%
# Lastly, if the constraint checks fail, we will create a FlyteDeck
# with the Summary Drift Report, which can provide further intuition into
# whether there was a data drift to the failed constraint checks.
@task
def summary_drift_report(new_data: pd.DataFrame, reference_data: pd.DataFrame) -> str:
    renderer = WhylogsSummaryDriftRenderer()
    report = renderer.to_html(target_data=new_data, reference_data=reference_data)
    flytekit.Deck("summary drift", report)
    return f"reported summary drift for target dataset with n={len(new_data)}"


# %%
# Finally, we can then create a Flyte workflow that will
# chain together our example data pipeline
@workflow
def wf() -> str:
    # 1. Read data
    target_df = get_target_data()

    # 2. Profile data and validate it
    profile_view = create_profile_view(df=target_df)
    validated = constraints_report(profile_view=profile_view)

    # 3. Conditional actions if data is valid or not
    return (
        conditional("stop_if_fails")
        .if_(validated.is_false())
        .then(
            summary_drift_report(
                new_data=target_df,
                reference_data=get_reference_data(),
            )
        )
        .else_()
        .then(make_predictions(input_data=target_df, output_path="./data"))
    )


# %%
if __name__ == "__main__":
    wf()
s
Sure, I will. Are you still seeing any errors?
a
The Flyte Workflow is stuck at the first get_target_data step for over 13 mins in my Flyte console
And I forot to bookmark the flyte K8s dashboard console. So searching for the k8s dashboard link to see the pod status.
s
Please check. I see the same error with this command. So it only works if the libraries are imported within a task.
a
Yes, it should error out as we discussed.
s
So if you don't use a whylogs type at the task boundary, it should work cause then, you can import whylogs within the relevant tasks. If not, I'm not sure this is possible.
a
Yeah. Would you be creating a docs issue you mentioned before and help to create a working example ? It will be beneficial for all.
s
Of course! I already created an issue: https://github.com/flyteorg/flyte/issues/3496
129 Views