Hey <@U06RKAMK0UC> How to make a Flyte task wait f...
# ask-ai
m
Hey @ancient-wolf-19325 How to make a Flyte task wait for more than one file sensor?
a
To make a Flyte task wait for more than one file sensor, you can define multiple
FileSensor
instances and use them within a workflow. Each sensor will wait for its respective file, and you can chain them together to ensure that the task executes only after all sensors have completed. Here's an example of how you can achieve this:
Copy code
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

# Define multiple file sensors
sensor1 = FileSensor(name="sensor1")
sensor2 = FileSensor(name="sensor2")

@task
def process_data():
    print("Processing data after all files are available")

@workflow
def my_workflow():
    # Chain the sensors to wait for both files before executing the task
    sensor1(path="/path/to/file1") >> sensor2(path="/path/to/file2") >> process_data()

if __name__ == "__main__":
    my_workflow()
In this example,
sensor1
waits for the first file, and once it is available,
sensor2
waits for the second file. Only after both files are available does the
process_data
task execute. Would you like more detail on any specific part of this process? Sources: - Flytekit Sensor Example - Flytekit FileSensor ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 Can we use for loop to make the task wait for all the file sensors?
a
Yes, you can use a for loop to make a Flyte task wait for multiple file sensors. This approach can be more scalable if you have a dynamic or large number of files to wait for. Below is an example of how you can achieve this using a for loop:
Copy code
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

@task
def process_data():
    print("Processing data after all files are available")

@workflow
def my_workflow(file_paths: list):
    sensors = []
    for file_path in file_paths:
        sensor = FileSensor(name=f"sensor_{file_path}")
        sensors.append(sensor(path=file_path))
    
    # Chain all sensors to ensure all files are available before processing data
    final_sensor = sensors[0]
    for sensor in sensors[1:]:
        final_sensor = final_sensor >> sensor
    
    final_sensor >> process_data()

if __name__ == "__main__":
    my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example, the workflow dynamically creates a
FileSensor
for each file path in the
file_paths
list. It then chains these sensors together to ensure that the
process_data
task executes only after all files are available. Would you like more detail on any specific part of this process? Sources: - Flytekit Sensor Example ๐Ÿ‘ 1 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @ancient-wolf-19325 in the thread for followups.
m
Hey @ancient-wolf-19325 can we pass array of sensors as an input to a flyte task and inside flyte task can we get the actual path from the sensor object?
a
Passing an array of sensors as an input to a Flyte task and extracting the actual path from the sensor object inside the task is not directly supported by Flyte's current API. Flyte tasks are designed to handle data inputs and outputs, but sensor objects themselves are not typically passed around as task inputs. However, you can achieve similar functionality by passing an array of file paths to the task and using those paths to create and manage sensors within the workflow. Here's how you can do it: 1. Define the file sensors in the workflow: Create a sensor for each file path. 2. Pass the file paths to the task: Use the file paths to trigger the sensors and then pass the paths to the task. Here's an example:
Copy code
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

@task
def process_data(file_paths: list):
    for file_path in file_paths:
        print(f"Processing file: {file_path}")

@workflow
def my_workflow(file_paths: list):
    sensors = []
    for file_path in file_paths:
        sensor = FileSensor(name=f"sensor_{file_path}")
        sensors.append(sensor(path=file_path))
    
    # Chain all sensors to ensure all files are available before processing data
    final_sensor = sensors[0]
    for sensor in sensors[1:]:
        final_sensor = final_sensor >> sensor
    
    final_sensor >> process_data(file_paths=file_paths)

if __name__ == "__main__":
    my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example, the
process_data
task receives the list of file paths and processes each file. The workflow ensures that all sensors are triggered and completed before the task executes. Would you like more detail on any specific part of this process? Sources: - https://discuss.flyte.org/t/15614930/wave-hello-all-it-seems-etl-has-been-a-topic-that-has-come-u/ ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 Can I read file_paths from a config object which will be passed to workflow as a parameter?
a
Yes, you can read
file_paths
from a configuration object that is passed to the workflow as a parameter. This allows you to dynamically configure your workflow based on external inputs. Here's an example of how you can achieve this: 1. Define a configuration class: Use a class to encapsulate the configuration, including the file paths. 2. Pass the configuration object to the workflow: Use this configuration object to initialize the file sensors and pass the file paths to the task. Here's how you can implement this:
Copy code
from typing import List
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

