abundant-judge-84756
04/10/2024, 1:37 PMdamp-lion-88352
04/10/2024, 1:58 PMdamp-lion-88352
04/10/2024, 1:59 PMdamp-lion-88352
04/10/2024, 1:59 PMdamp-lion-88352
04/10/2024, 1:59 PMdamp-lion-88352
04/10/2024, 1:59 PMdamp-lion-88352
04/10/2024, 2:01 PMdamp-lion-88352
04/10/2024, 2:03 PMdamp-lion-88352
04/10/2024, 2:03 PMabundant-judge-84756
04/10/2024, 2:07 PMfreezing-airport-6809
freezing-airport-6809
abundant-judge-84756
04/10/2024, 2:15 PMglamorous-carpet-83516
04/11/2024, 9:29 PMabundant-judge-84756
04/12/2024, 7:55 AMAsyncAgentExecutorMixin
only needed if the agent server isn't running? If I wanted my development environment to be as close to production as possible, I could run a CustomAPIAgent server (eg. the instructions here) and I don't need to implement CustomAPIAgentTask(AsyncAgentExecutorMixin)? How do I link tasks to agents in that scenario and call the create/get methods - are there some code examples of tasks/workflows that use agents in a production-like environment?damp-lion-88352
04/12/2024, 8:06 AMdo I link tasks to agents in that scenario and call the create/get methods - are there some code examples of tasks/workflows that use agents in a production-like environment?https://github.com/flyteorg/flytekit/tree/master/plugins/flytekit-bigquery/flytekitplugins/bigquery You have to implement
agent.py
and task.py
, so that bigquery agent can get the metadata it needs to execute query job.damp-lion-88352
04/12/2024, 8:11 AMagent_service.py
is for remote execution.
base_agent.py
is for local execution.
https://github.com/flyteorg/flytekit/tree/master/flytekit/extend/backend
You can see the lifecycle of executing agent tasks by reading base_agent.py
https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L277abundant-judge-84756
04/12/2024, 8:17 AMAsyncAgentExecutorMixin
class say:
This mixin class is used to run the async task locally, and it's only used for local execution.
Task should inherit from this class if the task can be run in the agent.But if I understand correctly, a
task
implementation that inherits from this class is still needed in production, it just executes differently?damp-lion-88352
04/12/2024, 8:19 AMAsyncAgentExecutorMixin
.damp-lion-88352
04/12/2024, 8:20 AMAsyncAgentBase
's method.
https://github.com/flyteorg/flytekit/blob/master/flytekit/extend/backend/base_agent.py#L127-L175damp-lion-88352
04/12/2024, 8:23 AMclass ChatGPTTask(PythonTask):
"""
This is the simplest form of a ChatGPT Task, you can define the model and the input you want.
"""
_TASK_TYPE = "chatgpt"
def __init__(self, name: str, openai_organization: str, chatgpt_config: Dict[str, Any], **kwargs):
"""
Args:
name: Name of this task, should be unique in the project
openai_organization: OpenAI Organization. String can be found here. <https://platform.openai.com/docs/api-reference/organization-optional>
chatgpt_config: ChatGPT job configuration. Config structure can be found here. <https://platform.openai.com/docs/api-reference/completions/create>
"""
if "model" not in chatgpt_config:
raise ValueError("The 'model' configuration variable is required in chatgpt_config")
task_config = {"openai_organization": openai_organization, "chatgpt_config": chatgpt_config}
inputs = {"message": str}
outputs = {"o0": str}
super().__init__(
task_type=self._TASK_TYPE,
name=name,
task_config=task_config,
interface=Interface(inputs=inputs, outputs=outputs),
**kwargs,
)
def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
return {
"openai_organization": self.task_config["openai_organization"],
"chatgpt_config": self.task_config["chatgpt_config"],
}
this is my chatgpt/task.py
, it can work remotely.damp-lion-88352
04/12/2024, 8:24 AMSyncAgentExecutorMixin
and it still works.
https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-openai/flytekitplugins/openai/chatgpt/task.pyabundant-judge-84756
04/12/2024, 8:24 AMdamp-lion-88352
04/12/2024, 8:25 AMBut if I understand correctly, aFor production, if there's just remote execution, you will don't need to inheritimplementation that inherits from this class is still needed in production, it just executes differently?task
AgentExecutorMixin
, but you need to face the tradeoff, which is you can't execute locally.damp-lion-88352
04/12/2024, 8:26 AMpyflyte run task.py xxx
remote: pyflyte run --remote task.py xxx
damp-lion-88352
04/12/2024, 8:26 AMabundant-judge-84756
04/12/2024, 4:34 PM# agent.py
class MyCustomAPIAgent(AsyncAgentBase):
def __init__(self):
super().__init__(task_type_name="custom_api", metadata_type=CustomAPIMetadata)
....
# task.py
class MyCustomAPITask(AsyncAgentExecutorMixin, PythonTask):
_TASK_TYPE = "custom_api"
....
#workflows.py
my_example_task = MyCustomAPITask(name="my_example_task")
abundant-judge-84756
04/12/2024, 4:34 PMpyflyte run workflows.py my_example_task
(eg. this example) but get an error:
SYSTEM:AgentNotFound: error=Cannot find agent for task category: custom_api_v0
So I'm not sure if there might be an extra step needed? Or if this method only works for existing flyte plugins π€
2. I tried seeing if I can follow these instructions to run the agent alongside the flyte demo cluster, but hit a few confusion points:
β’ pyflyte serve agent
doesn't work unless I do an additional pip install prometheus-client grpcio-health-checking
β’ I wasn't sure if flyte-single-binary-local.yaml
is an existing file I should be modifying, or a new file I should be creating?
β’ The instructions mention make compile
, suggesting an existing Makefile. Where is this located, if I'm running the `flytectl`demo cluster on MacOS from a brew installation?
β’ The note at the bottom of the page mentions "You must build an image that includes the plugin for the task". Should this be one of the main walkthrough steps, or is this only needed in some situations?
Thanks, lots of info there - I won't be working on this again til Monday πabundant-judge-84756
04/12/2024, 4:45 PMMyCustomAPIAgent
(which included the AgentRegister step) into workflows.py
, even though it's not used, to ensure the agent is registered at the point of execution π Still working on the demo cluster execution, but I have a basic local test workflow now πdamp-lion-88352
04/15/2024, 2:30 PMmake compile
needs to be done under the flyte
directory: https://github.com/flyteorg/flyte
for more information, you can see here: https://docs.flyte.org/en/latest/community/contribute.html#how-to-setup-dev-environment-for-flyteidl-f[β¦]-flytepropeller-datacatalog-and-flytestdlib
β’ in sandbox mode, you need to build an agent image to use it, in dev mode, you don't need, and the docs you mentioned is in the dev modeabundant-judge-84756
04/15/2024, 3:52 PMWorkflow[flytesnacksdevelopment.flytegen.my_custom_task] failed. RuntimeExecutionError: max number of system retry attempts [11/10] exhausted. Last known status message: failed at Node[mycustomtaskname]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configurationDoes something jump out as a step I might have missed? I've tried supplying an additional
--image localhost:30000/flyteagent:example
flag to the CLI command but get the same error.damp-lion-88352
04/16/2024, 4:19 PMkubectl rollout restart deployment flyte-sandbox -n flyte
to restart your sandbox after configured your new image?abundant-judge-84756
04/17/2024, 8:02 AM[BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration
error each time