Cyrus
05/03/2023, 5:00 PMKevin Su
05/03/2023, 5:04 PMGOOGLE_APPLICATION_CREDENTIALS
in propellerCyrus
05/03/2023, 5:29 PMKevin Su
05/03/2023, 5:41 PMGOOGLE_APPLICATION_CREDENTIALS
. just need to set IAM for propeller.karthikraj
05/03/2023, 6:03 PMGOOGLE_APPLICATION_CREDENTIALS
should be a path to credentials.json file.
Where do we need to maintain this file in Flyte environment?Kevin Su
05/03/2023, 6:05 PMkarthikraj
05/03/2023, 6:26 PMWorkflow[flytesnacks:development:workflows.bigquery_test.no_io_wf] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [xxx-go-asia-data-analytics:.ffd4bcdd3b02241998f0-n0-0], caused by: googleapi: Error 404: Not found: Job xxx-go-asia-data-analytics:ffd4bcdd3b02241998f0-n0-0, notFound
This is the code.
try:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
bigquery_task_no_io = BigQueryTask(
name="sql.bigquery.no_io",
inputs={},
query_template="SELECT * FROM `xxx-go-asia-data-analytics.yyyyyyyyy.table-name` LIMIT 10;",
task_config=BigQueryConfig(ProjectID="xxx-go-asia-data-analytics"),
)
@workflow
def no_io_wf():
return bigquery_task_no_io()
[xxx-xx-asia-data-analytics:asia-southeast1.f46318ceb47a1428cad4-n0-0]
-> This is ProjectID.Location.<ExecutionId of flyte>
execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [xxx-xx-asia-data-analytics:asia-southeast1.f46318ceb47a1428cad4-n0-0], caused by: googleapi: Error 404: Not found: Job xxx-xx-asia-data-analytics:f46318ceb47a1428cad4-n0-0, notFound
@Kevin SuKevin Su
05/18/2023, 5:55 PMkarthikraj
05/21/2023, 4:22 AMGOOGLE_APPLICATION_CREDENTIALS
E0601 16:29:04.631057 1 workers.go:102] error syncing 'flytesnacks-development/f72ec61ca7acf430e9ef': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.f72ec61ca7acf430e9ef-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:f72ec61ca7acf430e9ef-n0-0, notFound
I dont have admin privileges , so couldn't see the jobs in console, but i am able to access through python api.
Here is what i am trying.
• I am trying to retrieve the jobs using below code
import google.cloud.bigquery as bq
client = bq.Client.from_service_account_json("./credentials.json")
location = "asia-southeast1"
job_id = "f72ec61ca7acf430e9ef-n0-0"
job = client.get_job(job_id, location=location)
print(f"{job.location}:{job.job_id}")
print(f"Type: {job.job_type}")
print(f"State: {job.state}")
print(f"Created: {job.created.isoformat()}")
Output:
asia-southeast1:f72ec61ca7acf430e9ef-n0-0
Type: query
State: DONE
Created: 2023-06-01T16:28:54.349000+00:00
If you compare the error and code, I have given exactly same job id and location as input with same credentials.
The state says DONE in the py output which indicates the query is executed. through py api, i am able to retrieve it.Kevin Su
06/01/2023, 9:54 PMhbo-go-asia-data-analytics
? project name?karthikraj
06/01/2023, 9:57 PMKevin Su
06/02/2023, 10:08 PMPrafulla Mahindrakar
06/04/2023, 9:29 PMbigquery_task_no_io = BigQueryTask(
name="sql.bigquery.no_io",
inputs={},
query_template="SELECT 1",
task_config=BigQueryConfig(ProjectID="projectID", Location="US"),
)
The one that didn’t work was
bigquery_task_no_io = BigQueryTask(
name="sql.bigquery.no_io",
inputs={},
query_template="SELECT 1",
task_config=BigQueryConfig(ProjectID="projectID", Location="us-east-1"),
)
Where us-east-1 is default location of the projectIDkarthikraj
06/05/2023, 3:55 AMPrafulla Mahindrakar
06/08/2023, 10:56 AMkarthikraj
06/08/2023, 1:36 PMPrafulla Mahindrakar
06/12/2023, 8:31 PMkarthikraj
06/13/2023, 5:01 AME0613 04:56:16.355225 1 workers.go:102] error syncing 'flytesnacks-development/f9aa547d5533a411f844': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [kkr-project-july:.f9aa547d5533a411f844-n0-0], caused by: googleapi: Error 404: Not found: Job kkr-project-july:f9aa547d5533a411f844-n0-0, notFound
{"json":{"exec_id":"f3aaafe9d7b8c887b000","ns":"marketing-ds-development","routine":"worker-12"},"level":"warning","msg":"Workflow namespace[marketing-ds-development]/name[f3aaafe9d7b8c887b000] has already been terminated.","ts":"2023-06-13T04:56:32Z"}
Prafulla Mahindrakar
06/13/2023, 5:11 AMKevin Su
06/13/2023, 6:21 PMPrafulla Mahindrakar
06/14/2023, 10:45 PMkarthikraj
06/20/2023, 5:59 PMKevin Su
06/20/2023, 7:27 PMGOOGLE_APPLICATION_CREDENTIALS
to your local environment variable.import pandas as pd
from flytekit import kwtypes, StructuredDataset, workflow, task
from flytekitplugins.bigquery import BigQueryTask, BigQueryConfig
bigquery_doge_coin = BigQueryTask(
name=f"bigquery.doge_coin",
inputs=kwtypes(version=int),
# query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
query_template="SELECT 1;",
output_structured_dataset_type=StructuredDataset,
task_config=BigQueryConfig(ProjectID="flyte-test-340607") <- change project ID
)
@task
def t1(sd: StructuredDataset):
print(sd.open(pd.DataFrame).all())
@workflow
def wf():
sd = bigquery_doge_coin(version=1)
bigquery_doge_coin(version=1)
t1(sd=sd)
if __name__ == '__main__':
wf()
karthikraj
06/20/2023, 8:08 PMpip3 list | grep flyte
flyteidl 1.5.11
flytekit 1.7.0
flytekitplugins-bigquery 1.7.0
Plugin is executing the query, I can see an entry in BigQuery console everytime i run the workflow.
But Flyte task is remaining in Queued state. I tried all possible ways that i know to debug this.
SELECT 1 without giving any location parameter is working fine, but not when querying the table.Kevin Su
06/20/2023, 8:18 PMselect * from transactions where name==bra limit 10;
karthikraj
06/20/2023, 8:27 PMKevin Su
06/20/2023, 8:34 PMkarthikraj
06/21/2023, 12:26 AMKevin Su
06/21/2023, 12:30 AMkarthikraj
06/21/2023, 12:34 AMKevin Su
06/21/2023, 12:37 AMI tried to debug locally without including --remote.I see, that might be the issue. could you try to add location to flytekit, and test it again? If it works, we should add location to flyteplugins (golang) as well.
karthikraj
06/21/2023, 2:29 AMFailed with Unknown Exception <class 'AssertionError'> Reason: Encountered error while executing workflow 'workflows.bigquery_test_kkr.no_io_wf':
Error encountered while executing 'no_io_wf':
Length difference 0 1
Kevin Su
06/21/2023, 2:35 AMkarthikraj
06/21/2023, 2:37 AMtry:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated
import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
bigquery_task_no_io = BigQueryTask(
name="sql.bigquery.no_io",
inputs={},
query_template="SELECT * FROM `kkr_dataset.crypto_dogecoin_transactions` LIMIT 10;",
task_config=BigQueryConfig(ProjectID="kkr-project-july", Location="asia-southeast1"),
)
@workflow
def no_io_wf():
return bigquery_task_no_io()
Kevin Su
06/21/2023, 4:44 AMpip install git+<https://github.com/flyteorg/flytekit@540c704fcd2ba165a76d691a4f077e553c902273>
karthikraj
06/26/2023, 2:39 AMKevin Su
06/26/2023, 2:39 AMkarthikraj
06/27/2023, 2:17 AMKevin Su
06/27/2023, 3:58 AMFrank Shen
06/27/2023, 5:06 PMflytekit==1.7.0
flytekitplugins-snowflake==1.7.0
flytekitplugins-spark==1.7.0
flytekitplugins-polars==1.7.0
How do I pick up the change you just made?flytekitplugins-bigquery==1.7.0 <- 0 or ?
Kevin Su
06/27/2023, 5:12 PMFrank Shen
06/27/2023, 5:15 PMKevin Su
06/27/2023, 5:16 PMFrank Shen
06/27/2023, 5:18 PMflytekitplugins-bigquery==1.7.0
Kevin Su
06/27/2023, 5:18 PMFrank Shen
06/27/2023, 5:21 PMflytekit==1.7.1b0
Kevin Su
06/27/2023, 5:23 PMFrank Shen
06/27/2023, 5:23 PMKevin Su
06/27/2023, 5:48 PMpip install git+<https://github.com/flyteorg/flytekit@master>
karthikraj
06/27/2023, 6:07 PMKevin Su
06/27/2023, 6:09 PMFrank Shen
06/27/2023, 6:33 PMpip install git+<https://github.com/flyteorg/flytekit@master>
, will it replace all 4 below?
flytekit==1.7.0
flytekitplugins-snowflake==1.7.0
flytekitplugins-spark==1.7.0
flytekitplugins-polars==1.7.0
Kevin Su
06/27/2023, 6:36 PMpip install git+<https://github.com/flyteorg/flytekit@>"master#egg=flytekitplugins-bigquery&subdirectory=plugins/flytekit-bigquery"
Frank Shen
06/27/2023, 6:49 PMKevin Su
06/27/2023, 9:01 PMFrank Shen
06/27/2023, 9:09 PMgit+<https://github.com/flyteorg/flytekit@releases/tag/v1.7.1b1>
flytekitplugins-snowflake==1.7.0
flytekitplugins-spark==1.7.0
flytekitplugins-polars==1.7.0
git+<https://github.com/flyteorg/flytekit@releases/tag/v1.7.1b1>
flytekitplugins-snowflake==1.7.1b1
flytekitplugins-spark==1.7.1b1
flytekitplugins-polars==1.7.1b1
Kevin Su
06/27/2023, 9:12 PMflytekit==1.7.1b1
flytekitplugins-snowflake==1.7.1b1
flytekitplugins-spark==1.7.1b1
flytekitplugins-polars==1.7.1b1
Frank Shen
06/27/2023, 9:14 PMWaiting for a runner to pick up this job...
Kevin Su
06/27/2023, 10:00 PMFrank Shen
06/27/2023, 10:27 PMkarthikraj
06/29/2023, 5:27 PMKevin Su
06/29/2023, 5:55 PMFrank Shen
06/29/2023, 6:43 PMkarthikraj
06/29/2023, 8:28 PMYee
karthikraj
06/29/2023, 10:17 PMYee
karthikraj
07/06/2023, 6:25 PMYee
karthikraj
07/10/2023, 9:35 PMYee
karthikraj
07/10/2023, 9:47 PMpip3 list | grep flytekit
flytekit 1.7.1b1
flytekitplugins-bigquery 1.7.0
flytekitplugins-mlflow 1.6.2
Eduardo Apolinario (eapolinario)
07/10/2023, 9:56 PMflytekitplugins-bigquery==1.7.1b1
karthikraj
07/10/2023, 10:19 PME0710 22:17:18.918807 1 workers.go:102] error syncing 'flytesnacks-development/al8s5rx4t5fs7v8thwkg': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.al8s5rx4t5fs7v8thwkg-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:al8s5rx4t5fs7v8thwkg-n0-0, notFound
@Frank ShenFrank Shen
07/10/2023, 10:25 PMbigquery_task_1 = BigQueryTask(
name="sql.bigquery.test_frank",
inputs={},
query_template="SELECT * FROM evergent_aggregated.subs_daily LIMIT 2;",
task_config=BigQueryConfig(ProjectID="hbo-go-asia-data-analytics", Location="asia-southeast1"),
output_structured_dataset_type=pd.DataFrame
)
And the code got pass without above error when running locally.
I have the following packages locally:
flyteidl 1.5.12
flytekit 1.7.1b1
flytekitplugins-bigquery 1.7.1b1
google-api-core 2.11.1
google-auth 2.21.0
google-auth-oauthlib 1.0.0
google-cloud-bigquery 3.11.3
google-cloud-bigquery-storage 2.22.0
google-cloud-core 2.3.2
google-cloud-storage 2.10.0
Yee
Kevin Su
07/11/2023, 1:15 AMTried installing this version. All I am getting is
this is the same error you ran into before, right? you are able to create a job, but failed to get the job status?Copy codeE0710 22:17:18.918807 1 workers.go:102] error syncing 'flytesnacks-development/al8s5rx4t5fs7v8thwkg': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.al8s5rx4t5fs7v8thwkg-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:al8s5rx4t5fs7v8thwkg-n0-0, notFound
Frank Shen
07/11/2023, 4:09 PMYee
Kevin Su
07/11/2023, 4:11 PMFrank Shen
07/11/2023, 4:28 PMKevin Su
07/11/2023, 4:29 PMFrank Shen
07/11/2023, 5:23 PMKevin Su
07/11/2023, 5:24 PMFrank Shen
07/11/2023, 5:40 PM(env_flyte_1_7) ➜ dai-mle-workflow git:(feature/git-actions-flyte) ✗ pyflyte run ml_project_1/test_bigquery.py wf
dt country business_unit_code payment_method account_id order_id last_pay_day ... new_deleted_status cancel_subs paid_subs new_subs ended_subs user_type subscriber_status
0 2020-11-29 HK NOWE Operator Billing 386234080 1384111902 None ... None None None None None None active_subs
1 2020-11-29 HK NOWE Operator Billing 284808126 716821474 None ... None None None None None None active_subs
Kevin Su
07/11/2023, 5:41 PMFrank Shen
07/11/2023, 5:42 PMKevin Su
07/11/2023, 5:43 PMpackage main
import (
"context"
"fmt"
"<http://google.golang.org/api/bigquery/v2|google.golang.org/api/bigquery/v2>"
"<http://google.golang.org/api/option|google.golang.org/api/option>"
)
func main() {
client, err := newBigQueryClient(context.Background())
if err != nil {
panic(err)
}
job, err := client.Jobs.Get("flyte-test-340607", "14d65dc3-6c6a-461d-9a39-c866d53c8907").Location("US").Do()
if err != nil {
panic(err)
}
fmt.Printf("job status %v\n", job.Status.State)
}
func newBigQueryClient(ctx context.Context) (*bigquery.Service, error) {
options := []option.ClientOption{
option.WithScopes("<https://www.googleapis.com/auth/bigquery>"),
option.WithUserAgent(fmt.Sprintf("%s/%s", "flytepropeller", "LATEST")),
}
return bigquery.NewService(ctx, options...)
}
(base) ➜ test_bq git:(databricks-bug) ✗ ls
test test.go
(base) ➜ test_bq git:(databricks-bug) ✗ go build test.go
(base) ➜ test_bq git:(databricks-bug) ✗ go run .
job status DONE
(base) ➜ test_bq git:(databricks-bug) ✗
karthikraj
07/11/2023, 5:59 PM]$ ls
test.go
]$ go build test.go
test.go:6:2: no required module provides package <http://google.golang.org/api/bigquery/v2|google.golang.org/api/bigquery/v2>: go.mod file not found in current directory or any parent directory; see 'go help modules'
test.go:7:2: no required module provides package <http://google.golang.org/api/option|google.golang.org/api/option>: go.mod file not found in current directory or any parent directory; see 'go help modules'
Kevin Su
07/11/2023, 6:20 PMkarthikraj
07/11/2023, 6:35 PM]$ go run .
job status DONE
job, err := client.Jobs.Get("hbo-go-asia-data-analytics", "al8s5rx4t5fs7v8thwkg-n0-0").Location("asia-southeast1").Do()
Kevin Su
07/12/2023, 3:50 AMkarthikraj
07/12/2023, 2:16 PM]$ kubectl logs flyteagent-695c856b98-svhps -n flyte
Starting the agent service...
If I start running workflows, i think it is still using go plugin, i could see the same behavior. The workflow is queueing and not running.
What am i missingFrank Shen
07/13/2023, 6:54 PMKevin Su
07/13/2023, 6:56 PMkarthikraj
07/13/2023, 7:00 PMenabled_plugins.yaml: |
agent-service:
defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
endpointForTaskTypes:
- custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
supportedTaskTypes:
- custom_task
tasks:
task-plugins:
default-for-task-types:
bigquery_query_job_task: agent-service
container: container
container_array: k8s-array
custom_task: agent-service
ray: ray
sidecar: sidecar
snowflake: snowflake
spark: spark
enabled-plugins:
- container
- sidecar
- k8s-array
- snowflake
- spark
- ray
- bigquery
- agent-service
Kevin Su
07/13/2023, 7:02 PMthe workflow is queening and not running
Is there any error in the propeller
karthikraj
07/13/2023, 7:05 PMenabled_plugins.yaml: |
agent-service:
defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
endpointForTaskTypes:
- custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
supportedTaskTypes:
- custom_task
- bigquery_query_job_task
tasks:
task-plugins:
default-for-task-types:
bigquery_query_job_task: agent-service
container: container
container_array: k8s-array
custom_task: agent-service
ray: ray
sidecar: sidecar
snowflake: snowflake
spark: spark
enabled-plugins:
- container
- sidecar
- k8s-array
- snowflake
- spark
- ray
- bigquery
- agent-service
Kevin Su
07/13/2023, 7:24 PMkarthikraj
07/13/2023, 7:24 PMKevin Su
07/13/2023, 7:24 PMkarthikraj
07/13/2023, 7:26 PM{"json":{"exec_id":"fe1daf082b3d34e39827","ns":"flytesnacks-development","res_ver":"303417783","routine":"worker-4","wf":"flytesnacks:development:workflows.bigquery_test.no_io_wf"},"level":"error","msg":"Error when trying to reconcile workflow. Error [failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.fe1daf082b3d34e39827-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:fe1daf082b3d34e39827-n0-0, notFound]. Error Type[*errors.NodeErrorWithCause]","ts":"2023-07-13T19:17:51Z"}
E0713 19:17:51.508148 1 workers.go:102] error syncing 'flytesnacks-development/fe1daf082b3d34e39827': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.fe1daf082b3d34e39827-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:fe1daf082b3d34e39827-n0-0, notFound
Kevin Su
07/13/2023, 7:30 PMenable-plugins
Frank Shen
07/13/2023, 8:18 PMkarthikraj
07/13/2023, 8:20 PM{
"json": {
"exec_id": "arxfl7s76ns8nd9mxch7",
"ns": "flytesnacks-development",
"res_ver": "303447643",
"routine": "worker-30",
"wf": "flytesnacks:development:workflows.bigquery_test.no_io_wf"
},
"level": "error",
"msg": "Error when trying to reconcile workflow. Error [failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration]. Error Type[*errors.NodeErrorWithCause]",
"ts": "2023-07-13T20:04:47Z"
}
Kevin Su
07/13/2023, 8:30 PMkarthikraj
07/13/2023, 9:15 PMFrank Shen
07/13/2023, 10:29 PMKevin Su
07/14/2023, 2:57 PMflyte-propeller-config
?
it should look something like:
...
plugins:
agent-service:
defaultGrpcEndpoint: flyteagent.flyte.svc.cluster.local:8000
supportedTaskTypes:
- bigquery_query_job_task
enabled_plugins.yaml: |
agent-service:
defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
endpointForTaskTypes:
- custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
should be
enabled_plugins.yaml: |
plugins:
agent-service:
defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
endpointForTaskTypes:
- custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
karthikraj
07/14/2023, 3:00 PMtasks:
task-plugins:
enabled-plugins:
- agent-service
default-for-task-types:
- bigquery_query_job_task: agent-service
- custom_task: agent-service
k8s:
plugins:
agent-service:
defaultGrpcEndpoint: flyteagent.flyte.svc.cluster.local:8000
supportedTaskTypes:
- bigquery_query_job_task
One thing i am seeing here is the logs are showing empty . Do i need to make any additional configuration?Kevin Su
07/18/2023, 7:21 PMDo you think this document needs to be updated as below or is it only me who didnt know about configuring it correctly?yes, PR is welcome
credentials within this Imagewere you trying to set GOOGLE_APPLICATION_CREDENTIALS in the dockerfile
karthikraj
07/18/2023, 8:16 PMKevin Su
07/18/2023, 8:39 PMapiVersion: apps/v1
kind: Deployment
metadata:
name: flyteagent
namespace: flyte
labels:
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte
<http://helm.sh/chart|helm.sh/chart>: flyte-v0.1.10
<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
spec:
replicas: 1
selector:
matchLabels:
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte
template:
metadata:
annotations:
labels:
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte
<http://helm.sh/chart|helm.sh/chart>: flyte-v0.1.10
<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
spec:
containers:
- command:
- pyflyte
- serve
image: "<http://ghcr.io/flyteorg/flyteagent:1.6.2b1|ghcr.io/flyteorg/flyteagent:1.6.2b1>"
volumeMounts:
- mountPath: /var/secrets/google
name: google-cloud-key
imagePullPolicy: "IfNotPresent"
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/flyte-test-credentials.json
name: flyteagent
ports:
- containerPort: 8000
name: agent-grpc
resources:
limits:
cpu: 500m
ephemeral-storage: 100Mi
memory: 500Mi
requests:
cpu: 10m
ephemeral-storage: 50Mi
memory: 50Mi
volumes:
- name: google-cloud-key
secret:
defaultMode: 420
secretName: gcp-key
serviceAccountName: flyteagent
karthikraj
07/18/2023, 9:44 PMapiVersion: apps/v1
kind: Deployment
metadata:
annotations:
<http://deployment.kubernetes.io/revision|deployment.kubernetes.io/revision>: "1"
<http://meta.helm.sh/release-name|meta.helm.sh/release-name>: max-ml-flyte
<http://meta.helm.sh/release-namespace|meta.helm.sh/release-namespace>: flyte
creationTimestamp: "2023-07-14T19:43:35Z"
generation: 1
labels:
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: max-ml-flyte
<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
<http://helm.sh/chart|helm.sh/chart>: flyte-core-v0.1.10
name: flyteagent
namespace: flyte
resourceVersion: "304315314"
uid: 526ab0fb-3c09-4510-a5c4-821d591e0837
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: max-ml-flyte
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
<http://app.kubernetes.io/instance|app.kubernetes.io/instance>: max-ml-flyte
<http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
<http://helm.sh/chart|helm.sh/chart>: flyte-core-v0.1.10
spec:
containers:
- command:
- pyflyte
- serve
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /secrets/credentials.json
image: <http://613630599026.dkr.ecr.us-east-1.amazonaws.com/max-ml-flyte-agent-dev:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/max-ml-flyte-agent-dev:latest>
imagePullPolicy: IfNotPresent
name: flyteagent
ports:
- containerPort: 8000
name: agent-grpc
protocol: TCP
resources:
limits:
cpu: 500m
ephemeral-storage: 100Mi
memory: 500Mi
requests:
cpu: 10m
ephemeral-storage: 50Mi
memory: 50Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /secrets
name: service-account-credentials
readOnly: true
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: flyteagent
serviceAccountName: flyteagent
terminationGracePeriodSeconds: 30
volumes:
- name: service-account-credentials
secret:
defaultMode: 420
secretName: bigquery-secret
status:
availableReplicas: 1
conditions:
- lastTransitionTime: "2023-07-14T19:43:37Z"
lastUpdateTime: "2023-07-14T19:43:37Z"
message: Deployment has minimum availability.
reason: MinimumReplicasAvailable
status: "True"
type: Available
- lastTransitionTime: "2023-07-14T19:43:35Z"
lastUpdateTime: "2023-07-14T19:43:37Z"
message: ReplicaSet "flyteagent-66f4c86b" has successfully progressed.
reason: NewReplicaSetAvailable
status: "True"
type: Progressing
observedGeneration: 1
readyReplicas: 1
replicas: 1
updatedReplicas: 1
Kevin Su
07/18/2023, 9:47 PMkarthikraj
07/18/2023, 9:48 PMKevin Su
07/18/2023, 9:51 PMFrank Shen
07/18/2023, 10:02 PMfrom flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
@task
def load_input_data(dt: str) -> pd.DataFrame:
get_input_data = BigQueryTask(
...,
task_config=BigQueryConfig(ProjectID="ddbd-data-warehouse-prod", Location="asia-southeast1"),
...
)
return get_input_data()
I need to include flytekitplugins-bigquery==1.7.1b1 in my custom image’s req file.
Also in my Dockerfile, I have to include:
# copy bigquery_credentials.json to image
COPY bigquery_credentials.json /root/bigquery_credentials.json
# set GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json as env variable for the flyte bigquery agent to use
ENV GOOGLE_APPLICATION_CREDENTIALS=/root/bigquery_credentials.json
They I submit the job via command:
pyflyte register -p examples --image <http://613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-1_7:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-1_7:latest> dai_mle_apac_churn/inference/inference_wf.py
and it works.
However, if I remove the credentials above from the Dockerfile, the bigquery task will fail for
File "/opt/venv/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
return wrapped(*args, **kwargs)
File "/root/dai_mle_apac_churn/train/load_input_data.py", line 902, in load_input_data
return get_input_data()
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 304, in __call__
return flyte_entity_call_handler(self, *args, **kwargs) # type: ignore
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/promise.py", line 1022, in flyte_entity_call_handler
result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs)
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 285, in local_execute
outputs_literal_map = self.sandbox_execute(ctx, input_literal_map)
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 351, in sandbox_execute
return self.dispatch_execute(ctx, input_literal_map)
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 540, in dispatch_execute
raise e
File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 537, in dispatch_execute
native_outputs = self.execute(**native_inputs)
File "/opt/venv/lib/python3.9/site-packages/flytekit/extend/backend/base_agent.py", line 150, in execute
res = agent.create(dummy_context, output_prefix, cp_entity.template, inputs)
File "/opt/venv/lib/python3.9/site-packages/flytekitplugins/bigquery/agent.py", line 73, in create
client = bigquery.Client(project=project, location=location)
File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery/client.py", line 245, in __init__
super(Client, self).__init__(
File "/opt/venv/lib/python3.9/site-packages/google/cloud/client/__init__.py", line 321, in __init__
Client.__init__(
File "/opt/venv/lib/python3.9/site-packages/google/cloud/client/__init__.py", line 178, in __init__
credentials, _ = google.auth.default(scopes=scopes)
File "/opt/venv/lib/python3.9/site-packages/google/auth/_default.py", line 692, in default
raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
Message:
Your default credentials were not found. To set up Application Default Credentials, see <https://cloud.google.com/docs/authentication/external/set-up-adc> for more information.
User error.
Kevin Su
07/18/2023, 10:09 PMbigquery_task = BigQueryTask(
name="flytekit.demo.bigquery_task.query",
inputs=kwtypes(ds=str),
task_config=BigQueryConfig(
ProjectID="Flyte", Location="Asia", QueryJobConfig=QueryJobConfig(allow_large_results=True)
),
query_template=query_template,
output_structured_dataset_type=StructuredDataset,
)
Frank Shen
07/18/2023, 10:15 PMimport pandas as pd
from flytekit import task
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
@task
def load_input_data(dt: str) -> pd.DataFrame:
formatting_dict = {"dt": dt}
sql_str = """
multiple sql statements with temp tables, joins and over 900 lines long with parameters like {dt} then return select * from the last temp table
""".format(**formatting_dict)
print(sql_str)
get_input_data = BigQueryTask(
name=f"bigquery.train.input_data.{dt}",
# query_template="SELECT * FROM evergent_aggregated.subs_daily LIMIT 2;",
query_template=sql_str,
task_config=BigQueryConfig(ProjectID="ddbd-data-warehouse-prod", Location="asia-southeast1"),
inputs={},
# inputs=kwtypes(version=int),
output_structured_dataset_type=pd.DataFrame
)
return get_input_data()
Kevin Su
07/18/2023, 10:18 PMFrank Shen
07/19/2023, 12:32 AMget_input_data = BigQueryTask(
name=f"bigquery.apac.inference.input_data.{dt}",
# query_template="SELECT * FROM evergent_aggregated.subs_daily LIMIT 2;",
query_template=sql_str,
task_config=BigQueryConfig(ProjectID="ddbd-data-warehouse-prod", Location="asia-southeast1"),
inputs={},
# inputs=kwtypes(version=int),
output_structured_dataset_type=StructuredDataset
)
And in the downstream task I did:
@task
def get_input_data_by_country(sd: StructuredDataset, country: str) -> pd.DataFrame:
df = sd.open(pd.DataFrame).all()
The workflow works locally but still failed in flyte remote with the same error as above when I removed the credentials from the custom image.karthikraj
07/19/2023, 10:16 PMget_input_data = BigQueryTask(
name=f"bigquery.apac.inference.input_data",
inputs=kwtypes(dt=str),
# inputs={},
query_template=inf_input_sql_template,
task_config=BigQueryConfig(ProjectID="xxxxxxxxxxxxxxxxxxxxxx", Location="asia-southeast1"),
output_structured_dataset_type=StructuredDataset
)
@task
def read_data(sd: StructuredDataset) -> None:
df = sd.open(pd.DataFrame).all()
print(f'inference input data from bigquery df.shape: {df.shape}')
print(df.head())
@workflow
def wf(
team: str = "zzzzzzzzz",
project: str = "yyyyyyy",
version: str = "1",
# dt: str = datetime.now().strftime("%Y-%m-%d"),
dt: str = "2023-07-18"
) -> None:
sd= get_input_data(dt=dt)
read_data(sd=sd)
Error:
File "/root/dai_mle_apac_churn/inference/inference_wf.py", line 25, in read_data
df = sd.open(pd.DataFrame).all()
File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 105, in all
return flyte_dataset_transformer.open_as(ctx, self.literal, self._dataframe_type, self.metadata)
File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 791, in open_as
result = decoder.decode(ctx, sd, updated_metadata)
File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/bigquery.py", line 82, in decode
return _read_from_bq(flyte_value, current_task_metadata)
File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/bigquery.py", line 36, in _read_from_bq
client = bigquery_storage.BigQueryReadClient()
File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py", line 481, in __init__
self._transport = Transport(
File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", line 151, in __init__
super().__init__(
File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py", line 102, in __init__
credentials, _ = google.auth.default(
File "/opt/venv/lib/python3.9/site-packages/google/auth/_default.py", line 692, in default
raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
Frank Shen
07/19/2023, 10:43 PMKevin Su
07/19/2023, 10:52 PMkarthikraj
07/19/2023, 10:52 PMKevin Su
07/19/2023, 10:56 PMos.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/etc/secrets/gcp/bigquery"
@task(container_image=image, disable_deck=False, secret_requests=[Secret(group="gcp", key="bigquery")])
def t1(size: int) -> typing.List[typing.Any]:
...
karthikraj
07/19/2023, 10:57 PMwhen you add the secret in the @task, flyte will dynamically mount the secret to the pod
Meaning will it mount from the secret available in the namespace flytesnacks-development or from the propeller mount?Kevin Su
07/19/2023, 10:59 PMSo do we need to create secrets in all the namespace?yes, if you use K8s secrets. you can also use AWS Secret Manager or Vault Agent Injector, if you use these secret manager, you don’t need to create the secret in every namespace.
karthikraj
07/19/2023, 11:01 PMFrank Shen
07/19/2023, 11:24 PMKevin Su
07/19/2023, 11:40 PMFrank Shen
07/20/2023, 12:09 AMkarthikraj
07/20/2023, 12:17 AMKevin Su
07/20/2023, 12:24 AMkarthikraj
07/20/2023, 10:18 PMarn:aws:secretsmanager:us-east-1:1234567890:secret:flyte-bigquery-creds-lnC41z
2) Added a webhook config as given in the document, and I could see the webhook config as below in propeller config map.
core.yaml:
----
manager:
pod-application: flytepropeller
pod-template-container-name: flytepropeller
pod-template-name: flytepropeller-template
propeller:
downstream-eval-duration: 30s
enable-admin-launcher: true
gc-interval: 12h
kube-client-config:
burst: 25
qps: 100
timeout: 30s
leader-election:
enabled: true
lease-duration: 15s
lock-config-map:
name: propeller-leader
namespace: flyte
renew-deadline: 10s
retry-period: 2s
limit-namespace: all
max-workflow-retries: 50
metadata-prefix: metadata/propeller
metrics-prefix: flyte
prof-port: 10254
queue:
batch-size: -1
batching-interval: 2s
queue:
base-delay: 5s
capacity: 1000
max-delay: 120s
rate: 100
type: maxof
sub-queue:
capacity: 1000
rate: 100
type: bucket
type: batch
rawoutput-prefix: <s3://dev-xx-yyy-zz-flyte-us-east-1/>
workers: 40
workflow-reeval-duration: 30s
webhook:
certDir: /etc/webhook/certs
nolan: oppenheimer
resources:
limits:
cpu: 200m
memory: 500Mi
requests:
cpu: 200m
memory: 500Mi
secretManagerType: AWS
serviceName: flyte-pod-webhook
sidecarImage: <http://docker.io/amazon/aws-secrets-manager-secret-sidecar:v0.1.4|docker.io/amazon/aws-secrets-manager-secret-sidecar:v0.1.4>
I am kind a stuck here. Now, Where I should mention the aws secret name or arn? Where it is going to mount as file?