Hello, I deployed spark-operator in EKS cluster fo...
# ask-the-community
f
Hello, I deployed spark-operator in EKS cluster for flyte. When I run a spark task for the first time in this newly setup flyte cluster, the spark-operator pod gave the following error. Does anyone has idea on how to debug or resolve this? I am on the latest spark-operator ghcr.io/googlecloudplatform/spark-operator:v1beta2-1.3.8-3.1.1 And the flyte version is 1.10.
t
I just set up the spark operator today. Do you see a pod name
<flyte-execution-id>-driver
? Also looking at the sparkApplication custom resource can provide some useful information.
f
Awesome. I see the error now.
Copy code
FlyteAssertion: Failed to get data from                                                                                                                                                                                                    │
│ <s3://mlp-flyte-artifact/flytesnacks/development/EHLFTFV7HVOQI33UCL6FVKH4GQ======>                                                                                                                                                           │
│ /fast8dd8536de9cac7436adec9036b591317.tar.gz to /root/ (recursive=False).
So spark-operator pod doesn’t have S3 permission. @Thomas Newton, Thanks a lot! Do you know where in helm chart to specify the aws iam-role that has permission to access S3?
t
Not really - I'm on Azure. Probably it will be in
spark-config-default
. You will probably need to research hadoop for s3 to find the right config option to use. Are you familiar with spark?
Actually, looking more carefully at that error I think its
flytekit
failing to download something before starting the task. Its not spark which is failing to authenticate. I guess you can configure this auth similar to how you would for a simple python container task.
f
@Thomas Newton, I can run a python task without issues.
t
I would suggest comparing the pod spec for a normal python task and the spark driver pod. I expect there is some missing auth thing on the spark driver.
f
Thanks
@Thomas Newton, Upon comparison of the flyte python task ephemeral pod detail and spark task ephemeral pod detail, I noticed the difference under Environment: The spark task doesn’t have AWS_ROLE_ARN set. Python task:
Copy code
Environment:
......
      ENV:                                dev
      AWS_STS_REGIONAL_ENDPOINTS:         regional
      AWS_DEFAULT_REGION:                 us-east-1
      AWS_REGION:                         us-east-1
      AWS_ROLE_ARN:                       arn:aws:iam::245085526351:role/flyte-role
      AWS_WEB_IDENTITY_TOKEN_FILE:        /var/run/secrets/eks.amazonaws.com/serviceaccount/token
spark task:
Copy code
Environment:
....
      ENV:                                dev
      FLYTE_INTERNAL_PROJECT:             flytesnacks
      FLYTE_INTERNAL_EXECUTION_DOMAIN:    development
      FLYTE_INTERNAL_EXECUTION_PROJECT:   flytesnacks
      ML_S3_BUCKET_PREFIX:                mlp-flyte-workflow
      FLYTE_INTERNAL_TASK_PROJECT:        flytesnacks
      FLYTE_MAX_ATTEMPTS:                 1
      SPARK_DRIVER_BIND_ADDRESS:           (v1:status.podIP)
      PYSPARK_PYTHON:                     /opt/venv/bin/python3
      PYSPARK_DRIVER_PYTHON:              /opt/venv/bin/python3
      SPARK_LOCAL_DIRS:                   /var/data/spark-42162206-e11d-481c-a491-a85b80f9f7eb
      SPARK_CONF_DIR:                     /opt/spark/conf
Could you please share your python / spark task pod’s ENV sections from your flyte?
t
For me the important part is
Copy code
- name: FLYTE_AZURE_STORAGE_ACCOUNT_NAME
      value: storageaccountname
    - name: FLYTE_AZURE_STORAGE_ACCOUNT_KEY
      value: storageaccountkey
This is present on both our normal python task pods and spark stuff because we use a default pod template to apply it. I don't think this would be the best approach on S3 though.
f
@Thomas Newton, I added a python hello_world() task to the same flyte workflow where the spark task is. And the python task failed too with the same access denied error:
Copy code
FlyteAssertion: Failed to get data from                                                                                                                                                                                                    │
│ <s3://mlp-flyte-artifact/flytesnacks/development/QNU7PAABKUIUMHOHEZIXITVXZE======>                                                                                                                                                           │
│ /fastad3548af2ffa5d39c0465f040927e549.tar.gz to /root/ (recursive=False).
It would have succeeded if it’s in a non-spark workflow. I start to thing it’s not because the tasks not having access to S3. But because it cannot copy to /root on the pod. What do you think?
t
Are you registering the workflows the same way in both cases? I think this
/fastad3548af2ffa5d39c0465f040927e549.tar.gz
is flyte's fast execute thing where it packages up your code and dependencies then unpacks it on top of the docker image on the pod.
f
yes
I did
Copy code
pyflyte register ....
I know the cause now. The spark task is automatically being assigned service account: spark, even if I don’t specify it in command line with --service-account spark And that is not having permission to access S3.
Any idea how to fix it? Thanks
t
Probably you need to apply some AWS IAM role to the spark service account or configure flyte to use the same service account it uses for python tasks. If you go for the later option you will need to add k8s role assignments so that this service account has the required permissions for running SparkApplications.
f
thanks
d
@Frank Shen can you share the output of
kubectl describe sa spark -n <your-flyte-namespace>
?
f
Hi @David Espejo (he/him),
Copy code
kubectl describe sa spark -n flytesnacks-development
Name:                spark
Namespace:           flytesnacks-development
Labels:              <none>
Annotations:         <none>
Image pull secrets:  <none>
Mountable secrets:   <none>
Tokens:              <none>
Events:              <none>
d
are you using
flyte-binary
?
f
I am using flyte-core
And for default sa:
Copy code
kubectl describe sa default -n flytesnacks-development
Name:                default
Namespace:           flytesnacks-development
Labels:              <none>
Annotations:         <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::245085526351:role/flyte-role
Image pull secrets:  artifactory-da-reader-token
Mountable secrets:   default-token-x5dbz
Tokens:              <none>
Events:              <none>
I confirm that for python tasks that use default sa, it’s having access to S3.
d
yeah, I think the
spark
SA is missing the annotation for the IAM Role. Try adding the following to your Helm chart:
Copy code
templates
  - key: 011_spark_service_account 
    value: |
      apiVersion: v1
      kind: ServiceAccount
      metadata:
        name: spark
        namespace: '{{ namespace }}'
        annotations:
          <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: '{{ defaultIamRole }}'
