I wrote the following TypeTransformer for Light GB...
# ask-the-community
s
I wrote the following TypeTransformer for Light GBM Dataset and am seeing it default to pickle on registering the tasks and workflow that use a Light GBM Dataset. Would any be able to help point me in the right direction to troubleshoot this?
Copy code
class LGBMDatasetTransformer(extend.TypeTransformer[lgb.Dataset]):
    _TYPE_INFO = flytekit.BlobType(
        format="binary", dimensionality=flytekit.BlobType.BlobDimensionality.MULTIPART
    )

    def __init__(self) -> None:
        super(LGBMDatasetTransformer, self).__init__(name="lgbdataset-transform", t=lgb.Dataset)

    def get_literal_type(self, t: Type[lgb.Dataset]) -> flytekit.LiteralType:
        return flytekit.LiteralType(blob=self._TYPE_INFO)

    def to_literal(
        self,
        ctx: flytekit.FlyteContext,
        python_val: lgb.Dataset,
        python_type: Type[lgb.Dataset],
        expected: flytekit.LiteralType,
    ) -> flytekit.Literal:
        """
        Convert from the given python type object ``lgb.Dataset`` to the Literal representation.
        """
        # Step 1: Upload all the data into a remote place recommended by Flyte

        local_file = ctx.file_access.get_random_local_path() + ".data"
        lgb.Dataset.save_binary(python_val, local_file)

        remote_dir = ctx.file_access.get_random_remote_directory()
        ctx.file_access.upload_directory(local_file, remote_dir)
        # Step 2: Return a pointer to this remote_dir in the form of a Literal
        return flytekit.Literal(
            scalar=flytekit.Scalar(
                blob=flytekit.Blob(
                    uri=remote_dir, metadata=flytekit.BlobMetadata(type=self._TYPE_INFO)
                )
            )
        )

    def to_python_value(
        self,
        ctx: flytekit.FlyteContext,
        lv: flytekit.Literal,
        expected_python_type: Type[lgb.Dataset],
    ) -> lgb.Dataset:
        """
        In this method, we want to be able to re-hydrate the custom object from Flyte Literal value.
        """
        # Step 1: Download remote data locally
        local_dir = ctx.file_access.get_random_local_directory()
        ctx.file_access.download_directory(lv.scalar.blob.uri, local_dir)
        # Step 2: Create the ``lgb.Dataset`` object
        return lgb.Dataset.construct(lgb.Dataset(local_dir))


extend.TypeEngine.register(LGBMDatasetTransformer())
k
Does the registration get called
Is this a separate library
So if this is a separate library then you have to use the namespace loading in flytekit
r
Hey @Ketan (kumare3) this is currently in our source tree (in the same file as some of our tasks), so not a separate library. Do you have any examples of defining a TypeTransformer alongside tasks/workflows that aren't in a separate library?
k
It should work
The important thing is, the register needs to be invoked
r
Serialization/registration of the tasks/workflows is succeeding, but we're encountering the following error:
Copy code
[3/3] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/app/thm/flyte/workflows/model_training/lgbm_model_test.image.py.binary.runfiles/prod_flytekit/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/app/thm/flyte/workflows/model_training/lgbm_model_test.image.py.binary.runfiles/prod_flytekit/flytekit/core/base_task.py", line 473, in dispatch_execute
        native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
      File "/app/thm/flyte/workflows/model_training/lgbm_model_test.image.py.binary.runfiles/prod_flytekit/flytekit/core/type_engine.py", line 798, in literal_map_to_kwargs
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
      File "/app/thm/flyte/workflows/model_training/lgbm_model_test.image.py.binary.runfiles/prod_flytekit/flytekit/core/type_engine.py", line 798, in <dictcomp>
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
      File "/app/thm/flyte/workflows/model_training/lgbm_model_test.image.py.binary.runfiles/prod_flytekit/flytekit/core/type_engine.py", line 762, in to_python_value
        return transformer.to_python_value(ctx, lv, expected_python_type)
      File "/app/thm/flyte/workflows/model_training/lgbm_model_test.image.py.binary.runfiles/prod_flytekit/flytekit/types/pickle/pickle.py", line 59, in to_python_value
        with open(uri, "rb") as infile:

Message:

    [Errno 2] No such file or directory: '/tmp/flyte-ln4wnwph/raw/7ca592ea206ad1fc4fc3c279ac764d95/815803b1b0c20031e2708f676b4bc107'
How can we guarantee that register is properly invoked? (ie. what's the correct place to call
TypeEngine.register(LGBMDatasetTransformer())
)
k
could you try register at the top of the workflow file?
Copy code
import flytekit
...
TypeEngine.register(LGBMDatasetTransformer())

@workflow
def wf():
  ...
Mind sharing your entire code, I can help debug it.
r
Hey @Kevin Su we can put together a self-contained repro of the bug and share it with you next week. Appreciate the offer!
k
great, thank you
k
Here is my guess, if you are running a task in a different file and the transformer is in a different file it will not work at runtime, as we minimize module load
This is why when transformer is external lib, we force load it at flytekit init
r
Hmm...would it be better for us to package the type transformers separately and then depend on them in our monorepo?
I think that's the core of the issue here, that given we're developing in a monorepo it's not clear where to initialize the transformer/ensure the module is loaded
k
You might not have to, you can always import the transformer in you top level init file
Or register it there
Also for namespace packages check out point 7 https://github.com/flyteorg/flytekit/blob/master/plugins/README.md
132 Views