millions-night-34157
06/17/2024, 10:51 AMancient-wolf-19325
06/17/2024, 10:51 AMFileSensor
instances and use them within a workflow. Each sensor will wait for its respective file, and you can chain them together to ensure that the task executes only after all sensors have completed.
Here's an example of how you can achieve this:
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define multiple file sensors
sensor1 = FileSensor(name="sensor1")
sensor2 = FileSensor(name="sensor2")
@task
def process_data():
print("Processing data after all files are available")
@workflow
def my_workflow():
# Chain the sensors to wait for both files before executing the task
sensor1(path="/path/to/file1") >> sensor2(path="/path/to/file2") >> process_data()
if __name__ == "__main__":
my_workflow()
In this example, sensor1
waits for the first file, and once it is available, sensor2
waits for the second file. Only after both files are available does the process_data
task execute.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/17/2024, 10:53 AMancient-wolf-19325
06/17/2024, 10:53 AMfrom flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
@task
def process_data():
print("Processing data after all files are available")
@workflow
def my_workflow(file_paths: list):
sensors = []
for file_path in file_paths:
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
if __name__ == "__main__":
my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example, the workflow dynamically creates a FileSensor
for each file path in the file_paths
list. It then chains these sensors together to ensure that the process_data
task executes only after all files are available.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example ๐ 1 button ๐ 0 button ๐ก Hint: Mention @ancient-wolf-19325 in the thread for followups.millions-night-34157
06/17/2024, 11:53 AMancient-wolf-19325
06/17/2024, 11:53 AMfrom flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
@task
def process_data(file_paths: list):
for file_path in file_paths:
print(f"Processing file: {file_path}")
@workflow
def my_workflow(file_paths: list):
sensors = []
for file_path in file_paths:
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data(file_paths=file_paths)
if __name__ == "__main__":
my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example, the process_data
task receives the list of file paths and processes each file. The workflow ensures that all sensors are triggered and completed before the task executes.
Would you like more detail on any specific part of this process?
Sources:
- https://discuss.flyte.org/t/15614930/wave-hello-all-it-seems-etl-has-been-a-topic-that-has-come-u/ ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/17/2024, 11:55 AMancient-wolf-19325
06/17/2024, 11:55 AMfile_paths
from a configuration object that is passed to the workflow as a parameter. This allows you to dynamically configure your workflow based on external inputs.
Here's an example of how you can achieve this:
1. Define a configuration class: Use a class to encapsulate the configuration, including the file paths.
2. Pass the configuration object to the workflow: Use this configuration object to initialize the file sensors and pass the file paths to the task.
Here's how you can implement this:
from typing import List
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define a configuration class using Pydantic
class Config(BaseModel):
file_paths: List[str]
@task
def process_data(file_paths: List[str]):
for file_path in file_paths:
print(f"Processing file: {file_path}")
@workflow
def my_workflow(config: Config):
sensors = []
for file_path in config.file_paths:
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data(file_paths=config.file_paths)
if __name__ == "__main__":
# Example configuration
config = Config(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
my_workflow(config=config)
In this example:
- A Config
class is defined using Pydantic to encapsulate the file paths.
- The my_workflow
function takes a Config
object as a parameter.
- The workflow creates file sensors based on the paths in the Config
object.
- The process_data
task processes the files after all sensors have confirmed the files are available.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit Pydantic Integration ๐ 1 button ๐ 0 button ๐ก Hint: Mention @ancient-wolf-19325 in the thread for followups.millions-night-34157
06/18/2024, 3:07 PMancient-wolf-19325
06/18/2024, 3:07 PMFileSensor
to detect files appearing in your local or remote filesystem.
Here is the link to the example: file_sensor_example.py
This example includes:
1. Importing necessary libraries.
2. Creating a FileSensor
task.
3. Using the FileSensor
within a workflow to wait for a file before executing a subsequent task.
Here's a snippet from the example:
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Create a FileSensor task
sensor = FileSensor(name="test_file_sensor")
@task
def t1():
print("SUCCEEDED")
@workflow
def wf():
sensor(path="s3://my-s3-bucket/file.txt") >> t1()
if __name__ == "__main__":
wf()
For more details, you can refer to the full example in the Flyte GitHub repository.
Would you like more detail on any specific part of this example?
Sources:
- file_sensor_example.py
- Sensor Example Documentation ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 3:08 PMmillions-night-34157
06/18/2024, 3:08 PMancient-wolf-19325
06/18/2024, 3:08 PMFileSensor
inside a workflow in Flyte. This allows you to dynamically create sensors based on the workflow's logic or inputs. Below is an example demonstrating how to create instances of FileSensor
within a workflow:
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
@task
def process_data():
print("Processing data after all files are available")
@workflow
def my_workflow(file_paths: list):
sensors = []
for file_path in file_paths:
# Create a FileSensor instance dynamically inside the workflow
sensor = FileSensor(name=f"sensor_{file_path}")
sensors.append(sensor(path=file_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
if __name__ == "__main__":
my_workflow(file_paths=["/path/to/file1", "/path/to/file2", "/path/to/file3"])
In this example:
1. The my_workflow
function takes a list of file paths as input.
2. Inside the workflow, a FileSensor
instance is created for each file path.
3. The sensors are chained together to ensure that the process_data
task only runs after all files are available.
This approach allows you to dynamically create and manage file sensors based on the workflow's input parameters.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 3:20 PMancient-wolf-19325
06/18/2024, 3:20 PMConfig
object and create a FileSensor
for each path, you can follow these steps:
1. Define the Config
class: Use a class to encapsulate the configuration, including the hashmap.
2. Pass the Config
object to the sub-workflow: Use this configuration object to initialize the file sensors and iterate over the hashmap values.
Here's an example of how you can achieve this:
from typing import Dict
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define a configuration class using Pydantic
class Config(BaseModel):
paths: Dict[str, str]
@task
def process_data():
print("Processing data after all files are available")
@workflow
def sub_workflow(config: Config):
sensors = []
for path_name, s3_path in config.paths.items():
# Create a FileSensor instance dynamically inside the workflow
sensor = FileSensor(name=f"sensor_{path_name}")
sensors.append(sensor(path=s3_path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
@workflow
def main_workflow(config: Config):
sub_workflow(config=config)
if __name__ == "__main__":
# Example configuration
config = Config(paths={
"file1": "s3://my-s3-bucket/file1.txt",
"file2": "s3://my-s3-bucket/file2.txt",
"file3": "s3://my-s3-bucket/file3.txt"
})
main_workflow(config=config)
In this example:
1. The Config
class is defined using Pydantic to encapsulate the hashmap of paths.
2. The sub_workflow
function takes a Config
object as a parameter and iterates over the hashmap to create FileSensor
instances for each path.
3. The main_workflow
function calls the sub_workflow
and passes the Config
object to it.
This approach allows you to dynamically create and manage file sensors based on the configuration provided in the Config
object.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 3:26 PMmillions-night-34157
06/18/2024, 3:26 PMancient-wolf-19325
06/18/2024, 3:26 PMPromise
objects. This means that you cannot directly use these Promise
objects to create FileSensor
instances. Instead, you need to ensure that the paths are materialized before creating the sensors.
To handle this, you can use a combination of tasks and workflows. First, materialize the paths in a task and then create the sensors in the workflow. Hereโs how you can achieve this:
1. Define the Config
class: Use a class to encapsulate the configuration, including the hashmap.
2. Materialize the paths in a task: Create a task to extract the paths from the Config
object.
3. Create sensors in the workflow: Use the materialized paths to create the sensors.
Hereโs an example:
from typing import Dict, List
from pydantic import BaseModel
from flytekit import task, workflow
from flytekit.sensor.file_sensor import FileSensor
# Define a configuration class using Pydantic
class Config(BaseModel):
paths: Dict[str, str]
@task
def extract_paths(config: Config) -> List[str]:
return list(config.paths.values())
@task
def process_data():
print("Processing data after all files are available")
@workflow
def sub_workflow(config: Config):
# Extract paths from the config
paths = extract_paths(config=config)
sensors = []
for path in paths:
# Create a FileSensor instance dynamically inside the workflow
sensor = FileSensor(name=f"sensor_{path}")
sensors.append(sensor(path=path))
# Chain all sensors to ensure all files are available before processing data
final_sensor = sensors[0]
for sensor in sensors[1:]:
final_sensor = final_sensor >> sensor
final_sensor >> process_data()
@workflow
def main_workflow(config: Config):
sub_workflow(config=config)
if __name__ == "__main__":
# Example configuration
config = Config(paths={
"file1": "s3://my-s3-bucket/file1.txt",
"file2": "s3://my-s3-bucket/file2.txt",
"file3": "s3://my-s3-bucket/file3.txt"
})
main_workflow(config=config)
In this example:
1. The Config
class is defined using Pydantic to encapsulate the hashmap of paths.
2. The extract_paths
task materializes the paths from the Config
object.
3. The sub_workflow
function takes the materialized paths and creates FileSensor
instances for each path.
This approach ensures that the paths are materialized before creating the sensors, allowing you to handle Promise
objects correctly.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Sensor Example
- Flytekit FileSensor ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 5:14 PMancient-wolf-19325
06/18/2024, 5:14 PMpytest
framework along with Flyte's testing utilities. Flyte provides tools like task_mock
and patch
to facilitate the testing of tasks, especially those that depend on external resources or services.
Hereโs a step-by-step guide to unit test a Flyte task:
1. Define the Task: Create a Flyte task that you want to test.
2. Use pytest
for Testing: Write test cases using the pytest
framework.
3. Mock External Dependencies: Use Flyte's task_mock
or patch
utilities to mock external dependencies.
Example Task
Let's assume we have a simple Flyte task that processes some data:
from flytekit import task
@task
def process_data(a: int, b: int) -> int:
return a + b
Unit Test
Hereโs how you can write a unit test for the process_data
task using `pytest`:
import pytest
from flytekit.testing import task_mock
from my_module import process_data # Import the task from your module
def test_process_data():
# Directly call the task function with test inputs
result = process_data(a=3, b=4)
assert result == 7
# To run the test, use the pytest command in your terminal:
# pytest -v
Mocking External Dependencies
If your task depends on external resources, you can use task_mock
to mock these dependencies. Hereโs an example:
from flytekit import task, workflow
from flytekit.testing import task_mock
@task
def fetch_data() -> int:
# Simulate fetching data from an external source
return 42
@task
def process_data(a: int, b: int) -> int:
return a + b
@workflow
def my_workflow() -> int:
data = fetch_data()
result = process_data(a=data, b=10)
return result
def test_my_workflow():
with task_mock(fetch_data) as mock:
mock.return_value = 5
result = my_workflow()
assert result == 15
# To run the test, use the pytest command in your terminal:
# pytest -v
In this example:
1. fetch_data
Task: This task simulates fetching data from an external source.
2. process_data
Task: This task processes the fetched data.
3. my_workflow
Workflow: This workflow chains the tasks together.
4. test_my_workflow
Test: This test uses task_mock
to mock the fetch_data
task, ensuring that it returns a controlled value (5) during the test.
Additional Resources
- Flytekit Testing Documentation
- Flytekit GitHub Repository
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Testing Documentation
- Flytekit GitHub Repository ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 5:16 PMancient-wolf-19325
06/18/2024, 5:16 PMprocess_data
function and ensure that it processes data correctly, you can directly call the function within your test case and assert the expected output. Since process_data
is a simple function that adds two integers, you don't need to mock any external dependencies for this specific task.
Here's how you can write a unit test for the process_data
task using `pytest`:
Example Task
from flytekit import task
@task
def process_data(a: int, b: int) -> int:
return a + b
Unit Test
Create a test file, for example, test_process_data.py
, and write the following test case:
import pytest
from my_module import process_data # Import the task from your module
def test_process_data():
# Test case 1: Normal inputs
result = process_data(a=3, b=4)
assert result == 7
# Test case 2: Zero inputs
result = process_data(a=0, b=0)
assert result == 0
# Test case 3: Negative inputs
result = process_data(a=-1, b=-1)
assert result == -2
# Test case 4: Mixed inputs
result = process_data(a=-1, b=1)
assert result == 0
# To run the test, use the pytest command in your terminal:
# pytest -v test_process_data.py
In this example:
1. Import the Task: Import the process_data
task from your module.
2. Define Test Cases: Write multiple test cases to cover different scenarios, such as normal inputs, zero inputs, negative inputs, and mixed inputs.
3. Run the Test: Use the pytest
command to run the test and verify that the process_data
function returns the expected results.
Running the Test
To run the test, navigate to the directory containing your test file and execute the following command in your terminal:
pytest -v test_process_data.py
This will run the test cases and provide detailed output on the results.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Testing Documentation
- Flytekit GitHub Repository ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 6:53 PMclass Config:
def __init__(self):
self.dummy_data = {"a": 1, "b": 2, "c": 3}
self.const_val = 9999
def __str__(self):
return f"Config: {self.dummy_data}"
ancient-wolf-19325
06/18/2024, 6:53 PMTypeTransformer
for the Config
class, you need to define a custom transformer that handles the serialization and deserialization of the Config
object. This involves converting the Config
object to a Flyte Literal
when passing it as an input or output of a task, and converting it back to a Config
object when receiving it.
Hereโs how you can implement the TypeTransformer
for the Config
class:
1. Define the Config
class: Ensure the class is defined as per your requirements.
2. Implement the TypeTransformer
: Create a custom transformer that handles the serialization and deserialization.
3. Register the TypeTransformer
: Register the transformer with Flyte's TypeEngine
.
Step 1: Define the Config
Class
class Config:
def __init__(self):
self.dummy_data = {"a": 1, "b": 2, "c": 3}
self.const_val = 9999
def __str__(self):
return f"Config: {self.dummy_data}"
Step 2: Implement the TypeTransformer
from flytekit import task, workflow
from flytekit.extend import TypeTransformer, TypeEngine
from flytekit.models.literals import Literal, Scalar, Primitive, LiteralMap
from flytekit.models.types import LiteralType, SimpleType
from typing import Type
class ConfigTransformer(TypeTransformer[Config]):
def __init__(self):
super().__init__(name="ConfigTransformer", t=Config)
def get_literal_type(self, t: Type[Config]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.INTEGER))
def to_literal(self, ctx, python_val: Config, python_type: Type[Config], expected: LiteralType) -> Literal:
dummy_data_literals = {k: Literal(scalar=Scalar(primitive=Primitive(integer_value=v))) for k, v in python_val.dummy_data.items()}
const_val_literal = Literal(scalar=Scalar(primitive=Primitive(integer_value=python_val.const_val)))
return Literal(map=LiteralMap(literals={**dummy_data_literals, "const_val": const_val_literal}))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[Config]) -> Config:
dummy_data = {k: v.scalar.primitive.integer_value for k, v in lv.map.literals.items() if k != "const_val"}
const_val = lv.map.literals["const_val"].scalar.primitive.integer_value
config = Config()
config.dummy_data = dummy_data
config.const_val = const_val
return config
# Register the transformer
TypeEngine.register(ConfigTransformer())
Step 3: Use the Config
Class in a Task
Now you can use the Config
class in a Flyte task:
@task
def process_config(config: Config) -> str:
return str(config)
@workflow
def my_workflow() -> str:
config = Config()
return process_config(config=config)
if __name__ == "__main__":
print(my_workflow())
Explanationancient-wolf-19325
06/18/2024, 6:54 PMConfigTransformer
Class: This class extends TypeTransformer
and implements the required methods: get_literal_type
, to_literal
, and to_python_value
.
2. get_literal_type
Method: Defines the Flyte LiteralType
for the Config
object. In this case, it uses a map of integer values.
3. to_literal
Method: Converts the Config
object to a Flyte Literal
. It serializes the dummy_data
dictionary and const_val
into a LiteralMap
.
4. to_python_value
Method: Converts a Flyte Literal
back into a Config
object. It deserializes the LiteralMap
into the dummy_data
dictionary and const_val
.
5. Register the Transformer: Registers the custom transformer with Flyte's TypeEngine
.
This setup allows you to use the Config
class as an input and output type in Flyte tasks and workflows.
Would you like more detail on any specific part of this process?
Sources:
- Flytekit TypeTransformer Documentation
- Flytekit Custom Types Example ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 7:55 PMclass MyData:
def __init__(self, data_path: str, sensor_path: str):
self.data_path = data_path
self.sensor_path = sensor_path
class MyDataList:
def __init__(self, data_paths: List[str], sensor_path: str):
self.data_paths = data_paths
self.sensor_path = sensor_path
class Config:
def __init__(self, data_sets: Dict[str, Union[MyData, MyDataList]]):
self.data_sets = data_sets
ancient-wolf-19325
06/18/2024, 7:55 PMTypeTransformer
classes for MyData
, MyDataList
, and Config
, we need to handle the serialization and deserialization of these custom types into Flyte Literal
types. This involves converting the custom objects to Flyte Literal
representations when passing them as inputs or outputs of tasks, and converting them back to the custom objects when receiving them.
Here's how you can implement the TypeTransformer
for these classes:
Step 1: Define the Custom Classes
from typing import List, Dict, Union
class MyData:
def __init__(self, data_path: str, sensor_path: str):
self.data_path = data_path
self.sensor_path = sensor_path
class MyDataList:
def __init__(self, data_paths: List[str], sensor_path: str):
self.data_paths = data_paths
self.sensor_path = sensor_path
class Config:
def __init__(self, data_sets: Dict[str, Union[MyData, MyDataList]]):
self.data_sets = data_sets
Step 2: Implement the TypeTransformer
for Each Class
MyDataTransformer
from flytekit import task, workflow
from flytekit.extend import TypeTransformer, TypeEngine
from flytekit.models.literals import Literal, Scalar, Primitive, LiteralMap
from flytekit.models.types import LiteralType, SimpleType
from typing import Type
class MyDataTransformer(TypeTransformer[MyData]):
def __init__(self):
super().__init__(name="MyDataTransformer", t=MyData)
def get_literal_type(self, t: Type[MyData]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))
def to_literal(self, ctx, python_val: MyData, python_type: Type[MyData], expected: LiteralType) -> Literal:
return Literal(map=LiteralMap(literals={
"data_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.data_path))),
"sensor_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.sensor_path)))
}))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[MyData]) -> MyData:
data_path = lv.map.literals["data_path"].scalar.primitive.string_value
sensor_path = lv.map.literals["sensor_path"].scalar.primitive.string_value
return MyData(data_path=data_path, sensor_path=sensor_path)
# Register the transformer
TypeEngine.register(MyDataTransformer())
MyDataListTransformer
class MyDataListTransformer(TypeTransformer[MyDataList]):
def __init__(self):
super().__init__(name="MyDataListTransformer", t=MyDataList)
def get_literal_type(self, t: Type[MyDataList]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))
def to_literal(self, ctx, python_val: MyDataList, python_type: Type[MyDataList], expected: LiteralType) -> Literal:
data_paths_literals = [Literal(scalar=Scalar(primitive=Primitive(string_value=path))) for path in python_val.data_paths]
ancient-wolf-19325
06/18/2024, 7:56 PMreturn Literal(map=LiteralMap(literals={
"data_paths": Literal(collection=LiteralCollection(literals=data_paths_literals)),
"sensor_path": Literal(scalar=Scalar(primitive=Primitive(string_value=python_val.sensor_path)))
}))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[MyDataList]) -> MyDataList:
data_paths = [lit.scalar.primitive.string_value for lit in lv.map.literals["data_paths"].collection.literals]
sensor_path = lv.map.literals["sensor_path"].scalar.primitive.string_value
return MyDataList(data_paths=data_paths, sensor_path=sensor_path)
# Register the transformer
TypeEngine.register(MyDataListTransformer())
ConfigTransformer
class ConfigTransformer(TypeTransformer[Config]):
def __init__(self):
super().__init__(name="ConfigTransformer", t=Config)
def get_literal_type(self, t: Type[Config]) -> LiteralType:
return LiteralType(map_value_type=LiteralType(simple=SimpleType.STRING))
def to_literal(self, ctx, python_val: Config, python_type: Type[Config], expected: LiteralType) -> Literal:
data_sets_literals = {}
for key, value in python_val.data_sets.items():
if isinstance(value, MyData):
data_sets_literals[key] = MyDataTransformer().to_literal(ctx, value, MyData, expected)
elif isinstance(value, MyDataList):
data_sets_literals[key] = MyDataListTransformer().to_literal(ctx, value, MyDataList, expected)
return Literal(map=LiteralMap(literals=data_sets_literals))
def to_python_value(self, ctx, lv: Literal, expected_python_type: Type[Config]) -> Config:
data_sets = {}
for key, value in lv.map.literals.items():
if "data_paths" in value.map.literals:
data_sets[key] = MyDataListTransformer().to_python_value(ctx, value, MyDataList)
else:
data_sets[key] = MyDataTransformer().to_python_value(ctx, value, MyData)
return Config(data_sets=data_sets)
# Register the transformer
TypeEngine.register(ConfigTransformer())
Step 3: Use the Custom Types in a Task
Now you can use the Config
class and its nested custom types in a Flyte task:
@task
def process_config(config: Config) -> str:
return str(config)
@workflow
def my_workflow() -> str:
my_data = MyData(data_path="<s3://my-bucket/data.csv>", sensor_path="<s3://my-bucket/sensor.csv>")
my_data_list = MyDataList(data_paths=["<s3://my-bucket/data1.csv>", "<s3://my-bucket/data2.csv>"], sensor_path="<s3://my-bucket/sensor.csv>")
config = Config(data_sets={"dataset1": my_data, "dataset2": my_data_list})
return process_config(config=config)
if __name__ == "__main__":
print(my_workflow())
Explanation
1. MyDataTransformer
Class: Handles the serialization and deserialization of the MyData
class.ancient-wolf-19325
06/18/2024, 7:56 PMMyDataListTransformer
Class: Handles the serialization and deserialization of the MyDataList
class.
3. ConfigTransformer
Class: Handles the serialization and deserialization of the Config
class, which contains a dictionary of MyData
and MyDataList
objects.
4. Register the Transformers: Registers the custom transformers with Flyte's TypeEngine
.
This setup allows you to use the Config
, MyData
, and MyDataList
classes as input and output types in Flyte tasks and workflows.
Would you like more detail on any specific part of this process? ๐ 0 button ๐ 0 button ๐ก Hint: Mention @runllm in the thread for followups.millions-night-34157
06/18/2024, 9:14 PM@dataclass_json
@dataclass
class ExecConfig:
partition_date: str
datasets: dict[str, Union[MyData, MyDataList]]
ancient-wolf-19325
06/18/2024, 9:14 PM@dataclass_json
from the dataclasses_json
library. This means that you do not need to write a custom TypeTransformer
for the ExecConfig
class if it is annotated with @dataclass_json
and @dataclass
. Flyte will automatically handle the serialization and deserialization of this dataclass.
Hereโs how you can define the ExecConfig
class and use it in a Flyte task:
Define the ExecConfig
Class
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from typing import Dict, Union
@dataclass_json
@dataclass
class MyData:
data_path: str
sensor_path: str
@dataclass_json
@dataclass
class MyDataList:
data_paths: list[str]
sensor_path: str
@dataclass_json
@dataclass
class ExecConfig:
partition_date: str
datasets: Dict[str, Union[MyData, MyDataList]]
Use the ExecConfig
Class in a Flyte Task
from flytekit import task, workflow
@task
def process_exec_config(config: ExecConfig) -> str:
return f"Partition Date: {config.partition_date}, Datasets: {config.datasets}"
@workflow
def my_workflow() -> str:
my_data = MyData(data_path="<s3://my-bucket/data.csv>", sensor_path="<s3://my-bucket/sensor.csv>")
my_data_list = MyDataList(data_paths=["<s3://my-bucket/data1.csv>", "<s3://my-bucket/data2.csv>"], sensor_path="<s3://my-bucket/sensor.csv>")
exec_config = ExecConfig(partition_date="2023-10-01", datasets={"dataset1": my_data, "dataset2": my_data_list})
return process_exec_config(config=exec_config)
if __name__ == "__main__":
print(my_workflow())
Explanation
1. @dataclass_json
and @dataclass
Annotations: These annotations enable Flyte to automatically handle the serialization and deserialization of the ExecConfig
class.
2. process_exec_config
Task: This task takes an ExecConfig
object as input and processes it.
3. my_workflow
Workflow: This workflow creates an ExecConfig
object and passes it to the process_exec_config
task.
Running the Workflow
To run the workflow, simply execute the script:
python my_script.py
Flyte will automatically handle the conversion of the ExecConfig
object to and from its serialized form.
Additional Resources
- Flytekit Dataclass Support
- dataclasses_json Documentation
Would you like more detail on any specific part of this process?
Sources:
- Flytekit Dataclass Support
- dataclasses_json Documentation ๐ 1 button ๐ 0 button ๐ก Hint: Mention @ancient-wolf-19325 in the thread for followups.