and do a
helm upgrade ...
f
I found another difference in this helm chart section. Do you think it matters? I am using the older code:
Copy code
sparkoperator:
  # --- enable or disable Sparkoperator deployment installation
  enabled: false
  # -- Spark plugin configuration
  plugin_config:
    plugins:
      spark:
        # -- Spark default configuration
        spark-config-default:
          # We override the default credentials chain provider for Hadoop so that
          # it can use the serviceAccount based IAM role or ec2 metadata based.
          # This is more in line with how AWS works
          - spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
          - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
          - spark.kubernetes.allocation.batch.size: "50"
          - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
          - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
          - spark.blacklist.enabled: "true"
          - spark.blacklist.timeout: "5m"
          - spark.task.maxfailures: "8"
which is different from https://docs.flyte.org/en/latest/deployment/plugins/k8s/index.html
Copy code
sparkoperator:
  enabled: true
  plugin_config:
    plugins:
      spark:
        # Edit the Spark configuration as you see fit
        spark-config-default:
          - spark.driver.cores: "1"
          - spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
          - spark.kubernetes.allocation.batch.size: "50"
          - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
          - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
          - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
          - spark.network.timeout: 600s
          - spark.executorEnv.KUBERNETES_REQUEST_TIMEOUT: 100000
          - spark.executor.heartbeatInterval: 60s
d
right, I think there's missing content in that section of the docs for
flyte-core
sparkoperator.enabled
should be
true
f
I set
sparkoperator.enabled to true in the overriden values file.
what else is wrong?
d
what
templates
do you have?
f
Our org have a custom way of using helm chart. In our main chart:
Copy code
repositories:
      spark-operator:
        enabled: false
        url: <https://googlecloudplatform.github.io/spark-on-k8s-operator>
    components:
      flyte:
        enabled: false
        namespace: flyte
        helm:
          releaseName: flyte
          repository: helm-charts
          chart: flyte-core
          revision: 3.0.1
      spark-operator:
        enabled: false
        namespace: spark-operator
        helm:
          releaseName: spark-operator
          alias: sparkoperator
          repository: spark-operator
          chart: spark-operator
          revision: 1.1.27
Therefore I don’t have the template below in our code base.
Copy code
templates
  - key: 011_spark_service_account
d
according to docs, you'll need these templates in your config:
Copy code
templates:
    # -- Template for namespaces resources
    - key: aa_namespace
      value: |
        apiVersion: v1
        kind: Namespace
        metadata:
          name: {{ namespace }}
        spec:
          finalizers:
          - kubernetes

    - key: ab_project_resource_quota
      value: |
        apiVersion: v1
        kind: ResourceQuota
        metadata:
          name: project-quota
          namespace: {{ namespace }}
        spec:
          hard:
            limits.cpu: {{ projectQuotaCpu }}
            limits.memory: {{ projectQuotaMemory }}

    - key: ac_spark_role
      value: |
        apiVersion: <http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>
        kind: Role
        metadata:
          name: spark-role
          namespace: {{ namespace }}
        rules:
        - apiGroups: ["*"]
          resources:
          - pods
          verbs:
          - '*'
        - apiGroups: ["*"]
          resources:
          - services
          verbs:
          - '*'
        - apiGroups: ["*"]
          resources:
          - configmaps
          verbs:
          - '*'

    - key: ad_spark_service_account
      value: |
        apiVersion: v1
        kind: ServiceAccount
        metadata:
          name: spark
          namespace: {{ namespace }}

    - key: ae_spark_role_binding
      value: |
        apiVersion: <http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>
        kind: RoleBinding
        metadata:
          name: spark-role-binding
          namespace: {{ namespace }}
        roleRef:
          apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
          kind: Role
          name: spark-role
        subjects:
        - kind: ServiceAccount
          name: spark
          namespace: {{ namespace }}
f
@David Espejo (he/him), I do have those in my chart. However, I had to make one change. And that is because our org don’t use rbac.authorization.k8s.io/v1beta1. So it won’t be found and will error.
Copy code
apiVersion: <http://rbac.authorization.k8s.io/v1|rbac.authorization.k8s.io/v1> instead of apiVersion: <http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>
@David Espejo (he/him), Do you think adding the annotations in the following template section will fix my issue?
Copy code
- key: ad_spark_service_account
      value: |
        apiVersion: v1
        kind: ServiceAccount
        metadata:
          name: spark
          namespace: {{ namespace }}
          annotations:
            eks.amazonaws.com/role-arn: '{{ some iam-role that has been defined }}'
d
it should
Copy code
<http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>
indeed, I think we should update the spec in those templates
f
Thanks!
@David Espejo (he/him), I did make progress, but having a new issue. The variable is not being resolved.
Copy code
kubectl describe sa spark -n flytesnacks-development
Name:                spark
Namespace:           flytesnacks-development
Labels:              <none>
Annotations:         eks.amazonaws.com/role-arn: arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role
My template:
Copy code
templates:
              - key: ad_spark_service_account
                value: |
                  apiVersion: v1
                  kind: ServiceAccount
                  metadata:
                    name: spark
                    namespace: {{ namespace }}
                    annotations:
                      eks.amazonaws.com/role-arn: arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role
However, the variable can be resolved for e.g. :
Copy code
flyteadmin:
            serviceAccount:
              annotations:
                eks.amazonaws.com/role-arn: arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role