# Define a configuration class using Pydantic
class Config(BaseModel):
    file_paths: List[str]

@task
def process_data(file_paths: List[str]):
    for file_path in file_paths:
        print(f"Processing file: {file_path}")

@workflow
def my_workflow(config: Config):
    sensors = []
    for file_path in config.file_paths:
        sensor = FileSensor(name=f"sensor_{file_path}")
        sensors.append(sensor(path=file_path))
    
    # Chain all sensors to ensure all files are available before processing data
    final_sensor = sensors[0]
    for sensor in sensors[1:]:
        final_sensor = final_sensor >> sensor
    
    final_sensor >> process_data(file_paths=config.file_paths)

if __name__ == "__main__":
    # Example configuration
    config = Config(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
    my_workflow(config=config)
In this example: - A
Config
class is defined using Pydantic to encapsulate the file paths. - The
my_workflow
function takes a
Config
object as a parameter. - The workflow creates file sensors based on the paths in the
Config
object. - The
process_data
task processes the files after all sensors have confirmed the files are available. Would you like more detail on any specific part of this process? Sources: - Flytekit Sensor Example - Flytekit Pydantic Integration ๐Ÿ‘ 1 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @ancient-wolf-19325 in the thread for followups.
m
Hey @ancient-wolf-19325 where can I get a full example of file sensor in GitHub repo?
a
You can find a full example of a file sensor in the Flyte GitHub repository. The example demonstrates how to use the
FileSensor
to detect files appearing in your local or remote filesystem. Here is the link to the example: file_sensor_example.py This example includes: 1. Importing necessary libraries. 2. Creating a
FileSensor
task. 3. Using the
FileSensor
within a workflow to wait for a file before executing a subsequent task. Here's a snippet from the example:
Copy code
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

# Create a FileSensor task
sensor = FileSensor(name="test_file_sensor")

@task
def t1():
    print("SUCCEEDED")

@workflow
def wf():
    sensor(path="s3://my-s3-bucket/file.txt") >> t1()

if __name__ == "__main__":
    wf()
For more details, you can refer to the full example in the Flyte GitHub repository. Would you like more detail on any specific part of this example? Sources: - file_sensor_example.py - Sensor Example Documentation ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
Can we create instance of FileSensor inside a workflow?
@ancient-wolf-19325 Can we create instance of FileSensor inside a workflow?
a
Yes, you can create an instance of
FileSensor
inside a workflow in Flyte. This allows you to dynamically create sensors based on the workflow's logic or inputs. Below is an example demonstrating how to create instances of
FileSensor
within a workflow:
Copy code
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

@task
def process_data():
    print("Processing data after all files are available")

@workflow
def my_workflow(file_paths: list):
    sensors = []
    for file_path in file_paths:
        # Create a FileSensor instance dynamically inside the workflow
        sensor = FileSensor(name=f"sensor_{file_path}")
        sensors.append(sensor(path=file_path))
    
    # Chain all sensors to ensure all files are available before processing data
    final_sensor = sensors[0]
    for sensor in sensors[1:]:
        final_sensor = final_sensor >> sensor
    
    final_sensor >> process_data()

if __name__ == "__main__":
    my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example: 1. The
my_workflow
function takes a list of file paths as input. 2. Inside the workflow, a
FileSensor
instance is created for each file path. 3. The sensors are chained together to ensure that the
process_data
task only runs after all files are available. This approach allows you to dynamically create and manage file sensors based on the workflow's input parameters. Would you like more detail on any specific part of this process? Sources: - Flytekit Sensor Example - Flytekit FileSensor ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
Hey @ancient-wolf-19325 I have main workflow which reads yaml files and creates an object of type Config. This Config object has a hashmap in which key is path name and value is path in s3. I have a sub workflow which I call from main workflow and provide this config object as a parameter to sub workflow. How do I iterate on the values of hashmap which is inside config object and create a file sensor for each of the path?
a
To iterate over the values of a hashmap inside a
Config
object and create a
FileSensor
for each path, you can follow these steps: 1. Define the
Config
class
: Use a class to encapsulate the configuration, including the hashmap. 2. Pass the
Config
object to the sub-workflow
: Use this configuration object to initialize the file sensors and iterate over the hashmap values. Here's an example of how you can achieve this:
Copy code
from typing import Dict
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

# Define a configuration class using Pydantic
class Config(BaseModel):
    paths: Dict[str, str]

@task
def process_data():
    print("Processing data after all files are available")

@workflow
def sub_workflow(config: Config):
    sensors = []
    for path_name, s3_path in config.paths.items():
        # Create a FileSensor instance dynamically inside the workflow
        sensor = FileSensor(name=f"sensor_{path_name}")
        sensors.append(sensor(path=s3_path))
    
    # Chain all sensors to ensure all files are available before processing data
    final_sensor = sensors[0]
    for sensor in sensors[1:]:
        final_sensor = final_sensor >> sensor
    
    final_sensor >> process_data()

@workflow
def main_workflow(config: Config):
    sub_workflow(config=config)

if __name__ == "__main__":
    # Example configuration
    config = Config(paths={
        "file1": "s3://my-s3-bucket/file1.txt",
        "file2": "s3://my-s3-bucket/file2.txt",
        "file3": "s3://my-s3-bucket/file3.txt"
    })
    main_workflow(config=config)
In this example: 1. The
Config
class is defined using Pydantic to encapsulate the hashmap of paths. 2. The
sub_workflow
function takes a
Config
object as a parameter and iterates over the hashmap to create
FileSensor
instances for each path. 3. The
main_workflow
function calls the
sub_workflow
and passes the
Config
object to it. This approach allows you to dynamically create and manage file sensors based on the configuration provided in the
Config
object. Would you like more detail on any specific part of this process? Sources: - Flytekit Sensor Example - Flytekit FileSensor ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 When we try to register the workflow each key-value in "config.paths.items()" is of type "Promise". can we still create sensor using "promise" type object?
@ancient-wolf-19325 When we try to register the workflow each key-value in "config.paths.items()" is of type "Promise". can we still create sensor using "promise" type object?
a
In Flyte, when you pass inputs to a workflow, they are represented as
Promise
objects. This means that you cannot directly use these
Promise
objects to create
FileSensor
instances. Instead, you need to ensure that the paths are materialized before creating the sensors. To handle this, you can use a combination of tasks and workflows. First, materialize the paths in a task and then create the sensors in the workflow. Hereโ€™s how you can achieve this: 1. Define the
Config
class
: Use a class to encapsulate the configuration, including the hashmap. 2. Materialize the paths in a task: Create a task to extract the paths from the
Config
object. 3. Create sensors in the workflow: Use the materialized paths to create the sensors. Hereโ€™s an example:
Copy code
from typing import Dict, List
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor

# Define a configuration class using Pydantic
class Config(BaseModel):
    paths: Dict[str, str]

@task
def extract_paths(config: Config) -> List[str]:
    return list(config.paths.values())

@task
def process_data():
    print("Processing data after all files are available")

@workflow
def sub_workflow(config: Config):
    # Extract paths from the config
    paths = extract_paths(config=config)
    
    sensors = []
    for path in paths:
        # Create a FileSensor instance dynamically inside the workflow
        sensor = FileSensor(name=f"sensor_{path}")
        sensors.append(sensor(path=path))
    
    # Chain all sensors to ensure all files are available before processing data
    final_sensor = sensors[0]
    for sensor in sensors[1:]:
        final_sensor = final_sensor >> sensor
    
    final_sensor >> process_data()

@workflow
def main_workflow(config: Config):
    sub_workflow(config=config)

if __name__ == "__main__":
    # Example configuration
    config = Config(paths={
        "file1": "s3://my-s3-bucket/file1.txt",
        "file2": "s3://my-s3-bucket/file2.txt",
        "file3": "s3://my-s3-bucket/file3.txt"
    })
    main_workflow(config=config)
In this example: 1. The
Config
class is defined using Pydantic to encapsulate the hashmap of paths. 2. The
extract_paths
task materializes the paths from the
Config
object. 3. The
sub_workflow
function takes the materialized paths and creates
FileSensor
instances for each path. This approach ensures that the paths are materialized before creating the sensors, allowing you to handle
Promise
objects correctly. Would you like more detail on any specific part of this process? Sources: - Flytekit Sensor Example - Flytekit FileSensor ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 Whats the best way to "unit test" a Flyte task?
a
The best way to unit test a Flyte task is by using the
pytest
framework along with Flyte's testing utilities. Flyte provides tools like
task_mock
and
patch
to facilitate the testing of tasks, especially those that depend on external resources or services. Hereโ€™s a step-by-step guide to unit test a Flyte task: 1. Define the Task: Create a Flyte task that you want to test. 2. Use
pytest
for Testing
: Write test cases using the
pytest
framework. 3. Mock External Dependencies: Use Flyte's
task_mock
or
patch
utilities to mock external dependencies. Example Task Let's assume we have a simple Flyte task that processes some data:
Copy code
from flytekit import task

@task
def process_data(a: int, b: int) -> int:
    return a + b
Unit Test Hereโ€™s how you can write a unit test for the
process_data
task using `pytest`:
Copy code
import pytest
from flytekit.testing import task_mock
from my_module import process_data  # Import the task from your module

def test_process_data():
    # Directly call the task function with test inputs
    result = process_data(a=3, b=4)
    assert result == 7

# To run the test, use the pytest command in your terminal:
# pytest -v
Mocking External Dependencies If your task depends on external resources, you can use
task_mock
to mock these dependencies. Hereโ€™s an example:
Copy code
from flytekit import task, workflow
from flytekit.testing import task_mock

@task
def fetch_data() -> int:
    # Simulate fetching data from an external source
    return 42

@task
def process_data(a: int, b: int) -> int:
    return a + b

@workflow
def my_workflow() -> int:
    data = fetch_data()
    result = process_data(a=data, b=10)
    return result

def test_my_workflow():
    with task_mock(fetch_data) as mock:
        mock.return_value = 5
        result = my_workflow()
        assert result == 15

# To run the test, use the pytest command in your terminal:
# pytest -v
In this example: 1.
fetch_data
Task
: This task simulates fetching data from an external source. 2.
process_data
Task
: This task processes the fetched data. 3.
my_workflow
Workflow
: This workflow chains the tasks together. 4.
test_my_workflow
Test
: This test uses
task_mock
to mock the
fetch_data
task, ensuring that it returns a controlled value (5) during the test. Additional Resources - Flytekit Testing Documentation - Flytekit GitHub Repository Would you like more detail on any specific part of this process? Sources: - Flytekit Testing Documentation - Flytekit GitHub Repository ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 in the above example, how to "unit test" "process_data()" function and make sure that it processes correctly?
a
To unit test the
process_data
function and ensure that it processes data correctly, you can directly call the function within your test case and assert the expected output. Since
process_data
is a simple function that adds two integers, you don't need to mock any external dependencies for this specific task. Here's how you can write a unit test for the
process_data
task using `pytest`: Example Task
Copy code
from flytekit import task

@task
def process_data(a: int, b: int) -> int:
    return a + b
Unit Test Create a test file, for example,
test_process_data.py
, and write the following test case:
Copy code
import pytest
from my_module import process_data  # Import the task from your module

def test_process_data():
    # Test case 1: Normal inputs
    result = process_data(a=3, b=4)
    assert result == 7

    # Test case 2: Zero inputs
    result = process_data(a=0, b=0)
    assert result == 0

    # Test case 3: Negative inputs
    result = process_data(a=-1, b=-1)
    assert result == -2

    # Test case 4: Mixed inputs
    result = process_data(a=-1, b=1)
    assert result == 0

# To run the test, use the pytest command in your terminal:
# pytest -v test_process_data.py
In this example: 1. Import the Task: Import the
process_data
task from your module. 2. Define Test Cases: Write multiple test cases to cover different scenarios, such as normal inputs, zero inputs, negative inputs, and mixed inputs. 3. Run the Test: Use the
pytest
command to run the test and verify that the
process_data
function returns the expected results. Running the Test To run the test, navigate to the directory containing your test file and execute the following command in your terminal:
Copy code
pytest -v test_process_data.py
This will run the test cases and provide detailed output on the results. Would you like more detail on any specific part of this process? Sources: - Flytekit Testing Documentation - Flytekit GitHub Repository ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
Hey @ancient-wolf-19325 How do I implement "TypeTransformer" for below "Config" class such that I can use it in the task?
Copy code
class Config:
    def __init__(self):
        self.dummy_data = {"a": 1, "b": 2, "c": 3}
        self.const_val = 9999

    def __str__(self):
        return f"Config: {self.dummy_data}"
a
To implement a
TypeTransformer
for the
Config
class, you need to define a custom transformer that handles the serialization and deserialization of the
Config
object. This involves converting the
Config
object to a Flyte
Literal
when passing it as an input or output of a task, and converting it back to a
Config
object when receiving it. Hereโ€™s how you can implement the
TypeTransformer
for the
Config
class: 1. Define the
Config
class
: Ensure the class is defined as per your requirements. 2. Implement the
TypeTransformer
: Create a custom transformer that handles the serialization and deserialization. 3. Register the
TypeTransformer
: Register the transformer with Flyte's
TypeEngine
. Step 1: Define the
Config
Class
Copy code
class Config:
    def __init__(self):
        self.dummy_data = {"a": 1, "b": 2, "c": 3}
        self.const_val = 9999

    def __str__(self):
        return f"Config: {self.dummy_data}"
Step 2: Implement the
TypeTransformer
Copy code
from flytekit import task, workflow
from flytekit.extend import TypeTransformer, TypeEngine
from flytekit.models.literals import Literal, Scalar, Primitive, LiteralMap
from flytekit.models.types import LiteralType, SimpleType
from typing import Type

class ConfigTransformer(TypeTransformer[Config]):
    def __init__(self):
        super().__init__(name="ConfigTransformer", t=Config)

    def get_literal_type(self, t: Type[Config]) -> LiteralType:
        return LiteralType(map_value_type=LiteralType(simple=SimpleType.INTEGER))

    def to_literal(self, ctx, python_val: Config, python_type: Type[Config], expected: LiteralType) -> Literal:
        dummy_data_literals = {k: Literal(scalar=Scalar(primitive=Primitive(integer_value=v))) for k, v in python_val.dummy_data.items()}
        const_val_literal = Literal(scalar=Scalar(primitive=Primitive(integer_value=python_val.const_val)))
        return Literal(map=LiteralMap(literals={**dummy_data_literals, "const_val": const_val_literal}))

    def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[Config]) -> Config:
        dummy_data = {k: v.scalar.primitive.integer_value for k, v in lv.map.literals.items() if k != "const_val"}
        const_val = lv.map.literals["const_val"].scalar.primitive.integer_value
        config = Config()
        config.dummy_data = dummy_data
        config.const_val = const_val
        return config

