https://flyte.org logo
#ask-the-community
Title
# ask-the-community
c

Cyrus

05/03/2023, 5:00 PM
what's the best way to query bigquery from aws? it seems the plugin only supports querying from GCP
k

Kevin Su

05/03/2023, 5:04 PM
you could query from anywhere. just need to add
GOOGLE_APPLICATION_CREDENTIALS
in propeller
propeller will not create a new pod for bigquery task, it send a http request to BQ instead. therefore, it needs that credentials
c

Cyrus

05/03/2023, 5:29 PM
@karthikraj
k

Kevin Su

05/03/2023, 5:41 PM
btw, if you run on gcp, you don’t need
GOOGLE_APPLICATION_CREDENTIALS
. just need to set IAM for propeller.
k

karthikraj

05/03/2023, 6:03 PM
@Kevin Su Looks like the value of
GOOGLE_APPLICATION_CREDENTIALS
should be a path to credentials.json file. Where do we need to maintain this file in Flyte environment?
k

Kevin Su

05/03/2023, 6:05 PM
add it to the k8s secret, and mount to the propeller
k

karthikraj

05/03/2023, 6:26 PM
okay.. Let me try and get back if any questions.
Hi @Kevin Su, i have mounted the credentials and ran select 1; It succeeded. If i try to query a data from one of the table, the task is getting queued and getting aborted finally with the below error. What could be the reason?
Copy code
Workflow[flytesnacks:development:workflows.bigquery_test.no_io_wf] failed. RuntimeExecutionError: max number of system retry attempts [51/50] exhausted. Last known status message: failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [xxx-go-asia-data-analytics:.ffd4bcdd3b02241998f0-n0-0], caused by: googleapi: Error 404: Not found: Job xxx-go-asia-data-analytics:ffd4bcdd3b02241998f0-n0-0, notFound
This is the code.
Copy code
try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask


bigquery_task_no_io = BigQueryTask(
    name="sql.bigquery.no_io",
    inputs={},
    query_template="SELECT * FROM `xxx-go-asia-data-analytics.yyyyyyyyy.table-name` LIMIT 10;",
    task_config=BigQueryConfig(ProjectID="xxx-go-asia-data-analytics"),
)


@workflow
def no_io_wf():
    return bigquery_task_no_io()
Is it supposed to take execution id as job id?
[xxx-xx-asia-data-analytics:asia-southeast1.f46318ceb47a1428cad4-n0-0]
-> This is ProjectID.Location.<ExecutionId of flyte>
Copy code
execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [xxx-xx-asia-data-analytics:asia-southeast1.f46318ceb47a1428cad4-n0-0], caused by: googleapi: Error 404: Not found: Job xxx-xx-asia-data-analytics:f46318ceb47a1428cad4-n0-0, notFound
@Kevin Su
k

Kevin Su

05/18/2023, 5:55 PM
seems like it can create a bigquery task, but failed to get the status. do you know the exact jobID in bigquery console?
k

karthikraj

05/21/2023, 4:22 AM
@Kevin Su The task is going into Queued state and never running. Ensured that the credentials are properly mounted and the file path is referenced in
GOOGLE_APPLICATION_CREDENTIALS
Hi @Kevin Su Let me know if you have any thoughts on this.
Hi @Kevin Su, It looks like the query is being executed, but it couldn't get the status as you said. Error from flyte logs:
Copy code
E0601 16:29:04.631057       1 workers.go:102] error syncing 'flytesnacks-development/f72ec61ca7acf430e9ef': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.f72ec61ca7acf430e9ef-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:f72ec61ca7acf430e9ef-n0-0, notFound
I dont have admin privileges , so couldn't see the jobs in console, but i am able to access through python api. Here is what i am trying. • I am trying to retrieve the jobs using below code
Copy code
import google.cloud.bigquery as bq
client = bq.Client.from_service_account_json("./credentials.json")

location = "asia-southeast1"
job_id = "f72ec61ca7acf430e9ef-n0-0"
job = client.get_job(job_id, location=location)

print(f"{job.location}:{job.job_id}")
print(f"Type: {job.job_type}")
print(f"State: {job.state}")
print(f"Created: {job.created.isoformat()}")
Output:
Copy code
asia-southeast1:f72ec61ca7acf430e9ef-n0-0
Type: query
State: DONE
Created: 2023-06-01T16:28:54.349000+00:00
If you compare the error and code, I have given exactly same job id and location as input with same credentials. The state says DONE in the py output which indicates the query is executed. through py api, i am able to retrieve it.
k

Kevin Su

06/01/2023, 9:54 PM
what is
hbo-go-asia-data-analytics
? project name?
k

karthikraj

06/01/2023, 9:57 PM
Yes , that is the project name..
k

Kevin Su

06/02/2023, 10:08 PM
cc @Prafulla Mahindrakar
@karthikraj which version of propeller are you using? I just tested latest propeller, it worked for me.
cc @Evan Sadler also, did you see this error before?
p

Prafulla Mahindrakar

06/04/2023, 9:29 PM
I faced the same issue and the issue was with the location in my bigqueryConfig and once i corrected that the bq client was able to lookup the job correctly. eg :
Copy code
bigquery_task_no_io = BigQueryTask(
    name="sql.bigquery.no_io",
    inputs={},
    query_template="SELECT 1",
    task_config=BigQueryConfig(ProjectID="projectID", Location="US"),
)
The one that didn’t work was
Copy code
bigquery_task_no_io = BigQueryTask(
    name="sql.bigquery.no_io",
    inputs={},
    query_template="SELECT 1",
    task_config=BigQueryConfig(ProjectID="projectID", Location="us-east-1"),
)
Where us-east-1 is default location of the projectID
k

karthikraj

06/05/2023, 3:55 AM
@Prafulla Mahindrakar Thank you for checking this. Can you please try querying any table from BQ?🤔 SELECT 1; query is working for me as well when not giving Location or giving US as you mentioned, but when querying the table, I am facing the issue.😞 My table info is below. For this table, hope the Location should be "asia-southeast1". And, Even the python API retrieved the job details when passing this region..
image.png
@Kevin Su I am using the propeller image -> ghcr.io/flyteorg/flytepropeller-release:v1.5.0
Hi @Prafulla Mahindrakar , Did you get a chance to try it?
p

Prafulla Mahindrakar

06/08/2023, 10:56 AM
Yes it did work for me . I used the following example where I removed the location field from the bigqueryconfig https://github.com/flyteorg/flytesnacks/blob/master/cookbook/integrations/gcp/bigquery/bigquery.py Which calls into the dogecoin transactions table
k

