Hi, everyone. I'm trying to test out the BigQuery ...
# announcements
m
Hi, everyone. I'm trying to test out the BigQuery plugin using the example in the documentation, but when I try to convert the StructuredDataset into a DataFrame, I get this error:
Copy code
Traceback (most recent call last):
      ...
      File "/home/flyte/.venv/lib/python3.8/site-packages/flytekit/types/structured/basic_dfs.py", line 63, in decode
        ctx.file_access.get_data(path, local_dir, is_multipart=True)
      File "/home/flyte/.venv/lib/python3.8/site-packages/flytekit/core/data_persistence.py", line 427, in get_data
        raise FlyteAssertion(

Message:

    Failed to get data from  to /tmp/flyteltak17mx/local_flytekit/85e6844a8f8283f8841994437be73626 (recursive=True).

Original exception: cannot copy tree '': not a directory

User error.
No path for the resulting query file. Could it be that there was an error executing the task? Flyte says the task succeeded, but I can't check on the GCP console (I don't have the necessary permissions).
k
Mind sharing a snippet of code?
m
Yeah! This is the query definition:
Copy code
GET_OPERATION_FLIGHT_OPTIONS_QUERY = """
    SELECT status_name, status_id, count(*) as qty
    FROM `...`  # Redacted because I don't think I can't share the table name
    WHERE status_name <> "{{ .inputs.ignore }}"
    GROUP BY status_name, status_id
""".strip()


# Annotate a StructuredDataset type to conform with Flyte's strong typing
FlightOptionsCountDataset = Annotated[
    StructuredDataset, kwtypes(status_name=str, status_id=int, qty=int)
]

# Define the BigQuery task
get_operation_flight_options = BigQueryTask(
    name="tasks.bigquery.get_operation_flight_options",
    inputs=kwtypes(ignore=str),
    output_structured_dataset_type=FlightOptionsCountDataset,
    query_template=GET_OPERATION_FLIGHT_OPTIONS_QUERY,
    task_config=BIG_QUERY_CONFIG,
)
So, a friend of mine checked it out and the substitution wasn't applied
In this example, it looks like the substitution should be made using the @ operator, but the filter didn't make sense to me
so I did some digging and thought that the query had to be built using
{{ .inputs.<input_name> }}
, like what happens in this interpolate_query function https://github.com/flyteorg/flytekit/blob/386626356ff2cb810733a29a51cf3d6351c8424d/flytekit/core/base_sql_task.py
but that doesn't seem to be the case, right?
k
Bq has special syntax, so we have to use @ in query
m
ohhh ok, perfect, thank you!
👍 1
oh, by the way. if I wanted to pass a Tuple of values, can I pass them as a list?
I tried defining inputs as a Tuple but Flytekit said it was a protected type
k
We don’t support tuple as input now, you can use dataclass instead.
We are trying to fix this issue
m
ok, perfect!
k
Same error?
m
yeah
I thought it worked so I deleted the message
but it did work
but the convert task doesnt work
Copy code
@task
def convert_query_to_dataframe(dataset: StructuredDataset) -> pd.DataFrame:
    """Convert a BigQuery table to a pandas DataFrame."""
    return dataset.open(pd.DataFrame).all()
the exact same as the example
Copy code
@workflow
def test_bigquery_task() -> None:
    """Test the BigQuery task plugin."""
    bq_dataset = tasks.bigquery.get_operation_flight_options(ignore="Rascunho")
    dataframe = tasks.bigquery.convert_query_to_dataframe(dataset=bq_dataset)
thats the workflow
k
What version of propeller you are using
m
the latest one, I think
let me check
Copy code
tag: v1.1.15
Copy code
SELECT status_name, status_id, count(*) as qty
    FROM `...`
    WHERE status_name <> @ignore
    GROUP BY status_name, status_id
this is how the query is appearing on the bq console. should
@ignore
be present?
k
Yes, I think it should. BQ will replace the at runtime
Any error on bq console?
m
no, everything ran fine
I can run it myself and it works, the
@ignore
parameter was indeed passed
wait I think I discovered what the problem is
nope... I thought the problem was with the fact that I was setting the type of the convert task to
StructuredDataset
, not
FlightOptionsCountDataset
but the same error is still happening. empty filepath for the query result
I don't know if this is anything, but the task input for the converter is
Copy code
{
  "dataset": {
    "format": "parquet",
    "columns": [
      {
        "status_name": "string"
      },
      {
        "status_id": "integer"
      },
      {
        "qty": "integer"
      }
    ]
  }
}
There's no uri, maybe that's the problem. But why is it missing?
y
will look in a bit.
but which flytekit version?
m
flytekit 1.1.0
y
we’re not actively trying to add tuples. we’re trying to add them as syntactic-sugar in flytekit only, but not as a fundamental data type (which would require a change to flyteidl), so yes, please continue using a list.
also this looks complicated. esp since i don’t have any big query data really to test against. can we do a screenshare in a bit?
when’s good for you
m
hey, we can try to screenshare later, but I'm still trying to debug it
I can't execute even the examples present on the documentation
when I try to run any of the two workflows, I get a "ProjectId and DatasetId must be non-empty"
I was trying to debug using these examples to see how Flyte generates the output for a BigQuery task. As a said before, I think the URI is missing from the output. When a DataFrame is passed, we get the path for the parquet file, right? The output for the BQ task is just the name of the columns and a
"type": "parquet"
.
y
more on this for those interested. this is part of the output being returned by admin for the bq query node.
Copy code
"full_outputs": {
		"literals": {
			"results": {
				"scalar": {
					"structured_dataset": {
						"metadata": {
							"structured_dataset_type": {
								"columns": [{
									"name": "status_name",
									"literal_type": {
										"simple": "STRING"
									}
								}, {
									"name": "status_id",
									"literal_type": {
										"simple": "INTEGER"
									}
								}, {
									"name": "qty",
									"literal_type": {
										"simple": "INTEGER"
									}
								}],
								"format": "parquet"
							}
						}
					}
				}
			}
		}
	}
the
/api/v1/data/node_executions/
endpoint for n0
the issue is that the uri is somehow missing
not sure how this can happen
this here is where it should be extracted.
and this here is where it should be used.
somehow it’s getting an empty string
@Ketan (kumare3) this feels like a bug in the bigquery plugin
we should also debug log the query location.
k
I just figured out the BQ bug, the problem is that BQ will cache the query result, so when we create a bq task, the job status will return “Done” instead of “running”. Because the status is done, propeller won’t invoke get which can help us construct outputLocation for structured dataset. Therefore, it causes the uri in structured dataset become empty.
🙏 1
sorry, will create a pr soon
m
perfect, thank you!!
sorry I couldn't help more, I don't really know golang, so it's hard for me to understand what some components are doing 😅
❤️ 1
1019 Views