Frank Shen
06/29/2023, 10:13 PM/venv/lib/python3.8/site-packages/fsspec/registry.py:209 in get_filesystem_class
if protocol not in registry:
if protocol not in known_implementations:
raise ValueError("Protocol not known: %s" % protocol)
My code:
# from typing import Tuple
try:
from typing import Annotated
except ImportError:
from typing_extensions import Annotated
import pandas as pd
from flytekit import task, workflow, StructuredDataset, kwtypes
from flytekitplugins.bigquery import BigQueryConfig, BigQueryTask
import google.cloud.bigquery
bigquery_task = BigQueryTask(
name="sql.bigquery.test",
inputs=kwtypes(version=int),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 2;",
task_config=BigQueryConfig(ProjectID=""),
output_structured_dataset_type=pd.DataFrame
)
@task
def preproc(df: pd.DataFrame) -> None:
print(df.head())
@workflow
def wf(version: int = 1) -> None:
preproc(df = bigquery_task(version=version))
Do you have any idea?bigquery_task = BigQueryTask(
... output_structured_dataset_type=StructuredDataset
)
@task
def preproc(sd: StructuredDataset) -> None:
df = sd.open(pd.DataFrame).all()
print(df.head())
@workflow
def wf(version: int = 1) -> None:
sd = bigquery_task_1(version=version)
preproc(sd = sd)
Samhita Alla
Frank Shen
06/30/2023, 4:26 PMKevin Su
07/01/2023, 8:01 PMFrank Shen
07/06/2023, 6:36 PM(env_flyte_1_7) ➜ hbo-code pip list | grep google
google-api-core 2.11.1
google-auth 2.21.0
google-auth-oauthlib 1.0.0
google-cloud-bigquery 3.11.3
google-cloud-core 2.3.2
google-cloud-storage 2.10.0
google-crc32c 1.5.0
google-resumable-media 2.5.0
googleapis-common-protos 1.59.1
Kevin Su
07/06/2023, 10:47 PM