# Register the transformer
TypeEngine.register(ConfigTransformer())
Step 3: Use the
Config
Class in a Task
Now you can use the
Config
class in a Flyte task:
Copy code
@task
def process_config(config: Config) -> str:
    return str(config)

@workflow
def my_workflow() -> str:
    config = Config()
    return process_config(config=config)

if __name__ == "__main__":
    print(my_workflow())
Explanation
1.
ConfigTransformer
Class
: This class extends
TypeTransformer
and implements the required methods:
get_literal_type
,
to_literal
, and
to_python_value
. 2.
get_literal_type
Method
: Defines the Flyte
LiteralType
for the
Config
object. In this case, it uses a map of integer values. 3.
to_literal
Method
: Converts the
Config
object to a Flyte
Literal
. It serializes the
dummy_data
dictionary and
const_val
into a
LiteralMap
. 4.
to_python_value
Method
: Converts a Flyte
Literal
back into a
Config
object. It deserializes the
LiteralMap
into the
dummy_data
dictionary and
const_val
. 5. Register the Transformer: Registers the custom transformer with Flyte's
TypeEngine
. This setup allows you to use the
Config
class as an input and output type in Flyte tasks and workflows. Would you like more detail on any specific part of this process? Sources: - Flytekit TypeTransformer Documentation - Flytekit Custom Types Example ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 create a "TypeTransformer" for below classes:
Copy code
class MyData:
    def __init__(self, data_path: str, sensor_path: str):
        self.data_path = data_path
        self.sensor_path = sensor_path


