white-teacher-47376
06/08/2022, 12:33 PMfrom dataclasses import dataclass
from flytekit import task, workflow
from typing import List
from dataclasses_json import dataclass_json
from flytekit.types.file import FlyteFile
@dataclass_json
@dataclass
class InputsContainer:
files: List[FlyteFile]
@task
def task1(inputs: List[FlyteFile]) -> InputsContainer:
print("TASK1 remote source: ", inputs[0].remote_source)
return InputsContainer(files=inputs)
@task
def task2(inputs: InputsContainer) -> None:
print("TASK2 remote source: ", inputs.files[0].remote_source)
@workflow
def main_workflow(inputs: List[FlyteFile]) -> None:
task1_outputs = task1(inputs=inputs)
task2(inputs=task1_outputs)
if __name__ == '__main__':
file_path = FlyteFile("<s3://test-bucket/test.json>")
main_workflow(inputs=[file_path])
The output generated is:
TASK1 remote source: <s3://test-bucket/test.json>
TASK2 remote source: None
Could anyone help me out here? Thanks!acceptable-policeman-57188
high-accountant-32689
06/09/2022, 12:11 AMfreezing-airport-6809
tall-lock-23197
@dataclass_json
@dataclass
class InputsContainer:
files: FlyteFile
@task
def task1(inputs: FlyteFile) -> InputsContainer:
print("TASK1 remote source: ", inputs.remote_source)
return InputsContainer(files=inputs)
@task
def task2(inputs: InputsContainer) -> None:
print("TASK2 remote source: ", inputs.files.remote_source)
@workflow
def main_workflow(inputs: FlyteFile) -> None:
task1_outputs = task1(inputs=inputs)
task2(inputs=task1_outputs)
if __name__ == '__main__':
file_path = FlyteFile("<s3://test-bucket/test.json>")
main_workflow(inputs=file_path)
Output:
TASK1 remote source: <s3://test-bucket/test.json>
TASK2 remote source: <s3://test-bucket/test.json>
tall-lock-23197
tall-lock-23197
freezing-airport-6809
white-teacher-47376
06/09/2022, 9:04 AMglamorous-carpet-83516
06/09/2022, 9:07 AMglamorous-carpet-83516
06/09/2022, 12:08 PM