https://flyte.org logo
#ask-the-community
Title
# ask-the-community
e

Eli Bixby

04/17/2023, 11:23 AM
@Ketan (kumare3) Is it possible to use the
FlyteFile[Format]
syntax with `StructuredDataset`s? It looks like they are backed by a different proto, so it's not clear to me how that works.
k

Ketan (kumare3)

04/17/2023, 1:09 PM
You can for all dataframe types
e

Eli Bixby

04/17/2023, 1:34 PM
Is it just
FlyteFile[structured_dataset.PARQUET]
?
k

Ketan (kumare3)

04/17/2023, 1:58 PM
No structureddataset is the base type - just like flytefile
So pd.dataframe / spark dataframe, polars etc can be structureddataset
e

Eli Bixby

04/17/2023, 2:03 PM
Is there a way to get a reference to the remote path of the parquet file for a structured dataset (without downloading it) in a task?
Specifically the use case is creating a launch plan with a default/fixed_input StructuredDataset using a remote file. The way we've done this for e.g. Models/tensors etc is having the workflow take a
FlyteFile[PyTorchModule]
etc and then creating a flytefile when registering the launch plan, but I'm not sure if there's a way to do this with a StructuredDataset
It looks like
FlyteFile[structured_dataset.PARQUET]
works fine for inputting as a launch plan parameter, but we can't figure out how to pass the promise to a task that takes a
StructuredDataset
as an input.
So the thing that works for Modules/Tensors etc is we can directly use the FlyteFile promise in the workflow function to pass to a task that takes the deserialized type (e.g.
nn.Module
or
np.ndarray
) . That doesn't work for
StructuredDataset
. We get an error (will paste when I find it), that the input type doesn't match the expected type.
@Daniel Danciu who ran into this.
k

Kevin Su

04/17/2023, 3:48 PM
StructuredDataset should work as well. sharing the error will be helpful. you can get the remote path by calling
StructuredDataset.uri
d

Daniel Danciu

04/17/2023, 6:07 PM
Thanks for helping out, Kevin. Here's a sketch of what the code looks like:
Copy code
@task
def do_task(a: StructuredDataset) -> int:
    ...

@workflow
def do_workflow(a: FlyteFile[structured_dataset.PARQUET]):
    ...

LaunchPlan.create('PlanB', do_workflow, default_params={'a': FlyteFile('<gs://path_to_flyte_parquet_output>')})
And this is the error we are getting:
Copy code
Error 0: Code: MismatchingTypes, Node Id: n1, Description: Variable [a] (type [blob:<format:"parquet" > ]) doesn't match expected type [structured_dataset_type:<> ].
Using
FlyteFile[NumpyArrayTransformer.NUMPY_ARRAY_FORMAT]
in the workflow and receiving
np.ndarray
in the task works fine.
y

Yee

04/17/2023, 8:56 PM
why is the workflow a file type?
why not make it a structured dataset type?
default params = a: StructuredDataset(path=“gs://…“)
d

Daniel Danciu

04/17/2023, 9:30 PM
This is what I originally tried, except that I used uri instead of path, i.e.
Copy code
default_inputs={'a': StructuredDataset(uri='gs://...', file_format=structured_dataset.PARQUET)}
This runs into the following error:
Copy code
TypeError: int() argument must be a string, a bytes-like object or a number, not '_NoValueType'
when reading the received parameter in the task using:
Copy code
a.open(pd.DataFrame).all()
(I declared
a
as being
StructuredDataset
in the task)
Maybe that's not the right way to read it?
To circle back on this, the TypeError was actually caused by a messed up numpy installation. After fixing numpy the error went away, and I was able to receive an input of type StructuredDataset. The working snippet looks like this FFR:
Copy code
@task
def do_task(a: Annotated[StructuredDataset, kwtypes(my_column: float)]) -> int:
    ...

@workflow
def do_workflow(a: StructuredDataset):
    ...

LaunchPlan.create('PlanB', do_workflow, default_params={'a': StructuredDataset(uri='gs://...', file_format=PARQUET)})
Obviously one can just use
StructuredDataset
instad of
Annotated[StructuredDataset, kwtypes(my_column: float)]
75 Views