Hi all! I'm getting started with Flyte for the fir...
# ask-the-community
c
Hi all! I'm getting started with Flyte for the first time, and am trying to understand the best practice around tasks that need to make API requests. I have a workflow where I need to place a request to an internal API to request some additional processing, and continue the workflow once that processing is done (either by polling the API, or using a sensor). Is the best approach to develop a custom Sync Agent for the internal API? Or just make the API calls in the tasks directly? Is anyone able to point me to some code examples of a similar setup?
l
Sync agent might be one of best practice.
Check this series
it will help you

https://youtu.be/ah8Q5mSeikE?si=m6mR_FghCQPqgCxiβ–Ύ

This video is also great
Spotify is one of sync agent's user
This talk can help you understand how sync agent works.

https://youtu.be/IS6gi4jR7h0?si=6Ehut-asTjOIvvi2β–Ύ

this doc will help you too
c
Great, thanks very much @L godlike! I've been working through the docs and will take a look at the videos πŸš€ I'll report back if I get stuck on any specific points πŸ‘
k
@Charlie Moriarty you probably need async agent as it’s polling and waiting for something
But to start with you can simply write it as a task. The async agent is great for scaling and letting other folks reuse this thing and reduce cost too
c
Thanks @Ketan (kumare3) - very handy to know that this is still possible in tasks as a way of starting simple πŸ‘ And good point about Async agents - I didn't spot at first that these support create/get methods which does sound like the pattern we might need πŸ™
k
you can also check out the implementation of bigquery agent. it’s an async agent. https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L34-L94
c
Thanks @Kevin Su (and all)! I've got my example working using plain tasks and now I'm seeing if I can create a custom Async agent to do the API calls more efficiently. I've been working off the BigQuery example but there's a few things that are a little unclear.... is the custom Task class that implements
AsyncAgentExecutorMixin
only needed if the agent server isn't running? If I wanted my development environment to be as close to production as possible, I could run a CustomAPIAgent server (eg. the instructions here) and I don't need to implement CustomAPIAgentTask(AsyncAgentExecutorMixin)? How do I link tasks to agents in that scenario and call the create/get methods - are there some code examples of tasks/workflows that use agents in a production-like environment?
l
do I link tasks to agents in that scenario and call the create/get methods - are there some code examples of tasks/workflows that use agents in a production-like environment?
https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-bigquery/flytekitplugins/bigquery You have to implement
agent.py
and
task.py
, so that bigquery agent can get the metadata it needs to execute query job.
agent_service.py
is for remote execution.
base_agent.py
is for local execution. https://github.com/flyteorg/flytekit/tree/master/flytekit/extend/backend You can see the lifecycle of executing agent tasks by reading
base_agent.py
https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L277
c
Cool, thanks! I'll have a deeper look through the examples πŸ‘€ I got a little confused, because the docstrings for the
AsyncAgentExecutorMixin
class say:
This mixin class is used to run the async task locally, and it's only used for local execution.
Task should inherit from this class if the task can be run in the agent.
But if I understand correctly, a
task
implementation that inherits from this class is still needed in production, it just executes differently?
l
wait, you are right, it's possible to don't inherit
AsyncAgentExecutorMixin
.
for example
Copy code
class ChatGPTTask(PythonTask):
    """
    This is the simplest form of a ChatGPT Task, you can define the model and the input you want.
    """

    _TASK_TYPE = "chatgpt"

    def __init__(self, name: str, openai_organization: str, chatgpt_config: Dict[str, Any], **kwargs):
        """
        Args:
            name: Name of this task, should be unique in the project
            openai_organization: OpenAI Organization. String can be found here. <https://platform.openai.com/docs/api-reference/organization-optional>
            chatgpt_config: ChatGPT job configuration. Config structure can be found here. <https://platform.openai.com/docs/api-reference/completions/create>
        """

        if "model" not in chatgpt_config:
            raise ValueError("The 'model' configuration variable is required in chatgpt_config")

        task_config = {"openai_organization": openai_organization, "chatgpt_config": chatgpt_config}

        inputs = {"message": str}
        outputs = {"o0": str}

        super().__init__(
            task_type=self._TASK_TYPE,
            name=name,
            task_config=task_config,
            interface=Interface(inputs=inputs, outputs=outputs),
            **kwargs,
        )

    def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
        return {
            "openai_organization": self.task_config["openai_organization"],
            "chatgpt_config": self.task_config["chatgpt_config"],
        }
