Klemens Kasseroller
06/08/2022, 12:33 PMfrom dataclasses import dataclass
from flytekit import task, workflow
from typing import List
from dataclasses_json import dataclass_json
from flytekit.types.file import FlyteFile
@dataclass_json
@dataclass
class InputsContainer:
files: List[FlyteFile]
@task
def task1(inputs: List[FlyteFile]) -> InputsContainer:
print("TASK1 remote source: ", inputs[0].remote_source)
return InputsContainer(files=inputs)
@task
def task2(inputs: InputsContainer) -> None:
print("TASK2 remote source: ", inputs.files[0].remote_source)
@workflow
def main_workflow(inputs: List[FlyteFile]) -> None:
task1_outputs = task1(inputs=inputs)
task2(inputs=task1_outputs)
if __name__ == '__main__':
file_path = FlyteFile("<s3://test-bucket/test.json>")
main_workflow(inputs=[file_path])
The output generated is:
TASK1 remote source: <s3://test-bucket/test.json>
TASK2 remote source: None
Could anyone help me out here? Thanks!katrina
Eduardo Apolinario (eapolinario)
06/09/2022, 12:11 AMKetan (kumare3)
Samhita Alla
@dataclass_json
@dataclass
class InputsContainer:
files: FlyteFile
@task
def task1(inputs: FlyteFile) -> InputsContainer:
print("TASK1 remote source: ", inputs.remote_source)
return InputsContainer(files=inputs)
@task
def task2(inputs: InputsContainer) -> None:
print("TASK2 remote source: ", inputs.files.remote_source)
@workflow
def main_workflow(inputs: FlyteFile) -> None:
task1_outputs = task1(inputs=inputs)
task2(inputs=task1_outputs)
if __name__ == '__main__':
file_path = FlyteFile("<s3://test-bucket/test.json>")
main_workflow(inputs=file_path)
Output:
TASK1 remote source: <s3://test-bucket/test.json>
TASK2 remote source: <s3://test-bucket/test.json>
Ketan (kumare3)
Klemens Kasseroller
06/09/2022, 9:04 AMKevin Su
06/09/2022, 9:07 AM