Hi all, i have a question regarding the type annot...
# hacktoberfest-2022
r
Hi all, i have a question regarding the type annotations in workflow tasks. I am having an issue in https://github.com/flyteorg/flytekit/pull/1240, where the test for the workflow for
tests/flytekit/unit/extras/tensorflow/record/test_record.py
, which has tasks to generate TFRecordFile and TFRecordDirectory from tf.train.Example outputs and subsequently deserialise back to
tf.data.TFRecordDataset
if required. The bit thats causing an error is
Copy code
@task
def consume(dataset: Annotated[TFRecordDatasetV2, TFRecordDatasetConfig(name="testing")]):
which, seems to not be recognising the Annotated TFRecordDatasetV2 type although it should be the expected type in the transformers i have registered here flytekit/extras/tensorflow/record.py. I presume this is why it is defaulting to FlytePickle as in the traceback (this is only my assumption and based on some other logic i have seen elsewhere in the codebase). Also, my other test
tests/flytekit/unit/extras/tensorflow/record/test_transformations.py
which is testing the
to_python_value
and
to_literal
logic works fine so i feel something else is an issue which I am finding difficult to spot at the moment.
Copy code
expected_python_type = typing.Annotated[flytekit.types.pickle.pickle.FlytePickle.__class_getitem__.<locals>._SpecificFormatClass, TFRecordDatasetConfig(compression_type=None, buffer_size=None, num_parallel_reads=None, name='testing')]

    def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
        uri = lv.scalar.blob.uri
        # Deserialize the pickle, and return data in the pickle,
        # and download pickle file to local first if file is not in the local file systems.
        if ctx.file_access.is_remote(uri):
            local_path = ctx.file_access.get_random_local_path()
            ctx.file_access.get_data(uri, local_path, False)
            uri = local_path
        with open(uri, "rb") as infile:
>           data = cloudpickle.load(infile)
E           _pickle.UnpicklingError: invalid load key, '5'.
k
@Ryan Nazareth we might be a little slow is responding over this holiday weekend. But help will come
r
@Ketan (kumare3) No worries 🙂
s
@Ryan Nazareth, you are annotating
TFRecordDatasetV2
but not either of
TFRecordFile
or
TFRecordsDirectory
for which you defined TypeTransformers, and hence it’s defaulting to pickle.
r
so this bit is confusing me - if i look at the design for
flytekit-onxx-tensorflow
for example which is doing something similar ie. transformer defined for TensorFlow2ONNX and converting to ONNXFile type in `to_literal`: https://github.com/flyteorg/flytekit/blob/430795d9ee4aa48957554a6eb1446fa80993681a[…]ytekit-onnx-tensorflow/flytekitplugins/onnxtensorflow/schema.py in the associated test
def test_tf_onnx():
here (https://github.com/flyteorg/flytekit/blob/430795d9ee4aa48957554a6eb1446fa80993681a/plugins/flytekit-onnx-tensorflow/tests/test_onnx_tf.py#L49-L75 the model promise object is is annotated in output from task
def train_and_predict()....
Copy code
Annotated[
                    TensorFlow2ONNX,
                    TensorFlow2ONNXConfig(
                        input_signature=(tf.TensorSpec((None, 224, 224, 3), tf.float32, name="input"),), opset=13
                    ),
                ],
which is then subsequently passed into downstream task
def onnx_predict()...
, but the parameter is annotated as the expected type
ONNXFile
. This is similar to what im trying to do here with
TFRecordFile
and
TFRecordsDirectory
(which ive annoated in upstream task) and then need to handle conversion to
tf.data.TFRecordDataset
when it is passed into new task, as per the design suggested by @Niels Bantilan in the last code block here https://github.com/flyteorg/flytekit/pull/1240#issuecomment-1292623521
s
ONNXFile
is a FlyteFile for which a TypeTransformer has already been defined. Hence, it won’t default to pickle. What is
TFRecordDatasetV2
? Can you tell me the type of it?
r
oh i see,
TFRecordDatasetV2
is from
tensorflow.python.data.ops.readers
. - so i would need to register this in the transformer i have defined along with
TFRecordFile
but can multiple types be registered in same transformer via Union like below ?
Copy code
TENSORFLOW_FORMAT = "TensorFlowRecord"

def __init__(self):
    super().__init__(name="TensorFlow Record", t=Union[TFRecordFile, TFRecordDatasetV2])
s
But why do you need
TFRecordDatasetV2
? Can’t you use
TFRecordFile
or
TFRecordDirectory
?
r
i thought we also wanted to handle the initialisation of
TFRecordDatasetV2
constuctor with filenames (maybe i misunderstood), as was discussed in https://github.com/flyteorg/flytekit/pull/1240#issuecomment-1293507708
s
Okay, so you’re using
TFRecordDatasetV2
to denote that the value is
tf.data.TFRecordDataset
, right? The reason I used two separate types in ONNX Flyte type is because
to_literal
returns a file and it made more sense to use
FlyteFile
as the type when the value is consumed in a task. Moreover, the
to_literal
type needed annotated metadata while the other,
to_python_value
doesn’t require any metadata cause it’s just a file. So what’s the type of
TypeRecordDatasetV2
? In ONNX, it was a file, but here, it isn’t case. I didn’t add any additional type transformer for the file cause it already exists. I just added a type:
ONNXFile
, similar to
TFRecordFile
.
Let me know if this makes sense to you. If not, I’ll think about it and let you now the solution.
r
ok i see. I think i have the opposite issue where I need to use annotated metadata in
to_python_value
.
TFRecordDatasetV2
inherits from
DatasetV2
, part of tensorflow dataset api which is wrapper and container for storing collections (in our case tensorflow record files). I don’t think it falls into any of the existing supported types in flytekit/types which i could use or create custom types from. Do you mean what does
type(TFRecordDatasetV2)
return ? I think its
ABCMeta
but can check.
Alternatively, Is there an option to do something in between e.g. annotate with
TFRecordFile
(which is
FlyteFile
and supported) and also include metatdata (
TFRecordDatasetConfig
) which user would provide to instantiate
TFRecordDataset
, if that makes sense ?
s
Hey. So I was thinking how about we use `TFRecordFile`/`TFRecordsDirectory` both ways — as output and input type? So when sending output (
to_literal
), you’d say the type to be
TFRecordFile
and while accepting it as an input (
to_python_value
), you’d say
Annotated[TFRecordFile, TFRecordDatasetConfig(…)]
. Let me know what you think. This should simplify implementing the transformer.
You’ll need to defined your input and output representations in
to_python_value
and
to_literal
, respectively. Since input will have metadata, that’s where you need to use the user-provided metadata.
r
@Samhita Alla thanks, yes that was what i was thinking - i will try it later today and let you know.
also can you please confirm if
tf.data.Dataset
needs to be supported here, as I can see there is another issue created for supporting this as another transformer https://github.com/flyteorg/flyte/issues/3038 ?
s
We can tackle that in another PR.
tf.train.Example
should be good for now. 🙂
r
Good news, i think it seems to be working now using
Annotated[TFRecordFile, TFRecordDatasetConfig(…)]
and
Annotated[TFRecordDirectory, TFRecordDatasetConfig(…)]
! I’ve added more tests as well and they all pass (ci build failures unrelated to this PR) ! Ive pushed latest changes https://github.com/flyteorg/flytekit/pull/1240/files
s
Awesome, thanks @Ryan Nazareth! Will take a look at the PR shortly. 🙂
r
@Samhita Alla ok fixed CI now and all green
102 Views