karthikraj

06/08/2023, 1:36 PM
@Adedeji Ayinde
p

Prafulla Mahindrakar

06/12/2023, 8:31 PM
@karthikraj did you try without passing the location and have it use the default location configured for the project. I have tried this example by passing no location info and thats whats checkedin to the latest examples, the docs are not yet updated to reflect the code unfortunately but would be great to know what happens here
k

karthikraj

06/13/2023, 5:01 AM
@Prafulla Mahindrakar I tried without the Location too, the error is same. But it created the job in BigQuery console.
Copy code
E0613 04:56:16.355225       1 workers.go:102] error syncing 'flytesnacks-development/f9aa547d5533a411f844': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [kkr-project-july:.f9aa547d5533a411f844-n0-0], caused by: googleapi: Error 404: Not found: Job kkr-project-july:f9aa547d5533a411f844-n0-0, notFound
{"json":{"exec_id":"f3aaafe9d7b8c887b000","ns":"marketing-ds-development","routine":"worker-12"},"level":"warning","msg":"Workflow namespace[marketing-ds-development]/name[f3aaafe9d7b8c887b000] has already been terminated.","ts":"2023-06-13T04:56:32Z"}
p

Prafulla Mahindrakar

06/13/2023, 5:11 AM
@Kevin Su would this be due to older versions. Do you know of any changes around bigquery since you have been working on porting it to agent service.
k

Kevin Su

06/13/2023, 6:21 PM
hmm, we didn’t add any change to bq plugins since 1.1 or 1.2.
p

Prafulla Mahindrakar

06/14/2023, 10:45 PM
@Kevin Su since new flyte release comes with flyte agents should we have users start using that instead of the inbuilt bigquery plugin .If yes then may be you can point to the docs to use those for the OSS users or it might be as simple as just upgrading to latest flyte release and enabling the agent plugins
k

karthikraj

06/20/2023, 5:59 PM
@Prafulla Mahindrakar Do we have other alternatives for this? @Frank Shen Adding you to the thread to keep you updated..
k

Kevin Su

06/20/2023, 7:27 PM
@karthikraj we add flyte agent since flyte1.7. we have a Bigquery Agent written in python, and it allows you to test BQ task without running cluster. you need to upgrade flytekit, add
GOOGLE_APPLICATION_CREDENTIALS
to your local environment variable.
you could run below example to test it.
Copy code
import pandas as pd

from flytekit import kwtypes, StructuredDataset, workflow, task
from flytekitplugins.bigquery import BigQueryTask, BigQueryConfig

bigquery_doge_coin = BigQueryTask(
    name=f"bigquery.doge_coin",
    inputs=kwtypes(version=int),
    # query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
    query_template="SELECT 1;",
    output_structured_dataset_type=StructuredDataset,
    task_config=BigQueryConfig(ProjectID="flyte-test-340607") <- change project ID
)


@task
def t1(sd: StructuredDataset):
    print(sd.open(pd.DataFrame).all())


@workflow
def wf():
    sd = bigquery_doge_coin(version=1)
    bigquery_doge_coin(version=1)
    t1(sd=sd)


if __name__ == '__main__':
    wf()
if you want to run agent in your flyte cluster, check this doc
k

karthikraj

06/20/2023, 8:08 PM
We would like to go with the BigQuery inbuilt plugin.. I have a updated flytekit in my local.
Copy code
pip3 list | grep flyte
flyteidl                     1.5.11
flytekit                     1.7.0
flytekitplugins-bigquery     1.7.0
Plugin is executing the query, I can see an entry in BigQuery console everytime i run the workflow. But Flyte task is remaining in Queued state. I tried all possible ways that i know to debug this. SELECT 1 without giving any location parameter is working fine, but not when querying the table.
k

Kevin Su

06/20/2023, 8:18 PM
you mean the task fails when run other query? like
select * from transactions where name==bra limit 10;
k

karthikraj

06/20/2023, 8:27 PM
yeah, right..
k

Kevin Su

06/20/2023, 8:34 PM
could you check your dataset location?
and maybe check the permission, make sure your have permission to access the dataset in that project
k

Kevin Su

06/21/2023, 12:30 AM
That’s my mistake, but you are using flyteplugins, not agent, right?
k

karthikraj

06/21/2023, 12:34 AM
I tried to debug locally without including --remote. It had thrown me same error and started debugging it. So, you say no such issues with go google api? only local should fail and remote should work?
i am not expert in go lang , but this line seems to me that we are not passing location to it. so, wanted to confirm with you. https://github.com/flyteorg/flyteplugins/blob/dfdf6f95aef7bebff160d6660f5c62f5832c39e4/go/tasks/plugins/webapi/bigquery/plugin.go#L223
k

Kevin Su

06/21/2023, 12:37 AM
I tried to debug locally without including --remote.
I see, that might be the issue. could you try to add location to flytekit, and test it again? If it works, we should add location to flyteplugins (golang) as well.
k

karthikraj

06/21/2023, 2:29 AM
I am getting different error when adding location, but i think this error is due to logical errors. I am not getting any google api related errors.
Copy code
Failed with Unknown Exception <class 'AssertionError'> Reason: Encountered error while executing workflow 'workflows.bigquery_test_kkr.no_io_wf':
  Error encountered while executing 'no_io_wf':
  Length difference 0 1
k

Kevin Su

06/21/2023, 2:35 AM
could you share you bigquery task code? I want to test it as well
k

karthikraj

06/21/2023, 2:37 AM
okay.. this is my workflow code. Command i am executing: pyflyte run <script_name>.py no_io_wf
Copy code
try:
    from typing import Annotated
except ImportError:
    from typing_extensions import Annotated

import pandas as pd
from flytekit import StructuredDataset, kwtypes, task, workflow
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask

bigquery_task_no_io = BigQueryTask(
    name="sql.bigquery.no_io",
    inputs={},
    query_template="SELECT * FROM `kkr_dataset.crypto_dogecoin_transactions` LIMIT 10;",
    task_config=BigQueryConfig(ProjectID="kkr-project-july", Location="asia-southeast1"),
)


@workflow
def no_io_wf():
    return bigquery_task_no_io()
k

Kevin Su

06/21/2023, 4:44 AM
I can reproduce now. fixing the bug
@karthikraj https://github.com/flyteorg/flytekit/pull/1702, could you help me test it?
Copy code
pip install git+<https://github.com/flyteorg/flytekit@540c704fcd2ba165a76d691a4f077e553c902273>
if query fails, it should show the error message.
If this addresses your issue, I’ll update flyteplugins as well.
k

karthikraj

