https://flyte.org logo
#ask-the-community
Title
# ask-the-community
s

Sundeep K

02/28/2024, 10:59 PM
Hi Community, I am trying out user container task plugins example from Flyte docs https://docs.flyte.org/en/latest/user_guide/extending/user_container_task_plugins.html. This works fine in local testing but, when I try it out on flyte remote (Flyte demo) it fails. This is the error:
Copy code
Workflow[flytesnacks:development:flyte_tmp.pre_built_container.my_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration
pod doesn't even launch. I am on flyte 1.10.2. All other examples work fine. Any idea what could be going wrong?
This seems to be a bug. Just checking to see if anyone has insights
import Copy code
import typing
from datetime import timedelta
from time import sleep

from flytekit import TaskMetadata, task, workflow
from flytekit.extend import Interface, PythonTask, context_manager

class WaitForObjectStoreFile(PythonTask):
    """
    Add documentation here for your plugin.
    This plugin creates an object store file sensor that waits and exits only when the file exists.
    """

    _VAR_NAME: str = "path"

    def __init__(
        self,
        name: str,
        poll_interval: timedelta = timedelta(seconds=10),
        **kwargs,
    ):
        super(WaitForObjectStoreFile, self).__init__(
            task_type="python-task",
            name=name,
            task_config=None,
            interface=Interface(inputs={self._VAR_NAME: str}, outputs={self._VAR_NAME: str}),
            **kwargs,
        )
        self._poll_interval = poll_interval

    def execute(self, **kwargs) -> typing.Any:
        # No need to check for existence, as that is guaranteed.
        path = kwargs[self._VAR_NAME]
        ctx = context_manager.FlyteContext.current_context()
        user_context = ctx.user_space_params
        while True:
            user_context.logging.info(f"Sensing file in path {path}...")
            if ctx.file_access.exists(path):
                user_context.logging.info(f"file in path {path} exists!")
                return path
            user_context.logging.warning(f"file in path {path} does not exists!")
            sleep(self._poll_interval.seconds)


sensor = WaitForObjectStoreFile(
    name="my-objectstore-sensor",
    metadata=TaskMetadata(retries=10, timeout=timedelta(minutes=20)),
    poll_interval=timedelta(seconds=1),
)


@task
def print_file(path: str) -> str:
    print(path)
    return path


@workflow
def my_workflow(path: str) -> str:
    return print_file(path=sensor(path=path))


if __name__ == "__main__":
    f = "/tmp/some-file"
    with open(f, "w") as w:
        w.write("Hello World!")

    print(my_workflow(path=f))
pyflyte Copy code
pyflyte run --remote  pre_built_container.py my_workflow --path some-file
above is the code ai m using and command to launch
10 Views