Evan Sadler
04/29/2023, 1:37 AMFailed to convert return value for var o0 for function src.workflow.t1 with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o49.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
s3 -> s3a
and handle auth in a special way spark-on-k8s-operator/issues/1041.
The following code worked.
def configure_s3(spark):
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.WebIdentityTokenCredentialsProvider")
@task(container_image="<http://ghcr.io/flyteorg/flytecookbook:k8s_spark-latest|ghcr.io/flyteorg/flytecookbook:k8s_spark-latest>", task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.instances": "3",
"spark.driver.cores": "1",
"spark.executor.cores": "1",
}),
)
def t1() -> StructuredDataset:
spark = flytekit.current_context().spark_session
configure_s3(spark)
df = pd.DataFrame({"a": [1,2,3]})
df = spark.createDataFrame(df)
return StructuredDataset(dataframe=df)