Hi everyone. Ive been working off the PR that add...
# ask-the-community
a
Hi everyone. Ive been working off the PR that adds a transformer for pydantic basemodels (https://github.com/flyteorg/flytekit/pull/1620) running into an unfortunate issue with flytepaths/directories though. These inherit from os.PathLike which falls outside of pydantic's supported json serialization/deserialization schemas. my understanding is also that flytefiles/directories need to be initialized in a particular way to work (below iv'e implemented a flytedirectory initialized inspired from the dataclass transformer). This works but seems a bit hacky so wanted to hear if anyone else encountered similar challenges. TLDR; how to serialize-deserialize flytedirectories/files in pydantic basemodels (or other objects)
Copy code
from data_engine import (
    pydantic_transformer,
)  # code from <https://github.com/flyteorg/flytekit/pull/1620>


class Rax(pydantic.BaseModel):
    flytedir: str

    @pydantic.validator("flytedir")
    def validate_hax(cls, v: str) -> str:
        flytedir = make_flytedir(v)
        return str(flytedir.path)

    @classmethod
    def from_json(cls, json: str) -> "Self":
        self = cls.parse_raw(json)
        return self


@flytekit.workflow
def test_wf(rax: Rax) -> str:
    return test(rax=rax)  # type: ignore


@flytekit.task
def test(rax: Rax) -> str:
    return str(os.listdir(rax.flytedir))


def make_flytedir(path: Union[str, os.PathLike]) -> directory.FlyteDirectory:
    context = context_manager.FlyteContext.current_context()
    dimensionality = core_types.BlobType.BlobDimensionality.MULTIPART
    literal = make_literal(uri=path, dimensionality=dimensionality)
    transformer = directory_types.FlyteDirToMultipartBlobTransformer()
    out_dir = transformer.to_python_value(context, literal, directory.FlyteDirectory)
    os.listdir(out_dir)  # the dir isnt synced if we dont do this
    return out_dir


def make_literal(
    uri: Union[str, os.PathLike],
    dimensionality,
) -> literals.Literal:
    scalar = make_scalar(uri, dimensionality)
    return literals.Literal(scalar=scalar)  # type: ignore


def make_scalar(
    uri: Union[str, os.PathLike],
    dimensionality,
) -> literals.Scalar:
    blobtype = core_types.BlobType(format="", dimensionality=dimensionality)
    blob = literals.Blob(metadata=literals.BlobMetadata(type=blobtype), uri=uri)
return literals.Scalar(blob=blob) # type: ignore
update... This doesnt work for me when running remote...
k
Cc @Yee / @Kevin Su we should hopefully use the streaming api
@Arthur Book firstly, welcome to the community. Great to hear that you are working on the pydantic stuff. There are a few folks who have shared interest. your idea looks correct, but can you share what you mean by - it does not work with FlyteRemote. Does it work otherwise? i.e when executing
a
Thanks! Its fun to be here
I rewrote and refactored back and forth here for a day and now have a solution that im happy with
k
😄 wow - ccc @Fabio Grätz / @David Espejo (he/him) / @Eli Bixby
All of these folks are doing some part of it - cc @Kevin Su / @Eduardo Apolinario (eapolinario)
a
need to see if i can open a PR for this.
k
@Arthur Book you should also join the contributor sync we just had today. #contribute channel. Please open a PR, the community will cheer you on to merge it 😄
a
🫡
e
Hey Arthur! You might want to look at https://github.com/flyteorg/flytekit/pull/1565 which I'm currently splitting into pieces. The first piece is submitted here: https://github.com/flyteorg/flytekit/pull/1615
I've also discussed with @Yee about supporting pydantic more generally, by adding explicit JSON serialization and deserialization to the baseclass of
TypeTransformer
because it is already implicitly present in the
pyflyte
CLI parsing code.
k
@Eli Bixby check the #contribute channel there is a PR by Arthur
235 Views