https://flyte.org logo
#announcements
Title
# announcements
f

Fabio Grätz

08/30/2022, 4:24 PM
Hey everyone 🙂 I have some problem configuring a
StructuredDataset
to pass a Spark Dataframe around between tasks. I’m following this guide and get this error:
Copy code
{
  "asctime": "2022-08-30 16:15:09,048",
  "name": "flytekit",
  "levelname": "ERROR",
  "message": "Failed to convert return value for var o0 with error <class 'ValueError'>: Failed to find a handler for <class 'pyspark.sql.dataframe.DataFrame'>, protocol gs, fmt parquet"
}
I added this to my spark config but it doesn’t solve the problem:
Copy code
spark-config-default:
          - "spark.jars.packages": "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2"
          - "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
          - "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
          - "spark.hadoop.google.cloud.auth.service.account.enable": "true"
Would be great to get some pointers in case somebody has seen this error before, thanks!
y

Yee

08/30/2022, 4:28 PM
which version are you on?
f

Fabio Grätz

08/30/2022, 10:15 PM
v1.1.0
y

Yee

08/30/2022, 10:52 PM
do you think you could bump to 1.1.1 please?
we noticed an issue with the transformer on gcp. should be updated in 1.1.1 to read the protocol from the raw output prefix instead.
also you have
from flytekitplugins.spark import Spark
somewhere in your code right? or something that imports from that module.
(the init file there triggers the loading of the transformer)
f

Fabio Grätz

08/31/2022, 10:48 AM
Updated to 1.2.0-b1 as the 1.1.1 chart appears to not exist. I am importing
from flytekitplugins.spark import Spark
but still do see the same error.
I solved my problem now by simply writing the dataframe to a parquet file using the gcs connector. Using structured datasets would have been nicer i feel but the solution works well, so no more problem here.
s

Samhita Alla

09/01/2022, 6:55 AM
@Yee, is this a bug cause the latest flytekit version doesn’t resolve the issue?
y

Yee

09/01/2022, 4:34 PM
sorry, didn’t see this. yeah using the structured dataset pattern and just returning the dataframe would’ve been nice. we will look into this.
would you mind copy/pasting a bit of redacted code? primarily interested in the task signature and return statement.
f

Fabio Grätz

09/05/2022, 1:07 PM
Copy code
import pyspark.sql
import flytekit
import flytekitplugins.hydra  # noqa F401  # This is an internal package with a type transformer for omegaconf DictConfig objects

from flytekit import Resources, task, workflow, kwtypes
from flytekitplugins.spark import Spark
from flytekit.types.structured import StructuredDataset
from omegaconf import DictConfig, OmegaConf
import pyspark.sql.functions as F

urm_columns = kwtypes(user_id=int, item_id=int, rating=float)
id_mapping_columns = kwtypes(user_id=int, user_pseudo_id=int, ...)


@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "5000M",
            "spark.executor.memory": "5000M",
            "spark.executor.cores": "4",
            "spark.executor.instances": "3",
            "spark.driver.cores": "2",
        }
    ),
    retries=1,
    timeout=timedelta(hours=1),
    cache=True,
    cache_version="1.8",
    requests=Resources(mem="1Gi", cpu="1"),
    container_image="{{.images.spark.fqn}}:{{.images.default.version}}",
)
def access_and_validate_data(
    cfg: DictConfig,
) -> Tuple[
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, urm_columns],
    Annotated[StructuredDataset, id_mapping_columns],
]:
    """
    Args:
        cfg (DictConfig): data accessor hydra configuration.

    Returns:
       ...
    """
    sess = flytekit.current_context().spark_session  # noqa F841
    <http://logging.info|logging.info>(OmegaConf.to_yaml(cfg))

    online_df = bq_to_spark(...)
    offline_df = bq_to_spark(...)

    # Data validation with great expectations
    # Feature engineering
    # Train test split

    ds_online_train = StructuredDataset(dataframe=online_df_train, uri=df_online_train_uri)
    ds_online_test = StructuredDataset(dataframe=online_df_test, uri=df_online_test_uri)
    ds_offline_train = StructuredDataset(dataframe=offline_df_train, uri=df_offline_train_uri)
    ds_offline_test = StructuredDataset(dataframe=offline_df_test, uri=df_offline_test_uri)
    ds_id_mapping = StructuredDataset(dataframe=id_mapping_df, uri=id_mapping_df_uri)
    
    return (
        ds_online_train,
        ds_online_test,
        ds_offline_train,
        ds_offline_test,
        ds_id_mapping,
    )
Does this help? (I had to redact quite a bit of things but am happy to answer any questions you might have)
y

Yee

09/06/2022, 6:50 PM
can you run it again with
FLYTE_SDK_LOGGING_LEVEL=10
?
can add it to the task definition
flytekit has logging that should display what’s happening
i’m wondering if maybe the problem is that the data persistence layer isn’t picking up gcs as an option
10 Views