06/26/2023, 2:39 AM
@Kevin Su This changes seems to be fixing the issue. Can you please move the changes to plugins as well.
k

Kevin Su

06/26/2023, 2:39 AM
Thanks for testing it. Will do shortly
k

karthikraj

06/27/2023, 2:17 AM
Let us know once this merged. We will start our testing. cc: @Frank Shen
k

Kevin Su

06/27/2023, 3:58 AM
Merged it
f

Frank Shen

06/27/2023, 5:06 PM
Hi @Kevin Su, When will the flyteplugins latest changes be released in pypi? I am using pypi to download dependencies. Right now I am using:
Copy code
flytekit==1.7.0
flytekitplugins-snowflake==1.7.0
flytekitplugins-spark==1.7.0
flytekitplugins-polars==1.7.0
How do I pick up the change you just made?
Do I have to add:
Copy code
flytekitplugins-bigquery==1.7.0 <- 0 or ?
k

Kevin Su

06/27/2023, 5:12 PM
wait, you are using flyteplugins in propeller, right?
f

Frank Shen

06/27/2023, 5:15 PM
@Kevin Su, yes. However, I need to include flytekit, flytekitplugins, etc. in the custom image we build. Also I need to install them in my local machine for development.
k

Kevin Su

06/27/2023, 5:16 PM
if so, you only need to update the flytepropeller.
f

Frank Shen

06/27/2023, 5:18 PM
OK, thanks. I want to confirm that for customer image and local development, I still need to include this, correct?
Copy code
flytekitplugins-bigquery==1.7.0
k

Kevin Su

06/27/2023, 5:18 PM
no, you don’t need to.
There are two ways to run bigquery tasks for now. one is to use agent in flytekit, another way is to use propeller. Both of them have some issues, I just fixed it. you aren’t using agent, so you only need to upgrade propeller.
for local development, you can upgrade to latest flytekit, so you are able to run bigquery task locally.
f

Frank Shen

06/27/2023, 5:21 PM
For local development (pyflyte run locally), how do I pick up your fix?
Copy code
flytekit==1.7.1b0
Is the latest in pypi, but it may not have the fix.
k

Kevin Su

06/27/2023, 5:23 PM
one sec, cutting a beta release.
f

Frank Shen

06/27/2023, 5:23 PM
Thanks!
k

Kevin Su

06/27/2023, 5:48 PM
sorry, waiting CI to pass. will let you know once we cut a release.
if you want to test latest flytekit locally, you can also install
Copy code
pip install git+<https://github.com/flyteorg/flytekit@master>
k

karthikraj

06/27/2023, 6:07 PM
@Kevin Su Thank you. Is this the place we need to look for the new version once you release? https://github.com/flyteorg/flytepropeller/pkgs/container/flytepropeller-release
k

Kevin Su

06/27/2023, 6:09 PM
f

Frank Shen

06/27/2023, 6:33 PM
Hi @Kevin Su, If I do
Copy code
pip install git+<https://github.com/flyteorg/flytekit@master>
, will it replace all 4 below?
Copy code
flytekit==1.7.0
flytekitplugins-snowflake==1.7.0
flytekitplugins-spark==1.7.0
flytekitplugins-polars==1.7.0
k

Kevin Su

06/27/2023, 6:36 PM
only replace flytekit
if you want to install plugin
Copy code
pip install git+<https://github.com/flyteorg/flytekit@>"master#egg=flytekitplugins-bigquery&subdirectory=plugins/flytekit-bigquery"
f

Frank Shen

06/27/2023, 6:49 PM
Great, thanks. @Kevin Su
k

Kevin Su

06/27/2023, 9:01 PM
f

Frank Shen

06/27/2023, 9:09 PM
Thanks @Kevin Su, For our customer image, will below work?
Copy code
git+<https://github.com/flyteorg/flytekit@releases/tag/v1.7.1b1>
flytekitplugins-snowflake==1.7.0
flytekitplugins-spark==1.7.0
flytekitplugins-polars==1.7.0
Or it should be like this?
Copy code
git+<https://github.com/flyteorg/flytekit@releases/tag/v1.7.1b1>
flytekitplugins-snowflake==1.7.1b1
flytekitplugins-spark==1.7.1b1
flytekitplugins-polars==1.7.1b1
k

Kevin Su

06/27/2023, 9:12 PM
Copy code
flytekit==1.7.1b1
flytekitplugins-snowflake==1.7.1b1
flytekitplugins-spark==1.7.1b1
flytekitplugins-polars==1.7.1b1
Our CI will publish pypi package soon, so you don’t need to install from git link
f

Frank Shen

06/27/2023, 9:14 PM
Thanks @Kevin Su!
@Kevin Su, do you know why the git actions is still in ?
Copy code
Waiting for a runner to pick up this job...
for 35 minutes?
k

Kevin Su

06/27/2023, 10:00 PM
Too many PRs 😂. I just stop them.
image.png
f

Frank Shen

06/27/2023, 10:27 PM
Thanks @Kevin Su! I see it in pypi now.
@Kevin Su, do you have an estimate when the flyte release will be (1.7.1)?
k

karthikraj

06/29/2023, 5:27 PM
Hello, Our stakeholders are waiting for this integration to be completed. Once you complete, please let us know if we can start running our workflows.
k

Kevin Su

06/29/2023, 5:55 PM
cc @Yee @Eduardo Apolinario (eapolinario) could we cut a release?
f

Frank Shen

06/29/2023, 6:43 PM
Thanks a lot @Kevin Su, and the Flyte team! A new Flyte release will be much appreciated.
k

karthikraj

06/29/2023, 8:28 PM
@Kevin Su Sorry for asking too many questions before you release it. At this moment, Is it possible by any chance to upgrade our propeller pods with these beta release so that our Flyte remote workflows can run with this change?
y

Yee

06/29/2023, 9:52 PM
the propeller pr is still building sorry
we’ll post once that’s done and the propeller image is out.
k

karthikraj

06/29/2023, 10:17 PM
Thank you so much 😇
Hello, Do we have any ETA on releasing the new propeller image?
y

Yee

07/06/2023, 5:01 PM
sorry this has been out for a bit. https://github.com/flyteorg/flytepropeller/releases/tag/v1.1.100 is what you’re looking for?
we’ll push out the 1.8 general flyte release today as well.
k

karthikraj

07/06/2023, 6:25 PM
I still see the same issues. Is this correct propeller image that has fix for BigQuery? image: cr.flyte.org/flyteorg/flytepropeller:v1.1.100
y

Yee