class MyDataList:
    def __init__(self, data_paths: List[str], sensor_path: str):
        self.data_paths = data_paths
        self.sensor_path = sensor_path


class Config:
    def __init__(self, data_sets: Dict[str, Union[MyData, MyDataList]]):
        self.data_sets = data_sets
a
To create
TypeTransformer
classes for
MyData
,
MyDataList
, and
Config
, we need to handle the serialization and deserialization of these custom types into Flyte
Literal
types. This involves converting the custom objects to Flyte
Literal
representations when passing them as inputs or outputs of tasks, and converting them back to the custom objects when receiving them. Here's how you can implement the
TypeTransformer
for these classes: Step 1: Define the Custom Classes
Copy code
from typing import List, Dict, Union

class MyData:
    def __init__(self, data_path: str, sensor_path: str):
        self.data_path = data_path
        self.sensor_path = sensor_path

class MyDataList:
    def __init__(self, data_paths: List[str], sensor_path: str):
        self.data_paths = data_paths
        self.sensor_path = sensor_path

class Config:
    def __init__(self, data_sets: Dict[str, Union[MyData, MyDataList]]):
        self.data_sets = data_sets
Step 2: Implement the
TypeTransformer
for Each Class
MyDataTransformer
Copy code
from flytekit import task, workflow
from flytekit.extend import TypeTransformer, TypeEngine
from flytekit.models.literals import Literal, Scalar, Primitive, LiteralMap
from flytekit.models.types import LiteralType, SimpleType
from typing import Type

