Hey guys, this is Sally. I’m trying to test/use ag...
# flyte-support
g
Hey guys, this is Sally. I’m trying to test/use agent service, I ran “pip install flytekit” on a brand new virtual environment and installed flytectl as well. However, my workflow that involves testing a “time delta” sensor (that I wrote myself) is failing with
rpc error: code = Internal desc = failed to get sensor task with error: No module named 'sensor'.
this is the stack trace:
Copy code
{"json":{"routine":"agent-service-worker-0","src":"cache.go:84"},"level":"debug","msg":"Sync loop - processing resource with cache key [f677097c1e4564916ad6-n0-0]","ts":"2024-05-07T04:12:02Z"}
{"json":{"routine":"agent-service-worker-0","src":"cache.go:113"},"level":"debug","msg":"Querying AsyncPlugin for f677097c1e4564916ad6-n0-0","ts":"2024-05-07T04:12:02Z"}
{"json":{"routine":"agent-service-worker-0","src":"cache.go:116"},"level":"info","msg":"Error retrieving resource [f677097c1e4564916ad6-n0-0]. Error: rpc error: code = Internal desc = failed to get sensor task with error: No module named 'sensor'.","ts":"2024-05-07T04:12:02Z"}
this is the workflow with sensor I’m testing with, it’s very simple:
Copy code
from flytekit import task, workflow
from flytekit.sensor.base_sensor import BaseSensor
import time


class TimeDeltaSensor(BaseSensor):
    def __init__(self, name: str, **kwargs):
        super().__init__(name=name, **kwargs)

    async def poke(self, timeDelta: int) -> bool:
        print("Before the sleep sensor")
        time.sleep(timeDelta)
        print("After the time sleep sensor")
        return True

time_delta_sensor = TimeDeltaSensor(name="Time_Delta_mySensor")


@task()
def t1():
    print("SUCCEEDED")


@workflow()
def wf():
    time_delta_sensor(timeDelta=11) >> t1()


if __name__ == "__main__":
    wf()
any idea why this is failing? any help is appreciated 🙏 cc: @glamorous-carpet-83516
👋 1
this is my dependencies:
Copy code
(venv) yaou@yaou-mn1 sensor % pip list | grep flyte
flyteidl                 1.12.0
flytekit                 1.12.0
g
are you running this workflow in flytectl demo cluster?
g
correct
g
you have to create a new python package that contains your custom sensor, and install this python package in the dockerfile.
g
hmm, I do not quite understand ^^ I want to develop a sensor that’s similar to file sensor, it seems that you can just import the class
from flytekit.sensor.file_sensor import FileSensor
and use it. What’s the reason why I cannot do that in my workflow?
g
the class
TimeDeltaSensor
doesn’t exist in the agent server, so agent can’t not import this class
agent server has installed flytekit, so it can import FileSensor
g
Ok so it sounds like I can’t just use flytectl demo cluster to try this? What is the best way to quickly run a test using my TimeDeltaSensor?
g
you can run it locally to test your custom sensor. pyflyte run without
--remote
To test it in the demo cluster, you have to build a new container image for the agent server. This image should install new plugin that contains your
TimeDeltaSensor
👀 1
🙏 1
g
Does this dockerfile build the “new” image I want? I’ll need to add a
pip install
line in here? Do I do
flytectl demo start
(which also starts the agent service) first then run the dockerfile that would “restart” the agent ?
f
cc @high-park-82026 deployment problem
g
run
flytectl demo start
first, and then replace the agent’s image with the image you built.
🙏 1
Try this
👀 1
This can help you understand it.
🙏 1
🙏 1
g
Thanks everybody! I’ll try it out tmrw, it’s getting late where I am 😴
Hey everyone, I have an urgent follow up on this. So I implemented a
flytekit-timesensor
plugin, I `pip install -e .`the whole local flytekit as well as pip installed this new plugin I developed. The logic of the timeDeltaSensor is the same as the one pasted above ^ I edited the Dockerfile.agent to look like this, pushed it and re-deployed the flyteagent:
Copy code
FROM python:3.10-slim-bookworm as agent-slim

MAINTAINER Flyte Team <users@flyte.org>
LABEL org.opencontainers.image.source=<https://github.com/flyteorg/flytekit>

RUN apt-get update && apt-get install build-essential -y

COPY ./plugins/flytekit-timesensor /root/timesensor
RUN pip install /root/timesensor