07/06/2023, 10:34 PM
sorry… completely missed this.
feel free to use a later one.
v1.1.104
what happened was the merge to master to pick that up failed because of a flaky test, so goreleaser never ran
and the latest version that i used wasn’t actually the latest version, we’re fixing that now.
k

karthikraj

07/10/2023, 9:35 PM
We have tried with the latest image v1.1.104 of propeller, and the issue seems to exists still.. Are we still waiting for the new version with the fix for this issue? @Frank Shen
y

Yee

07/10/2023, 9:39 PM
this thread is a bit old actually, are we still talking about the original problem fixed by https://github.com/flyteorg/flyteplugins/pull/365?
and what version of flytekit are you on?
k

karthikraj

07/10/2023, 9:47 PM
My local has below packages.
Copy code
pip3 list | grep flytekit
flytekit                     1.7.1b1
flytekitplugins-bigquery     1.7.0
flytekitplugins-mlflow       1.6.2
Do i need to change the versions of any other component such as flyteadmin, scheduler.etc?
e

Eduardo Apolinario (eapolinario)

07/10/2023, 9:56 PM
@karthikraj, you should use use
flytekitplugins-bigquery==1.7.1b1
k

karthikraj

07/10/2023, 10:19 PM
No luck 😞 Tried installing this version. All I am getting is
Copy code
E0710 22:17:18.918807       1 workers.go:102] error syncing 'flytesnacks-development/al8s5rx4t5fs7v8thwkg': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.al8s5rx4t5fs7v8thwkg-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:al8s5rx4t5fs7v8thwkg-n0-0, notFound
@Frank Shen
f

Frank Shen

07/10/2023, 10:25 PM
@Kevin Su, @Yee @Eduardo Apolinario (eapolinario), This is the code that caused the above error.
Copy code
bigquery_task_1 = BigQueryTask(
    name="sql.bigquery.test_frank",
    inputs={},
    query_template="SELECT * FROM evergent_aggregated.subs_daily LIMIT 2;",
    task_config=BigQueryConfig(ProjectID="hbo-go-asia-data-analytics", Location="asia-southeast1"),
    output_structured_dataset_type=pd.DataFrame

)
And the code got pass without above error when running locally. I have the following packages locally:
Copy code
flyteidl                      1.5.12
flytekit                      1.7.1b1
flytekitplugins-bigquery      1.7.1b1
google-api-core               2.11.1
google-auth                   2.21.0
google-auth-oauthlib          1.0.0
google-cloud-bigquery         3.11.3
google-cloud-bigquery-storage 2.22.0
google-cloud-core             2.3.2
google-cloud-storage          2.10.0
And locally I have bigquery_credentials.json configured.
Could it be in flyte remote, flyte is not accessing the same bigquery_credentials.json correctly?
y

Yee

07/10/2023, 11:25 PM
@Kevin Su will help out in a bit i think… let’s see if he can do any more digging on his end.
k

Kevin Su

07/11/2023, 1:15 AM
looking
Tried installing this version. All I am getting is
Copy code
E0710 22:17:18.918807       1 workers.go:102] error syncing 'flytesnacks-development/al8s5rx4t5fs7v8thwkg': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.al8s5rx4t5fs7v8thwkg-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:al8s5rx4t5fs7v8thwkg-n0-0, notFound
this is the same error you ran into before, right? you are able to create a job, but failed to get the job status?
f

Frank Shen

07/11/2023, 4:09 PM
Hi @Kevin Su, Yes, that’s error we got when runing the workflow on the flyter remote. But I don’t have the error when running locally.
y

Yee

07/11/2023, 4:10 PM
is this using the flyte agent framework kevin?
k

Kevin Su

07/11/2023, 4:11 PM
no, they are using flytepropeller to run bq task
@Frank Shen are you able to jump on a call and share screen? happy to debug together
f

Frank Shen

07/11/2023, 4:28 PM
Hi @Kevin Su, I am in a meeting. I will ping you when I am available in 30ish minutes. Will that work for you?
k

Kevin Su

07/11/2023, 4:29 PM
ok, np
f

Frank Shen

07/11/2023, 5:23 PM
Hi @Kevin Su , I can talk now.
f

Frank Shen

07/11/2023, 5:40 PM
Update, I can run bigquery locally and retrieve data with the above code.
Copy code
(env_flyte_1_7) ➜  dai-mle-workflow git:(feature/git-actions-flyte) ✗ pyflyte run ml_project_1/test_bigquery.py wf
           dt country business_unit_code    payment_method account_id    order_id last_pay_day  ...  new_deleted_status cancel_subs paid_subs new_subs ended_subs user_type subscriber_status
0  2020-11-29      HK               NOWE  Operator Billing  386234080  1384111902         None  ...                None        None      None     None       None      None       active_subs
1  2020-11-29      HK               NOWE  Operator Billing  284808126   716821474         None  ...                None        None      None     None       None      None       active_subs
f

Frank Shen

07/11/2023, 5:42 PM
@karthikraj , if you can use agent on flyte server, I believe we can run bigquery task on the server as well.
@karthikraj could you try to run this file locally.
Copy code
package main

import (
	"context"
	"fmt"
	"<http://google.golang.org/api/bigquery/v2|google.golang.org/api/bigquery/v2>"
	"<http://google.golang.org/api/option|google.golang.org/api/option>"
)

func main() {
	client, err := newBigQueryClient(context.Background())
	if err != nil {
		panic(err)
	}
	job, err := client.Jobs.Get("flyte-test-340607", "14d65dc3-6c6a-461d-9a39-c866d53c8907").Location("US").Do()
	if err != nil {
		panic(err)
	}
	fmt.Printf("job status %v\n", job.Status.State)
}

func newBigQueryClient(ctx context.Context) (*bigquery.Service, error) {
	options := []option.ClientOption{
		option.WithScopes("<https://www.googleapis.com/auth/bigquery>"),
		option.WithUserAgent(fmt.Sprintf("%s/%s", "flytepropeller", "LATEST")),
	}

	return bigquery.NewService(ctx, options...)
}
Copy code
(base) ➜  test_bq git:(databricks-bug) ✗ ls
test    test.go
(base) ➜  test_bq git:(databricks-bug) ✗ go build test.go
(base) ➜  test_bq git:(databricks-bug) ✗ go run .        
job status DONE
(base) ➜  test_bq git:(databricks-bug) ✗
btw, you need to update the project, location and jobID
k

karthikraj