I think it’s because template won’t be able to resolve variable like a normal value.
Copy code
kubectl describe sa flyteadmin -n flyte
Name:                flyteadmin
Namespace:           flyte
Labels:              app.kubernetes.io/instance=flyte
                     app.kubernetes.io/managed-by=Helm
                     app.kubernetes.io/name=flyteadmin
                     helm.sh/chart=flyte-core-3.0.1
                     helm.toolkit.fluxcd.io/name=flyte
                     helm.toolkit.fluxcd.io/namespace=flyte
Annotations:         eks.amazonaws.com/role-arn: arn:aws:iam::245085526351:role/flyte-role
How can I use variable with templates?
Hi @David Espejo (he/him), I hard-coded the iam role and it resolved the S3 access denied issue for spark tasks. However, I’ve encountered another issue for spark tasks.
Copy code
An error occurred while calling o122.parquet.
: java.nio.file.AccessDeniedException: mlp-flyte-workflow: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by DefaultAWSCredentialsProviderChain : com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), com.amazonaws.auth.profile.ProfileCredentialsProvider@6b5f17ee: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@3b3fb958: Unauthorized (Service: null; Status Code: 401; Error Code: null; Request ID: null)]
...
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), com.amazonaws.auth.profile.ProfileCredentialsProvider@6b5f17ee: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@3b3fb958: Unauthorized (Service: null; Status Code: 401; Error Code: null; Request ID: null)]
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
	at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
	... 52 more
I am using the same spark config as in the doc.
Copy code
sparkoperator:
  enabled: true
  plugin_config:
    plugins:
      spark:
        # Edit the Spark configuration as you see fit
        spark-config-default:
          - spark.driver.cores: "1"
          - spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
How can I set the following env vars in the spark task running pod or container flytesnacks-development?
Copy code
AWS_ACCESS_KEY
AWS_SECRET_ACCESS_KEY
I think it shouldn’t ask for AWS credentials, as it will use the node role(ec2 instance role) of EKS... Not really sure why this is expecting the keys..
@Kevin Su, @Samhita Alla, @Ketan (kumare3) We are trying to deploy flyte for production use and this is a blocking issue. Please help. Thanks a lot!
CC: @Cyrus
k
I think it shouldn’t ask for AWS credentials, as it will use the node role(ec2 instance role) of EKS.
you are right, you should not
could you describe the pod that runs spark task. kubectl describe pod <name> -n flytesnacks-development
f
Copy code
Name:             am92dvzqctxvhr74572q-n1-0-driver
Namespace:        flytesnacks-development
Priority:         0
Service Account:  spark
Node:             ip-100-66-52-246.ec2.internal/100.66.52.246
Start Time:       Mon, 04 Dec 2023 11:39:52 -0800
Labels:           domain=development
                  execution-id=am92dvzqctxvhr74572q
                  interruptible=false
                  node-id=n1
                  project=flytesnacks
                  shard-key=10
                  spark-app-selector=spark-6b8c4186d5d64363a8b358e1a0c256d6
                  spark-role=driver
                  <http://sparkoperator.k8s.io/app-name=am92dvzqctxvhr74572q-n1-0|sparkoperator.k8s.io/app-name=am92dvzqctxvhr74572q-n1-0>
                  <http://sparkoperator.k8s.io/launched-by-spark-operator=true|sparkoperator.k8s.io/launched-by-spark-operator=true>
                  <http://sparkoperator.k8s.io/submission-id=20022915-c6a7-4eca-ab06-505a2b1aec9d|sparkoperator.k8s.io/submission-id=20022915-c6a7-4eca-ab06-505a2b1aec9d>
                  task-name=test-spark-read-s3-load-s3-data
                  workflow-name=test-spark-read-s3-wf
Annotations:      <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: false
Status:           Succeeded
IP:               172.22.69.107
IPs:
  IP:  172.22.69.107
