<#1419 DuckDB plugin> Pull request opened by <samh...
# flyte-github
a
#1419 DuckDB plugin Pull request opened by samhita-alla Signed-off-by: Samhita Alla aallasamhita@gmail.com TL;DR This PR adds a
DuckDBQuery
task plugin that runs queries using DuckDB as the DBMS. Type ☐ Bug Fix ☐ Feature ☑︎ Plugin Are all requirements met? ☐ Code completed ☐ Smoke tested ☑︎ Unit tests added ☑︎ Code documentation added ☐ Any pending items have an associated Issue Complete description Capturing the crucial assumptions I made while building the task plugin: • The
DuckDBQuery
task parameter that a user needs to send argument to includes
query
and can contemplate adding includes
inputs
. •
query
can include a set of queries that'll be run sequentially. The last query needs to be a SELECT query. •
inputs
can include structured dataset or a list of parameters to be sent to the queries. • The
output
is a pyarrow table. Can be converted to any structured dataset compatible type. • The connection mode is set to
:memory
, i.e., the data is always stored in an in-memory, non-persistent database. It can be set to a file, but it's difficult to make the file accessible to different
DuckDBQuery
pods, which otherwise wouldn't make sense because file is persistent, and it needs to be leveraged. Example:
Copy code
duckdb_query = DuckDBQuery(
    name="read_parquet",
    query=[
        "INSTALL httpfs",
        "LOAD httpfs",
        """SELECT hour(lpep_pickup_datetime) AS hour, count(*) AS count FROM READ_PARQUET(?) GROUP BY hour""",
    ],
    inputs=kwtypes(params=list[str]),
)

@workflow
def parquet_wf(parquet_file: str) -> pd.DataFrame:
    return duckdb_query(params=[parquet_file])

assert isinstance(
    parquet_wf(parquet_file="<https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-02.parquet>"),
    pd.DataFrame,
)
Tracking Issue Fixes flyteorg/flyte#3246 Follow-up issue NA OR https://github.com/flyteorg/flyte/issues/ flyteorg/flytekit All checks have passed 30/30 successful checks