07/11/2023, 5:59 PM
ok Let me try..
I have installed go and created the above file. Tried to execute the code. I am getting the below error. What am i missing @Kevin Su
Copy code
]$ ls
test.go
]$ go build test.go
test.go:6:2: no required module provides package <http://google.golang.org/api/bigquery/v2|google.golang.org/api/bigquery/v2>: go.mod file not found in current directory or any parent directory; see 'go help modules'
test.go:7:2: no required module provides package <http://google.golang.org/api/option|google.golang.org/api/option>: go.mod file not found in current directory or any parent directory; see 'go help modules'
do i need to install any package like python pip?
k

Kevin Su

07/11/2023, 6:20 PM
Try to run go mod init locally first, then build the binary
k

karthikraj

07/11/2023, 6:35 PM
okay.. I passed in our bq project, job id and location. It is able to connect and get the results.
Copy code
]$ go run .
job status DONE
Copy code
job, err := client.Jobs.Get("hbo-go-asia-data-analytics", "al8s5rx4t5fs7v8thwkg-n0-0").Location("asia-southeast1").Do()
do you get a chance to use flyte agent?
k

karthikraj

07/12/2023, 2:16 PM
@Kevin Su we are trying.. we will let you know..
@Kevin Su I have followed this page agent_service.html#build-a-new-image. I have built an image and passed to the agent deployment and redeployed the chart. I could see the agent deployment and pods running.
Copy code
]$ kubectl logs flyteagent-695c856b98-svhps -n flyte
Starting the agent service...
If I start running workflows, i think it is still using go plugin, i could see the same behavior. The workflow is queueing and not running. What am i missing
f

Frank Shen

07/13/2023, 6:54 PM
Hi @Kevin Su, @karthikraj , Is there any findings?
k

Kevin Su

07/13/2023, 6:56 PM
Did you change the plug-in config
k

karthikraj

07/13/2023, 7:00 PM
yeah, this is done too...
Copy code
enabled_plugins.yaml: |
    agent-service:
      defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
      endpointForTaskTypes:
      - custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
      supportedTaskTypes:
      - custom_task
    tasks:
      task-plugins:
        default-for-task-types:
          bigquery_query_job_task: agent-service
          container: container
          container_array: k8s-array
          custom_task: agent-service
          ray: ray
          sidecar: sidecar
          snowflake: snowflake
          spark: spark
        enabled-plugins:
        - container
        - sidecar
        - k8s-array
        - snowflake
        - spark
        - ray
        - bigquery
        - agent-service
k

Kevin Su

07/13/2023, 7:02 PM
the workflow is queening and not running
Is there any error in the propeller
You have to add bigquery_query_job_task to supportedTaskTypes
SupportTaskTypes: -bigquery_query_job_task
k

karthikraj

07/13/2023, 7:05 PM
ohhh.. got it.. let me do..
I am still getting the same error Is this looking good? This is in propeller config. Can you configm?
Copy code
enabled_plugins.yaml: |
    agent-service:
      defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
      endpointForTaskTypes:
      - custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
      supportedTaskTypes:
      - custom_task
      - bigquery_query_job_task
    tasks:
      task-plugins:
        default-for-task-types:
          bigquery_query_job_task: agent-service
          container: container
          container_array: k8s-array
          custom_task: agent-service
          ray: ray
          sidecar: sidecar
          snowflake: snowflake
          spark: spark
        enabled-plugins:
        - container
        - sidecar
        - k8s-array
        - snowflake
        - spark
        - ray
        - bigquery
        - agent-service
k

Kevin Su

07/13/2023, 7:24 PM
Lgtm, did you restart propeller after updating config
k

karthikraj

07/13/2023, 7:24 PM
yeah yeah, I am redeploying entire application everytime.
k

Kevin Su

07/13/2023, 7:24 PM
Could you share propeller’s log
k

karthikraj

07/13/2023, 7:26 PM
Copy code
{"json":{"exec_id":"fe1daf082b3d34e39827","ns":"flytesnacks-development","res_ver":"303417783","routine":"worker-4","wf":"flytesnacks:development:workflows.bigquery_test.no_io_wf"},"level":"error","msg":"Error when trying to reconcile workflow. Error [failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.fe1daf082b3d34e39827-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:fe1daf082b3d34e39827-n0-0, notFound]. Error Type[*errors.NodeErrorWithCause]","ts":"2023-07-13T19:17:51Z"}
E0713 19:17:51.508148       1 workers.go:102] error syncing 'flytesnacks-development/fe1daf082b3d34e39827': failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [bigquery]: [RuntimeFailure] failed to get job [hbo-go-asia-data-analytics:asia-southeast1.fe1daf082b3d34e39827-n0-0], caused by: googleapi: Error 404: Not found: Job hbo-go-asia-data-analytics:fe1daf082b3d34e39827-n0-0, notFound
I am getting this error if rerun the workflow
k

Kevin Su

07/13/2023, 7:30 PM
It’s not running on agent. Could you remove bigquery from the
enable-plugins
f

Frank Shen

07/13/2023, 8:18 PM
CC: @karthikraj
k

karthikraj

07/13/2023, 8:20 PM
Yeah, I was checking that.. This is the propeller error i am getting now
Copy code
{
  "json": {
    "exec_id": "arxfl7s76ns8nd9mxch7",
    "ns": "flytesnacks-development",
    "res_ver": "303447643",
    "routine": "worker-30",
    "wf": "flytesnacks:development:workflows.bigquery_test.no_io_wf"
  },
  "level": "error",
  "msg": "Error when trying to reconcile workflow. Error [failed at Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [container]: [BadTaskSpecification] invalid TaskSpecification, unable to determine Pod configuration]. Error Type[*errors.NodeErrorWithCause]",
  "ts": "2023-07-13T20:04:47Z"
}
k

Kevin Su

07/13/2023, 8:30 PM
Hmm, you are using propeller v1.1.104, right
k

karthikraj

07/13/2023, 9:15 PM
yeah, it is v1.1.104.. 😞
f

Frank Shen

07/13/2023, 10:29 PM
@Kevin Su, hmm, any idea?
k

Kevin Su

07/14/2023, 2:57 PM
@karthikraj could you share the output of kubectl get cm
flyte-propeller-config
? it should look something like:
Copy code
...
    plugins:
      agent-service:
        defaultGrpcEndpoint: flyteagent.flyte.svc.cluster.local:8000
        supportedTaskTypes:
        - bigquery_query_job_task
I think this is wrong
Copy code
enabled_plugins.yaml: |
    agent-service:
      defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
      endpointForTaskTypes:
      - custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
should be
Copy code
enabled_plugins.yaml: |
    plugins:
      agent-service:
        defaultGrpcEndpoint: dns:///flyteagent.flyte.svc.cluster.local:8000
        endpointForTaskTypes:
        - custom_task: dns:///your-agent.flyte.svc.cluster.local:8000
