Hi team! How can I make a pydantic `BaseModel` cac...
# ask-the-community
x
Hi team! How can I make a pydantic
BaseModel
cacheable if one of its fields is
FlyteFile
type? I guess I need some sort of HashMethod since BaseModel is a non-flyte offloaded object. Any recommended approach? Thanks!
s
for a flytefile, i think if you specify where the file should be uploaded to using
remote_path
, that should hit the cache. also, if you're passing the same file, the cache should hit automatically.
x
ah, ok, so for
remote_path
, I can specify different S3 paths (even if the file content is the same) and it should hit the cache?
this works with the flytekit pydantic plugin right?
s
so for
remote_path
, I can specify different S3 paths ...
i don't think so. the remote_path has to be the same.
this works with the flytekit pydantic plugin right?
should work! i think flytefile cache has to work out of the box.
x
oh, I thought flytefile can be cached regardless of the
remote_path
or the local
path
, no?
s
i don't think so. i'd love for you to try! i think the remote_path will remain the same if you're sending the same file again.
x
I see, the hash key is based on file path not the content
s
yeah! you can hash the content using HashMethod.
x
Got it, that makes sense. Just curious, would the flyteflye annotated with hashmethod nested inside basemodel work?
Copy code
class MyModel(BaseModel):
    ffile: Annotated[FlyteFile, HashMethod(hash_my_file)]
s
yes, i believe it should work. i'm not able to get a simple pydantic plugin example to work on a flyte cluster; does it work for you?
x
I haven’t, it turns out to be quite complicated to upgrade to the flytekit version that supports pydantic plugin for my codebase
s
what flytekit version do you currently have?
the plugin needs to be updated so that it support pydantic v2: https://github.com/flyteorg/flyte/issues/4603
x
1.9.1
s
are you seeing any errors when upgrading?
x
yeah, there are some errors. Unfortunately, I didn’t save them and reverted the changes. I can repro later
Inputs:
Copy code
{
  "input": input,
  "params": params,
}
But got the error in the log
I tried running the following snippet locally and remotely:
Copy code
class ConfigWithFlyteFiles(BaseModel):
    flytefiles: list[FlyteFile]

    def __eq__(self, __value: object) -> bool:
        return isinstance(__value, ConfigWithFlyteFiles) and all(
            pathlib.Path(self_file).read_text() == pathlib.Path(other_file).read_text()
            for self_file, other_file in zip(self.flytefiles, __value.flytefiles)
        )


@task
def example_task(input: str) -> str:
    return input


@dynamic(cache=True, cache_version="1.0")
def example_dynamic(input: str, params: ConfigWithFlyteFiles) -> str:
    return example_task(input=input)


@workflow
def example_workflow(input: str, params: ConfigWithFlyteFiles) -> str:
    return example_dynamic(input=input, params=params)
Copy code
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.INVALID_ARGUMENT
        details = "invalid params input wrong type. Expected simple:STRUCT , but got map_value_type:<union_type:<variants:<map_value_type:<blob:<> > > variants:<simple:STRUCT > > > "
        debug_error_string = "UNKNOWN:Error received from peer ipv4:172.21.196.17:443 {created_time:"2024-02-03T01:43:35.346807118+00:00", grpc_status:3, grpc_message:"invalid params input 
wrong type. Expected simple:STRUCT , but got map_value_type:<union_type:<variants:<map_value_type:<blob:<> > > variants:<simple:STRUCT > > > "}"
My version:
flytekit==1.10.3
,
flytekitplugins-pydantic==1.10.3
,
pydantic==1.10.13
Appreciate any points when you have a chance to take a look, thanks, @Samhita Alla!
s
i'm unable to run the pydantic code with flytekit as well. i think the plugin needs to be updated and this is a bug. would you mind creating an issue? [flyte-bug]
s
if you could contribute a fix as well, that'd be great! cc @Eduardo Apolinario (eapolinario)
t
I've been having issues with the
pydantic
plugin too. Specifically I think the problem is when the
pydantic.BaseModel
is an input to an execution. When running a workflow that uses
pydantic
for inputs and outputs just of internal tasks it works fine. I'm testing the simplest possible case
Copy code
class Config(BaseModel):
    value: int
I think the problem stems from the slightly unusual way that
pydantic.BaseModel
is serialised to a
FlytLiteral
. The format is a map with 2 keys:
BaseModel JSON
and
Serialized Flyte Objects
.
BaseModel JSON
appears to be a struct formed from the result of
pydantic.BaseModel.json()
. When there are complex types e.g. a pandas Dataframe these get put in
Serialized Flyte Objects
and a placeholder is put in
BaseModel JSON
. This structure makes sense to me as it enables working with complex types that Flyte can serialise but
pydantic
can't. The problem is that the transformer just declares the literal type to be
types.LiteralType(simple=types.SimpleType.STRUCT)
. Everything works most of the time despite the types being very different from what they are declared to be. I think the problem is that there is an explicit validation in `flyteadmin` that fails.
Copy code
details: invalid config input wrong type. Expected simple:STRUCT , but got map_value_type:<union_type:<variants:<map_value_type:<simple:NONE > > variants:<simple:STRUCT > > >
This validation seems to only happen when the
pydantic.BaseModel
is used as input to an execution. I think the solution is probably to update
BaseModelTransformer.get_literal_type
to reflect the literal that is actually created. However, I think this could be a bit tricky because
Serialized Flyte Objects
is a map type which could contain basically anything so its difficult to define the literal type for this.
x
Good find! True that BaseModel can contain complex and unserializable types. But it’s good to know that it works in internal tasks, which are good enough for my use cases since I only need simple types and FlyteFile. But yeah, to make pydantic work end to end in all workflow components is tricky.