Hello flyte community, I’m facing the following bu...
# ask-the-community
k
Hello flyte community, I’m facing the following bug: I’ve defined a task function:
Copy code
@task
def to_training_config(cfg: DictConfig) -> TrainingConfig:
    return TrainingConfig(**cfg)
TrainingConfig
is simply a custom
dataclass_json
object. This function simply aims to convert a
DictConfig
object to the dataclass so I can reuse it later within my workflow. I am getting the following error:
Copy code
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/opt/venv/lib/python3.7/site-packages/flytekit/core/base_task.py in dispatch_execute(self, ctx, input_literal_map)
    521                 try:
--> 522                     literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
    523                 except Exception as e:

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(cls, ctx, python_val, python_type, expected)
    751 
--> 752         lv = transformer.to_literal(ctx, python_val, python_type, expected)
    753 

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(self, ctx, python_val, python_type, expected)
    349             )
--> 350         self._serialize_flyte_type(python_val, python_type)
    351         return Literal(

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
    397                 field_type = v.type
--> 398                 python_val.__setattr__(v.name, self._serialize_flyte_type(val, field_type))
    399             return python_val

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
    395             for v in dataclasses.fields(python_type):
--> 396                 val = python_val.__getattribute__(v.name)
    397                 field_type = v.type

AttributeError: 'DictConfig' object has no attribute 'name'
Note
: this works perfectly fine when I remove the
@task
decorator. Any idea what could be the cause?
k
Interesting
What’s a dictconfig
k
It’s from the omegaconf library. It’s basically a more rigid dictionary, which is quite useful for configurations https://omegaconf.readthedocs.io/en/2.0_branch/usage.html
k
Interesting we should add a type transformer for it
Then we won’t need to change it at all. It seems It is being pickled
k
so the pickling is what’s causing the error right?
k
Not sure, but a guess
Could also be the dataclass json decorator screwing things uo
k
so fun fact: i added the
dataclass_json
decorator because it raised the following error before I did:
Copy code
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
/tmp/ipykernel_31113/2006354963.py in <module>
----> 1 from global_search_training.tasks.config import get_generic_config, print_training_config
      2 from global_search_training.schema.super_schema import TrainingConfig, to_training_config
      3 from omegaconf import DictConfig, OmegaConf

/workspaces/global_search_training/global_search_training/tasks/config.py in <module>
      4 from flytekit import task
      5 from hydra.core.global_hydra import GlobalHydra
----> 6 from global_search_training.schema.super_schema import TrainingConfig
      7 from omegaconf import DictConfig, OmegaConf
      8 from hydra.core.config_store import ConfigStore

/workspaces/global_search_training/global_search_training/schema/super_schema.py in <module>
     18 
     19 @task
---> 20 def to_training_config(cfg: DictConfig) -> TrainingConfig:
     21     return TrainingConfig(**cfg)
     22 

/opt/venv/lib/python3.7/site-packages/flytekit/core/task.py in task(_task_function, task_config, cache, cache_serialize, cache_version, retries, interruptible, deprecated, timeout, container_image, environment, requests, limits, secret_requests, execution_mode, task_resolver, disable_deck)
    210 
    211     if _task_function:
--> 212         return wrapper(_task_function)
    213     else:
    214         return wrapper

/opt/venv/lib/python3.7/site-packages/flytekit/core/task.py in wrapper(fn)
    204             execution_mode=execution_mode,
    205             task_resolver=task_resolver,
--> 206             disable_deck=disable_deck,
    207         )
    208         update_wrapper(task_instance, fn)

/opt/venv/lib/python3.7/site-packages/flytekit/core/tracker.py in __call__(cls, *args, **kwargs)
     33 
     34     def __call__(cls, *args, **kwargs):
---> 35         o = super(InstanceTrackingMeta, cls).__call__(*args, **kwargs)
     36         o._instantiated_in = InstanceTrackingMeta._find_instance_module()
     37         return o