k

karthikraj

07/14/2023, 3:00 PM
Thank you. checking on my end..
@Kevin Su I believe its working. My workflow succeeded after making the above changes. I would ask @Frank Shen to test from his end as well once. Do you think this document needs to be updated as below or is it only me who didnt know about configuring it correctly?
Copy code
tasks:
    task-plugins:
      enabled-plugins:
        - agent-service
      default-for-task-types:
        - bigquery_query_job_task: agent-service
        - custom_task: agent-service

  k8s:
    plugins:
       agent-service:
        defaultGrpcEndpoint: flyteagent.flyte.svc.cluster.local:8000
        supportedTaskTypes:
         - bigquery_query_job_task
One thing i am seeing here is the logs are showing empty . Do i need to make any additional configuration?
@Kevin Su Frank is trying to use his custom-image which has other dependencies while submitting the workflow.. The Flyte task is trying to search for credentials within this Image instead from the agent. Any suggestions?
I have already mounted credentials.json as secrets and had set the env var GOOGLE_APPLICATION_CREDENTIALS in both propeller and agent deployments.
@Kevin Su Please do let us know if you have any thoughts on this.
k

Kevin Su

07/18/2023, 7:21 PM
@karthikraj sorry for the late reply
Do you think this document needs to be updated as below or is it only me who didnt know about configuring it correctly?
yes, PR is welcome
credentials within this Image
were you trying to set GOOGLE_APPLICATION_CREDENTIALS in the dockerfile
I think you only need to set env in the agent deployment
k

karthikraj

07/18/2023, 8:16 PM
yeah, I have already set the env variable in both agent and propeller.. I verified the env-vars by entering into the running pod too. It is working if we build custom image by copying the credentials file and setting this env variables and passing --image <img_uri> while running pyflyte command... We feel the flyte is always checking in image that we are passing rather than the agent/propeller env vars
k

Kevin Su

07/18/2023, 8:39 PM
hmm, looking, let me test it again
it works for me. here is my deployment yaml
Copy code
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flyteagent
  namespace: flyte
  labels:
    <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
    <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte
    <http://helm.sh/chart|helm.sh/chart>: flyte-v0.1.10
    <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
spec:
  replicas: 1
  selector:
    matchLabels:
      <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
      <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte
  template:
    metadata:
      annotations:
      labels:
        <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
        <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: flyte
        <http://helm.sh/chart|helm.sh/chart>: flyte-v0.1.10
        <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
    spec:
      containers:
      - command:
        - pyflyte
        - serve
        image: "<http://ghcr.io/flyteorg/flyteagent:1.6.2b1|ghcr.io/flyteorg/flyteagent:1.6.2b1>"
        volumeMounts:
        - mountPath: /var/secrets/google
          name: google-cloud-key
        imagePullPolicy: "IfNotPresent"
        env:
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: /var/secrets/google/flyte-test-credentials.json
        name: flyteagent
        ports:
        - containerPort: 8000
          name: agent-grpc
        resources:
          limits:
            cpu: 500m
            ephemeral-storage: 100Mi
            memory: 500Mi
          requests:
            cpu: 10m
            ephemeral-storage: 50Mi
            memory: 50Mi
      volumes:
        - name: google-cloud-key
          secret:
            defaultMode: 420
            secretName: gcp-key
          serviceAccountName: flyteagent
I tested 1.8.1 as well, and it works
k

karthikraj

07/18/2023, 9:44 PM
@Kevin Su Are you passing --image <custom image uri> in the pyflyte command?
This is my agent deployment
Copy code
apiVersion: apps/v1
kind: Deployment
metadata:
  annotations:
    <http://deployment.kubernetes.io/revision|deployment.kubernetes.io/revision>: "1"
    <http://meta.helm.sh/release-name|meta.helm.sh/release-name>: max-ml-flyte
    <http://meta.helm.sh/release-namespace|meta.helm.sh/release-namespace>: flyte
  creationTimestamp: "2023-07-14T19:43:35Z"
  generation: 1
  labels:
    <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: max-ml-flyte
    <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
    <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
    <http://helm.sh/chart|helm.sh/chart>: flyte-core-v0.1.10
  name: flyteagent
  namespace: flyte
  resourceVersion: "304315314"
  uid: 526ab0fb-3c09-4510-a5c4-821d591e0837
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: max-ml-flyte
      <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: max-ml-flyte
        <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
        <http://app.kubernetes.io/name|app.kubernetes.io/name>: flyteagent
        <http://helm.sh/chart|helm.sh/chart>: flyte-core-v0.1.10
    spec:
      containers:
      - command:
        - pyflyte
        - serve
        env:
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: /secrets/credentials.json
        image: <http://613630599026.dkr.ecr.us-east-1.amazonaws.com/max-ml-flyte-agent-dev:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/max-ml-flyte-agent-dev:latest>
        imagePullPolicy: IfNotPresent
        name: flyteagent
        ports:
        - containerPort: 8000
          name: agent-grpc
          protocol: TCP
        resources:
          limits:
            cpu: 500m
            ephemeral-storage: 100Mi
            memory: 500Mi
          requests:
            cpu: 10m
            ephemeral-storage: 50Mi
            memory: 50Mi
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /secrets
          name: service-account-credentials
          readOnly: true
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext: {}
      serviceAccount: flyteagent
      serviceAccountName: flyteagent
      terminationGracePeriodSeconds: 30
      volumes:
      - name: service-account-credentials
        secret:
          defaultMode: 420
          secretName: bigquery-secret
status:
  availableReplicas: 1
  conditions:
  - lastTransitionTime: "2023-07-14T19:43:37Z"
    lastUpdateTime: "2023-07-14T19:43:37Z"
    message: Deployment has minimum availability.
    reason: MinimumReplicasAvailable
    status: "True"
    type: Available
  - lastTransitionTime: "2023-07-14T19:43:35Z"
    lastUpdateTime: "2023-07-14T19:43:37Z"
    message: ReplicaSet "flyteagent-66f4c86b" has successfully progressed.
    reason: NewReplicaSetAvailable
    status: "True"
    type: Progressing
  observedGeneration: 1
  readyReplicas: 1
  replicas: 1
  updatedReplicas: 1
It works for me if we dont pass any custom image.. did you try with passing custom image which doesnt have creds?
k

Kevin Su

07/18/2023, 9:47 PM
hmm, bigquery task is always running in the agent, so it doesn’t require image.
you mean the task fails if we use pyflyte run --image …
k

karthikraj

