millions-judge-96324
02/28/2024, 10:59 PMWorkflow[flytesnacks:development:flyte_tmp.pre_built_container.my_workflow] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration
pod doesn't even launch. I am on flyte 1.10.2. All other examples work fine. Any idea what could be going wrong?millions-judge-96324
02/29/2024, 3:27 PMmillions-judge-96324
02/29/2024, 3:42 PMimport typing
from datetime import timedelta
from time import sleep
from flytekit import TaskMetadata, task, workflow
from flytekit.extend import Interface, PythonTask, context_manager
class WaitForObjectStoreFile(PythonTask):
"""
Add documentation here for your plugin.
This plugin creates an object store file sensor that waits and exits only when the file exists.
"""
_VAR_NAME: str = "path"
def __init__(
self,
name: str,
poll_interval: timedelta = timedelta(seconds=10),
**kwargs,
):
super(WaitForObjectStoreFile, self).__init__(
task_type="python-task",
name=name,
task_config=None,
interface=Interface(inputs={self._VAR_NAME: str}, outputs={self._VAR_NAME: str}),
**kwargs,
)
self._poll_interval = poll_interval
def execute(self, **kwargs) -> typing.Any:
# No need to check for existence, as that is guaranteed.
path = kwargs[self._VAR_NAME]
ctx = context_manager.FlyteContext.current_context()
user_context = ctx.user_space_params
while True:
user_context.logging.info(f"Sensing file in path {path}...")
if ctx.file_access.exists(path):
user_context.logging.info(f"file in path {path} exists!")
return path
user_context.logging.warning(f"file in path {path} does not exists!")
sleep(self._poll_interval.seconds)
sensor = WaitForObjectStoreFile(
name="my-objectstore-sensor",
metadata=TaskMetadata(retries=10, timeout=timedelta(minutes=20)),
poll_interval=timedelta(seconds=1),
)
@task
def print_file(path: str) -> str:
print(path)
return path
@workflow
def my_workflow(path: str) -> str:
return print_file(path=sensor(path=path))
if __name__ == "__main__":
f = "/tmp/some-file"
with open(f, "w") as w:
w.write("Hello World!")
print(my_workflow(path=f))
millions-judge-96324
02/29/2024, 3:43 PMpyflyte run --remote pre_built_container.py my_workflow --path some-file
millions-judge-96324
02/29/2024, 3:43 PM