We're trying to get rid of FlytePickle but we cann...
# ask-the-community
g
We're trying to get rid of FlytePickle but we cannot because our ints keep getting cast to float. Any ideas on this issue? https://github.com/flyteorg/flyte/issues/4505
k
cc @Eduardo Apolinario (eapolinario) / @Yee
g
I use
@dataclass
and
DataClassJSONMixin
a lot. I find it extremely helpful to do this routinely. This pattern allows you to pass extremely complex objects around. Additionally, they improve the readability of your code by quite a bit, and enforce Flyte's excellent type checking functionality. Alternatively, when I want to apply even more strict requirements upon my objects, I use
@pydantic.dataclasses.dataclass
and
DataClassJSONMixin
If you really, really want to be able to store the type of the value, you could use a more complex data class that also stores the type of the value as well:
Copy code
from dataclasses import dataclass
from mashumaro.mixins.json import DataClassJSONMixin

@dataclass
class MyOutput(DataClassJSONMixin):
    
    _value: float
    _is_int: bool
    
    @classmethod
    def from_value(cls, value: float|int) -> "MyOutput":
        
        assert isinstance(value, (float, int)), \
            f"value must be of type int or float, not {type(value)}"
        
        if isinstance(value, int):
            return cls(_is_int =True, _value=float(value))
        else:
            return cls(_is_int =False, _value=value)
    
    @property
    def value(self) -> float|int:
        
        if self._is_int:
            return int(self._value)
        else:
            return float(self._value)
        
    def __repr__(self):
        
        _type = "int" if self._is_int else "float"
        return f"{self.__class__.__name__}({self.value}: {_type})"


my_output_instance = MyOutput.from_value(3.1232)
serialized = my_output_instance.to_json()
deserialized = MyOutput.from_json(serialized)

print(deserialized) # MyOutput(3.1232: float)

my_output_instance = MyOutput.from_value(33)
serialized = my_output_instance.to_json()
deserialized = MyOutput.from_json(serialized)

print(deserialized) # MyOutput(33: int)
I do frequently use such `@classmethod`s with my data classes to automate complex initiation logic such that I am able to cleanly use the data classes within my tasks.
g
Grantham, thanks for your response. We are using DataClassJSONMixin (see my last comment in GitHub) - we still see this bug
g
Roger. Can you try the above method? (custom
@property
and initiating
@classmethod
)
I tested with this:
Copy code
from flytekit import workflow, task

@task
def a() -> MyOutput:
    return MyOutput.from_value(32)

@task
def b(arg: MyOutput):
    print(arg) # MyOutput(32: int)


@workflow
def test():
    result = a()
    b(arg=result)
    
test()
Seems to work as expected.
g
Yep, this does appear to work. Thanks Grantham. The main issue I see is that now users of my pipeline have to pass in an
_is_int
parameter every time they initialize a value. So where previously they could have done:
Foo(_value=5)
they have to do
Foo(_value=5, _is_int=true)
@Ketan (kumare3) I still consider this a bug, would appreciate some resolution
g
I think you may not understand the intended usage. You may instantiate
MyOutput
using the class method (
MyOutput.from_value
) which will automatically handle all of that. For example:
Copy code
@task
def make_an_integer() -> MyOutput:
    return MyOutput.from_value(32)

@task
def make_a_float() -> MyOutput:
    return MyOutput.from_value(332.23123)
