Hi all, I'm debugging an issue related to seriali...
# flyte-support
m
Hi all, I'm debugging an issue related to serialization when trying to update flytekit from 1.12.0 to anything > 1.13.1 -- when some breaking changes involving Mashumaro and Msgpack took place. I'm not sure if I'm trying to do something that can't be done with my approach, or if I've run into a legitimate bug in how flytekit is handling dataclass fields that can be None (specifically when using
.with_overrides()
for a dynamic task) I have a (fairly) stripped down repro case, in which the crucial point is needing to serialize an arbitrary python object as part of a dataclass, which we rely on pickle for. Here is a stripped down example of a class+dataclass which serializes correctly with 1.12.0 -- as noted in the comments, this fails in flytekit 1.15.0 with the errors indicated:
Copy code
class SimpleObjectOriginal():
    # Of course we COULD make this a dataclass; this is overly simple for repro purposes,
    # and in reality this class is a lot more complex!
    def __init__(self, a: Optional[str] = None, b: Optional[int] = None):
        self.a = a
        self.b = b

def encode_object(object: SimpleObjectOriginal) -> str:
    return base64.b64encode(pickle.dumps(object)).decode("utf-8")

def decode_object(object_value: str) -> SimpleObjectOriginal:
    return pickle.loads(base64.b64decode(object_value.encode("utf-8")))


@dataclass
class SimpleObjectOriginalMixin(DataClassJsonMixin):
    # This is a mixin that adds a SimpleObjectOriginal field to any dataclass.
    #
    # This works perfectly fine in with flytekit 1.12.0

    # In 1.15.0, we get this:
    # type_engine.py:622 - Failed to extract schema for      
    #                          object <class                                          
    #                          'plaster.run.priors.SimpleObjectOriginalMixin'>, error:
    #                          Type plaster.run.priors.SimpleObjectOriginal of field  
    #                          "simple_object" in                                     
    #                          plaster.run.priors.SimpleObjectOriginalMixin isn't     
    #                          supported       
    
    # and then this:
    # mashumaro.exceptions.UnserializableField: Field "simple_object" of type SimpleObjectOriginal in SimpleObjectOriginalMixin is not serializable

    simple_object: SimpleObjectOriginal = field(
        default_factory=SimpleObjectOriginal,
        metadata=config(
            mm_field=marshmallow.fields.String(),
            encoder=encode_object,
            decoder=decode_object,
        ),
    )
To allow Mashumaro to serialize our field, I finally arrived at making the embedded object a dataclass itself, and inheriting from
mashumaro.SerializableType.
I also had to make the field a Union with None, otherwise the msgpack decoder will try to call _serialize on the None value in the case simple_object is None.
Copy code
@dataclass
class SimpleObject(SerializableType):
    def __init__(self, a: Optional[str] = None, b: Optional[int] = None):
        self.a = a
        self.b = b

    def _serialize(self) -> str:
        return base64.b64encode(pickle.dumps(self)).decode("utf-8")

    @classmethod
    def _deserialize(cls, value: str):
        if isinstance(value, cls):
            # Not sure this is necessary.
            return value
        return pickle.loads(base64.b64decode(value.encode("utf-8")))



from typing import Optional, Union

@dataclass
class SimpleObjectMixin(DataClassJsonMixin):
    # This works fine unless the dataclass is being passed from a dynamic task
    # using .with_overrides()

    simple_object: Union[SimpleObject, None] = field(
        default_factory=SimpleObject,
        metadata=config(
            mm_field=marshmallow.fields.String(),
            encoder=encode_object,
            decoder=decode_object,
        ),
    )