Containers:
  spark-kubernetes-driver:
    Container ID:  <containerd://f1f752e54d5a45ecf0dc410c9ad90e520a0522436c628a1b87408dbb7ac59b0>2
    Image:         <http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0>
    Image ID:      <http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte@sha256:eadbc0b33b14a825649ef0f809794d79e1bc55eb07560911b845f17e2476d3be|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte@sha256:eadbc0b33b14a825649ef0f809794d79e1bc55eb07560911b845f17e2476d3be>
    Ports:         7078/TCP, 7079/TCP, 4040/TCP
    Host Ports:    0/TCP, 0/TCP, 0/TCP
    Args:
      driver
      --properties-file
      /opt/spark/conf/spark.properties
      --class
      org.apache.spark.deploy.PythonRunner
      local:///opt/venv/bin/entrypoint.py
      pyflyte-fast-execute
      --additional-distribution
      <s3://mlp-flyte-artifact/flytesnacks/development/2QSUGV7OX6IXKD5N55OGGKTGQY======/fast7706dac240fa1194072d4ac2dd1037b4.tar.gz>
      --dest-dir
      /root
      --
      pyflyte-execute
      --inputs
      <s3://mlp-flyte-artifact/metadata/propeller/flytesnacks-development-am92dvzqctxvhr74572q/n1/data/inputs.pb>
      --output-prefix
      <s3://mlp-flyte-artifact/metadata/propeller/flytesnacks-development-am92dvzqctxvhr74572q/n1/data/0>
      --raw-output-data-prefix
      <s3://mlp-flyte-artifact/43/am92dvzqctxvhr74572q-n1-0>
      --checkpoint-path
      <s3://mlp-flyte-artifact/43/am92dvzqctxvhr74572q-n1-0/_flytecheckpoints>
      --prev-checkpoint
      ""
      --resolver
      flytekit.core.python_auto_container.default_task_resolver
      --
      task-module
      test_spark_read_s3
      task-name
      load_s3_data
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Mon, 04 Dec 2023 11:39:53 -0800
      Finished:     Mon, 04 Dec 2023 11:40:13 -0800
    Ready:          False
    Restart Count:  0
    Limits:
      cpu:     1
      memory:  2800Mi
    Requests:
      cpu:     1
      memory:  2800Mi
    Environment:
      SPARK_USER:                         root
      SPARK_APPLICATION_ID:               spark-6b8c4186d5d64363a8b358e1a0c256d6
      FLYTE_INTERNAL_TASK_DOMAIN:         development
      FLYTE_INTERNAL_VERSION:             -VnmQ0GG2CM2WRwW7ij7Aw==
      FLYTE_INTERNAL_EXECUTION_WORKFLOW:  flytesnacks:development:<http://test_spark_read_s3.wf|test_spark_read_s3.wf>
      FLYTE_INTERNAL_TASK_VERSION:        -VnmQ0GG2CM2WRwW7ij7Aw==
      MLFLOW_TRACKING_URI:                <https://mlflow.dev.api.discomax.com>
      FLYTE_INTERNAL_EXECUTION_ID:        am92dvzqctxvhr74572q
      FLYTE_ATTEMPT_NUMBER:               0
      FLYTE_START_TIME:                   1701718789074
      FLYTE_INTERNAL_TASK_NAME:           test_spark_read_s3.load_s3_data
      FLYTE_INTERNAL_NAME:                test_spark_read_s3.load_s3_data
      FLYTE_INTERNAL_DOMAIN:              development
      ENV:                                dev
      FLYTE_INTERNAL_PROJECT:             flytesnacks
      FLYTE_INTERNAL_EXECUTION_DOMAIN:    development
      FLYTE_INTERNAL_EXECUTION_PROJECT:   flytesnacks
      ML_S3_BUCKET_PREFIX:                mlp-flyte-workflow
      FLYTE_INTERNAL_TASK_PROJECT:        flytesnacks
      FLYTE_MAX_ATTEMPTS:                 1
      SPARK_DRIVER_BIND_ADDRESS:           (v1:status.podIP)
      PYSPARK_PYTHON:                     /opt/venv/bin/python3
      PYSPARK_DRIVER_PYTHON:              /opt/venv/bin/python3
      SPARK_LOCAL_DIRS:                   /var/data/spark-80ed1c93-4743-437a-aa8f-980006c8fd44
      SPARK_CONF_DIR:                     /opt/spark/conf
      AWS_STS_REGIONAL_ENDPOINTS:         regional
      AWS_DEFAULT_REGION:                 us-east-1
      AWS_REGION:                         us-east-1
      AWS_ROLE_ARN:                       arn:aws:iam::245085526351:role/flyte-role
      AWS_WEB_IDENTITY_TOKEN_FILE:        /var/run/secrets/eks.amazonaws.com/serviceaccount/token
    Mounts:
      /opt/spark/conf from spark-conf-volume-driver (rw)
      /var/data/spark-80ed1c93-4743-437a-aa8f-980006c8fd44 from spark-local-dir-1 (rw)
      /var/run/secrets/eks.amazonaws.com/serviceaccount from aws-iam-token (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-xzhwj (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             False 
  ContainersReady   False 
  PodScheduled      True 
Volumes:
  aws-iam-token:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  86400
  spark-local-dir-1:
    Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
    Medium:     
    SizeLimit:  <unset>
  spark-conf-volume-driver:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      spark-drv-c5ad328c365813af-conf-map
    Optional:  false
  kube-api-access-xzhwj:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
QoS Class:                   Guaranteed
Node-Selectors:              <none>
Tolerations:                 <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
                             <http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
Events:
  Type     Reason       Age   From               Message
  ----     ------       ----  ----               -------
  Normal   Scheduled    40m   default-scheduler  Successfully assigned flytesnacks-development/am92dvzqctxvhr74572q-n1-0-driver to ip-100-66-52-246.ec2.internal
  Warning  FailedMount  40m   kubelet            MountVolume.SetUp failed for volume "spark-conf-volume-driver" : configmap "spark-drv-c5ad328c365813af-conf-map" not found
  Normal   Pulled       40m   kubelet            Container image "<http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0>" already present on machine
  Normal   Created      40m   kubelet            Created container spark-kubernetes-driver
  Normal   Started      40m   kubelet            Started container spark-kubernetes-driver
k
are you able to hardcode it temporarily, and test it
Copy code
namespace: {{ namespace }}
                    annotations:
                      <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role
change it to eks.amazonaws.com/role-arn: arnawsiam:123456role/flyte-role
f
@Kevin Su, I already did:
Copy code
templates:
              - key: ad_spark_service_account
                value: |
                  apiVersion: v1
                  kind: ServiceAccount
                  metadata:
                    name: spark
                    namespace: {{ namespace }}
                    annotations:
                      <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::245085526351:role/flyte-role
Where else do you want me to override?
This is the sa:
Copy code
kubectl describe sa spark -n flytesnacks-development
Name:                spark
Namespace:           flytesnacks-development
Labels:              <none>
Annotations:         <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::245085526351:role/flyte-role
Image pull secrets:  <none>
Mountable secrets:   <none>
Tokens:              <none>
Events:              <none>
It’s the same for the default sa:
Copy code
kubectl describe sa default -n flytesnacks-development
Name:                default
Namespace:           flytesnacks-development
Labels:              <none>
Annotations:         <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::245085526351:role/flyte-role
Image pull secrets:  artifactory-da-reader-token
Mountable secrets:   default-token-x5dbz
Tokens:              <none>
Events:              <none>
And the python task in the same workflow as the spark task can access S3.
This is the pod description for python task in the same workflow as spark task:
Copy code
Name:             am92dvzqctxvhr74572q-n0-0
Namespace:        flytesnacks-development
Priority:         0
Service Account:  spark
Node:             ip-100-66-50-125.ec2.internal/100.66.50.125
Start Time:       Mon, 04 Dec 2023 11:39:49 -0800
Labels:           domain=development
                  execution-id=am92dvzqctxvhr74572q
                  interruptible=false
                  node-id=n0
                  project=flytesnacks
                  shard-key=10
                  task-name=test-spark-read-s3-say-hello
                  workflow-name=test-spark-read-s3-wf
Annotations:      <http://cluster-autoscaler.kubernetes.io/safe-to-evict|cluster-autoscaler.kubernetes.io/safe-to-evict>: false
                  primary_container_name: am92dvzqctxvhr74572q-n0-0
Status:           Succeeded
IP:               172.20.5.58
IPs:
  IP:           172.20.5.58
Controlled By:  flyteworkflow/am92dvzqctxvhr74572q
Containers:
  am92dvzqctxvhr74572q-n0-0:
    Container ID:  <containerd://c01887733aff00bdf1a93f5a7bc40b0e264eda3c34e626c13e8107a54190065>1
    Image:         <http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0>
    Image ID:      <http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte@sha256:eadbc0b33b14a825649ef0f809794d79e1bc55eb07560911b845f17e2476d3be|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte@sha256:eadbc0b33b14a825649ef0f809794d79e1bc55eb07560911b845f17e2476d3be>
    Port:          <none>
    Host Port:     <none>
    Args:
      pyflyte-fast-execute
      --additional-distribution
      <s3://mlp-flyte-artifact/flytesnacks/development/2QSUGV7OX6IXKD5N55OGGKTGQY======/fast7706dac240fa1194072d4ac2dd1037b4.tar.gz>
      --dest-dir
      /root
      --
      pyflyte-execute
      --inputs
      <s3://mlp-flyte-artifact/metadata/propeller/flytesnacks-development-am92dvzqctxvhr74572q/n0/data/inputs.pb>
      --output-prefix
      <s3://mlp-flyte-artifact/metadata/propeller/flytesnacks-development-am92dvzqctxvhr74572q/n0/data/0>
      --raw-output-data-prefix
      <s3://mlp-flyte-artifact/6o/am92dvzqctxvhr74572q-n0-0>
      --checkpoint-path
      <s3://mlp-flyte-artifact/6o/am92dvzqctxvhr74572q-n0-0/_flytecheckpoints>
      --prev-checkpoint
      ""
      --resolver
      flytekit.core.python_auto_container.default_task_resolver
      --
      task-module
      test_spark_read_s3
      task-name
      say_hello
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Mon, 04 Dec 2023 11:39:49 -0800
      Finished:     Mon, 04 Dec 2023 11:39:53 -0800
    Ready:          False
    Restart Count:  0
    Limits:
      cpu:     2
      memory:  1Gi
    Requests:
      cpu:     2
      memory:  1Gi
    Environment:
      FLYTE_INTERNAL_EXECUTION_WORKFLOW:  flytesnacks:development:<http://test_spark_read_s3.wf|test_spark_read_s3.wf>
      FLYTE_INTERNAL_EXECUTION_ID:        am92dvzqctxvhr74572q
      FLYTE_INTERNAL_EXECUTION_PROJECT:   flytesnacks
      FLYTE_INTERNAL_EXECUTION_DOMAIN:    development
      FLYTE_ATTEMPT_NUMBER:               0
      FLYTE_INTERNAL_TASK_PROJECT:        flytesnacks
      FLYTE_INTERNAL_TASK_DOMAIN:         development
      FLYTE_INTERNAL_TASK_NAME:           test_spark_read_s3.say_hello
      FLYTE_INTERNAL_TASK_VERSION:        -VnmQ0GG2CM2WRwW7ij7Aw==
      FLYTE_INTERNAL_PROJECT:             flytesnacks
      FLYTE_INTERNAL_DOMAIN:              development
      FLYTE_INTERNAL_NAME:                test_spark_read_s3.say_hello
      FLYTE_INTERNAL_VERSION:             -VnmQ0GG2CM2WRwW7ij7Aw==
      MLFLOW_TRACKING_URI:                <https://mlflow.dev.api.discomax.com>
      ENV:                                dev
      ML_S3_BUCKET_PREFIX:                mlp-flyte-workflow
      AWS_STS_REGIONAL_ENDPOINTS:         regional
      AWS_DEFAULT_REGION:                 us-east-1
      AWS_REGION:                         us-east-1
      AWS_ROLE_ARN:                       arn:aws:iam::245085526351:role/flyte-role
      AWS_WEB_IDENTITY_TOKEN_FILE:        /var/run/secrets/eks.amazonaws.com/serviceaccount/token
    Mounts:
      /var/run/secrets/eks.amazonaws.com/serviceaccount from aws-iam-token (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-sz9h9 (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             False 
  ContainersReady   False 
  PodScheduled      True 
Volumes:
  aws-iam-token:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  86400
  kube-api-access-sz9h9:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
QoS Class:                   Guaranteed
Node-Selectors:              <none>
Tolerations:                 <http://node.kubernetes.io/not-ready:NoExecute|node.kubernetes.io/not-ready:NoExecute> op=Exists for 300s
                             <http://node.kubernetes.io/unreachable:NoExecute|node.kubernetes.io/unreachable:NoExecute> op=Exists for 300s
Events:
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  53m   default-scheduler  Successfully assigned flytesnacks-development/am92dvzqctxvhr74572q-n0-0 to ip-100-66-50-125.ec2.internal
  Normal  Pulled     53m   kubelet            Container image "<http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0>" already present on machine
  Normal  Created    53m   kubelet            Created container am92dvzqctxvhr74572q-n0-0
  Normal  Started    53m   kubelet            Started container am92dvzqctxvhr74572q-n0-0
Spark task has the extra:
Copy code
Environment:
      SPARK_USER:                         root
@Kevin Su
k
so you got the same error after you override it?
Copy code
namespace: {{ namespace }}
                    annotations:
                      <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role
Spark task has the extra:
``` Environment:
SPARK_USER: root```
That should be fine
f
Yes. Same error
here is my override values:
Copy code
cluster_resource_manager:
             templates:
              - key: ad_spark_service_account
                value: |
                  apiVersion: v1
                  kind: ServiceAccount
                  metadata:
                    name: spark
                    namespace: {{ namespace }}
                    annotations:
                      eks.amazonaws.com/role-arn: arn:aws:iam::245085526351:role/flyte-role
k
could you also describe the sparkJob. kubectl describe sparkjob <name>
f
@Kevin Su, Is sparkjob a pod? What’s the command to get the sparkjob name?
k
kubectl get sparkjob
is a k8s custom resource
f
Copy code
kubectl get sparkjob --all-namespaces
error: the server doesn't have a resource type "sparkjob"
d
@Frank Shen in the `templates`section, you should use
{{ defaultIamRole }}
as the variable. Assuming that you have also specified
Copy code
cluster_resource_manager:
  config:
     cluster_resources:
      customData:
        - production:
          - defaultIamRole:
              value: <FLYTE_USER_IAM_ARN>
        - staging:
          - defaultIamRole:
              value: <FLYTE_USER_IAM_ARN>
        - development:
          - defaultIamRole:
              value: <FLYTE_USER_IAM_ARN>
f
@David Espejo (he/him), That’s correct:
Copy code
cluster_resource_manager:
            # -- Enables the Cluster resource manager component
            enabled: true
            # -- Starts the cluster resource manager in standalone mode with requisite auth credentials to call flyteadmin service endpoints
            standalone_deploy: false
            config:
              cluster_resources:
                refreshInterval: 5m
                customData:
                  - development:
                      - projectQuotaCpu:
                          value: "800"
                      - projectQuotaMemory:
                          value: "3200Gi"
                      - defaultIamRole:
                          value: "arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role"
                refresh: 5m
            templates:
              - key: ad_spark_service_account
                value: |
                  apiVersion: v1
                  kind: ServiceAccount
                  metadata:
                    name: spark
                    namespace: {{ namespace }}
                    annotations:
                      <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: arn:aws:iam::245085526351:role/flyte-role
How can I verify the default role is set correctly?
I checked the helm release for flyte:
Copy code
Values:
    cluster_resource_manager:
      Config:
        cluster_resources:
          Custom Data:
            Development:
              Project Quota Cpu:
                Value:  800
              Project Quota Memory:
                Value:  3200Gi
              Default Iam Role:
                Value:       arn:aws:iam::{{ .Values.userSettings.accountNumber }}:role/flyte-role
d
if your template is defined in this way:
Copy code
- key: ad_spark_service_account
                value: |
                  apiVersion: v1
                  kind: ServiceAccount
                  metadata:
                    name: spark
                    namespace: {{ namespace }}
                    annotations:
                      <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: {{ defaultIamRole }}
then your
spark
SA should end up with an annotation pointing to the IAM Role's ARN
f
What’s the difference if I hard-code it, which I did?
d
probably won't change the behavior on the additional errors you're experiencing, but it helps in case you change the IAM role in the future
f
Sure, @David Espejo (he/him). Do you have a fix for the current error?
s
hey @jeev, my apologies for bringing you into this thread. could you assist with resolving this error that's occurring in the Spark task?
Copy code
An error occurred while calling o122.parquet.
: java.nio.file.AccessDeniedException: mlp-flyte-workflow: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by DefaultAWSCredentialsProviderChain : com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), com.amazonaws.auth.profile.ProfileCredentialsProvider@6b5f17ee: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@3b3fb958: Unauthorized (Service: null; Status Code: 401; Error Code: null; Request ID: null)]
...
Caused by: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), com.amazonaws.auth.profile.ProfileCredentialsProvider@6b5f17ee: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@3b3fb958: Unauthorized (Service: null; Status Code: 401; Error Code: null; Request ID: null)]
	at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
	at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
	... 52 more
