damp-lion-88352
08/01/2023, 1:18 AMclass AsyncAgentExecutorMixin:
"""
This mixin class is used to run the agent task locally, and it's only used for local execution.
Task should inherit from this class if the task can be run in the agent.
"""
def execute(self, **kwargs) -> typing.Any:
from unittest.mock import MagicMock
from flytekit.tools.translator import get_serializable
entity = typing.cast(PythonTask, self)
m: OrderedDict = OrderedDict()
dummy_context = MagicMock(spec=grpc.ServicerContext)
cp_entity = get_serializable(m, settings=SerializationSettings(ImageConfig()), entity=entity)
agent = AgentRegistry.get_agent(dummy_context, cp_entity.template.type)
if agent is None:
raise Exception("Cannot run the task locally, please mock.")
literals = {}
ctx = FlyteContext.current_context()
for k, v in kwargs.items():
literals[k] = TypeEngine.to_literal(ctx, v, type(v), entity.interface.inputs[k].type)
inputs = LiteralMap(literals) if literals else None
output_prefix = ctx.file_access.get_random_local_directory()
cp_entity = get_serializable(m, settings=SerializationSettings(ImageConfig()), entity=entity)
res = agent.create(dummy_context, output_prefix, cp_entity.template, inputs)
state = RUNNING
metadata = res.resource_meta
progress = Progress(transient=True)
task = progress.add_task(f"[cyan]Running Task {entity.name}...", total=None)
with progress:
while not is_terminal_state(state):
progress.start_task(task)
time.sleep(1)
res = agent.get(dummy_context, metadata)
state = res.resource.state
<http://logger.info|logger.info>(f"Task state: {state}")
if state != SUCCEEDED:
raise Exception(f"Failed to run the task {entity.name}")
return LiteralMap.from_flyte_idl(res.resource.outputs)
Does it calls by the code below when I use the example code in bigquery agent ?
bigquery_task_templatized_query = BigQueryTask(
name="sql.bigquery.w_io",
# Define inputs as well as their types that can be used to customize the query.
inputs=kwtypes(version=int),
output_structured_dataset_type=DogeCoinDataset,
task_config=BigQueryConfig(ProjectID="aesthetic-way-394507"),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
)
bigquery_task_templatized_query(version=version)
To clarify, I am asking "bigquery_task_templatized_query(version=version)" will call the "execute()" function in agen_base.py
Thanks a lotdamp-lion-88352
08/01/2023, 2:40 AMrefined-doctor-1380
08/01/2023, 3:26 AMflyte_entity_call_handler
define the work mode(work environment) of the workflow, then execute the workflow.