Here is the test that illustrates the failure. Note that with a normal workflow, or even with a dynamic that _does not use `with_overrides()`_ this works as expected. But when
.with_overrides()
is used, we get an error due to the Union type being used for the field:
Copy code
def test_simple_object_serialize(self):

        @task
        def generate_result(with_none: bool) -> SimpleObjectMixin:
            return SimpleObjectMixin(simple_object=None if with_none else SimpleObject(a='thomas', b=55))

        @task
        def check_result(obj: SimpleObjectMixin):
            if obj.simple_object is not None:
                assert obj.simple_object.a == 'thomas'
                assert obj.simple_object.b == 55

        @workflow
        def test_workflow():
            # works as expected
            result = generate_result(with_none=False)
            check_result(obj=result)

            # If the dataclass field is None, then the _serialize() method is called on None
            # and we get an error, unless we make the field a Union[SimpleObject, None].
            # In the latter case, this too will work:
            result = generate_result(with_none=True)
            check_result(obj=result)

        @dynamic
        def test_dynamic():
            # This, however, though identical to the workflow above, fails with a different error, due
            # to Union[SimpleObject, None] in conjunction with the .with_overrides() call.
            #
            #         check_result(obj=result).with_overrides(limits=Resources(cpu="3", mem="500Mi"))
            # E       AttributeError: Error encountered while executing 'tests.plaster.run.sigproc_v2.test_result.test_dynamic':
            # E         'NoneType' object has no attribute 'with_overrides'

            # This will fail with the above whether or not with_none is True or False.  It fails as the
            # result of the dataclass having a Union[SimpleObject, None] field, which was necessary to
            # prevent Marshumaro/Msgpack from trying to call _serialize() on None.
            result = generate_result(with_none=False)
            check_result(obj=result).with_overrides(limits=Resources(cpu="3", mem="500Mi"))




        test_workflow()  # this works
        test_dynamic()   # this works too, unless you use .with_overrides()
I realize that maybe the "best" approach is to abandon the use of pickle in this serialization, but this is a large change for our codebase and existing data, and the error I'm running into would seem to indicate some strange error in how .with_overrides() is being called on the task object, which is quite unexpected since it is just the argument to the task that is changing. Thoughts?
t
will dive into this tonight
Untitled
would something like this work?
haven’t tried with the later stuff but not able to repro the original error.
m
Hey @thankful-minister-83577, thanks for this suggestion, I have tried similar as well. I copied your class definitions exactly and created some simple tests around this. Your TypeEngine test works fine, but it is the same error with the @dynamic use of
with_overrides()
that is still the problem. This is with flytekit 1.15.0. Here is the test using your suggested classes - I have used the Mixin class directly to simplify slightly, and I also just init the simple_object with None because you get the same error either way.
Copy code
def test_simple_object_yee(self):
        @task
        def generate_result() -> SimpleObjectOriginalMixin:
            return SimpleObjectOriginalMixin(simple_object=None)

        @task
        def check_result(obj: SimpleObjectOriginalMixin):
            assert obj.simple_object is None

        @dynamic
        def test_dynamic(with_overrides: bool):
            result = generate_result()
            if with_overrides:
                check_result(obj=result).with_overrides(
                    limits=Resources(cpu="3", mem="500Mi")
                )
            else:
                check_result(obj=result)

        def test_type_engine(with_none: bool):
            p = SimpleObjectOriginalMixin(simple_object=SimpleObjectOriginal(a="a", b=1))
            if with_none:
                p.simple_object = None

            ctx = FlyteContextManager.current_context()
            lt = TypeEngine.to_literal_type(SimpleObjectOriginalMixin)
            lit = TypeEngine.to_literal(ctx, p, SimpleObjectOriginalMixin, lt)
            print(lit)

            new_p = TypeEngine.to_python_value(ctx, lit, SimpleObjectOriginalMixin)
            print(new_p)
            if with_none:
                assert new_p.simple_object is None
            else:
                assert new_p.simple_object.a == p.simple_object.a
                assert new_p.simple_object.b == p.simple_object.b

        test_type_engine(with_none=False)   # this works
        test_type_engine(with_none=True)    # this works
        test_dynamic(with_overrides=False)  # this works
        test_dynamic(with_overrides=True)   # this fails
Here is the error produced for `test_dynamic(with_overrides=True)`:
Copy code
@dynamic
    def test_dynamic():
        result = generate_result(with_none=False)
>       check_result(obj=result).with_overrides(
            limits=Resources(cpu="3", mem="500Mi")
        )
E       AttributeError: Error encountered while executing 'tests.plaster.run.sigproc_v2.test_result.test_dynamic':
E         'NoneType' object has no attribute 'with_overrides'
@thankful-minister-83577 - sorry to bother - any thoughts on the above?
t
let me get back to you next week…
someone will put in a fix I think
g
@microscopic-furniture-57275 https://github.com/flyteorg/flytekit/pull/3202 could you give it a try
m
@glamorous-carpet-83516 I will try to check this out tomorrow, thanks!
@glamorous-carpet-83516 - my tests are passing with this PR, thanks!