square-agency-59624
09/18/2025, 5:55 PMancient-wolf-19325
09/18/2025, 5:55 PMsquare-agency-59624
09/18/2025, 6:01 PMancient-wolf-19325
09/18/2025, 6:01 PMfrom datetime import timedelta
from flytekit import TaskMetadata, task, workflow
from flytekit.extend import Interface, PythonTask, context_manager
class WaitForObjectStoreFile(PythonTask):
_VAR_NAME: str = "path"
def __init__(self, name: str, poll_interval: timedelta = timedelta(seconds=10), **kwargs):
super().__init__(
task_type="object-store-sensor",
name=name,
task_config=None,
interface=Interface(inputs={self._VAR_NAME: str}, outputs={"result": str}),
**kwargs,
)
self._poll_interval = poll_interval
def execute(self, **kwargs):
path = kwargs[self._VAR_NAME]
ctx = context_manager.FlyteContext.current_context()
user_context = ctx.user_space_params
# Simulate sensing logic
if "trigger_a" in path:
return {"result": "A"}
else:
return {"result": "B"}
sensor = WaitForObjectStoreFile(
name="my-objectstore-sensor",
metadata=TaskMetadata(retries=3, timeout=timedelta(minutes=10)),
poll_interval=timedelta(seconds=1),
)
Workflows with Conditional Branching:
from flytekit import task, workflow
@task
def workflow_a():
return "Workflow A triggered"
@task
def workflow_b():
return "Workflow B triggered"
@workflow
def main_wf(path: str) -> str:
sensor_result = sensor(path=path)
if sensor_result["result"] == "A":
return workflow_a()
else:
return workflow_b()
This setup allows the sensor to determine which downstream workflow to trigger based on its output. You can run main_wf(path="trigger_a_file") or main_wf(path="other_file") to see different branches.
Sources:
- user container task plugin example
- Flytesnacks user container example 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.square-agency-59624
09/18/2025, 6:03 PMancient-wolf-19325
09/18/2025, 6:03 PMsquare-agency-59624
09/18/2025, 6:10 PMfreezing-airport-6809
freezing-airport-6809
freezing-airport-6809