Luís Rosário
01/02/2024, 2:55 PMe.g settings.yaml
default:
processing:
some_list: [1,2,3,4]
entities:
swt3fsd1234: [1, 3]
fsgr545646: [5, 6]
raw_data:
start_date: '2023-09-02'
@task(container_image="{{.image.my_img.fqn}}:{{.image.my_img.version}}")
def read_config(config_file_path: str) -> Dynaconf:
settings = Dynaconf(
settings_files=[config_file_path], environments=True
)
return settings
@task(container_image="{{.image.my_img.fqn}}:{{.image.my_img.version}}")
def task1(some_list: List[int], some_dict: Dict[str, List[int]], start_date: str) -> int:
...
@workflow
def sub_wf1(dynaconf_obj: Dynaconf) -> int:
res=task1(
some_list=dynaconf_obj.processing.some_list,
some_dict=dynaconf_obj.entities.some_dict,
start_date=dynaconf_obj.raw_data.start_date,
)
return res
@workflow
def main_wf(config_file_path: str)
dynaconf_obj = read_config(
config_file_path=config_file_path,
)
a=sub_wf1(dynaconf_obj=dynaconf_obj)
However inside the sub-workflows whenever I try to declare clearly defined interfaces for my tasks (i.e. not passing the entire Dynaconf object but rather having defined parameters like 'start_date', 'some_list' which are defined in the Dynaconf object) I start to encounter issues. By solely using Dynaconf I realized that its attribute types cannot be assessed since they are lazily loaded:
Failed with Exception Code: SYSTEM:Unknown
RPC Failed, with Status: StatusCode.INTERNAL
details: failed to compile workflow for [resource_type:WORKFLOW project:"flytesnacks" domain:"development" name:"pipeline.sub_wf1" version:"fohfPx4j0h6drOlX4u5gHA==" ] with err failed to compile workflow with err Collected Errors: 6
Error 0: Code: MismatchingTypes, Node Id: n0, Description: Variable [settings] (type [blob:<format:"PythonPickle" > metadata:<fields:<key:"python_class_name" value:<string_value:"<class 'dynaconf.base.LazySettings'>" > > > ]) doesn't match expected type [collection_type:<simple:INTEGER > ].
Given this I tried to parse these Dynaconf attributes into a dataclass before sending them to the sub-wf in order for flyte to know apriori which are the expected types for the parameters required:
@dataclass
class ConfigObj(DataClassJSONMixin):
start_date: str
some_list: List[int]
some_dict: Dict[str, List[int]]
@task(container_image="{{.image.my_img.fqn}}:{{.image.my_img.version}}")
def read_config(config_file_path: str) -> ConfigObj:
settings = Dynaconf(
settings_files=[config_file_path], environments=True
)
dataclass_obj= ConfigObj(
start_date=settings.raw_data.start_date,
some_list=settings.processing.some_list,
some_dict=settings.entities.some_dict,
)
return dataclass_obj
@workflow
def sub_wf1(dataclass_obj: ConfigObj) -> int:
res=task1(
some_list=dataclass_obj.some_list,
some_dict=dataclass_obj.some_dict,
start_date=dataclass_obj.start_date,
)
return res
@workflow
def main_wf(config_file_path: str)
dataclass_obj = read_config(
config_file_path=config_file_path,
)
a=sub_wf1(dataclass_obj=dataclass_obj)
In this way Flyte is unable to serialize "complex" types that are non literals like dicts and lists (got this error).
Failed with Unknown Exception <class 'TypeError'> Reason: Encountered error while executing workflow 'pipeline.main_wf':
Error encountered while executing 'main_wf':
Failed to convert inputs of task 'src.data_acquisition.data_acquisition.sub_wf1':
Failed argument 'some_list': Failed to convert to generic protobuf struct
Encountered error while executing workflow 'pipeline.main_wf':
Error encountered while executing 'main_wf':
Failed to convert inputs of task 'src.data_acquisition.data_acquisition.sub_wf1':
Failed argument 'some_list': Failed to convert to generic protobuf struct
1- Is there a way to make this pattern work rather than just send the whole object (dynaconf or dataclass) as the sole parameter to the @tasks? I wanted to follow this approach in the event of a manual re-run on the flyte console to be easier to fill out the re-run task form and keep the visibility of the parameters used throughout the pipeline flow.
2- Besides this, anyone has any tips on using a dynaconf with flyte?
Thanks in advance and sorry for the long post 🙂Samhita Alla