07/18/2023, 9:48 PM
yeah right.. @Frank Shen needs to pass this image with other dependencies for his workflows.
k

Kevin Su

07/18/2023, 9:51 PM
I just specified image, but it also works for me.
What errors did you see when you passing image config
f

Frank Shen

07/18/2023, 10:02 PM
@Kevin Su I can answer that. In order for me to run code like
Copy code
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
@task
def load_input_data(dt: str) -> pd.DataFrame:
    get_input_data = BigQueryTask(
    ...,
        task_config=BigQueryConfig(ProjectID="ddbd-data-warehouse-prod", Location="asia-southeast1"),
    ...
    )
    return get_input_data()
I need to include flytekitplugins-bigquery==1.7.1b1 in my custom image’s req file. Also in my Dockerfile, I have to include:
Copy code
# copy bigquery_credentials.json to image 
COPY bigquery_credentials.json /root/bigquery_credentials.json
# set GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json as env variable for the flyte bigquery agent to use
ENV GOOGLE_APPLICATION_CREDENTIALS=/root/bigquery_credentials.json
They I submit the job via command:
Copy code
pyflyte register -p examples --image <http://613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-1_7:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-1_7:latest>  dai_mle_apac_churn/inference/inference_wf.py
and it works. However, if I remove the credentials above from the Dockerfile, the bigquery task will fail for
Copy code
File "/opt/venv/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/dai_mle_apac_churn/train/load_input_data.py", line 902, in load_input_data
        return get_input_data()
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 304, in __call__
        return flyte_entity_call_handler(self, *args, **kwargs)  # type: ignore
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/promise.py", line 1022, in flyte_entity_call_handler
        result = cast(LocallyExecutable, entity).local_execute(child_ctx, **kwargs)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 285, in local_execute
        outputs_literal_map = self.sandbox_execute(ctx, input_literal_map)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 351, in sandbox_execute
        return self.dispatch_execute(ctx, input_literal_map)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 540, in dispatch_execute
        raise e
      File "/opt/venv/lib/python3.9/site-packages/flytekit/core/base_task.py", line 537, in dispatch_execute
        native_outputs = self.execute(**native_inputs)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/extend/backend/base_agent.py", line 150, in execute
        res = agent.create(dummy_context, output_prefix, cp_entity.template, inputs)
      File "/opt/venv/lib/python3.9/site-packages/flytekitplugins/bigquery/agent.py", line 73, in create
        client = bigquery.Client(project=project, location=location)
      File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery/client.py", line 245, in __init__
        super(Client, self).__init__(
      File "/opt/venv/lib/python3.9/site-packages/google/cloud/client/__init__.py", line 321, in __init__
        Client.__init__(
      File "/opt/venv/lib/python3.9/site-packages/google/cloud/client/__init__.py", line 178, in __init__
        credentials, _ = google.auth.default(scopes=scopes)
      File "/opt/venv/lib/python3.9/site-packages/google/auth/_default.py", line 692, in default
        raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)

Message:

    Your default credentials were not found. To set up Application Default Credentials, see <https://cloud.google.com/docs/authentication/external/set-up-adc> for more information.

User error.
k

Kevin Su

07/18/2023, 10:09 PM
Ohh, I see. you are using structured dataset to convert bq to pandas. In this case, you have to add credentials to the dockerfile.
btw, flyte doesn’t support using task in the @task
BigQueryTask is also a flyte task, so you should not use it in the @task
you can create a BQ task that creates a structured dataset output, and pass it to downstream task
Copy code
bigquery_task = BigQueryTask(
        name="flytekit.demo.bigquery_task.query",
        inputs=kwtypes(ds=str),
        task_config=BigQueryConfig(
            ProjectID="Flyte", Location="Asia", QueryJobConfig=QueryJobConfig(allow_large_results=True)
        ),
        query_template=query_template,
        output_structured_dataset_type=StructuredDataset,
    )
f

Frank Shen

07/18/2023, 10:15 PM
@Kevin Su, this is why I have to use BigQueryTask inside another task.
Copy code
import pandas as pd
from flytekit import task
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask

@task
def load_input_data(dt: str) -> pd.DataFrame:
    formatting_dict = {"dt": dt}
    sql_str = """
multiple sql statements with temp tables, joins and over 900 lines long with parameters like {dt} then return select * from the last temp table
    """.format(**formatting_dict)
    print(sql_str)
    get_input_data = BigQueryTask(
        name=f"bigquery.train.input_data.{dt}",
        # query_template="SELECT * FROM evergent_aggregated.subs_daily LIMIT 2;",
        query_template=sql_str,
        task_config=BigQueryConfig(ProjectID="ddbd-data-warehouse-prod", Location="asia-southeast1"),
        inputs={},
        # inputs=kwtypes(version=int),
        output_structured_dataset_type=pd.DataFrame
    )
    return get_input_data()
I have a parameterized sql str to format before I pass it to the bigquerytask.
k

Kevin Su

07/18/2023, 10:18 PM
i see, could you use @dynamic instead
f

Frank Shen

07/19/2023, 12:32 AM
@Kevin Su, I changed to structured dataset:
Copy code
get_input_data = BigQueryTask(
        name=f"bigquery.apac.inference.input_data.{dt}",
        # query_template="SELECT * FROM evergent_aggregated.subs_daily LIMIT 2;",
        query_template=sql_str,
        task_config=BigQueryConfig(ProjectID="ddbd-data-warehouse-prod", Location="asia-southeast1"),
        inputs={},
        # inputs=kwtypes(version=int),
        output_structured_dataset_type=StructuredDataset
    )
And in the downstream task I did:
Copy code
@task
def get_input_data_by_country(sd: StructuredDataset, country: str) -> pd.DataFrame:
    df = sd.open(pd.DataFrame).all()
The workflow works locally but still failed in flyte remote with the same error as above when I removed the credentials from the custom image.
k

karthikraj

07/19/2023, 10:16 PM
Hi @Kevin Su, How can we make the GOOGLE_APPLICATION_CREDENTIALS set in the flytepropeller deployment available for the individual workflow task pods. Frank is running the below code but it is not able to find the creds. I have verified if this gcp cred is available in the propeller and agent pods.
Copy code
get_input_data = BigQueryTask(
    name=f"bigquery.apac.inference.input_data",
    inputs=kwtypes(dt=str),
    # inputs={},    
    query_template=inf_input_sql_template,
    task_config=BigQueryConfig(ProjectID="xxxxxxxxxxxxxxxxxxxxxx", Location="asia-southeast1"),
    output_structured_dataset_type=StructuredDataset
)

@task
def read_data(sd: StructuredDataset) -> None:
    df = sd.open(pd.DataFrame).all()
    print(f'inference input data from bigquery df.shape: {df.shape}')
    print(df.head())

@workflow
def wf(
    team: str = "zzzzzzzzz",
    project: str = "yyyyyyy",
    version: str = "1",
    # dt: str = datetime.now().strftime("%Y-%m-%d"),
    dt: str = "2023-07-18"
) -> None:
    sd= get_input_data(dt=dt)
    read_data(sd=sd)
Error:
Copy code
File "/root/dai_mle_apac_churn/inference/inference_wf.py", line 25, in read_data
        df = sd.open(pd.DataFrame).all()
      File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 105, in all
        return flyte_dataset_transformer.open_as(ctx, self.literal, self._dataframe_type, self.metadata)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/structured_dataset.py", line 791, in open_as
        result = decoder.decode(ctx, sd, updated_metadata)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/bigquery.py", line 82, in decode
        return _read_from_bq(flyte_value, current_task_metadata)
      File "/opt/venv/lib/python3.9/site-packages/flytekit/types/structured/bigquery.py", line 36, in _read_from_bq
        client = bigquery_storage.BigQueryReadClient()
      File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py", line 481, in __init__
        self._transport = Transport(
      File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/grpc.py", line 151, in __init__
        super().__init__(
      File "/opt/venv/lib/python3.9/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/transports/base.py", line 102, in __init__
        credentials, _ = google.auth.default(
      File "/opt/venv/lib/python3.9/site-packages/google/auth/_default.py", line 692, in default
        raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
Basically, the pods created should share the mount available in the propeller.
f

Frank Shen

07/19/2023, 10:43 PM
Thanks @karthikraj!
k

Kevin Su

07/19/2023, 10:52 PM
you have to use webhook to mount the secret.
IIRC, you are using aws eks, right?
k

karthikraj

07/19/2023, 10:52 PM
yea, right
k

Kevin Su

07/19/2023, 10:56 PM
you could add secret to @task like
Copy code
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/etc/secrets/gcp/bigquery"
@task(container_image=image, disable_deck=False, secret_requests=[Secret(group="gcp", key="bigquery")])
def t1(size: int) -> typing.List[typing.Any]:
   ...
you have to create a secret in the flytesnacks-development namespace
when you add the secret in the @task, flyte will dynamically mount the secret to the pod
k

karthikraj

07/19/2023, 10:57 PM
Got it.. We are using multiple project-domain. So do we need to create secrets in all the namespace?
when you add the secret in the @task, flyte will dynamically mount the secret to the pod
Meaning will it mount from the secret available in the namespace flytesnacks-development or from the propeller mount?
k

Kevin Su

07/19/2023, 10:59 PM
from the secret available in the namespace flytesnacks-development
So do we need to create secrets in all the namespace?
yes, if you use K8s secrets. you can also use AWS Secret Manager or Vault Agent Injector, if you use these secret manager, you don’t need to create the secret in every namespace.
k

karthikraj

07/19/2023, 11:01 PM
Thank you so much. We will have a look and try. 🙂
f

Frank Shen

07/19/2023, 11:24 PM
Hi @Kevin Su, Just to add the missing context why a normal python task needs to access google. The whole reason is BigQueryTask’s output is stored in google bq://[project_id]/… Therefore the next python task (read_data) will need to access it from bq://… Could we specify the BigQueryTask to output to normal flyte S3 location so that we can eliminate the need to use the google secrets in python task?
k

Kevin Su

07/19/2023, 11:40 PM
No, bigquery write the result to another bq table, so downstream task mush have google secrets to read it. bigquery can’t write the result table to s3.
f

Frank Shen

07/20/2023, 12:09 AM
Got it, thanks @Kevin Su!
k

karthikraj

07/20/2023, 12:17 AM
@Kevin Su sorry again. I can use this config along with SideCar image, but do you know how to add secrets in here? https://github.com/flyteorg/flyte/blob/b4bf416df907353ec8c55304ffda67e9198e42e0/rs[…]ts/deployment/configuration/generated/flytepropeller_config.rst
k

Kevin Su

07/20/2023, 12:24 AM
k

karthikraj

07/20/2023, 10:18 PM
Hi @Kevin Su, Please check this when you get sometime. I still have problem mounting the secrets in webhook. I have gone through the documentation page multiple times, but I am unsure of mounting the secrets. These are the steps I have done. 1) I have created a secret in AWS secrets manager. my secret arn is
arn:aws:secretsmanager:us-east-1:1234567890:secret:flyte-bigquery-creds-lnC41z
2) Added a webhook config as given in the document, and I could see the webhook config as below in propeller config map.
Copy code
core.yaml:
----
manager:
  pod-application: flytepropeller
  pod-template-container-name: flytepropeller
  pod-template-name: flytepropeller-template
propeller:
  downstream-eval-duration: 30s
  enable-admin-launcher: true
  gc-interval: 12h
  kube-client-config:
    burst: 25
    qps: 100
    timeout: 30s
  leader-election:
    enabled: true
    lease-duration: 15s
    lock-config-map:
      name: propeller-leader
      namespace: flyte
    renew-deadline: 10s
    retry-period: 2s
  limit-namespace: all
  max-workflow-retries: 50
  metadata-prefix: metadata/propeller
  metrics-prefix: flyte
  prof-port: 10254
  queue:
    batch-size: -1
    batching-interval: 2s
    queue:
      base-delay: 5s
      capacity: 1000
      max-delay: 120s
      rate: 100
      type: maxof
    sub-queue:
      capacity: 1000
      rate: 100
      type: bucket
    type: batch
  rawoutput-prefix: <s3://dev-xx-yyy-zz-flyte-us-east-1/>
  workers: 40
  workflow-reeval-duration: 30s
webhook:
  certDir: /etc/webhook/certs
  nolan: oppenheimer
  resources:
    limits:
      cpu: 200m
      memory: 500Mi
    requests:
      cpu: 200m
      memory: 500Mi
  secretManagerType: AWS
  serviceName: flyte-pod-webhook
  sidecarImage: <http://docker.io/amazon/aws-secrets-manager-secret-sidecar:v0.1.4|docker.io/amazon/aws-secrets-manager-secret-sidecar:v0.1.4>
I am kind a stuck here. Now, Where I should mention the aws secret name or arn? Where it is going to mount as file?
Hi @Kevin Su , The secret mount part is not a blocker for our development since we are copying the credentials to custom image and using it, but we would like to mount it in webhook as mentioned above to avoid security issues. Whenever you get a chance please help me with the above query
82 Views