square-agency-59624
09/15/2025, 7:42 PMancient-wolf-19325
09/15/2025, 7:42 PMsquare-agency-59624
09/15/2025, 7:59 PMancient-wolf-19325
09/15/2025, 7:59 PMsquare-agency-59624
09/15/2025, 8:18 PMancient-wolf-19325
09/15/2025, 8:18 PMBaseSensor and implementing the poke method to check your messaging queue. The sensor can be used as a task in your workflow. Hereβs a minimal example for a custom sensor that polls a queue (youβll need to fill in the actual queue logic):
from flytekit.sensor.base_sensor import BaseSensor
class QueueSensor(BaseSensor):
def __init__(self, name: str, config=None, **kwargs):
super().__init__(name=name, sensor_config=config, **kwargs)
async def poke(self, queue_name: str) -> bool:
# Replace this with your queue polling logic
# For example, check if there are messages in the queue
# Return True if a message is found, False otherwise
return check_queue_for_message(queue_name)
You can then use this sensor in your workflow as shown in the sensor plugin example and file sensor example.
Would you like a more detailed, step-by-step guide or a full workflow example?
Sources:
- user container task plugins: sensor example
- Flyte 1.10: custom sensor example π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.square-agency-59624
09/15/2025, 9:23 PMancient-wolf-19325
09/15/2025, 9:23 PM