Hi, community, I am a huge fan of agent services ...
# ask-the-community
l
Hi, community, I am a huge fan of agent services in flyte. I want to support for the agent service feature and here's my question. In agent_base.py (in flytekit)
Copy code
class AsyncAgentExecutorMixin:
    """
    This mixin class is used to run the agent task locally, and it's only used for local execution.
    Task should inherit from this class if the task can be run in the agent.
    """

    def execute(self, **kwargs) -> typing.Any:
        from unittest.mock import MagicMock

        from flytekit.tools.translator import get_serializable

        entity = typing.cast(PythonTask, self)
        m: OrderedDict = OrderedDict()
        dummy_context = MagicMock(spec=grpc.ServicerContext)
        cp_entity = get_serializable(m, settings=SerializationSettings(ImageConfig()), entity=entity)
        agent = AgentRegistry.get_agent(dummy_context, cp_entity.template.type)

        if agent is None:
            raise Exception("Cannot run the task locally, please mock.")
        literals = {}
        ctx = FlyteContext.current_context()
        for k, v in kwargs.items():
            literals[k] = TypeEngine.to_literal(ctx, v, type(v), entity.interface.inputs[k].type)
        inputs = LiteralMap(literals) if literals else None
        output_prefix = ctx.file_access.get_random_local_directory()
        cp_entity = get_serializable(m, settings=SerializationSettings(ImageConfig()), entity=entity)
        res = agent.create(dummy_context, output_prefix, cp_entity.template, inputs)
        state = RUNNING
        metadata = res.resource_meta
        progress = Progress(transient=True)
        task = progress.add_task(f"[cyan]Running Task {entity.name}...", total=None)
        with progress:
            while not is_terminal_state(state):
                progress.start_task(task)
                time.sleep(1)
                res = agent.get(dummy_context, metadata)
                state = res.resource.state
                <http://logger.info|logger.info>(f"Task state: {state}")

        if state != SUCCEEDED:
            raise Exception(f"Failed to run the task {entity.name}")

        return LiteralMap.from_flyte_idl(res.resource.outputs)
Does it calls by the code below when I use the example code in bigquery agent ?
Copy code
bigquery_task_templatized_query = BigQueryTask(
    name="sql.bigquery.w_io",
    # Define inputs as well as their types that can be used to customize the query.
    inputs=kwtypes(version=int),
    output_structured_dataset_type=DogeCoinDataset,
    task_config=BigQueryConfig(ProjectID="aesthetic-way-394507"),
    query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
)


bigquery_task_templatized_query(version=version)
To clarify, I am asking "bigquery_task_templatized_query(version=version)" will call the "execute()" function in agen_base.py Thanks a lot
i solved it by myself, now I am researching how flyte_entity_call_handler works
h
flyte_entity_call_handler
define the work mode(work environment) of the workflow, then execute the workflow.