faint-accountant-77873
02/10/2025, 7:36 PMfrom flytekit.extend.backend.base_agent import AsyncAgentBase, Resource, ResourceMeta
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from typing import Optional
class MyAgentMetadata(ResourceMeta):
def __init__(self, job_id: str):
self.job_id = job_id
class MyAsyncAgent(AsyncAgentBase):
def __init__(self):
super().__init__(task_type_name="my_async_task", metadata_type=MyAgentMetadata)
def create(self, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None, **kwargs) -> MyAgentMetadata:
# Simulate a job creation
job_id = "12345" # Simulated job ID
return MyAgentMetadata(job_id=job_id)
def get(self, resource_meta: MyAgentMetadata, **kwargs) -> Resource:
# Simulate checking the job status
status = "completed" # Simulated status
if status == "completed":
return Resource(phase="SUCCEEDED", outputs={"result": "Job completed successfully"})
else:
return Resource(phase="RUNNING")
def delete(self, resource_meta: MyAgentMetadata, **kwargs):
# Simulate job cancellation
pass
# Register the agent
from flytekit.extend.backend import AgentRegistry
AgentRegistry.register(MyAsyncAgent())
average-finland-92144
02/10/2025, 8:17 PMalert-oil-1341
02/10/2025, 8:18 PMStart the Flyte server with the single binary config file:
?alert-oil-1341
02/10/2025, 8:19 PMConfiguring the agent service in development mode
faint-accountant-77873
02/10/2025, 9:04 PMglamorous-carpet-83516
02/10/2025, 10:25 PMis step 4 referring toyes. check out to flyte repo and run?Start the Flyte server with the single binary config file:
make compile
to build the binary. Then, run flyte start …
to start the flyte serverfaint-accountant-77873
02/11/2025, 1:41 PMfaint-accountant-77873
02/11/2025, 1:43 PMdamp-lion-88352
02/11/2025, 1:48 PMfaint-accountant-77873
02/11/2025, 2:31 PMflyte-single-binary-local.yaml
(config-sandbox.yaml
on .flyte directory) but i was not able to find the key cloudEvents
that i want to enable. I looked on the repository for this key and i found that it appears on ConfigMap flyte-admin-base-config
but if i tried to look for that map on my local flyte cluster i do not find it.average-finland-92144
02/11/2025, 3:38 PMcloudevents
is a top level section in the flyteadmin config, which means you should be able to set it in your values file at the "root" level
What I don't think will work is setting it in the config-sandbox
file as that affects only client-side configuration.
So in summary, you could create a new cloudevents
section in your $HOME/.flyte/sandbox/config.yaml
like
cloudevents:
...
faint-accountant-77873
02/11/2025, 5:37 PMkubectl logs flyte-sandbox-996ff6658-nmbv4 > log.txt
average-finland-92144
02/11/2025, 6:00 PMkafka
publisher type will work
Could you share the config you're using?faint-accountant-77873
02/11/2025, 6:04 PMcloudEvents:
enable: true
kafka:
brokers: host.docker.internal:9092
eventsPublisher:
eventTypes:
- all
topicName: myTopic
type: kafka
average-finland-92144
02/11/2025, 9:08 PMlogger:
show-source: true
level: 5
average-finland-92144
02/11/2025, 9:11 PMfaint-accountant-77873
02/12/2025, 1:20 PMadmin:
# For GRPC endpoints you might want to use dns:///flyte.myexample.com
endpoint: dns:///localhost:30080
insecure: true
logger:
show-source: true
level: 5
cloudEvents:
enable: true
version: "3.9.0"
kafka:
brokers:
- host.docker.internal:9092
eventsPublisher:
eventTypes:
- all
topicName: myTopic
type: kafka
faint-accountant-77873
02/12/2025, 2:43 PMAgentRegistry.register
to serve it with pyflyte serve agent?
Here is the sample code
from flytekit.extend.backend.base_agent import AsyncAgentBase, Resource, ResourceMeta
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from typing import Optional
class CustomAgentMetadata(ResourceMeta):
def __init__(self, job_id: str):
self.job_id = job_id
class CustomAsyncAgent(AsyncAgentBase):
def __init__(self):
super().__init__(task_type_name="custom_async_task", metadata_type=CustomAgentMetadata)
def create(self, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None, **kwargs) -> CustomAgentMetadata:
# Simulate a job creation
job_id = "12345" # Simulated job ID
return CustomAgentMetadata(job_id=job_id)
def get(self, resource_meta: CustomAgentMetadata, **kwargs) -> Resource:
# Simulate checking the job status
status = "completed" # Simulated status
if status == "completed":
return Resource(phase="SUCCEEDED", outputs=("result": "Job completed successfully"))
else:
return Resource(phase="RUNNING")
def delete(self, resource_meta: CustomAgentMetadata, **kwargs):
# Simulate job cancellation
pass
# Register the agent
from flytekit.extend.backend import AgentRegistry
AgentRegistry.register(CustomAsyncAgent())
alert-oil-1341
02/12/2025, 3:08 PMsetup.py
or pyproject.toml
. Is this what you're looking for - https://github.com/flyteorg/flytekit/blob/ac906ca50b7631bfdf5a6689d9342f562a989663/plugins/flytekit-perian/setup.py#L37alert-oil-1341
02/12/2025, 3:09 PM[project.entry-points."flytekit.plugins"]
custom_agent = "foo.bar:CustomAgent"
faint-accountant-77873
02/12/2025, 3:21 PMfaint-accountant-77873
02/12/2025, 3:58 PMpyflyte serve agent
?damp-lion-88352
02/12/2025, 4:04 PMdamp-lion-88352
02/12/2025, 4:04 PMpyflyte serve agent
alert-oil-1341
02/12/2025, 4:05 PMalert-oil-1341
02/12/2025, 4:06 PMdamp-lion-88352
02/12/2025, 4:06 PMdamp-lion-88352
02/12/2025, 4:07 PMdamp-lion-88352
02/12/2025, 4:09 PMentrypoint
in setup.py
matters
https://github.com/Future-Outlier/flyte-custom-agent-template/blob/main/flytekit-bigquery/setup.py#L39faint-accountant-77873
02/12/2025, 4:12 PM