acoustic-carpenter-78188
02/27/2023, 3:38 PMDuckDBQuery task plugin that runs queries using DuckDB as the DBMS.
Type
☐ Bug Fix
☐ Feature
☑︎ Plugin
Are all requirements met?
☐ Code completed
☐ Smoke tested
☑︎ Unit tests added
☑︎ Code documentation added
☐ Any pending items have an associated Issue
Complete description
Capturing the crucial assumptions I made while building the task plugin:
• The DuckDBQuery task parameter that a user needs to send argument to includes query and can contemplate adding includes inputs.
• query can include a set of queries that'll be run sequentially. The last query needs to be a SELECT query.
• inputs can include structured dataset or a list of parameters to be sent to the queries.
• The output is a pyarrow table. Can be converted to any structured dataset compatible type.
• The connection mode is set to :memory, i.e., the data is always stored in an in-memory, non-persistent database. It can be set to a file, but it's difficult to make the file accessible to different DuckDBQuery pods, which otherwise wouldn't make sense because file is persistent, and it needs to be leveraged.
Example:
duckdb_query = DuckDBQuery(
name="read_parquet",
query=[
"INSTALL httpfs",
"LOAD httpfs",
"""SELECT hour(lpep_pickup_datetime) AS hour, count(*) AS count FROM READ_PARQUET(?) GROUP BY hour""",
],
inputs=kwtypes(params=list[str]),
)
@workflow
def parquet_wf(parquet_file: str) -> pd.DataFrame:
return duckdb_query(params=[parquet_file])
assert isinstance(
parquet_wf(parquet_file="<https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-02.parquet>"),
pd.DataFrame,
)
Tracking Issue
Fixes flyteorg/flyte#3246
Follow-up issue
NA
OR
https://github.com/flyteorg/flyte/issues/
flyteorg/flytekit
✅ All checks have passed
30/30 successful checksacoustic-carpenter-78188
02/27/2023, 3:38 PM