here's the pod description: https://flyte-org.slack.com/archives/CP2HDHKE1/p1701721256154039?thread_ts=1701456433.448079&amp;cid=CP2HDHKE1
does
spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
credentials provider look correct to you?
d
I'll also have a call with Frank to investigate more
f
new error with com.amazonaws.auth.WebIdentityTokenCredentialsProvider:
Copy code
Traceback (most recent call last):

      File "/opt/venv/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/test_spark_read_s3.py", line 35, in load_s3_data
        spark_df = spark.read.parquet(INPUT_PATH)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 364, in parquet
        return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
        return_value = get_return_value(
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
        return f(*a, **kw)
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(

Message:

    An error occurred while calling o122.parquet.
: java.io.IOException: From option fs.s3a.aws.credentials.provider java.lang.ClassNotFoundException: Class com.amazonaws.auth.WebIdentityTokenCredentialsProvider not found
	at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:631)
	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:597)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:268)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:752)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:750)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:579)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: Class com.amazonaws.auth.WebIdentityTokenCredentialsProvider not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClasses(Configuration.java:2663)
	at org.apache.hadoop.fs.s3a.S3AUtils.loadAWSProviderClasses(S3AUtils.java:628)
	... 30 more


User error.
@David Espejo (he/him), This is what my dockerfile looks like (for the AWS Java SDK version ref):
Copy code
FROM apache/spark-py:3.3.1

ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
ENV DEBIAN_FRONTEND=noninteractive
ARG spark_uid=1001

