microscopic-furniture-57275
02/24/2025, 9:23 PM.with_overrides()
for a dynamic task)
I have a (fairly) stripped down repro case, in which the crucial point is needing to serialize an arbitrary python object as part of a dataclass, which we rely on pickle for. Here is a stripped down example of a class+dataclass which serializes correctly with 1.12.0 -- as noted in the comments, this fails in flytekit 1.15.0 with the errors indicated:
class SimpleObjectOriginal():
# Of course we COULD make this a dataclass; this is overly simple for repro purposes,
# and in reality this class is a lot more complex!
def __init__(self, a: Optional[str] = None, b: Optional[int] = None):
self.a = a
self.b = b
def encode_object(object: SimpleObjectOriginal) -> str:
return base64.b64encode(pickle.dumps(object)).decode("utf-8")
def decode_object(object_value: str) -> SimpleObjectOriginal:
return pickle.loads(base64.b64decode(object_value.encode("utf-8")))
@dataclass
class SimpleObjectOriginalMixin(DataClassJsonMixin):
# This is a mixin that adds a SimpleObjectOriginal field to any dataclass.
#
# This works perfectly fine in with flytekit 1.12.0
# In 1.15.0, we get this:
# type_engine.py:622 - Failed to extract schema for
# object <class
# 'plaster.run.priors.SimpleObjectOriginalMixin'>, error:
# Type plaster.run.priors.SimpleObjectOriginal of field
# "simple_object" in
# plaster.run.priors.SimpleObjectOriginalMixin isn't
# supported
# and then this:
# mashumaro.exceptions.UnserializableField: Field "simple_object" of type SimpleObjectOriginal in SimpleObjectOriginalMixin is not serializable
simple_object: SimpleObjectOriginal = field(
default_factory=SimpleObjectOriginal,
metadata=config(
mm_field=marshmallow.fields.String(),
encoder=encode_object,
decoder=decode_object,
),
)
To allow Mashumaro to serialize our field, I finally arrived at making the embedded object a dataclass itself, and inheriting from mashumaro.SerializableType.
I also had to make the field a Union with None, otherwise the msgpack decoder will try to call _serialize on the None value in the case simple_object is None.
@dataclass
class SimpleObject(SerializableType):
def __init__(self, a: Optional[str] = None, b: Optional[int] = None):
self.a = a
self.b = b
def _serialize(self) -> str:
return base64.b64encode(pickle.dumps(self)).decode("utf-8")
@classmethod
def _deserialize(cls, value: str):
if isinstance(value, cls):
# Not sure this is necessary.
return value
return pickle.loads(base64.b64decode(value.encode("utf-8")))
from typing import Optional, Union
@dataclass
class SimpleObjectMixin(DataClassJsonMixin):
# This works fine unless the dataclass is being passed from a dynamic task
# using .with_overrides()
simple_object: Union[SimpleObject, None] = field(
default_factory=SimpleObject,
metadata=config(
mm_field=marshmallow.fields.String(),
encoder=encode_object,
decoder=decode_object,
),
)
Here is the test that illustrates the failure. Note that with a normal workflow, or even with a dynamic that _does not use `with_overrides()`_ this works as expected. But when .with_overrides()
is used, we get an error due to the Union type being used for the field:
def test_simple_object_serialize(self):
@task
def generate_result(with_none: bool) -> SimpleObjectMixin:
return SimpleObjectMixin(simple_object=None if with_none else SimpleObject(a='thomas', b=55))
@task
def check_result(obj: SimpleObjectMixin):
if obj.simple_object is not None:
assert obj.simple_object.a == 'thomas'
assert obj.simple_object.b == 55
@workflow
def test_workflow():
# works as expected
result = generate_result(with_none=False)
check_result(obj=result)
# If the dataclass field is None, then the _serialize() method is called on None
# and we get an error, unless we make the field a Union[SimpleObject, None].
# In the latter case, this too will work:
result = generate_result(with_none=True)
check_result(obj=result)
@dynamic
def test_dynamic():
# This, however, though identical to the workflow above, fails with a different error, due
# to Union[SimpleObject, None] in conjunction with the .with_overrides() call.
#
# check_result(obj=result).with_overrides(limits=Resources(cpu="3", mem="500Mi"))
# E AttributeError: Error encountered while executing 'tests.plaster.run.sigproc_v2.test_result.test_dynamic':
# E 'NoneType' object has no attribute 'with_overrides'
# This will fail with the above whether or not with_none is True or False. It fails as the
# result of the dataclass having a Union[SimpleObject, None] field, which was necessary to
# prevent Marshumaro/Msgpack from trying to call _serialize() on None.
result = generate_result(with_none=False)
check_result(obj=result).with_overrides(limits=Resources(cpu="3", mem="500Mi"))
test_workflow() # this works
test_dynamic() # this works too, unless you use .with_overrides()
I realize that maybe the "best" approach is to abandon the use of pickle in this serialization, but this is a large change for our codebase and existing data, and the error I'm running into would seem to indicate some strange error in how .with_overrides() is being called on the task object, which is quite unexpected since it is just the argument to the task that is changing.
Thoughts?thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
microscopic-furniture-57275
02/27/2025, 5:59 PMwith_overrides()
that is still the problem. This is with flytekit 1.15.0.
Here is the test using your suggested classes - I have used the Mixin class directly to simplify slightly, and I also just init the simple_object with None because you get the same error either way.
def test_simple_object_yee(self):
@task
def generate_result() -> SimpleObjectOriginalMixin:
return SimpleObjectOriginalMixin(simple_object=None)
@task
def check_result(obj: SimpleObjectOriginalMixin):
assert obj.simple_object is None
@dynamic
def test_dynamic(with_overrides: bool):
result = generate_result()
if with_overrides:
check_result(obj=result).with_overrides(
limits=Resources(cpu="3", mem="500Mi")
)
else:
check_result(obj=result)
def test_type_engine(with_none: bool):
p = SimpleObjectOriginalMixin(simple_object=SimpleObjectOriginal(a="a", b=1))
if with_none:
p.simple_object = None
ctx = FlyteContextManager.current_context()
lt = TypeEngine.to_literal_type(SimpleObjectOriginalMixin)
lit = TypeEngine.to_literal(ctx, p, SimpleObjectOriginalMixin, lt)
print(lit)
new_p = TypeEngine.to_python_value(ctx, lit, SimpleObjectOriginalMixin)
print(new_p)
if with_none:
assert new_p.simple_object is None
else:
assert new_p.simple_object.a == p.simple_object.a
assert new_p.simple_object.b == p.simple_object.b
test_type_engine(with_none=False) # this works
test_type_engine(with_none=True) # this works
test_dynamic(with_overrides=False) # this works
test_dynamic(with_overrides=True) # this fails
Here is the error produced for `test_dynamic(with_overrides=True)`:
@dynamic
def test_dynamic():
result = generate_result(with_none=False)
> check_result(obj=result).with_overrides(
limits=Resources(cpu="3", mem="500Mi")
)
E AttributeError: Error encountered while executing 'tests.plaster.run.sigproc_v2.test_result.test_dynamic':
E 'NoneType' object has no attribute 'with_overrides'
microscopic-furniture-57275
03/12/2025, 4:04 PMthankful-minister-83577
thankful-minister-83577
glamorous-carpet-83516
03/18/2025, 9:15 PMmicroscopic-furniture-57275
03/18/2025, 9:16 PMmicroscopic-furniture-57275
03/20/2025, 3:59 PM