Thread
#announcements
    Fabio Grätz

    Fabio Grätz

    4 weeks ago
    Hey everyone 🙂 I have some problem configuring a
    StructuredDataset
    to pass a Spark Dataframe around between tasks. I’m following this guide and get this error:
    {
      "asctime": "2022-08-30 16:15:09,048",
      "name": "flytekit",
      "levelname": "ERROR",
      "message": "Failed to convert return value for var o0 with error <class 'ValueError'>: Failed to find a handler for <class 'pyspark.sql.dataframe.DataFrame'>, protocol gs, fmt parquet"
    }
    I added this to my spark config but it doesn’t solve the problem:
    spark-config-default:
              - "spark.jars.packages": "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.2"
              - "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS"
              - "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem"
              - "spark.hadoop.google.cloud.auth.service.account.enable": "true"
    Would be great to get some pointers in case somebody has seen this error before, thanks!
    Yee

    Yee

    4 weeks ago
    which version are you on?
    Fabio Grätz

    Fabio Grätz

    3 weeks ago
    v1.1.0
    Yee

    Yee

    3 weeks ago
    do you think you could bump to 1.1.1 please?
    we noticed an issue with the transformer on gcp. should be updated in 1.1.1 to read the protocol from the raw output prefix instead.
    also you have
    from flytekitplugins.spark import Spark
    somewhere in your code right? or something that imports from that module.
    (the init file there triggers the loading of the transformer)
    Fabio Grätz

    Fabio Grätz

    3 weeks ago
    Updated to 1.2.0-b1 as the 1.1.1 chart appears to not exist. I am importing
    from flytekitplugins.spark import Spark
    but still do see the same error.
    I solved my problem now by simply writing the dataframe to a parquet file using the gcs connector. Using structured datasets would have been nicer i feel but the solution works well, so no more problem here.
    Samhita Alla

    Samhita Alla

    3 weeks ago
    @Yee, is this a bug cause the latest flytekit version doesn’t resolve the issue?
    Yee

    Yee

    3 weeks ago
    sorry, didn’t see this. yeah using the structured dataset pattern and just returning the dataframe would’ve been nice. we will look into this.
    would you mind copy/pasting a bit of redacted code? primarily interested in the task signature and return statement.
    Fabio Grätz

    Fabio Grätz

    3 weeks ago
    import pyspark.sql
    import flytekit
    import flytekitplugins.hydra  # noqa F401  # This is an internal package with a type transformer for omegaconf DictConfig objects
    
    from flytekit import Resources, task, workflow, kwtypes
    from flytekitplugins.spark import Spark
    from flytekit.types.structured import StructuredDataset
    from omegaconf import DictConfig, OmegaConf
    import pyspark.sql.functions as F
    
    urm_columns = kwtypes(user_id=int, item_id=int, rating=float)
    id_mapping_columns = kwtypes(user_id=int, user_pseudo_id=int, ...)
    
    
    @task(
        task_config=Spark(
            # this configuration is applied to the spark cluster
            spark_conf={
                "spark.driver.memory": "5000M",
                "spark.executor.memory": "5000M",
                "spark.executor.cores": "4",
                "spark.executor.instances": "3",
                "spark.driver.cores": "2",
            }
        ),
        retries=1,
        timeout=timedelta(hours=1),
        cache=True,
        cache_version="1.8",
        requests=Resources(mem="1Gi", cpu="1"),
        container_image="{{.images.spark.fqn}}:{{.images.default.version}}",
    )
    def access_and_validate_data(
        cfg: DictConfig,
    ) -> Tuple[
        Annotated[StructuredDataset, urm_columns],
        Annotated[StructuredDataset, urm_columns],
        Annotated[StructuredDataset, urm_columns],
        Annotated[StructuredDataset, urm_columns],
        Annotated[StructuredDataset, id_mapping_columns],
    ]:
        """
        Args:
            cfg (DictConfig): data accessor hydra configuration.
    
        Returns:
           ...
        """
        sess = flytekit.current_context().spark_session  # noqa F841
        <http://logging.info|logging.info>(OmegaConf.to_yaml(cfg))
    
        online_df = bq_to_spark(...)
        offline_df = bq_to_spark(...)
    
        # Data validation with great expectations
        # Feature engineering
        # Train test split
    
        ds_online_train = StructuredDataset(dataframe=online_df_train, uri=df_online_train_uri)
        ds_online_test = StructuredDataset(dataframe=online_df_test, uri=df_online_test_uri)
        ds_offline_train = StructuredDataset(dataframe=offline_df_train, uri=df_offline_train_uri)
        ds_offline_test = StructuredDataset(dataframe=offline_df_test, uri=df_offline_test_uri)
        ds_id_mapping = StructuredDataset(dataframe=id_mapping_df, uri=id_mapping_df_uri)
        
        return (
            ds_online_train,
            ds_online_test,
            ds_offline_train,
            ds_offline_test,
            ds_id_mapping,
        )
    Does this help? (I had to redact quite a bit of things but am happy to answer any questions you might have)
    Yee

    Yee

    3 weeks ago
    can you run it again with
    FLYTE_SDK_LOGGING_LEVEL=10
    ?
    can add it to the task definition
    flytekit has logging that should display what’s happening
    i’m wondering if maybe the problem is that the data persistence layer isn’t picking up gcs as an option