USER root

RUN apt-get update && apt-get install -y python3 python3-venv make build-essential libssl-dev python3-pip curl wget git


WORKDIR /opt
RUN curl <https://sdk.cloud.google.com> > install.sh
RUN bash /opt/install.sh --install-dir=/opt
ENV PATH $PATH:/opt/google-cloud-sdk/bin
WORKDIR /root

ENV VENV /opt/venv
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"
RUN pip3 install wheel

RUN wget <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.2/hadoop-aws-3.2.2.jar> -P /opt/spark/jars && \
    wget <https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar> -P /opt/spark/jars

COPY <http://in_container.mk|in_container.mk> /root/Makefile

# RUN chown -R ${spark_uid}:${spark_uid} /root


# Install the AWS cli separately to prevent issues with boto being written over
RUN pip3 install awscli

# Similarly, if you're using GCP be sure to update this command to install gsutil
# RUN apt-get install -y curl
# RUN curl -sSL <https://sdk.cloud.google.com> | bash
# ENV PATH="$PATH:/root/google-cloud-sdk/bin"


# Install Python dependencies

# Create the application directory
RUN mkdir /root/mlp_lib

# Set the working directory
WORKDIR /root/mlp_lib

# Copy the pyproject.toml and all other necessary files
COPY . /root/mlp_lib