class MyDataTransformer(TypeTransformer[MyData]):
    def __init__(self):
        super().__init__(name="MyDataTransformer", t=MyData)

    def get_literal_type(self, t: Type[MyData]) -> LiteralType:
        return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))

    def to_literal(self, ctx, python_val: MyData, python_type: Type[MyData], expected: LiteralType) -> Literal:
        return Literal(map=LiteralMap(literals={
            "data_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.data_path))),
            "sensor_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.sensor_path)))
        }))

    def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[MyData]) -> MyData:
        data_path = lv.map.literals["data_path"].scalar.primitive.string_value
        sensor_path = lv.map.literals["sensor_path"].scalar.primitive.string_value
        return MyData(data_path=data_path, sensor_path=sensor_path)

# Register the transformer
TypeEngine.register(MyDataTransformer())
MyDataListTransformer
Copy code
class MyDataListTransformer(TypeTransformer[MyDataList]):
    def __init__(self):
        super().__init__(name="MyDataListTransformer", t=MyDataList)

    def get_literal_type(self, t: Type[MyDataList]) -> LiteralType:
        return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))

    def to_literal(self, ctx, python_val: MyDataList, python_type: Type[MyDataList], expected: LiteralType) -> Literal:
        data_paths_literals = [Literal(scalar=Scalar(primitive=Primitive(string_value=path))) for path in python_val.data_paths]