this is my
chatgpt/task.py
, it can work remotely.
This is the source code, you can find that I remove
SyncAgentExecutorMixin
and it still works. https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-openai/flytekitplugins/openai/chatgpt/task.py
c
Thanks!! That's a very helpful example - I'll have a look and see how I get on πŸ™‚
l
But if I understand correctly, a
task
implementation that inherits from this class is still needed in production, it just executes differently?
For production, if there's just remote execution, you will don't need to inherit
AgentExecutorMixin
, but you need to face the tradeoff, which is you can't execute locally.
local:
pyflyte run task.py xxx
remote:
pyflyte run --remote task.py xxx
thanks, feel free to ask more
c
Thanks again for all the help so far - I've worked through the examples and have tried creating a local async agent in a plugins sub-folder in my main codebase.
Copy code
# agent.py 
class MyCustomAPIAgent(AsyncAgentBase):
    def __init__(self):
        super().__init__(task_type_name="custom_api", metadata_type=CustomAPIMetadata)
    ....

# task.py
class MyCustomAPITask(AsyncAgentExecutorMixin, PythonTask):
    _TASK_TYPE = "custom_api"
    ....

#workflows.py
my_example_task = MyCustomAPITask(name="my_example_task")
My implementation is similar to the BigQuery/ChatGPT examples, and I now want to test that it's working ok - but I'm hitting a few issues from the documentation: 1. I tried seeing if I can just test the task using
pyflyte run workflows.py my_example_task
(eg. this example) but get an error:
Copy code
SYSTEM:AgentNotFound: error=Cannot find agent for task category: custom_api_v0
So I'm not sure if there might be an extra step needed? Or if this method only works for existing flyte plugins πŸ€” 2. I tried seeing if I can follow these instructions to run the agent alongside the flyte demo cluster, but hit a few confusion points: β€’
pyflyte serve agent
doesn't work unless I do an additional
pip install prometheus-client grpcio-health-checking
β€’ I wasn't sure if
flyte-single-binary-local.yaml
is an existing file I should be modifying, or a new file I should be creating? β€’ The instructions mention
make compile
, suggesting an existing Makefile. Where is this located, if I'm running the `flytectl`demo cluster on MacOS from a brew installation? β€’ The note at the bottom of the page mentions "You must build an image that includes the plugin for the task". Should this be one of the main walkthrough steps, or is this only needed in some situations? Thanks, lots of info there - I won't be working on this again til Monday πŸ˜„
Straight after posting this I solved question 1 πŸ˜‚ I needed to import
MyCustomAPIAgent
(which included the AgentRegister step) into
workflows.py
, even though it's not used, to ensure the agent is registered at the point of execution πŸ‘ Still working on the demo cluster execution, but I have a basic local test workflow now πŸš€
l
β€’ we install at here when we want to use agent in the sandbox: https://github.com/flyteorg/flytekit/blob/master/Dockerfile.agent β€’ flyte-single-binary-local.yaml is from here: https://github.com/flyteorg/flyte/blob/master/flyte-single-binary-local.yaml β€’
make compile
needs to be done under the
flyte
directory: https://github.com/flyteorg/flyte for more information, you can see here: https://docs.flyte.org/en/latest/community/contribute.html#how-to-setup-dev-environment-for-flyteidl-f[…]-flytepropeller-datacatalog-and-flytestdlib β€’ in sandbox mode, you need to build an agent image to use it, in dev mode, you don't need, and the docs you mentioned is in the dev mode
c
Thank you! I think I was adding extra confusion for myself by looking at the wrong docs page - I think what I'm actually trying to do is deploy my custom agent to the flyte sandbox. If I've now understood correctly, testing agents in a local development cluster is a different workflow, used if I want to run existing flyte agents locally. My custom agent is working nicely when I test it in a local Python environment - but I haven't been able to run it on the sandbox yet. I'm now following the instructions here, and I've completed steps 1-4 no problem. In step 5, when I try to test my agent remotely, I get the error message:
Workflow[flytesnacksdevelopment.flytegen.my_custom_task] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[mycustomtaskname]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration
Does something jump out as a step I might have missed? I've tried supplying an additional
--image localhost:30000/flyteagent:example
flag to the CLI command but get the same error.
l
Do you use
Copy code
kubectl rollout restart deployment flyte-sandbox -n flyte
to restart your sandbox after configured your new image?
c
Yep, I've run the command to restart the sandbox but I still get the error. I can confirm the restart occurs as there's a brief moment of sandbox downtime before it becomes available for remote executions again. I've also tested the Docker image I've built that includes my agent code and it prints the expected "Starting the agent service..." log when it runs. I've tried running both the pure task and a workflow that contains it on the restarted sandbox, but get the same
[BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration
error each time