g
We need to instantiate from JSON, using the .from_json method. But regardless of how we instantiate, the fact that Flyte is requiring us to annotate numerical types with additional type annotation to serialize correctly is a bug, correct?
g
Why not create another class method to instantiate from whatever format you are working with? I don’t work for Flyte. I am a community member. That being said, I don’t consider it a bug. It is, in my mind, a necessity in order to enforce expected behavior and to compile the manifests. From my perspective, it seems that you are incorrectly using Flyte. As I posted on GH, if you have arbitrarily nested values of unknown types… just use a data file. Use Avro to pass around your data.
g
How is it not a bug that if I pass in an integer, it gets converted to a float?
k
We used to convert correctly - sadly the wire format protobuf json uses json number (loose json semantics) and this makes it not possible to preserve the actual type. So in flytekit we maintain the schema through the wire. Not sure why that schema is not working. Let me try to reproduce if you don’t mind @Gopal Vashishtha @Grantham Taylor thank you for the support appreciate it. Let’s see if we can help, if not we should use your ideas for sure
@Gopal Vashishtha what’s the simplest example to reproduce the problem
g
Ketan this should do it:
Copy code
from flytekit import task, workflow, dynamic
from mashumaro.mixins.json import DataClassJSONMixin
from dataclasses import dataclass

@dataclass
class Foo(DataClassJSONMixin):
    x: float | int


@task
def print_foo(*, foo: Foo) -> Foo:
    print(foo)
    print(f"x={type(foo.x)}")
    return foo


@dynamic
def _print_foo_dynamic(*, foo: Foo) -> Foo:
    return print_foo(foo=foo)


@workflow
def print_foo_wf(*, foo: Foo) -> Foo:
    return _print_foo_dynamic(foo=foo)
k
Ohh this is hard. You are using union?
Json schema has no support for union
Flyte has
g
Yes we're using union
k
Hmm this is a json problem.
g
mashumaro seems to support it just fine though:
Copy code
>>> from mashumaro.mixins.json import DataClassJSONMixin
>>> from dataclasses import dataclass
>>> from typing import Any
>>> 
>>> @dataclass
... class Foo(DataClassJSONMixin):
...     x: float | int
... 
>>> test = Foo(5)
>>> test1 = Foo.from_json(test.to_json())
>>> test1
Foo(x=5)
>>> type(test1.x)
<class 'int'>
k
Ya I mean think across the wire. Let me see how we resolve that type
Let’s say the receiver sees a number 1.0 is this float or int?
So we will need to carry runtime type information
g
that's where Grantham was going in his solution
btw:
jsonschema has no support for union
What makes you say this? When I try:
Copy code
from mashumaro.jsonschema import build_json_schema
I get:
Copy code
build_json_schema(Foo).to_json()
'{"type": "object", "title": "Foo", "properties": {"x": {"anyOf": [{"type": "number"}, {"type": "integer"}]}}, "additionalProperties": false, "required": ["x"]}'
k
interesting
lets try this
wait let me share
@Gopal Vashishtha - https://github.com/flyteorg/flytekit/blob/f69afd984871ba184cf04fad53b106d140b7aba2/flytekit/core/type_engine.py#L475 This is the hairy code. it might just be a bug. But the original issue is talking about
dict
i do not think both of these are related
@Gopal Vashishtha I wrote a way to extract the info from type engine ---
Copy code
def print_type():
    from flytekit.core.type_engine import TypeEngine
    t = TypeEngine.to_literal_type(Foo)
    print(t)
Output
Copy code
<FlyteLiteral simple: STRUCT metadata { fields { key: "type" value { string_value: "object" } } fields { key: "title" value { string_value: "Foo" } } fields { key: "required" value { list_value { values { string_value: "x" } } } } fields { key: "properties" value { struct_value { fields { key: "x" value { struct_value { fields { key: "anyOf" value { list_value { values { struct_value { fields { key: "type" value { string_value: "number" } } } } values { struct_value { fields { key: "type" value { string_value: "integer" } } } } } } } } } } } } } fields { key: "additionalProperties" value { bool_value: false } } } structure { dataclass_type { key: "x" value { union_type { variants { simple: FLOAT structure { tag: "float" } } variants { simple: INTEGER structure { tag: "int" } } } } } }>
It seems it knows anyof
int | float
let me try in the UI
hmm the UI is not showing the
jsonschema
auto-form
@Gopal Vashishtha It is infact the
Union
that is the problem
Copy code
>>> print(Foo.from_json('{"x": 1.0}'))
Foo(x=1.0)
Thus if the wire has float and receiver is ambiguous it will choose the matching type
this
Copy code
@dataclass
class Foo(DataClassJSONMixin):
    x: int