Copy code
return Literal(map=LiteralMap(literals={
            "data_paths": Literal(collection=LiteralCollection(literals=data_paths_literals)),
            "sensor_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.sensor_path)))
        }))

    def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[MyDataList]) -> MyDataList:
        data_paths = [lit.scalar.primitive.string_value for lit in lv.map.literals["data_paths"].collection.literals]
        sensor_path = lv.map.literals["sensor_path"].scalar.primitive.string_value
        return MyDataList(data_paths=data_paths, sensor_path=sensor_path)

# Register the transformer
TypeEngine.register(MyDataListTransformer())
ConfigTransformer
Copy code
class ConfigTransformer(TypeTransformer[Config]):
    def __init__(self):
        super().__init__(name="ConfigTransformer", t=Config)

    def get_literal_type(self, t: Type[Config]) -> LiteralType:
        return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))

    def to_literal(self, ctx, python_val: Config, python_type: Type[Config], expected: LiteralType) -> Literal:
        data_sets_literals = {}
        for key, value in python_val.data_sets.items():
            if isinstance(value, MyData):
                data_sets_literals[key] = MyDataTransformer().to_literal(ctx, value, MyData, expected)
            elif isinstance(value, MyDataList):
                data_sets_literals[key] = MyDataListTransformer().to_literal(ctx, value, MyDataList, expected)
        return Literal(map=LiteralMap(literals=data_sets_literals))

    def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[Config]) -> Config:
        data_sets = {}
        for key, value in lv.map.literals.items():
            if "data_paths" in value.map.literals:
                data_sets[key] = MyDataListTransformer().to_python_value(ctx, value, MyDataList)
            else:
                data_sets[key] = MyDataTransformer().to_python_value(ctx, value, MyData)
        return Config(data_sets=data_sets)

