https://flyte.org logo
#ask-the-community
Title
# ask-the-community
l

Luís Rosário

01/02/2024, 2:55 PM
Hello! I'm building a pipeline where I have a main workflow which contains several sub-workflows. The main workflow receives as input a path to a yaml file (may be changed to a FlyteFile in the future) which is converted to a Dynaconf object that then serves as the input for the sub-workflows mentioned. Something along these lines:
Copy code
e.g settings.yaml

	default:
		processing:
			some_list: [1,2,3,4]
		entities:
      		swt3fsd1234: [1, 3]
      		fsgr545646: [5, 6]
		raw_data:
			start_date: '2023-09-02'

@task(container_image="{{.image.my_img.fqn}}:{{.image.my_img.version}}")
def read_config(config_file_path: str) -> Dynaconf:
    settings = Dynaconf(
        settings_files=[config_file_path], environments=True
    )

    return settings

@task(container_image="{{.image.my_img.fqn}}:{{.image.my_img.version}}")
def task1(some_list: List[int], some_dict: Dict[str, List[int]], start_date: str) -> int:
	
	...

@workflow
def sub_wf1(dynaconf_obj: Dynaconf) -> int:

	res=task1(
			some_list=dynaconf_obj.processing.some_list,
			some_dict=dynaconf_obj.entities.some_dict,
			start_date=dynaconf_obj.raw_data.start_date,
			 )
    return res


@workflow
def main_wf(config_file_path: str)
	
	dynaconf_obj = read_config(
        config_file_path=config_file_path,
    )

    a=sub_wf1(dynaconf_obj=dynaconf_obj)
However inside the sub-workflows whenever I try to declare clearly defined interfaces for my tasks (i.e. not passing the entire Dynaconf object but rather having defined parameters like 'start_date', 'some_list' which are defined in the Dynaconf object) I start to encounter issues. By solely using Dynaconf I realized that its attribute types cannot be assessed since they are lazily loaded:
Copy code
Failed with Exception Code: SYSTEM:Unknown
RPC Failed, with Status: StatusCode.INTERNAL
details: failed to compile workflow for [resource_type:WORKFLOW project:"flytesnacks" domain:"development" name:"pipeline.sub_wf1" version:"fohfPx4j0h6drOlX4u5gHA==" ] with err failed to compile workflow with err Collected Errors: 6
Error 0: Code: MismatchingTypes, Node Id: n0, Description: Variable [settings] (type [blob:<format:"PythonPickle" > metadata:<fields:<key:"python_class_name" value:<string_value:"<class 'dynaconf.base.LazySettings'>" > > > ]) doesn't match expected type [collection_type:<simple:INTEGER > ].
Given this I tried to parse these Dynaconf attributes into a dataclass before sending them to the sub-wf in order for flyte to know apriori which are the expected types for the parameters required:
Copy code
@dataclass
class ConfigObj(DataClassJSONMixin):
    start_date: str
    some_list: List[int]
 	some_dict: Dict[str, List[int]]


@task(container_image="{{.image.my_img.fqn}}:{{.image.my_img.version}}")
def read_config(config_file_path: str) -> ConfigObj:
    settings = Dynaconf(
        settings_files=[config_file_path], environments=True
    )

    dataclass_obj= ConfigObj(
    			start_date=settings.raw_data.start_date,
    			some_list=settings.processing.some_list,
				some_dict=settings.entities.some_dict,
    			)

    return dataclass_obj


@workflow
def sub_wf1(dataclass_obj: ConfigObj) -> int:

	res=task1(
			some_list=dataclass_obj.some_list,
			some_dict=dataclass_obj.some_dict,
			start_date=dataclass_obj.start_date,
			 )
    return res


@workflow
def main_wf(config_file_path: str)
	
	dataclass_obj = read_config(
        config_file_path=config_file_path,
    )

    a=sub_wf1(dataclass_obj=dataclass_obj)
In this way Flyte is unable to serialize "complex" types that are non literals like dicts and lists (got this error).
Copy code
Failed with Unknown Exception <class 'TypeError'> Reason: Encountered error while executing workflow 'pipeline.main_wf':
  Error encountered while executing 'main_wf':
  Failed to convert inputs of task 'src.data_acquisition.data_acquisition.sub_wf1':
  Failed argument 'some_list': Failed to convert to generic protobuf struct
Encountered error while executing workflow 'pipeline.main_wf':
  Error encountered while executing 'main_wf':
  Failed to convert inputs of task 'src.data_acquisition.data_acquisition.sub_wf1':
  Failed argument 'some_list': Failed to convert to generic protobuf struct
1- Is there a way to make this pattern work rather than just send the whole object (dynaconf or dataclass) as the sole parameter to the @tasks? I wanted to follow this approach in the event of a manual re-run on the flyte console to be easier to fill out the re-run task form and keep the visibility of the parameters used throughout the pipeline flow. 2- Besides this, anyone has any tips on using a dynaconf with flyte? Thanks in advance and sorry for the long post 🙂
s

Samhita Alla

01/04/2024, 10:28 AM
this is a bug. an issue has been created already: https://github.com/flyteorg/flyte/issues/4581