# Now install the package, as all necessary files are in place
RUN pip3 install .

# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag

ENV PYTHONPATH "/root/mlp_lib:${PYTHONPATH}:"

ENTRYPOINT ["/opt/entrypoint.sh"]
ENV HOME /root
# USER ${spark_uid}
@David Espejo (he/him), Additional information. My flyte spark workflow code:
Copy code
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark


@task
def say_hello() -> int:
    return 777

@task(
    #container_image="<http://613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-spark-root-user:latest|613630599026.dkr.ecr.us-east-1.amazonaws.com/dai-mlp-flyte-spark-root-user:latest>",
    #container_image="{{.image.spark.fqn }}:{{.image.spark.version}}",
    task_config=Spark(
        spark_conf={
            "spark.driver.memory": "2000M",
            "spark.executor.memory": "1000M",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.driver.cores": "1",
        },
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def load_s3_data() -> int:
    spark = flytekit.current_context().spark_session
    INPUT_PATH = f'<s3a://mlp-flyte-workflow/flyte-workflows/team=mlp-demo/project=test/version=1.0/inference/latest/input/raw>'
    spark_df = spark.read.parquet(INPUT_PATH)
    
    df = spark_df.toPandas()
    print(df.head(5))
    return df.shape[0]

@workflow
def wf() -> int:
    say_hello()
    return load_s3_data()
My pyflyte command:
Copy code
pyflyte register mlp-demo/test_spark_read_s3.py  --image <http://876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0|876262748715.dkr.ecr.us-east-1.amazonaws.com/mlforge/flyte:0.1.0> --service-account spark
The spark task additional logs:
Copy code
23/12/04 23:16:45 WARN FileSystem: Failed to initialize fileystem <s3a://mlp-flyte-workflow/flyte-workflows/team=mlp-demo/project=test/version=1.0/inference/latest/input/raw>: java.nio.file.AccessDeniedException: mlp-flyte-workflow
That means it failed on these two lines of code:
Copy code
INPUT_PATH = f'<s3a://mlp-flyte-workflow/flyte-workflows/team=mlp-demo/project=test/version=1.0/inference/latest/input/raw>'
spark_df = spark.read.parquet(INPUT_PATH)
s
com.amazonaws.auth.WebIdentityTokenCredentialsProvider not found
is the error. i don't think this provider is necessary unless you are specifically aiming for the spark history server. https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html should work as well. regarding the dockerfile, you can use image spec as it comes with all the spark dependencies pre-installed: https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/k8s_spark_plugin/pyspark_pi.html.
@David Espejo (he/him) have you been able to identify the root cause of the aws permissions issue?
d
@Frank Shen can you check the Trust Relationship in your IAM Role? The
spark
SA should be there, otherwise it won't get a WebIdentityToken. It doesn't seem to be the cause of the
Class not found
error you're getting though. Also, is that the same IAM Role you're using in the other cluster where you don't get errors?
f
Hi @David Espejo (he/him), It is not the iam role in the other flyte cluster. The other flyte cluster is setup in a completely different AWS account, company platforms & environments.
d
ok, we should check that the Trust Relationship associated with the IAM Role, includes not only the
default
SA but also the
spark
one, otherwise, it won't work in the end
f
@David Espejo (he/him), The other cluster where spark tasks work didn’t even have an iam role associated with the spark SA:
Copy code
- key: ad_spark_service_account
      value: |
        apiVersion: v1
        kind: ServiceAccount
        metadata:
          name: spark
          namespace: {{ namespace }}
Where in my cluster, I need to add iam role to have the python task in the same workflow that has a spark task to work. The python task will assume the spark SA because of pyflyte param:
Copy code
--service-account spark
Copy code
- key: ad_spark_service_account
                value: |
                  apiVersion: v1
                  kind: ServiceAccount
                  metadata:
                    name: spark
                    namespace: {{ namespace }}
                    annotations:
                      <http://eks.amazonaws.com/role-arn|eks.amazonaws.com/role-arn>: '{{ defaultIamRole }}'
The other cluster’s iam role’s trustrelationship:
Copy code
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "<http://ec2.amazonaws.com|ec2.amazonaws.com>"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "<http://eks.amazonaws.com|eks.amazonaws.com>"
      },
      "Action": "sts:AssumeRole"
    },
    {
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::506334935535:oidc-provider/oidc.eks.us-east-1.amazonaws.com/id/4A5CA24198DD293C4668FFCC4255D11E"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "<http://oidc.eks.us-east-1.amazonaws.com/id/4A5CA24198DD293C4668FFCC4255D11E:aud|oidc.eks.us-east-1.amazonaws.com/id/4A5CA24198DD293C4668FFCC4255D11E:aud>": "<http://sts.amazonaws.com|sts.amazonaws.com>"
        }
      }
    }
  ]
}
My cluster’s iam role’s trustrelationship:
Copy code
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Federated": "arn:aws:iam::245085526351:oidc-provider/oidc.eks.us-east-1.amazonaws.com/id/C9E033A44FFBE3986BC2224BBA1D6579"
      },
      "Action": "sts:AssumeRoleWithWebIdentity",
      "Condition": {
        "StringEquals": {
          "<http://oidc.eks.us-east-1.amazonaws.com/id/C9E033A44FFBE3986BC2224BBA1D6579:aud|oidc.eks.us-east-1.amazonaws.com/id/C9E033A44FFBE3986BC2224BBA1D6579:aud>": "<http://sts.amazonaws.com|sts.amazonaws.com>"
        }
      }
    }
  ]
}
It’s missing:
Copy code
{
            "Sid": "",
            "Effect": "Allow",
            "Principal": {
                "Service": "<http://ec2.amazonaws.com|ec2.amazonaws.com>"
            },
            "Action": "sts:AssumeRole"
        },
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "<http://eks.amazonaws.com|eks.amazonaws.com>"
            },
            "Action": "sts:AssumeRole"
        },