# Register the transformer
TypeEngine.register(ConfigTransformer())
Step 3: Use the Custom Types in a Task Now you can use the
Config
class and its nested custom types in a Flyte task:
Copy code
@task
def process_config(config: Config) -> str:
    return str(config)

@workflow
def my_workflow() -> str:
    my_data = MyData(data_path="<s3://my-bucket/data.csv>", sensor_path="<s3://my-bucket/sensor.csv>")
    my_data_list = MyDataList(data_paths=["<s3://my-bucket/data1.csv>", "<s3://my-bucket/data2.csv>"], sensor_path="<s3://my-bucket/sensor.csv>")
    config = Config(data_sets={"dataset1": my_data, "dataset2": my_data_list})
    return process_config(config=config)

if __name__ == "__main__":
    print(my_workflow())
Explanation 1.
MyDataTransformer
Class
: Handles the serialization and deserialization of the
MyData
class.
2.
MyDataListTransformer
Class
: Handles the serialization and deserialization of the
MyDataList
class. 3.
ConfigTransformer
Class
: Handles the serialization and deserialization of the
Config
class, which contains a dictionary of
MyData
and
MyDataList
objects. 4. Register the Transformers: Registers the custom transformers with Flyte's
TypeEngine
. This setup allows you to use the
Config
,
MyData
, and
MyDataList
classes as input and output types in Flyte tasks and workflows. Would you like more detail on any specific part of this process? ๐Ÿ‘ 0 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @runllm in the thread for followups.
m
@ancient-wolf-19325 Do I need to write TypeTransformer for below class or Flyte will automatically convert it?
Copy code
@dataclass_json
@dataclass
class ExecConfig:
    partition_date: str
    datasets: dict[str, Union[MyData, MyDataList]]
a
Flyte provides built-in support for dataclasses annotated with
@dataclass_json
from the
dataclasses_json
library. This means that you do not need to write a custom
TypeTransformer
for the
ExecConfig
class if it is annotated with
@dataclass_json
and
@dataclass
. Flyte will automatically handle the serialization and deserialization of this dataclass. Hereโ€™s how you can define the
ExecConfig
class and use it in a Flyte task: Define the
ExecConfig
Class
Copy code
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from typing import Dict, Union

@dataclass_json
@dataclass
class MyData:
    data_path: str
    sensor_path: str

@dataclass_json
@dataclass
class MyDataList:
    data_paths: list[str]
    sensor_path: str

@dataclass_json
@dataclass
class ExecConfig:
    partition_date: str
    datasets: Dict[str, Union[MyData, MyDataList]]
Use the
ExecConfig
Class in a Flyte Task
Copy code
from flytekit import task, workflow

@task
def process_exec_config(config: ExecConfig) -> str:
    return f"Partition Date: {config.partition_date}, Datasets: {config.datasets}"

@workflow
def my_workflow() -> str:
    my_data = MyData(data_path="<s3://my-bucket/data.csv>", sensor_path="<s3://my-bucket/sensor.csv>")
    my_data_list = MyDataList(data_paths=["<s3://my-bucket/data1.csv>", "<s3://my-bucket/data2.csv>"], sensor_path="<s3://my-bucket/sensor.csv>")
    exec_config = ExecConfig(partition_date="2023-10-01", datasets={"dataset1": my_data, "dataset2": my_data_list})
    return process_exec_config(config=exec_config)

if __name__ == "__main__":
    print(my_workflow())
Explanation 1.
@dataclass_json
and
@dataclass
Annotations
: These annotations enable Flyte to automatically handle the serialization and deserialization of the
ExecConfig
class. 2.
process_exec_config
Task
: This task takes an
ExecConfig
object as input and processes it. 3.
my_workflow
Workflow
: This workflow creates an
ExecConfig
object and passes it to the
process_exec_config
task. Running the Workflow To run the workflow, simply execute the script:
Copy code
python my_script.py
Flyte will automatically handle the conversion of the
ExecConfig
object to and from its serialized form. Additional Resources - Flytekit Dataclass Support - dataclasses_json Documentation Would you like more detail on any specific part of this process? Sources: - Flytekit Dataclass Support - dataclasses_json Documentation ๐Ÿ‘ 1 button ๐Ÿ‘Ž 0 button ๐Ÿ’ก Hint: Mention @ancient-wolf-19325 in the thread for followups.