/opt/venv/lib/python3.7/site-packages/flytekit/core/python_function_task.py in __init__(self, task_config, task_function, task_type, ignore_input_vars, execution_mode, task_resolver, **kwargs)
    125             task_config=task_config,
    126             task_resolver=task_resolver,
--> 127             **kwargs,
    128         )
    129 

/opt/venv/lib/python3.7/site-packages/flytekit/core/python_auto_container.py in __init__(self, name, task_config, task_type, container_image, requests, limits, environment, task_resolver, secret_requests, **kwargs)
     76             task_config=task_config,
     77             security_ctx=sec_ctx,
---> 78             **kwargs,
     79         )
     80         self._container_image = container_image

/opt/venv/lib/python3.7/site-packages/flytekit/core/base_task.py in __init__(self, task_type, name, task_config, interface, environment, disable_deck, **kwargs)
    384             task_type=task_type,
    385             name=name,
--> 386             interface=transform_interface_to_typed_interface(interface),
    387             **kwargs,
    388         )

/opt/venv/lib/python3.7/site-packages/flytekit/core/interface.py in transform_interface_to_typed_interface(interface)
    218 
    219     inputs_map = transform_variable_map(interface.inputs, input_descriptions)
--> 220     outputs_map = transform_variable_map(interface.outputs, output_descriptions)
    221     return _interface_models.TypedInterface(inputs_map, outputs_map)
    222 

/opt/venv/lib/python3.7/site-packages/flytekit/core/interface.py in transform_variable_map(variable_map, descriptions)
    326     if variable_map:
    327         for k, v in variable_map.items():
--> 328             res[k] = transform_type(v, descriptions.get(k, k))
    329             sub_type: Type[T] = v
    330             if hasattr(v, "__origin__") and hasattr(v, "__args__"):

/opt/venv/lib/python3.7/site-packages/flytekit/core/interface.py in transform_type(x, description)
    344 
    345 def transform_type(x: type, description: str = None) -> _interface_models.Variable:
