Arthur Book
05/24/2023, 8:44 PMfrom data_engine import (
pydantic_transformer,
) # code from <https://github.com/flyteorg/flytekit/pull/1620>
class Rax(pydantic.BaseModel):
flytedir: str
@pydantic.validator("flytedir")
def validate_hax(cls, v: str) -> str:
flytedir = make_flytedir(v)
return str(flytedir.path)
@classmethod
def from_json(cls, json: str) -> "Self":
self = cls.parse_raw(json)
return self
@flytekit.workflow
def test_wf(rax: Rax) -> str:
return test(rax=rax) # type: ignore
@flytekit.task
def test(rax: Rax) -> str:
return str(os.listdir(rax.flytedir))
def make_flytedir(path: Union[str, os.PathLike]) -> directory.FlyteDirectory:
context = context_manager.FlyteContext.current_context()
dimensionality = core_types.BlobType.BlobDimensionality.MULTIPART
literal = make_literal(uri=path, dimensionality=dimensionality)
transformer = directory_types.FlyteDirToMultipartBlobTransformer()
out_dir = transformer.to_python_value(context, literal, directory.FlyteDirectory)
os.listdir(out_dir) # the dir isnt synced if we dont do this
return out_dir
def make_literal(
uri: Union[str, os.PathLike],
dimensionality,
) -> literals.Literal:
scalar = make_scalar(uri, dimensionality)
return literals.Literal(scalar=scalar) # type: ignore
def make_scalar(
uri: Union[str, os.PathLike],
dimensionality,
) -> literals.Scalar:
blobtype = core_types.BlobType(format="", dimensionality=dimensionality)
blob = literals.Blob(metadata=literals.BlobMetadata(type=blobtype), uri=uri)
return literals.Scalar(blob=blob) # type: ignoreKetan (kumare3)
Arthur Book
05/26/2023, 12:30 AMKetan (kumare3)
Arthur Book
05/26/2023, 12:34 AMKetan (kumare3)
Arthur Book
05/26/2023, 12:35 AMEli Bixby
05/26/2023, 9:04 AMTypeTransformer
because it is already implicitly present in the pyflyte
CLI parsing code.Ketan (kumare3)