millions-night-34157
06/17/2024, 10:51 AMancient-wolf-19325
06/17/2024, 10:51 AMFileSensor instances and use them within a workflow. Each sensor will wait for its respective file, and you can chain them together to ensure that the task executes only after all sensors have completed.
Here's an example of how you can achieve this:
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define multiple file sensors
sensor1 = FileSensor(name="sensor1")
sensor2 = FileSensor(name="sensor2")
@task
def process_data():
print("Processing data after all files are available")
@workflow
def my_workflow():
# Chain the sensors to wait for both files before executing the task
sensor1(path="/path/to/file1") >> sensor2(path="/path/to/file2") >> process_data()
if __name__ == "__main__":
my_workflow()
In this example, sensor1 waits for the first file, and once it is available, sensor2 waits for the second file. Only after both files are available does the process_data task execute.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/17/2024, 10:53 AMancient-wolf-19325
06/17/2024, 10:53 AMfrom flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
@task
def process_data():
print("Processing data after all files are available")
@workflow
def my_workflow(file_paths: list):
sensors = []
for file_path in file_paths:
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
if __name__ == "__main__":
my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example, the workflow dynamically creates a FileSensor for each file path in the file_paths list. It then chains these sensors together to ensure that the process_data task executes only after all files are available.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example ๐ 1 button ๐ 0 button ๐ก Hint: Mention @ancient-wolf-19325 in the thread for followups.millions-night-34157
06/17/2024, 11:53 AMancient-wolf-19325
06/17/2024, 11:53 AMfrom flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
@task
def process_data(file_paths: list):
for file_path in file_paths:
print(f"Processing file: {file_path}")
@workflow
def my_workflow(file_paths: list):
sensors = []
for file_path in file_paths:
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data(file_paths=file_paths)
if __name__ == "__main__":
my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example, the process_data task receives the list of file paths and processes each file. The workflow ensures that all sensors are triggered and completed before the task executes.
Would you like more detail on any specific part of this process?
Sources:
- https://discuss.flyte.org/t/15614930/wave-hello-all-it-seems-etl-has-been-a-topic-that-has-come-u/ ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/17/2024, 11:55 AMancient-wolf-19325
06/17/2024, 11:55 AMfile_paths from a configuration object that is passed to the workflow as a parameter. This allows you to dynamically configure your workflow based on external inputs.
Here's an example of how you can achieve this:
1. Define a configuration class: Use a class to encapsulate the configuration, including the file paths.
2. Pass the configuration object to the workflow: Use this configuration object to initialize the file sensors and pass the file paths to the task.
Here's how you can implement this:
from typing import List
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define a configuration class using Pydantic
class Config(BaseModel):
file_paths: List[str]
@task
def process_data(file_paths: List[str]):
for file_path in file_paths:
print(f"Processing file: {file_path}")
@workflow
def my_workflow(config: Config):
sensors = []
for file_path in config.file_paths:
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data(file_paths=config.file_paths)
if __name__ == "__main__":
# Example configuration
config = Config(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
my_workflow(config=config)
In this example:
- A Config class is defined using Pydantic to encapsulate the file paths.
- The my_workflow function takes a Config object as a parameter.
- The workflow creates file sensors based on the paths in the Config object.
- The process_data task processes the files after all sensors have confirmed the files are available.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit Pydantic Integration ๐ 1 button ๐ 0 button ๐ก Hint: Mention @ancient-wolf-19325 in the thread for followups.millions-night-34157
06/18/2024, 3:07 PMancient-wolf-19325
06/18/2024, 3:07 PMFileSensor to detect files appearing in your local or remote filesystem.
Here is the link to the example: file_sensor_example.py
This example includes:
1. Importing necessary libraries.
2. Creating a FileSensor task.
3. Using the FileSensor within a workflow to wait for a file before executing a subsequent task.
Here's a snippet from the example:
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Create a FileSensor task
sensor = FileSensor(name="test_file_sensor")
@task
def t1():
print("SUCCEEDED")
@workflow
def wf():
sensor(path="s3://my-s3-bucket/file.txt") >> t1()
if __name__ == "__main__":
wf()
For more details, you can refer to the full example in the Flyte GitHub repository.
Would you like more detail on any specific part of this example?
Sources:
- file_sensor_example.py
- Sensor Example Documentation ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 3:08 PMmillions-night-34157
06/18/2024, 3:08 PMancient-wolf-19325
06/18/2024, 3:08 PMFileSensor inside a workflow in Flyte. This allows you to dynamically create sensors based on the workflow's logic or inputs. Below is an example demonstrating how to create instances of FileSensor within a workflow:
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
@task
def process_data():
print("Processing data after all files are available")
@workflow
def my_workflow(file_paths: list):
sensors = []
for file_path in file_paths:
# Create a FileSensor instance dynamically inside the workflow
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
if __name__ == "__main__":
my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example:
1. The my_workflow function takes a list of file paths as input.
2. Inside the workflow, a FileSensor instance is created for each file path.
3. The sensors are chained together to ensure that the process_data task only runs after all files are available.
This approach allows you to dynamically create and manage file sensors based on the workflow's input parameters.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 3:20 PMancient-wolf-19325
06/18/2024, 3:20 PMConfig object and create a FileSensor for each path, you can follow these steps:
1. Define the Config class: Use a class to encapsulate the configuration, including the hashmap.
2. Pass the Config object to the sub-workflow: Use this configuration object to initialize the file sensors and iterate over the hashmap values.
Here's an example of how you can achieve this:
from typing import Dict
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define a configuration class using Pydantic
class Config(BaseModel):
paths: Dict[str, str]
@task
def process_data():
print("Processing data after all files are available")
@workflow
def sub_workflow(config: Config):
sensors = []
for path_name, s3_path in config.paths.items():
# Create a FileSensor instance dynamically inside the workflow
sensor = FileSensor(name=f"sensor_{path_name}")
sensors.append(sensor(path=s3_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
@workflow
def main_workflow(config: Config):
sub_workflow(config=config)
if __name__ == "__main__":
# Example configuration
config = Config(paths={
"file1": "s3://my-s3-bucket/file1.txt",
"file2": "s3://my-s3-bucket/file2.txt",
"file3": "s3://my-s3-bucket/file3.txt"
})
main_workflow(config=config)
In this example:
1. The Config class is defined using Pydantic to encapsulate the hashmap of paths.
2. The sub_workflow function takes a Config object as a parameter and iterates over the hashmap to create FileSensor instances for each path.
3. The main_workflow function calls the sub_workflow and passes the Config object to it.
This approach allows you to dynamically create and manage file sensors based on the configuration provided in the Config object.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 3:26 PMmillions-night-34157
06/18/2024, 3:26 PMancient-wolf-19325
06/18/2024, 3:26 PMPromise objects. This means that you cannot directly use these Promise objects to create FileSensor instances. Instead, you need to ensure that the paths are materialized before creating the sensors.
To handle this, you can use a combination of tasks and workflows. First, materialize the paths in a task and then create the sensors in the workflow. Hereโs how you can achieve this:
1. Define the Config class: Use a class to encapsulate the configuration, including the hashmap.
2. Materialize the paths in a task: Create a task to extract the paths from the Config object.
3. Create sensors in the workflow: Use the materialized paths to create the sensors.
Hereโs an example:
from typing import Dict, List
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define a configuration class using Pydantic
class Config(BaseModel):
paths: Dict[str, str]
@task
def extract_paths(config: Config) -> List[str]:
return list(config.paths.values())
@task
def process_data():
print("Processing data after all files are available")
@workflow
def sub_workflow(config: Config):
# Extract paths from the config
paths = extract_paths(config=config)
sensors = []
for path in paths:
# Create a FileSensor instance dynamically inside the workflow
sensor = FileSensor(name=f"sensor_{path}")
sensors.append(sensor(path=path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
@workflow
def main_workflow(config: Config):
sub_workflow(config=config)
if __name__ == "__main__":
# Example configuration
config = Config(paths={
"file1": "s3://my-s3-bucket/file1.txt",
"file2": "s3://my-s3-bucket/file2.txt",
"file3": "s3://my-s3-bucket/file3.txt"
})
main_workflow(config=config)
In this example:
1. The Config class is defined using Pydantic to encapsulate the hashmap of paths.
2. The extract_paths task materializes the paths from the Config object.
3. The sub_workflow function takes the materialized paths and creates FileSensor instances for each path.
This approach ensures that the paths are materialized before creating the sensors, allowing you to handle Promise objects correctly.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 5:14 PMancient-wolf-19325
06/18/2024, 5:14 PMpytest framework along with Flyte's testing utilities. Flyte provides tools like task_mock and patch to facilitate the testing of tasks, especially those that depend on external resources or services.
Hereโs a step-by-step guide to unit test a Flyte task:
1. Define the Task: Create a Flyte task that you want to test.
2. Use pytest for Testing: Write test cases using the pytest framework.
3. Mock External Dependencies: Use Flyte's task_mock or patch utilities to mock external dependencies.
Example Task
Let's assume we have a simple Flyte task that processes some data:
from flytekit import task
@task
def process_data(a: int, b: int) -> int:
return a + b
Unit Test
Hereโs how you can write a unit test for the process_data task using `pytest`:
import pytest
from flytekit.testing import task_mock
from my_module import process_data # Import the task from your module
def test_process_data():
# Directly call the task function with test inputs
result = process_data(a=3, b=4)
assert result == 7
# To run the test, use the pytest command in your terminal:
# pytest -v
Mocking External Dependencies
If your task depends on external resources, you can use task_mock to mock these dependencies. Hereโs an example:
from flytekit import task, workflow
from flytekit.testing import task_mock
@task
def fetch_data() -> int:
# Simulate fetching data from an external source
return 42
@task
def process_data(a: int, b: int) -> int:
return a + b
@workflow
def my_workflow() -> int:
data = fetch_data()
result = process_data(a=data, b=10)
return result
def test_my_workflow():
with task_mock(fetch_data) as mock:
mock.return_value = 5
result = my_workflow()
assert result == 15
# To run the test, use the pytest command in your terminal:
# pytest -v
In this example:
1. fetch_data Task: This task simulates fetching data from an external source.
2. process_data Task: This task processes the fetched data.
3. my_workflow Workflow: This workflow chains the tasks together.
4. test_my_workflow Test: This test uses task_mock to mock the fetch_data task, ensuring that it returns a controlled value (5) during the test.
Additional Resources
- Flytekit Testing Documentation
- Flytekit GitHub Repository
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Testing Documentation
- Flytekit GitHub Repository ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 5:16 PMancient-wolf-19325
06/18/2024, 5:16 PMprocess_data function and ensure that it processes data correctly, you can directly call the function within your test case and assert the expected output. Since process_data is a simple function that adds two integers, you don't need to mock any external dependencies for this specific task.
Here's how you can write a unit test for the process_data task using `pytest`:
Example Task
from flytekit import task
@task
def process_data(a: int, b: int) -> int:
return a + b
Unit Test
Create a test file, for example, test_process_data.py, and write the following test case:
import pytest
from my_module import process_data # Import the task from your module
def test_process_data():
# Test case 1: Normal inputs
result = process_data(a=3, b=4)
assert result == 7
# Test case 2: Zero inputs
result = process_data(a=0, b=0)
assert result == 0
# Test case 3: Negative inputs
result = process_data(a=-1, b=-1)
assert result == -2
# Test case 4: Mixed inputs
result = process_data(a=-1, b=1)
assert result == 0
# To run the test, use the pytest command in your terminal:
# pytest -v test_process_data.py
In this example:
1. Import the Task: Import the process_data task from your module.
2. Define Test Cases: Write multiple test cases to cover different scenarios, such as normal inputs, zero inputs, negative inputs, and mixed inputs.
3. Run the Test: Use the pytest command to run the test and verify that the process_data function returns the expected results.
Running the Test
To run the test, navigate to the directory containing your test file and execute the following command in your terminal:
pytest -v test_process_data.py
This will run the test cases and provide detailed output on the results.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Testing Documentation
- Flytekit GitHub Repository ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 6:53 PMclass Config:
def __init__(self):
self.dummy_data = {"a": 1, "b": 2, "c": 3}
self.const_val = 9999
def __str__(self):
return f"Config: {self.dummy_data}"ancient-wolf-19325
06/18/2024, 6:53 PMTypeTransformer for the Config class, you need to define a custom transformer that handles the serialization and deserialization of the Config object. This involves converting the Config object to a Flyte Literal when passing it as an input or output of a task, and converting it back to a Config object when receiving it.
Hereโs how you can implement the TypeTransformer for the Config class:
1. Define the Config class: Ensure the class is defined as per your requirements.
2. Implement the TypeTransformer: Create a custom transformer that handles the serialization and deserialization.
3. Register the TypeTransformer: Register the transformer with Flyte's TypeEngine.
Step 1: Define the Config Class
class Config:
def __init__(self):
self.dummy_data = {"a": 1, "b": 2, "c": 3}
self.const_val = 9999
def __str__(self):
return f"Config: {self.dummy_data}"
Step 2: Implement the TypeTransformer
from flytekit import task, workflow
from flytekit.extend import TypeTransformer, TypeEngine
from flytekit.models.literals import Literal, Scalar, Primitive, LiteralMap
from flytekit.models.types import LiteralType, SimpleType
from typing import Type
class ConfigTransformer(TypeTransformer[Config]):
def __init__(self):
super().__init__(name="ConfigTransformer", t=Config)
def get_literal_type(self, t: Type[Config]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.INTEGER))
def to_literal(self, ctx, python_val: Config, python_type: Type[Config], expected: LiteralType) -> Literal:
dummy_data_literals = {k: Literal(scalar=Scalar(primitive=Primitive(integer_value=v))) for k, v in python_val.dummy_data.items()}
const_val_literal = Literal(scalar=Scalar(primitive=Primitive(integer_value=python_val.const_val)))
return Literal(map=LiteralMap(literals={**dummy_data_literals, "const_val": const_val_literal}))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[Config]) -> Config:
dummy_data = {k: v.scalar.primitive.integer_value for k, v in lv.map.literals.items() if k != "const_val"}
const_val = lv.map.literals["const_val"].scalar.primitive.integer_value
config = Config()
config.dummy_data = dummy_data
config.const_val = const_val
return config
# Register the transformer
TypeEngine.register(ConfigTransformer())
Step 3: Use the Config Class in a Task
Now you can use the Config class in a Flyte task:
@task
def process_config(config: Config) -> str:
return str(config)
@workflow
def my_workflow() -> str:
config = Config()
return process_config(config=config)
if __name__ == "__main__":
print(my_workflow())
Explanationancient-wolf-19325
06/18/2024, 6:54 PMConfigTransformer Class: This class extends TypeTransformer and implements the required methods: get_literal_type, to_literal, and to_python_value.
2. get_literal_type Method: Defines the Flyte LiteralType for the Config object. In this case, it uses a map of integer values.
3. to_literal Method: Converts the Config object to a Flyte Literal. It serializes the dummy_data dictionary and const_val into a LiteralMap.
4. to_python_value Method: Converts a Flyte Literal back into a Config object. It deserializes the LiteralMap into the dummy_data dictionary and const_val.
5. Register the Transformer: Registers the custom transformer with Flyte's TypeEngine.
This setup allows you to use the Config class as an input and output type in Flyte tasks and workflows.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit TypeTransformer Documentation
- Flytekit Custom Types Example ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 7:55 PMclass MyData:
def __init__(self, data_path: str, sensor_path: str):
self.data_path = data_path
self.sensor_path = sensor_path
class MyDataList:
def __init__(self, data_paths: List[str], sensor_path: str):
self.data_paths = data_paths
self.sensor_path = sensor_path
class Config:
def __init__(self, data_sets: Dict[str, Union[MyData, MyDataList]]):
self.data_sets = data_setsancient-wolf-19325
06/18/2024, 7:55 PMTypeTransformer classes for MyData, MyDataList, and Config, we need to handle the serialization and deserialization of these custom types into Flyte Literal types. This involves converting the custom objects to Flyte Literal representations when passing them as inputs or outputs of tasks, and converting them back to the custom objects when receiving them.
Here's how you can implement the TypeTransformer for these classes:
Step 1: Define the Custom Classes
from typing import List, Dict, Union
class MyData:
def __init__(self, data_path: str, sensor_path: str):
self.data_path = data_path
self.sensor_path = sensor_path
class MyDataList:
def __init__(self, data_paths: List[str], sensor_path: str):
self.data_paths = data_paths
self.sensor_path = sensor_path
class Config:
def __init__(self, data_sets: Dict[str, Union[MyData, MyDataList]]):
self.data_sets = data_sets
Step 2: Implement the TypeTransformer for Each Class
MyDataTransformer
from flytekit import task, workflow
from flytekit.extend import TypeTransformer, TypeEngine
from flytekit.models.literals import Literal, Scalar, Primitive, LiteralMap
from flytekit.models.types import LiteralType, SimpleType
from typing import Type
class MyDataTransformer(TypeTransformer[MyData]):
def __init__(self):
super().__init__(name="MyDataTransformer", t=MyData)
def get_literal_type(self, t: Type[MyData]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))
def to_literal(self, ctx, python_val: MyData, python_type: Type[MyData], expected: LiteralType) -> Literal:
return Literal(map=LiteralMap(literals={
"data_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.data_path))),
"sensor_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.sensor_path)))
}))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[MyData]) -> MyData:
data_path = lv.map.literals["data_path"].scalar.primitive.string_value
sensor_path = lv.map.literals["sensor_path"].scalar.primitive.string_value
return MyData(data_path=data_path, sensor_path=sensor_path)
# Register the transformer
TypeEngine.register(MyDataTransformer())
MyDataListTransformer
class MyDataListTransformer(TypeTransformer[MyDataList]):
def __init__(self):
super().__init__(name="MyDataListTransformer", t=MyDataList)
def get_literal_type(self, t: Type[MyDataList]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))
def to_literal(self, ctx, python_val: MyDataList, python_type: Type[MyDataList], expected: LiteralType) -> Literal:
data_paths_literals = [Literal(scalar=Scalar(primitive=Primitive(string_value=path))) for path in python_val.data_paths]ancient-wolf-19325
06/18/2024, 7:56 PMreturn Literal(map=LiteralMap(literals={
"data_paths": Literal(collection=LiteralCollection(literals=data_paths_literals)),
"sensor_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.sensor_path)))
}))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[MyDataList]) -> MyDataList:
data_paths = [lit.scalar.primitive.string_value for lit in lv.map.literals["data_paths"].collection.literals]
sensor_path = lv.map.literals["sensor_path"].scalar.primitive.string_value
return MyDataList(data_paths=data_paths, sensor_path=sensor_path)
# Register the transformer
TypeEngine.register(MyDataListTransformer())
ConfigTransformer
class ConfigTransformer(TypeTransformer[Config]):
def __init__(self):
super().__init__(name="ConfigTransformer", t=Config)
def get_literal_type(self, t: Type[Config]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))
def to_literal(self, ctx, python_val: Config, python_type: Type[Config], expected: LiteralType) -> Literal:
data_sets_literals = {}
for key, value in python_val.data_sets.items():
if isinstance(value, MyData):
data_sets_literals[key] = MyDataTransformer().to_literal(ctx, value, MyData, expected)
elif isinstance(value, MyDataList):
data_sets_literals[key] = MyDataListTransformer().to_literal(ctx, value, MyDataList, expected)
return Literal(map=LiteralMap(literals=data_sets_literals))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[Config]) -> Config:
data_sets = {}
for key, value in lv.map.literals.items():
if "data_paths" in value.map.literals:
data_sets[key] = MyDataListTransformer().to_python_value(ctx, value, MyDataList)
else:
data_sets[key] = MyDataTransformer().to_python_value(ctx, value, MyData)
return Config(data_sets=data_sets)
# Register the transformer
TypeEngine.register(ConfigTransformer())
Step 3: Use the Custom Types in a Task
Now you can use the Config class and its nested custom types in a Flyte task:
@task
def process_config(config: Config) -> str:
return str(config)
@workflow
def my_workflow() -> str:
my_data = MyData(data_path="<s3://my-bucket/data.csv>", sensor_path="<s3://my-bucket/sensor.csv>")
my_data_list = MyDataList(data_paths=["<s3://my-bucket/data1.csv>", "<s3://my-bucket/data2.csv>"], sensor_path="<s3://my-bucket/sensor.csv>")
config = Config(data_sets={"dataset1": my_data, "dataset2": my_data_list})
return process_config(config=config)
if __name__ == "__main__":
print(my_workflow())
Explanation
1. MyDataTransformer Class: Handles the serialization and deserialization of the MyData class.ancient-wolf-19325
06/18/2024, 7:56 PMMyDataListTransformer Class: Handles the serialization and deserialization of the MyDataList class.
3. ConfigTransformer Class: Handles the serialization and deserialization of the Config class, which contains a dictionary of MyData and MyDataList objects.
4. Register the Transformers: Registers the custom transformers with Flyte's TypeEngine.
This setup allows you to use the Config, MyData, and MyDataList classes as input and output types in Flyte tasks and workflows.
Would you like more detail on any specific part of this process? ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 9:14 PM@dataclass_json
@dataclass
class ExecConfig:
partition_date: str
datasets: dict[str, Union[MyData, MyDataList]]ancient-wolf-19325
06/18/2024, 9:14 PM@dataclass_json from the dataclasses_json library. This means that you do not need to write a custom TypeTransformer for the ExecConfig class if it is annotated with @dataclass_json and @dataclass. Flyte will automatically handle the serialization and deserialization of this dataclass.
Hereโs how you can define the ExecConfig class and use it in a Flyte task:
Define the ExecConfig Class
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from typing import Dict, Union
@dataclass_json
@dataclass
class MyData:
data_path: str
sensor_path: str
@dataclass_json
@dataclass
class MyDataList:
data_paths: list[str]
sensor_path: str
@dataclass_json
@dataclass
class ExecConfig:
partition_date: str
datasets: Dict[str, Union[MyData, MyDataList]]
Use the ExecConfig Class in a Flyte Task
from flytekit import task, workflow
@task
def process_exec_config(config: ExecConfig) -> str:
return f"Partition Date: {config.partition_date}, Datasets: {config.datasets}"
@workflow
def my_workflow() -> str:
my_data = MyData(data_path="<s3://my-bucket/data.csv>", sensor_path="<s3://my-bucket/sensor.csv>")
my_data_list = MyDataList(data_paths=["<s3://my-bucket/data1.csv>", "<s3://my-bucket/data2.csv>"], sensor_path="<s3://my-bucket/sensor.csv>")
exec_config = ExecConfig(partition_date="2023-10-01", datasets={"dataset1": my_data, "dataset2": my_data_list})
return process_exec_config(config=exec_config)
if __name__ == "__main__":
print(my_workflow())
Explanation
1. @dataclass_json and @dataclass Annotations: These annotations enable Flyte to automatically handle the serialization and deserialization of the ExecConfig class.
2. process_exec_config Task: This task takes an ExecConfig object as input and processes it.
3. my_workflow Workflow: This workflow creates an ExecConfig object and passes it to the process_exec_config task.
Running the Workflow
To run the workflow, simply execute the script:
python my_script.py
Flyte will automatically handle the conversion of the ExecConfig object to and from its serialized form.
Additional Resources
- Flytekit Dataclass Support
- dataclasses_json Documentation
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Dataclass Support
- dataclasses_json Documentation ๐ 1 button ๐ 0 button ๐ก Hint: Mention @ancient-wolf-19325 in the thread for followups.