Having trouble running the *`k8s_spark.<http://dat...
# ask-the-community
c
Having trouble running the
k8s_spark.<http://dataframe_passing.my|dataframe_passing.my>_smart_structured_dataset
example. I've been able to run the other spark example
pyspark_pi
. I've set up the K8s Operator, built the Docker Image based on the Dockerfile in the
cookbook/integrations/kubernetes/k8s
folder. Getting this error about s3 :
Copy code
[3/3] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 527, in dispatch_execute
        raise TypeError(

Message:

    Failed to convert return value for var o0 for function k8s_spark.dataframe_passing.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o41.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FiInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:781)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


SYSTEM ERROR! Contact platform administrators.
k
@Chase Grisham you need the s3:// driver for spark
you can simply add it to the backend as a config
here is possiblie config
you can also add it to your tasks config
Copy code
@task(
task_config=Spark(
      spark_conf={
       spark.hadoop.fs.s3.impl:"org.apache.hadoop.fs.s3a.S3AFileSystem",
})
def foo():
  pass
c
pretty sure i added the config on the backend using the instructions here https://docs.flyte.org/en/latest/deployment/plugin_setup/k8s/index.html#deployment-plugin-setup-k8s (what would be the way to confirm?) I'll attempt putting them inline in the task config. Does the task config take all spark config settings? Was thinking of passing it inline but wasn't sure if it'd treat it the same
k
ohh you have to restart flytepropeller
thats the only thing
you can look at the config-map
kubectl -n flyte get cm flyte-propeller-config
c
how to restart flytepropeller?
y
kubectl -n flyte rollout restart deploy flytepropeller
👍 1
c
thanks I'll give this all a try and report back. Appreciate the awesome responsiveness!
No change after adding the s3 configuration to the task config or by restarting the flyte propeller after setting up the backend config
k
ohh sorry i missed it
Copy code
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
you have not installed an S3 driver for spark in the container
c
any guidance on best way to do that? Locally when i run spark I can pass
.config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0') \
in the spark config (tried that with task config to no avail) or when I start the spark app. And EMR handles it in production. What is the recommended method for adding it to the container?
k
Let me send some instructions
Sorry I was busy today
@Samhita Alla if you can help
c
No worries are appreciate any help or even just pointing me in a direction to figure out / learn. Really love the product's potential so just trying to work out all the nuances to get a demo with real code running
❤️ 1
k
Yes we are very grateful for you to bear through these problems. Trust us we want to help you be successful
@Chase Grisham did you follow the dockerfile exactly? or did you write it on your own. This is because you have to install the hadoop-aws libraries - like done in the packaged script - https://github.com/flyteorg/flytekit/blob/353df9b48858707267ede22fdf4e4852a3fc5389/plugins/flytekit-spark/scripts/flytekit_install_spark3.sh#L46
the script is called
flytekit_install_spark3
@Samhita Alla can you probably point him to docs on how to setup correctly
s
Hello @Chase Grisham! To enable Spark in your deployment, follow the steps outline under “Spark Operator” in https://docs.flyte.org/en/latest/deployment/plugin_setup/k8s/index.html#deployment-plugin-setup-k8s page. Also, the Dockerfile that Ketan has sent should resolve the s3 issue. Can you share the detailed pod log to debug further?
c
I used this Dockerfile from the repo https://github.com/flyteorg/flytesnacks/blob/master/cookbook/integrations/kubernetes/k8s_spark/Dockerfile. It has the install script in there
RUN flytekit_install_spark3.sh
s
Can you share the detailed pod log?
c
@Samhita Alla This the log file from the pod. It does look like the hadoop-aws jar is being loaded during the docker image process I can find it at
./var/lib/docker/overlay2/de24159d93f1ddb20b418e05ba33ceb3f545008bebfd370439553a34ea53aa29/diff/opt/spark/jars/hadoop-aws-3.2.0.jar
s
I’m assuming you’re using flytekitplugins-spark==0.32.6. Can you bump it to 1.0.0 in the requirements? The change that I’ve pushed should be available in the latest.
You should be having hadoop-aws 3.3.1 and aws-java-sdk 1.11.901
c
No go still after update the flytekitplugins-spark package requirements
s
Somehow hadoop-aws and aws-java-sdk aren’t being recognised by the plugin. Have you followed the steps outlined here? Let me try to run this tomorrow. I’ll let you know the status.
c
yes I've run through setting up the k8s-spark-operator and best I can tell it succeeds properly
Is there some way to confirm the setup through checking configuration files somewhere? This was posted earlier
kubectl -n flyte get cm flyte-propeller-config
but it just seems to tell me the size of the config and how long its lived.
Here are the steps I am running.
And here is the task I'm trying to run. The only thing I've changed on this is I had to add Resource limits to stop getting OOM errors.
s
@Ketan (kumare3) & @Chase Grisham, I’m seeing the same S3 error (if the spark task resource limit is not increased, the error would be
Answer from Java side is empty
). When I inspected the config map through
kubectl -n flyte get cm flyte-propeller-config
, spark wasn’t present. When I asked @Yuvraj about it, he said we ought to have
flyte
as the key in the
values-override.yaml
file, meaning the config has to be present under
flyte
key:
Copy code
flyte:
  cluster_resource_manager:
    # -- Enables the Cluster resource manager component
    enabled: true
    # -- Configmap for ClusterResource parameters
    config:
...
But this isn’t successfully bringing up all the pods after following https://docs.flyte.org/en/latest/deployment/plugin_setup/k8s/index.html#deployment-plugin-setup-k8s:
Copy code
flyte-kubernetes-dashboard-7fd989b99d-9dj2r   1/1     Running                 0          7m40s
flyteconsole-668f9ccdd8-bj8pf                 1/1     Running                 0          7m40s
flyteadmin-7db9b49c6f-24dq6                   1/1     Running                 0          7m40s
syncresources-27519190-4ws6k                  0/1     Completed               0          6m45s
syncresources-27519191-rkhhn                  0/1     Completed               0          5m45s
syncresources-27519192-jtc7q                  0/1     Completed               0          4m45s
postgres-5b4ccdcd68-9gpv8                     1/1     Running                 0          3m51s
minio-999cb6d9b-ltnzn                         1/1     Running                 0          3m51s
flyte-contour-contour-7cfc9f6fb5-g842s        1/1     Running                 0          3m51s
flytepropeller-cfcdd6bf5-krtw2                1/1     Running                 0          3m51s
datacatalog-7cc7d996d5-s4dmv                  1/1     Running                 0          3m51s
flyte-pod-webhook-67dfc889df-wsxqd            1/1     Running                 0          3m51s
flytescheduler-6d6f79d89-6lqt6                1/1     Running                 3          3m51s
flyte-contour-envoy-pcwjh                     2/2     Running                 0          2m45s
flyteadmin-8c8b86d46-v2jxw                    0/2     Init:CrashLoopBackOff   4          3m51s
syncresources-27519193-shg2c                  0/1     CrashLoopBackOff        4          3m45s
syncresources-27519194-w86t7                  0/1     Error                   4          2m45s
flyteconsole-85df86887d-ftcjg                 0/1     ErrImagePull            0          3m51s
syncresources-27519195-l77td                  0/1     Error                   3          105s
syncresources-27519196-kcj2p                  1/1     Running                 2          45s
Notes: •
flyteconsole
is failing because it’s trying to pull
"<http://cr.flyte.org/flyteorg/flyteconsole-release:v0.19.0|cr.flyte.org/flyteorg/flyteconsole-release:v0.19.0>"
which doesn’t exist. • `flyte`’s version on running
helm upgrade ..
is 0.19.0, which needs to be upgraded to
1.0.0
• In
flyteadmin
pod log, I see `Back-off restarting failed container`; here’s the event log:
Copy code
Type     Reason     Age                   From               Message
  ----     ------     ----                  ----               -------
  Normal   Scheduled  13m                   default-scheduler  Successfully assigned flyte/flyteadmin-8c8b86d46-v2jxw to eba2bc8245ba
  Normal   Pulled     13m                   kubelet            Container image "<http://ecr.flyte.org/ubuntu/postgres:13-21.04_beta|ecr.flyte.org/ubuntu/postgres:13-21.04_beta>" already present on machine
  Normal   Created    13m                   kubelet            Created container check-db-ready
  Normal   Started    13m                   kubelet            Started container check-db-ready
  Normal   Pulling    13m                   kubelet            Pulling image "<http://cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0|cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0>"
  Normal   Pulled     13m                   kubelet            Successfully pulled image "<http://cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0|cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0>" in 34.417862933s
  Normal   Created    13m                   kubelet            Created container run-migrations
  Normal   Started    13m                   kubelet            Started container run-migrations
  Normal   Pulled     13m                   kubelet            Container image "<http://cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0|cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0>" already present on machine
  Normal   Created    13m                   kubelet            Created container seed-projects
  Normal   Started    13m                   kubelet            Started container seed-projects
  Normal   Pulled     11m (x4 over 13m)     kubelet            Container image "<http://cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0|cr.flyte.org/flyteorg/flyteadmin-release:v0.19.0>" already present on machine
  Normal   Created    11m (x4 over 13m)     kubelet            Created container sync-cluster-resources
  Normal   Started    11m (x4 over 13m)     kubelet            Started container sync-cluster-resources
  Warning  BackOff    3m39s (x37 over 12m)  kubelet            Back-off restarting failed container
Not sure if this would resolve the issue, but IMO, we’ll have to fix this first.
k
I'm running into this same issue as well. I fixed the issue I'd been running into before with spark by manually creating the service accounts and started seeing this issue this week.
k
What - @Yuvraj / @Haytham Abuelfutuh something is wrong with the helm chart
@Samhita Alla can you help. I will be in a flight soon
s
Sure Ketan.
h
@Yuvraj can you repro this on sandbox?
👍 1
s
Pods have come up successfully after I ran
helm dep update
. Will continue debugging the s3 issue.
🙏 1
Somehow, I’ve arrived at this error,
Copy code
Failed to convert return value for var o0 for function k8s_spark.dataframe_passing.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o140.parquet.
: java.io.FileNotFoundException: PUT 0-byte object  on r3/adggpq6ghqkppp2jjj9m-n0-3/8addd767922501d67338242d43f12759/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: Not Found (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request ID: null; S3 Extended Request ID: null; Proxy: minio.flyte), S3 Extended Request ID: null:404 Not Found
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:260)
	at org.apache.hadoop.fs.s3a.Invoker.once(I$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5227)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5173)
	at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:415)
	at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6289)
	at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1834)
	at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1794)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2432)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$22(S3AFileSystem.java:4098)
	at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:115)
	... 54 more
with help from @Yuvraj. I guess the problem now is concerning providing the required access. I’m seeing this error after I’ve added:
Copy code
- spark.hadoop.fs.s3a.proxy.host: minio.flyte
            - spark.hadoop.fs.s3a.proxy.port: 9000
to the spark conf in the
values-override.yaml
file, minio creds to the dockerfile, along with a couple of other changes. But if there’s no proxy, I see the following error:
Copy code
Failed to convert return value for var o0 for function k8s_spark.dataframe_passing.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o138.parquet.
: java.nio.file.AccessDeniedException: <s3://my-s3-bucket/3d/aphcqdxts4ccpdbzqjjr-n0-3/ff104636f156a8f5de8ef1378b0c81a8>: getFileStatus on <s3://my-s3-bucket/3d/aphcqdxts4ccpdbzqjjr-n0-3/ff104636f156a8f5de8ef1378b0c81a8>: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 91ZXJZRMJPSHJ7QY; S3 Extended Request ID: pOYFp8feFwlpc0fSnTDoQ75RklZu51sa69plw+kORlJbqjSazc/o5+MuUZCBGNINRMDi61IqMmU=; Proxy: null),pClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5227)
	at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5173)
	at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1360)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$6(S3AFileSystem.java:2066)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:412)
	at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:375)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2056)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2032)
	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3273)
	... 46 more


SYSTEM ERROR! Contact platform administrators.
Can someone help me resolve this issue? @User / @User
k
Ohh access denied
Are u using the right service account
y
@Ketan (kumare3) why are you thinking that it’s service account issue ? It’s sandbox and we use ENV variable for s3 auth. Samhita added aws env variable in her dockerfile.
c
@Samhita Alla any further updates on investigating this issue?
k
@Chase Grisham sorry for the delay. Most of us were at pycon and back today. Someone will try to look
s
I haven’t worked on this today, @Chase Grisham, hoping for some pointers. I’ll take a look at it tomorrow.
👍 1
Here’s a PR that has all the changes we introduced: https://github.com/flyteorg/flytesnacks/pull/746. There’s also values-override.yaml specifying the backend config.
Are u using the right service account
Btw Ketan, I’ve used “spark” service account.
@Chase Grisham @Katrina P @Ketan (kumare3) FINALLY! I see successful executions of spark examples on the sandbox. Here are the two PRs which have the required changes: https://github.com/flyteorg/flyte/pull/2445 & https://github.com/flyteorg/flytesnacks/pull/746. The solution for the s3 access issue is to have minio-related config in the spark config section. Please find the
values-override.yaml
config on this page. @Chase Grisham & @Katrina P, we’ll have to use
flyte
as the base key in the
values-override.yaml
file (example) until this PR is merged, which @Yuvraj is working on.
🙌 2
c
Awesome ! Thank you for working through this @Samhita Alla !
k
Thanks! I appreciate it. I should be able to find some time to run though setting everything up from scratch today and try it out.
k
Thank you so much. @Samhita Alla you rock
☝️ 1
This was only for sandbox. So I was confused. I thought this was on cloud
@Chase Grisham / @Katrina P thank you for the patience. These have been merged
c
I synced the repo changes and replaced my
values-override.yaml
with the new changes with the minio configurations and still getting the same error =/
k
are you running this locally? with minio right
TBH its weird to run spark locally
c
lol I won't argue with you on the last point. Yes running this locally however. First time ever using minio though. I was hoping to get a local demo working to pitch to my team before spinning up additional resources .
k
fair
🙂
can you share the error
I know @Samhita Alla said she got it working
she is working in different hours, so she must be asleep now, if you dont mind
c
ya it's the same s3 error
Copy code
[3/3] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/opt/venv/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.8/site-packages/flytekit/core/base_task.py", line 527, in dispatch_execute
        raise TypeError(

Message:

    Failed to convert return value for var o0 for function k8s_spark.dataframe_passing.create_spark_df with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o41.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FiInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:781)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)


SYSTEM ERROR! Contact platform administrators.
y
@Chase Grisham can you post your
values-override.yaml
?
c
Seeing this when poking through kubernetes logs .
Copy code
pods "flyte-sparkoperator-5f8b4845f8-" is forbidden: error looking up
        service account flyte/flyte-sparkoperator: serviceaccount
        "flyte-sparkoperator" not found
y
@Chase Grisham Sorry for the inconvenience, You can use these values for sandbox
Copy code
flyte:
  cluster_resource_manager:
    # -- Enables the Cluster resource manager component
    enabled: true
    # -- Configmap for ClusterResource parameters
    config:
      # -- ClusterResource parameters
      # Refer to the [structure](<https://pkg.go.dev/github.com/lyft/flyteadmin@v0.3.37/pkg/runtime/interfaces#ClusterResourceConfig>) to customize.
      cluster_resources:
        refreshInterval: 5m
        templatePath: "/etc/flyte/clusterresource/templates"
        customData:
          - production:
              - projectQuotaCpu:
                  value: "5"
              - projectQuotaMemory:
                  value: "4000Mi"
          - staging:
              - projectQuotaCpu:
                  value: "2"
              - projectQuotaMemory:
                  value: "3000Mi"
          - development:
              - projectQuotaCpu:
                  value: "4"
              - projectQuotaMemory:
                  value: "5000Mi"
        refresh: 5m
  
    # -- Resource templates that should be applied
    templates:
      # -- Template for namespaces resources
      - key: aa_namespace
        value: |
          apiVersion: v1
          kind: Namespace
          metadata:
            name: {{ namespace }}
          spec:
            finalizers:
            - kubernetes
  
      - key: ab_project_resource_quota
        value: |
          apiVersion: v1
          kind: ResourceQuota
          metadata:
            name: project-quota
            namespace: {{ namespace }}
          spec:
            hard:
              limits.cpu: {{ projectQuotaCpu }}
              limits.memory: {{ projectQuotaMemory }}
  
      - key: ac_spark_role
        value: |
          apiVersion: <http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>
          kind: Role
          metadata:
            name: spark-role
            namespace: {{ namespace }}
          rules:
          - apiGroups: ["*"]
            resources: ["pods"]
            verbs: ["*"]
          - apiGroups: ["*"]
            resources: ["services"]
            verbs: ["*"]
          - apiGroups: ["*"]
            resources: ["configmaps", "persistentvolumeclaims"]
            verbs: ["*"]
  
      - key: ad_spark_service_account
        value: |
          apiVersion: v1
          kind: ServiceAccount
          metadata:
            name: spark
            namespace: {{ namespace }}
  
      - key: ae_spark_role_binding
        value: |
          apiVersion: <http://rbac.authorization.k8s.io/v1beta1|rbac.authorization.k8s.io/v1beta1>
          kind: RoleBinding
          metadata:
            name: spark-role-binding
            namespace: {{ namespace }}
          roleRef:
            apiGroup: <http://rbac.authorization.k8s.io|rbac.authorization.k8s.io>
            kind: Role
            name: spark-role
          subjects:
          - kind: ServiceAccount
            name: spark
            namespace: {{ namespace }}
  
  sparkoperator:
    enabled: true
    plugin_config:
      plugins:
        spark:
          # -- Spark default configuration
          spark-config-default:
            # We override the default credentials chain provider for Hadoop so that
            # it can use the serviceAccount based IAM role or ec2 metadata based.
            # This is more in line with how AWS works
            - spark.hadoop.fs.s3a.aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
            - spark.hadoop.fs.s3a.endpoint: "<http://minio.flyte.svc.cluster.local:9000>"
            - spark.hadoop.fs.s3a.access.key: "minio"
            - spark.hadoop.fs.s3a.secret.key: "miniostorage"
            - spark.hadoop.fs.s3a.path.style.access: "true"
            - spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
            - spark.kubernetes.allocation.batch.size: "50"
            - spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
            - spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
            - spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
            - spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
            - spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
            - spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
            - spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
            - spark.hadoop.fs.s3a.multipart.threshold: "536870912"
            - spark.excludeOnFailure.enabled: "true"
            - spark.excludeOnFailure.timeout: "5m"
            - spark.task.maxfailures: "8"
  configmap:
    enabled_plugins:
      # -- Tasks specific configuration [structure](<https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig>)
      tasks:
        # -- Plugins configuration, [structure](<https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig>)
        task-plugins:
          # -- [Enabled Plugins](<https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config>). Enable sagemaker*, athena if you install the backend
          # plugins
          enabled-plugins:
            - container
            - sidecar
            - k8s-array
            - spark
          default-for-task-types:
            container: container
            sidecar: sidecar
            container_array: k8s-array
            spark: spark
c
really wish I could say that worked =/
Copy code
[1/1] currentAttempt done. Last Error: USER::Spark Job  Submission Failed with Error: failed to run spark-submit for SparkApplication flytesnacks-development/a9sgv4zlx2nqhljrrnd2-n0-0: WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/05/04 22:04:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/05/04 22:04:19 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
22/05/04 22:04:20 INFO KerberosConfDriverFeatureStep: You have not specified a krb5.conf file locally or via a ConfigMap. Make sure that you have the krb5.conf locally on the driver image.
Exception in thread "main" org.apache.spark.SparkException: Please specify spark.kubernetes.file.upload.path property.
	at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:299)
	at org.apache.spark.deploy.k8s.KubernetesUtils$.renameMainAppResource(KubernetesUtils.scala:270)
	at org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configureForPython(DriverCommandFeatureStep.scala:109)
	at org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configurePod(DriverCommandFeatureStep.scala:44)
	at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:59)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:89)
	at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)
	at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2611)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22/05/04 22:04:20 INFO ShutdownHookManager: Shutdown hook called
22/05/04 22:04:20 INFO ShutdownHookManager: Deleting directory /tmp/spark-857af52f-ce40-4c26-893a-44478cd6dd98
I'm starting to understand a bit more how this is all working together so I'll keep trying to debug it.
s
It works for me, @Chase Grisham. I tried running the example today as well with the config Yuvraj sent. Are you using “spark” service account? And, can you teardown your sandbox and follow the steps right from the start after pulling the latest flytesnacks changes? Can you also update your helm dependencies by running
helm dep update
in the
flyte/charts/flyte
repository after you clone https://github.com/flyteorg/flyte?
Also, please build the image and register the spark examples:
Copy code
## BUILD
flytectl sandbox exec -- docker build . --tag "examples:v1" --file k8s_spark/Dockerfile
pyflyte --pkgs k8s_spark package --image "examples:v1" 
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v1
Hi @Chase Grisham! Are you still seeing issues or are your runs successful?
c
Thanks for following up @Samhita Alla I have not been able to get them to run yet. I've been a bit tied up, but I'm going to remove everything flyte related and start from scratch to see if that'll get it working for me. I'll follow up again after that.
👍 1
k
this is really sad
please let us know how we can help
c
You all have been super helpful and seem to be confident you got it working so I want to make sure I didn't bungle something up when i was messing around trying different things to get it working. Hopefully be able to get around to retesting it the next day or so.
k
No worries, please take your time. Just let us know if you find any more problems
c
@Ketan (kumare3) @Samhita Alla after starting from scratch and going through all the steps again still ending up at this error. What I find strange is I can run the
pyspark_pi
example just fine without installing the k8s_sparkoperator. But cannot run the
dataframe_passing
example. Soon as I install the k8s_sparkoperator neither of them will run due to the error below.
Copy code
[1/1] currentAttempt done. Last Error: USER::Spark Job  Submission Failed with Error: failed to run spark-submit for SparkApplication flytesnacks-development/a78dhklnqtwphd22bm8v-n0-0: WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
22/05/14 12:05:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/05/14 12:05:04 INFO SparkKubernetesClientFactory: Auto-configuring K8S client using current context from users K8S config file
22/05/14 12:05:05 INFO KerberosConfDriverFeatureStep: You have not specified a krb5.conf file locally or via a ConfigMap. Make sure that you have the krb5.conf locally on the driver image.
Exception in thread "main" org.apache.spark.SparkException: Please specify spark.kubernetes.file.upload.path property.
	at org.apache.spark.deploy.k8s.KubernetesUtils$.uploadFileUri(KubernetesUtils.scala:299)
	at org.apache.spark.deploy.k8s.KubernetesUtils$.renameMainAppResource(KubernetesUtils.scala:270)
	at org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configureForPython(DriverCommandFeatureStep.scala:109)
	at org.apache.spark.deploy.k8s.features.DriverCommandFeatureStep.configurePod(DriverCommandFeatureStep.scala:44)
	at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.$anonfun$buildFromFeatures$3(KubernetesDriverBuilder.scala:59)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:89)
	at org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder.buildFromFeatures(KubernetesDriverBuilder.scala:58)
	at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:106)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3(KubernetesClientApplication.scala:213)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.$anonfun$run$3$adapted(KubernetesClientApplication.scala:207)
	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2611)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:207)
	at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:179)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22/05/14 12:05:05 INFO ShutdownHookManager: Shutdown hook called
22/05/14 12:05:05 INFO ShutdownHookManager: Deleting directory /tmp/spark-c50b58af-9177-4c36-877e-0ed7b7149eaf
s
@Chase Grisham, could you please share your pod driver log, including the spark submit command?
Also, when you run
kubectl -n flyte get cm flyte-propeller-config -o yaml
, do you see spark config?
c
@Samhita Alla • Yes my install script is the same as the one you provided • here is the spark config I see from the flyte-propeller-config • Also attached the logs for that contain the spark-submit command
s
@Chase Grisham, the pod log looks different from the one I’m getting. What’s the pod name your log is associated with?
Also, is this your Dockerfile?
Copy code
FROM ubuntu:focal
LABEL org.opencontainers.image.source <https://github.com/flyteorg/flytesnacks>

WORKDIR /root
ENV VENV /opt/venv
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
ENV PYTHONPATH /root
ENV DEBIAN_FRONTEND=noninteractive

# Install Python3 and other basics
RUN apt-get update && apt-get install -y python3.8 python3.8-venv make build-essential libssl-dev python3-pip curl

# Install AWS CLI to run on AWS (for GCS install GSutil). This will be removed
# in future versions to make it completely portable
RUN pip3 install awscli

WORKDIR /opt
RUN curl <https://sdk.cloud.google.com> > install.sh
RUN bash /opt/install.sh --install-dir=/opt
ENV PATH $PATH:/opt/google-cloud-sdk/bin
WORKDIR /root

ENV VENV /opt/venv
# Virtual environment
RUN python3 -m venv ${VENV}
ENV PATH="${VENV}/bin:$PATH"

RUN pip3 install wheel

# Install Python dependencies
COPY k8s_spark/requirements.txt /root
RUN pip install -r /root/requirements.txt

RUN flytekit_install_spark3.sh

# Adding Tini support for the spark pods
RUN wget  <https://github.com/krallin/tini/releases/download/v0.18.0/tini> && \
    cp tini /sbin/tini && cp tini /usr/bin/tini && \
    chmod a+x /sbin/tini && chmod a+x /usr/bin/tini

# Setup Spark environment
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64
ENV SPARK_HOME /opt/spark
ENV SPARK_VERSION 3.2.1
ENV PYSPARK_PYTHON ${VENV}/bin/python3
ENV PYSPARK_DRIVER_PYTHON ${VENV}/bin/python3

# Copy the makefile targets to expose on the container. This makes it easier to register.
# Delete this after we update CI
COPY <http://in_container.mk|in_container.mk> /root/Makefile

# Delete this after we update CI to not serialize inside the container
COPY k8s_spark/sandbox.config /root

# Copy the actual code
COPY k8s_spark/ /root/k8s_spark

# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
ARG tag
ENV FLYTE_INTERNAL_IMAGE $tag

# Copy over the helper script that the SDK relies on
RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
RUN chmod a+x /usr/local/bin/flytekit_venv

# For spark we want to use the default entrypoint which is part of the
# distribution, also enable the virtualenv for this image.
ENTRYPOINT ["/opt/entrypoint.sh"]
What’s your Kubernetes version?
c
@Samhita Alla • that is the pod log for the spark_operator I believe. I assumed that was where to find the spark-submit . Do you need the log for a different pod? • Yes my Dockerfile is identical to that • Kubernetes v 1.24.0
s
Is that driver pod log? Can you share me the kubectl command?
Is your K8s 1.21 in the sandbox? Can you run
kubectl version --short
in your docker sandbox?
Also, I’m assuming you’re using “spark” service account.
FYI, here are the commands I’m running in the
kubernetes
directory in flytesnacks:
Copy code
flytectl sandbox start --source .
# export kubeconfig and flytectl config commands

helm repo add flyteorg <https://flyteorg.github.io/flyte>
helm repo add spark-operator <https://googlecloudplatform.github.io/spark-on-k8s-operator>
helm dep update (in flyte repo; flyte/charts/flyte directory)
helm install spark-operator spark-operator/spark-operator --namespace spark-operator --create-namespace
helm upgrade flyte flyteorg/flyte -f values-override.yaml -n flyte

# wait for a few seconds until all the pods are up and running; check the status using kubectl get pods -n flyte

flytectl sandbox exec -- docker build . --tag "examples:v1" --file k8s_spark/Dockerfile
pyflyte --pkgs k8s_spark package --image "examples:v1" 
flytectl register files --project flytesnacks --domain development --archive flyte-package.tgz --version v1
c
Yes I'm using the spark service account (though interestingly enough I get the same error even if I don't use the spark service account). Those look like the same commands and order I'm using , but I'll double check them and the K8s version in the sandbox later this evening or tomorrow. I'm not using a
kubectl
command to pull the log, I'm just exporting it from the kubernetes console. If there's a better way I should be doing it let me know; still learning kubernetes.
Again just want to express my appreciation and gratitude for your patience and willingness to help resolve this.
s
Of course! We thank you for your patience. 🙂 I’m unable to repro your issue, and hence, it’s taking longer than expected. Please run
kubectl
-related commands once you open bash for your docker container using
docker exec -it <container-id> bash
(run
docker ps
to know the container ID), although, the K8s console log should be the same as the pod log. In any case, can you please send me the pod log? Run
kubectl get pods -n flytesnacks-development
first, get the pod name (it’s usually of the format
<…>-n0-0-driver
, but if you don’t see this, it should be
<…>-n0-0
), and run
kubectl logs <pod-name> -n flytesnacks-development
. Also, please double check the order of your commands and K8s version and lemme know. Make sure you wait for a few seconds until all the pods are in COMPLETED and RUNNING after you run
helm upgrade …
command. To check the pods’ status, run
kubectl get pods -n flyte
in the docker container bash. IMO,
spark.kubernetes.file.upload.path
should be available automatically. Your code’s failing even before the spark submission, and hence you aren’t seeing any service account-related error. I’m presuming the spark operator isn’t getting installed properly. Let’s see if you’re able to circumvent the issue after you run the commands I’ve sent.
c
@Samhita Alla Sorry for the delay getting back to you. • my sandbox is on K8s is on version 1.21 • When i run
kubectl get pods -n flytesnacks-development
I get "No resources found in flytesnacks-development namespace" • Here are all the pods that it says are running. This is after replicating the error trying to run the workflow.
s
@Chase Grisham, you should be able to find pods in the
flytesnacks-development
namespace after you run the workflow. Let me DM you.
1042 Views