Hi everyone! I’m trying Flyte to develop a custom agent. I am trying to create a dummy agent. I trie...
f
Hi everyone! I’m trying Flyte to develop a custom agent. I am trying to create a dummy agent. I tries to test the demo agent using the guide. however i do not understand how can i do step 4 of the guide. What am i missing? Here is the code of the sample agent
Copy code
from flytekit.extend.backend.base_agent import AsyncAgentBase, Resource, ResourceMeta
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from typing import Optional

class MyAgentMetadata(ResourceMeta):
    def __init__(self, job_id: str):
        self.job_id = job_id

class MyAsyncAgent(AsyncAgentBase):
    def __init__(self):
        super().__init__(task_type_name="my_async_task", metadata_type=MyAgentMetadata)

    def create(self, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None, **kwargs) -> MyAgentMetadata:
        # Simulate a job creation
        job_id = "12345"  # Simulated job ID
        return MyAgentMetadata(job_id=job_id)

    def get(self, resource_meta: MyAgentMetadata, **kwargs) -> Resource:
        # Simulate checking the job status
        status = "completed"  # Simulated status
        if status == "completed":
            return Resource(phase="SUCCEEDED", outputs={"result": "Job completed successfully"})
        else:
            return Resource(phase="RUNNING")

    def delete(self, resource_meta: MyAgentMetadata, **kwargs):
        # Simulate job cancellation
        pass

# Register the agent
from flytekit.extend.backend import AgentRegistry
AgentRegistry.register(MyAsyncAgent())
a
cc @damp-lion-88352
a
is step 4 referring to
Start the Flyte server with the single binary config file:
?
or the entire
Configuring the agent service in development mode
f
I would appreciate if you could help me to get the agent working on my development mode.
g
is step 4 referring to
Start the Flyte server with the single binary config file:
?
yes. check out to flyte repo and run
make compile
to build the binary. Then, run
flyte start …
to start the flyte server
f
Thanks for your response! I will try it out
I have another question, i am trying out cloud events using this guide. Is it possibe to try it out on my local flyte cluster? If yes, how can i apply cloud_events.yaml on my flyte admin?
d
I think its possible, you have to put your config in your yml file https://github.com/flyteorg/flyte/blob/master/flyte-single-binary-local.yaml for the related flyteadmin yaml, you can look at https://github.com/flyteorg/flyte/blob/master/flyteadmin/flyteadmin_config.yaml to see the related hierarchy dependency
f
Thanks! I tried editing the
flyte-single-binary-local.yaml
(
config-sandbox.yaml
on .flyte directory) but i was not able to find the key
cloudEvents
that i want to enable. I looked on the repository for this key and i found that it appears on ConfigMap
flyte-admin-base-config
but if i tried to look for that map on my local flyte cluster i do not find it.
a
@faint-accountant-77873
cloudevents
is a top level section in the flyteadmin config, which means you should be able to set it in your values file at the "root" level What I don't think will work is setting it in the
config-sandbox
file as that affects only client-side configuration. So in summary, you could create a new
cloudevents
section in your
$HOME/.flyte/sandbox/config.yaml
like
Copy code
cloudevents:
  ...
f
great! i added the configuration and it seems to be working. However, i am not able to see the messages on my kafka broker. I tried to see the logs on the pod but i did not find an issue with the connection. Is there a way I can see the logs if the connection to the broker is successful? I tried the following to retrieve the logs
Copy code
kubectl logs flyte-sandbox-996ff6658-nmbv4 > log.txt
a
I'm not sure if a
kafka
publisher type will work Could you share the config you're using?
f
Sure!
Copy code
cloudEvents:
  enable: true
  kafka:
    brokers: host.docker.internal:9092
  eventsPublisher:
    eventTypes:
    - all
    topicName: myTopic
  type: kafka
a
could you verify if you have this line in your config
Copy code
logger:
  show-source: true
  level: 5
also judging by this snippet and how the flyte-core chart does it I think you need to set the brokers as a list and probably specify the kafka version
f
Sure! I added the logger group and also cluster config. However, I still do not find an error on the logs related to kafka connection. What could be the issue?
Copy code
admin:
  # For GRPC endpoints you might want to use dns:///flyte.myexample.com
  endpoint: dns:///localhost:30080
  insecure: true

logger:
  show-source: true
  level: 5
Copy code
cloudEvents:
  enable: true
  version: "3.9.0"
  kafka:
    brokers:
      - host.docker.internal:9092
  eventsPublisher:
    eventTypes:
    - all
    topicName: myTopic
  type: kafka
About the agent service, I have a question i want to register a new custom agent. How can i run that
AgentRegistry.register
to serve it with pyflyte serve agent? Here is the sample code
Copy code
from flytekit.extend.backend.base_agent import AsyncAgentBase, Resource, ResourceMeta
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from typing import Optional

class CustomAgentMetadata(ResourceMeta):
    def __init__(self, job_id: str):
        self.job_id = job_id

class CustomAsyncAgent(AsyncAgentBase):
    def __init__(self):
        super().__init__(task_type_name="custom_async_task", metadata_type=CustomAgentMetadata)

    def create(self, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None, **kwargs) -> CustomAgentMetadata:
        # Simulate a job creation
        job_id = "12345"  # Simulated job ID
        return CustomAgentMetadata(job_id=job_id)

    def get(self, resource_meta: CustomAgentMetadata, **kwargs) -> Resource:
        # Simulate checking the job status
        status = "completed"  # Simulated status
        if status == "completed":
            return Resource(phase="SUCCEEDED", outputs=("result": "Job completed successfully"))
        else:
            return Resource(phase="RUNNING")

    def delete(self, resource_meta: CustomAgentMetadata, **kwargs):
        # Simulate job cancellation
        pass

# Register the agent
from flytekit.extend.backend import AgentRegistry
AgentRegistry.register(CustomAsyncAgent())
a
I think you need to get the entry_point set up correctly either in
setup.py
or
pyproject.toml
. Is this what you're looking for - https://github.com/flyteorg/flytekit/blob/ac906ca50b7631bfdf5a6689d9342f562a989663/plugins/flytekit-perian/setup.py#L37
example w/ `pyproject.toml`:
Copy code
[project.entry-points."flytekit.plugins"]
custom_agent = "foo.bar:CustomAgent"
f
Thanks! I will try it out
I am trying to load the sample plugin flytekit-perian, how can i load it to my
pyflyte serve agent
?
d
you have to install it in your environment
then just call
pyflyte serve agent
a
@damp-lion-88352 is the entry point part required?
(it's been a couple months since I did this and don't remember)
d
I think only setup.py is needed
@faint-accountant-77873 I have an agent template repo, feel free to try it out https://github.com/Future-Outlier/flyte-custom-agent-template
f
Great!! I will try it out, thanks you both!