works fine
the problem as i said earlier protobuf json has no way to pass ints = it sadly only passes floats
g
OK, so summarizing my understanding, the issue here is that protobuf itself casts integers to floats when converting JSON, so Flyte can't know whether a float it is seeing is actually an integer. Sounds like we will have to override the
from_dict
and/or
from_json
methods to persist additional metadata about whether the number is an integer, along the lines of what Grantham was saying. Do you know which methods Flyte will actually call when rehydrating a DataclassJSONMixin object?
k
Check the link I shared. Also please propose anything else we can pass. Maybe we add a hidden meta field automatically about type?
That is doable globally in the Transformer engine
g
I see where the
to_json
method gets called here but I don't see any invocations of
from_json
- how do user-provided JSON dataclasses get deserialized?
k
Right
g
Does this serialization also apply when starting runs with
FlyteRemote
?
k
Yes
Type engine is central
g
When I start a workflow from flyte console and pass in a "struct," what code path does that hit to initialize the dataclass?
k
That is in JavaScript- react json
g
hm... when I try doing:
Copy code
from flytekit import task, workflow
from dataclasses import dataclass, field
from mashumaro.mixins.json import DataClassJSONMixin
import json

@dataclass
class Foo(DataClassJSONMixin):
    x: float
    _is_int: bool = field(init=False)
    def __post_init__(self):
        if not hasattr(self, "_is_int"):
            self._is_int = isinstance(self.x, int)
    @classmethod
    def from_json(cls, input: str) -> "Foo":
        input = json.loads(input)
        print("from_json was called", flush=True)
        new_instance = cls(input["x"])
        # Persist metadata about integers if already present
        if "_is_int" in input:
            new_instance._is_int = input["_is_int"]
        return new_instance
    @property
    def typed_x(self) -> float | int:
        if self._is_int:
            return int(self.x)
        else:
            return float(self.x)
And then start the task with `FlyteRemote`:
Copy code
flyte.execute_remote_wf(execution_name="foo", entity=flyte_task, inputs={"foo": foo})
I get:
Copy code
│                                                                                                                      │
│ /remote.py:1055 in _execute      │
│                                                                                                                      │
│ ❱ 1055 │   │   │   │   │   lit = TypeEngine.to_literal(ctx, v, hint, variable.type)                                  │
│                                                                                                                      │
│ /type_engine.py:1059 in to_literal │
│                                                                                                                      │
│ ❱ 1059 │   │   │   transformer.assert_type(python_type, python_val)                                                  │
│                                                                                                                      │
│ /type_engine.py:354 in assert_type │
│                                                                                                                      │
│ ❱  354 │   │   │   expected_type = expected_fields_dict[f.name]                                                      │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
KeyError: '_is_int'
any idea why?
k
I think maybe JSON ignores the _?
g
nope:
Copy code
from flytekit import task, workflow
from dataclasses import dataclass, field
from mashumaro.mixins.json import DataClassJSONMixin
import json

@dataclass
class Foo(DataClassJSONMixin):
    x: float
    is_int: bool = field(init=False)
    def __post_init__(self):
        if not hasattr(self, "is_int"):
            self.is_int = isinstance(self.x, int)
    @classmethod
    def from_json(cls, input: str) -> "Foo":
        input = json.loads(input)
        print("from_json was called", flush=True)
        new_instance = cls(input["x"])
        # Persist metadata about integers if already present
        if "is_int" in input:
            new_instance.is_int = input["is_int"]
        return new_instance
    @property
    def typed_x(self) -> float | int:
        if self.is_int:
            return int(self.x)
        else:
            return float(self.x)
still fails