Ryan Nazareth
11/23/2022, 2:28 PMtests/flytekit/unit/extras/tensorflow/record/test_record.py
, which has tasks to generate TFRecordFile and TFRecordDirectory from tf.train.Example outputs and subsequently deserialise back to tf.data.TFRecordDataset
if required.
The bit thats causing an error is
@task
def consume(dataset: Annotated[TFRecordDatasetV2, TFRecordDatasetConfig(name="testing")]):
which, seems to not be recognising the Annotated TFRecordDatasetV2 type although it should be the expected type in the transformers i have registered here flytekit/extras/tensorflow/record.py.
I presume this is why it is defaulting to FlytePickle as in the traceback (this is only my assumption and based on some other logic i have seen elsewhere in the codebase). Also, my other test tests/flytekit/unit/extras/tensorflow/record/test_transformations.py
which is testing the to_python_value
and to_literal
logic works fine so i feel something else is an issue which I am finding difficult to spot at the moment.
expected_python_type = typing.Annotated[flytekit.types.pickle.pickle.FlytePickle.__class_getitem__.<locals>._SpecificFormatClass, TFRecordDatasetConfig(compression_type=None, buffer_size=None, num_parallel_reads=None, name='testing')]
def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
uri = lv.scalar.blob.uri
# Deserialize the pickle, and return data in the pickle,
# and download pickle file to local first if file is not in the local file systems.
if ctx.file_access.is_remote(uri):
local_path = ctx.file_access.get_random_local_path()
ctx.file_access.get_data(uri, local_path, False)
uri = local_path
with open(uri, "rb") as infile:
> data = cloudpickle.load(infile)
E _pickle.UnpicklingError: invalid load key, '5'.
Ketan (kumare3)
Ryan Nazareth
11/23/2022, 5:20 PMSamhita Alla
TFRecordDatasetV2
but not either of TFRecordFile
or TFRecordsDirectory
for which you defined TypeTransformers, and hence it’s defaulting to pickle.Ryan Nazareth
11/24/2022, 1:28 PMflytekit-onxx-tensorflow
for example which is doing something similar ie. transformer defined for TensorFlow2ONNX and converting to ONNXFile type in `to_literal`: https://github.com/flyteorg/flytekit/blob/430795d9ee4aa48957554a6eb1446fa80993681a[…]ytekit-onnx-tensorflow/flytekitplugins/onnxtensorflow/schema.py
in the associated test def test_tf_onnx():
here
(https://github.com/flyteorg/flytekit/blob/430795d9ee4aa48957554a6eb1446fa80993681a/plugins/flytekit-onnx-tensorflow/tests/test_onnx_tf.py#L49-L75
the model promise object is is annotated in output from task def train_and_predict()....
Annotated[
TensorFlow2ONNX,
TensorFlow2ONNXConfig(
input_signature=(tf.TensorSpec((None, 224, 224, 3), tf.float32, name="input"),), opset=13
),
],
which is then subsequently passed into downstream task def onnx_predict()...
, but the parameter is annotated as the expected type ONNXFile
. This is similar to what im trying to do here with TFRecordFile
and TFRecordsDirectory
(which ive annoated in upstream task) and then need to handle conversion to tf.data.TFRecordDataset
when it is passed into new task, as per the design suggested by @Niels Bantilan in the last code block here https://github.com/flyteorg/flytekit/pull/1240#issuecomment-1292623521Samhita Alla
ONNXFile
is a FlyteFile for which a TypeTransformer has already been defined. Hence, it won’t default to pickle. What is TFRecordDatasetV2
? Can you tell me the type of it?Ryan Nazareth
11/25/2022, 12:54 PMTFRecordDatasetV2
is from tensorflow.python.data.ops.readers
. - so i would need to register this in the transformer i have defined along with TFRecordFile
but can multiple types be registered in same transformer via Union like below ?
TENSORFLOW_FORMAT = "TensorFlowRecord"
def __init__(self):
super().__init__(name="TensorFlow Record", t=Union[TFRecordFile, TFRecordDatasetV2])
Samhita Alla
TFRecordDatasetV2
? Can’t you use TFRecordFile
or TFRecordDirectory
?Ryan Nazareth
11/25/2022, 1:15 PMTFRecordDatasetV2
constuctor with filenames (maybe i misunderstood), as was discussed in https://github.com/flyteorg/flytekit/pull/1240#issuecomment-1293507708Samhita Alla
TFRecordDatasetV2
to denote that the value is tf.data.TFRecordDataset
, right? The reason I used two separate types in ONNX Flyte type is because to_literal
returns a file and it made more sense to use FlyteFile
as the type when the value is consumed in a task. Moreover, the to_literal
type needed annotated metadata while the other, to_python_value
doesn’t require any metadata cause it’s just a file.
So what’s the type of TypeRecordDatasetV2
? In ONNX, it was a file, but here, it isn’t case. I didn’t add any additional type transformer for the file cause it already exists. I just added a type: ONNXFile
, similar to TFRecordFile
.Ryan Nazareth
11/25/2022, 3:36 PMto_python_value
.
TFRecordDatasetV2
inherits from DatasetV2
, part of tensorflow dataset api which is wrapper and container for storing collections (in our case tensorflow record files). I don’t think it falls into any of the existing supported types in flytekit/types which i could use or create custom types from.
Do you mean what does type(TFRecordDatasetV2)
return ? I think its ABCMeta
but can check.TFRecordFile
(which is FlyteFile
and supported) and also include metatdata (TFRecordDatasetConfig
) which user would provide to instantiate TFRecordDataset
, if that makes sense ?Samhita Alla
to_literal
), you’d say the type to be TFRecordFile
and while accepting it as an input (to_python_value
), you’d say Annotated[TFRecordFile, TFRecordDatasetConfig(…)]
. Let me know what you think. This should simplify implementing the transformer.to_python_value
and to_literal
, respectively. Since input will have metadata, that’s where you need to use the user-provided metadata.Ryan Nazareth
11/28/2022, 5:37 AMtf.data.Dataset
needs to be supported here, as I can see there is another issue created for supporting this as another transformer https://github.com/flyteorg/flyte/issues/3038 ?Samhita Alla
tf.train.Example
should be good for now. 🙂Ryan Nazareth
11/28/2022, 11:59 PMAnnotated[TFRecordFile, TFRecordDatasetConfig(…)]
and Annotated[TFRecordDirectory, TFRecordDatasetConfig(…)]
! I’ve added more tests as well and they all pass (ci build failures unrelated to this PR) ! Ive pushed latest changes https://github.com/flyteorg/flytekit/pull/1240/filesSamhita Alla
Ryan Nazareth
11/29/2022, 9:45 PM