--> 346     return _interface_models.Variable(type=TypeEngine.to_literal_type(x), description=description)
    347 
    348 

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal_type(cls, python_type)
    708         """
    709         transformer = cls.get_transformer(python_type)
--> 710         res = transformer.get_literal_type(python_type)
    711         data = None
    712         if get_origin(python_type) is Annotated:

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in get_literal_type(self, t)
    317         if not issubclass(t, DataClassJsonMixin):
    318             raise AssertionError(
--> 319                 f"Dataclass {t} should be decorated with @dataclass_json to be " f"serialized correctly"
    320             )
    321         schema = None

AssertionError: Dataclass <class 'global_search_training.schema.super_schema.TrainingConfig'> should be decorated with @dataclass_json to be serialized correctly
interesting bugs 😅
s
Not sure why the field is of type
DictConfig
. What’s the output of
TrainingConfig(**cfg)
generally?
k
@Samhita Alla I tried to change the field to
dict
but the error became
Copy code
AttributeError: 'dict' object has no attribute 'name'
Let’s say I define a class like this
Copy code
@dataclass_json
@dataclass
class TrainingConfig:
    environment: str
Then I would declare the object by doing
Copy code
cfg = {'enviromnent': 'staging'}
training_config = TrainingConfig(**cfg}
output should be like
Copy code
TrainingConfig(enviroment='staging')
s
Yeah. Let me repro it.
Copy code
from dataclasses import dataclass

from dataclasses_json import dataclass_json
from flytekit import task, workflow
from omegaconf import DictConfig, OmegaConf


@dataclass_json
@dataclass
class TrainingConfig:
    environment: str


@task
def t1(cfg: DictConfig) -> TrainingConfig:
    return TrainingConfig(**cfg)


@workflow
def wf(cfg: DictConfig = OmegaConf.create({"environment": "staging"})):
    t1(cfg=cfg)


wf()
This works for me. Isn’t this what you’re trying to do? Is it working locally for you?
k
ok sorry @Samhita Alla, my bad for not framing the problem exactly the same. I think the problem comes from the fact that I have nested classes. Here’s something more similar to my case where I can reproduce the bug
Copy code
from dataclasses import dataclass

from dataclasses_json import dataclass_json
from flytekit import task, workflow
from omegaconf import DictConfig, OmegaConf
from global_search_training.schema.primary_schema import GCPConfig

@dataclass_json
@dataclass
class TrainingConfig:
    environment: str
    gcp: str

@dataclass_json
@dataclass    
class NestedClass:
    name: str
    training: TrainingConfig

@task
def t1(cfg: DictConfig) -> NestedClass:
    return NestedClass(**cfg)


@workflow
def wf(cfg: DictConfig = OmegaConf.create({'name': 'staging', 'training':{'environment': 'staging', 'gcp': 'test'}})):
    t1(cfg=cfg)


wf()
Copy code
AttributeError                            Traceback (most recent call last)
/opt/venv/lib/python3.7/site-packages/flytekit/core/base_task.py in dispatch_execute(self, ctx, input_literal_map)
    521                 try:
--> 522                     literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
    523                 except Exception as e:

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(cls, ctx, python_val, python_type, expected)
    751 
--> 752         lv = transformer.to_literal(ctx, python_val, python_type, expected)
    753 

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(self, ctx, python_val, python_type, expected)
    349             )
--> 350         self._serialize_flyte_type(python_val, python_type)
    351         return Literal(

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
    397                 field_type = v.type
--> 398                 python_val.__setattr__(v.name, self._serialize_flyte_type(val, field_type))
    399             return python_val

/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
    395             for v in dataclasses.fields(python_type):
--> 396                 val = python_val.__getattribute__(v.name)
    397                 field_type = v.type

AttributeError: 'DictConfig' object has no attribute 'environment'
so the bug is slightly different so this seems to indicate that it’s having issues fetching the key
environment
. Above it was
Copy code
AttributeError: 'DictConfig' object has no attribute 'name'
because
TrainingConfig
is a nested config and one of the dataclasses inside had an attribute called
name
s
Copy code
from dataclasses import dataclass

from dataclasses_json import dataclass_json
from flytekit import task, workflow
from omegaconf import DictConfig, OmegaConf


@dataclass_json
@dataclass
class TrainingConfig:
    environment: str
    gcp: str


@dataclass_json
@dataclass
class NestedClass:
    name: str
    training: TrainingConfig


@task
def t1(cfg: DictConfig) -> NestedClass:
    return NestedClass(
        cfg.name,
        TrainingConfig(**cfg.training),
    )


@workflow
def wf(
    cfg: DictConfig = OmegaConf.create(
        {"name": "staging", "training": {"environment": "staging", "gcp": "test"}}
    )
):
    t1(cfg=cfg)


wf()
This works. I think the error’s cropping up because a dataclass needs to be initialized before sending the arguments.
k
I see!! Is it because flyte works with promise objects? Is it comparable to lazy evaluation in a way?
Ok thanks so much @Samhita Alla!! literally been stuck on this for almost 2 days
s
I see!! Is it because flyte works with promise objects? Is it comparable to lazy evaluation in a way?
It isn’t because of Promises. If you see the following code:
Copy code
from dataclasses import dataclass

from dataclasses_json import dataclass_json
from flytekit import task, workflow
from omegaconf import DictConfig, OmegaConf


@dataclass_json
@dataclass
class TrainingConfig:
    environment: str
    gcp: str


@dataclass_json
@dataclass
class NestedClass:
    name: str
    training: int


cfg: DictConfig = OmegaConf.create(
    {"name": "staging", "training": {"environment": "staging", "gcp": "test"}}
)
print(NestedClass(**cfg))
This works despite
training
type set to
int
cause type validation isn’t done in this case. In the Flyte case, the types are validated at every step and hence, you were seeing the error.
157 Views