RUN pip install prometheus-client grpcio-health-checking
RUN pip install --no-cache-dir -U flytekit \
  flytekitplugins-airflow \
  flytekitplugins-bigquery \
  flytekitplugins-openai \
  flytekitplugins-snowflake \
  flytekitplugins-awssagemaker \
  && apt-get clean autoclean \
  && apt-get autoremove --yes \
  && rm -rf /var/lib/{apt,dpkg,cache,log}/ \
  && :

CMD pyflyte serve agent --port 8000
but when I tried to run a workflow that uses this timeDeltaSensor, it is giving error
rpc error: code = DeadlineExceeded desc = context deadline exceeded
. Any advice? cc: @glamorous-carpet-83516 thank you 🙏woohoo
I was able to run it locally of course, that error showed up on remote ^ I used
flytectl demo start
for the whole setup
I created a PR here (not to be checked in) to show the code and process I used : https://github.com/flyteorg/flytekit/pull/2439 Let me know if you are able to reproduce or if you know what is wrong thanks cc: @glamorous-carpet-83516
d
@gray-ram-51379
hi
do you use your gitsha?
I mean you have to do something like
g
yes?
d
Copy code
FROM python:3.9-slim-bookworm

RUN apt-get update && apt-get install build-essential git -y
RUN pip install prometheus-client grpcio-health-checking
RUN pip install --no-cache-dir -U \
	git+<https://github.com/flyteorg/flytekit.git@><gitsha>
    && apt-get clean autoclean \
    && apt-get autoremove --yes \
    && rm -rf /var/lib/{apt,dpkg,cache,log}/ \
    && :

CMD pyflyte serve agent --port 8000
git+<https://github.com/flyteorg/flytekit.git@><gitsha>
focus on this one
you have to change it to your forked repo
so maybe like
<http://github.com/sally/flytekit.git@gitsha|github.com/sally/flytekit.git@gitsha>
alias gitsha="git rev-parse HEAD";
gitsha can get like this
Copy code
FROM python:3.9-slim-bookworm

RUN apt-get update && apt-get install build-essential git -y
RUN pip install prometheus-client grpcio-health-checking
RUN pip install --no-cache-dir -U \
	git+<https://github.com/Sally-Yang-Jing-Ou/flytekit.git@a829b1893a86ecbaa64ede871de1a723ae329d3c>
    && apt-get clean autoclean \
    && apt-get autoremove --yes \
    && rm -rf /var/lib/{apt,dpkg,cache,log}/ \
    && :

CMD pyflyte serve agent --port 8000
use this one
Copy code
FROM python:3.9-slim-bookworm

RUN apt-get update && apt-get install build-essential git -y
RUN pip install prometheus-client grpcio-health-checking
RUN pip install --no-cache-dir -U \
	git+<https://github.com/Sally-Yang-Jing-Ou/flytekit.git@a829b1893a86ecbaa64ede871de1a723ae329d3c> 
	git+<https://github.com/Sally-Yang-Jing-Ou/flytekit.git@a829b1893a86ecbaa64ede871de1a723ae329d3c#subdirectory=plugins/flytekit-timesensor> \
    && apt-get clean autoclean \
    && apt-get autoremove --yes \
    && rm -rf /var/lib/{apt,dpkg,cache,log}/ \
    && :

CMD pyflyte serve agent --port 8000
Updated: this is the one you can use
c
I think it’s fairly simple. Propeller has a default timeout/deadline of 10s on requests to the flyte agent. In your example you are sleeping 11 seconds, which is why the request from propeller times out.
🙏 1
g
@clean-glass-36808 Wow i changed it to 5 seconds and it actually worked, thank you 🙏 !
d
oh ok
you can configure timeout at here
change the timeout of
Copy code
GetTask
g
If I create a sensor that requires pinging to see if data exists, that would take more than 10 seconds, I do not understand how that would work then? For example, how does the file sensor work in this case, since it might take more than 10 seconds for data/file to be available?@clean-glass-36808
d
and
Copy code
CreateTask
c
I am a Flyte noob so I don't even really know how sensors are used. I just went through the effort of setting up a flyte task/agent last week and fixing up the docs. Sorry I can't help more
🙏 1
🙏 1
g
I think it might be that I used time.sleep which is blocking instead of await asyncio.sleep(), I am going to test it out
d
I think you have to update the timeout setting in the propeller
if you need longer time to wait
👍 1
Did I misunderstand your meaning?
@gray-ram-51379
g
No, it’s all good, that makes sense, thank you, Han Ru 🙏 !
d
ok, feel free to ping me, thank you
For more details, in propeller we will create a context with timout, and use it to send request to the agent server. So if it takes 15 seconds to finish the task in the agent server, and the timeout in propeller is 10 seconds, propeller will always fail to get the response
👍 1
🙏 1