d
@Frank Shen I'll work to reproduce this issue and will let you know
f
@David Espejo (he/him), Awesome, thanks! The other cluster is using
Copy code
- spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
@David Espejo (he/him) @Kevin Su, In a desperate effort since is not working, I tried the following according to doc https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/assumed_roles.html :
Copy code
- spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"
          - spark.hadoop.fs.s3a.assumed.role.arn: "arn:aws:iam::245085526351:role/flyte-role"
And got error:
Copy code
Traceback (most recent call last):

      File "/opt/venv/lib/python3.9/site-packages/flytekit/exceptions/scopes.py", line 206, in user_entry_point
        return wrapped(*args, **kwargs)
      File "/root/test_spark_read_s3.py", line 66, in spark_load_s3_data
        spark_df = spark.read.parquet(INPUT_PATH)
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 364, in parquet
        return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
        return_value = get_return_value(
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
        return f(*a, **kw)
      File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(

Message:

    An error occurred while calling o123.parquet.
: java.nio.file.AccessDeniedException: : org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider : org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:187)
	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:713)
	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet(S3AUtils.java:605)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:268)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3469)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:752)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:750)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:579)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:562)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider : org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset
	at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:159)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1225)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:801)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:751)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.doInvoke(AWSSecurityTokenServiceClient.java:1389)
	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1356)
	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.invoke(AWSSecurityTokenServiceClient.java:1345)
	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.executeAssumeRole(AWSSecurityTokenServiceClient.java:528)
	at com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient.assumeRole(AWSSecurityTokenServiceClient.java:500)
	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.newSession(STSAssumeRoleSessionCredentialsProvider.java:321)
	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.access$000(STSAssumeRoleSessionCredentialsProvider.java:37)
	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:76)
	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider$1.call(STSAssumeRoleSessionCredentialsProvider.java:73)
	at com.amazonaws.auth.RefreshableTask.refreshValue(RefreshableTask.java:257)
	at com.amazonaws.auth.RefreshableTask.blockingRefresh(RefreshableTask.java:213)
	at com.amazonaws.auth.RefreshableTask.getValue(RefreshableTask.java:154)
	at com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider.getCredentials(STSAssumeRoleSessionCredentialsProvider.java:299)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:285)
	at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.getCredentials(AssumedRoleCredentialProvider.java:169)
	at org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider.<init>(AssumedRoleCredentialProvider.java:158)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
	at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
	at org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProvider(S3AUtils.java:674)
	... 30 more
Caused by: org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset
	at org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.getCredentials(SimpleAWSCredentialsProvider.java:68)
	at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137)
	... 61 more


User error.
We don’t store AWS keys in the k8s env vars apparently. Please advise. Thanks
d
@Frank Shen I've spent a bunch of time trying to reproduce this issue and I'm not there yet. I think the Default Credentials provider is fine, I've been using it with a Trust Relationship like yours and at least it can generate the signed URLs and uploads the code
f
Hi @David Espejo (he/him), Thank you for the efforts. One thing I want to clarify is that the spark driver pod can access the flyte metadata S3 location via the assumed iam role fine. However it was the spark code that tried to read the dataset stored in S3 that doesn’t have the S3 permission. Because the spark code is calling the AWS Java SDK, and java code is not using the iam role, that is the cause of the issue.
Still I don’t know how to fix it.
@David Espejo (he/him), @Kevin Su, @Thomas Newton, @Samhita Alla, I have found the root cause of the spark operator not able to access S3 in the flyte cluster. The com.amazonaws.auth.DefaultAWSCredentialsProviderChain doesn’t invoke com.amazonaws.auth.WebIdentityTokenCredentialsProvider which is what I need to invoke. The versions of the hadoop and aws java jars do not support com.amazonaws.auth.WebIdentityTokenCredentialsProvider directly. I found the correct combinations of the versions of jars that works by try and error. Here are the versions of the jars that works in the custom flyte image I built.
Copy code
RUN wget <https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.2.4/hadoop-aws-3.2.4.jar> -P /opt/spark/jars && \
   wget <https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar> -P /opt/spark/jars
Note: the latest hadoop aws jar and its dependent aws java sdk bundle jar doesn’t work for me.
t
Good to here you've made progress. I think its best to use the jar versions referenced in the
<https://github.com/apache/spark/blob/v3.5.0/pom.xml>
for the version of spark you use. Its a bit orthogonal to Flyte but possibly defaults from fltye are misleading? Personally I avoided all this because, I build the docker image my own way using our bazel build system and I don't use any of the recommended
spark.hadoop
configs about S3 because we use Azure.
k
d
thanks for sharing @Frank Shen!
f
@Kevin Su it’s the docker image that needs the changes. I have created a PR https://github.com/flyteorg/flytesnacks/pull/1331
s
@Frank Shen have you tried using the latest apache/spark-py image (apache/spark-py:v3.4.0) with the recently released versions of hadoop-aws and aws-java-sdk-bundle?
f
Hi @Samhita Alla, Do you have the link to the dockerfile of the apache/spark-py:v3.4.0 image? I need to compare the versions of the hadoop-aws jar, etc.
f
@Kevin Su, thanks. However, this image doesn’t include hadoop-aws jars, etc.
k
yes, so we added that to our dockerfile, and use apache/spark-py as our base image. https://github.com/flyteorg/flytekit/blob/6f613e778103aa9b802c2176456e774ff2bffb12/plugins/flytekit-spark/Dockerfile#L12-L19
f
@Samhita Alla, I tried with hadoop-aws 3.3.6 and it didn’t work for me.