Hey, we are using Spark task and encountered an im...
# ask-the-community
m
Hey, we are using Spark task and encountered an implicit retry without us configuring it explicitly. When we run
kubectl get sparkapplication ...
we see
Copy code
FLYTE_MAX_ATTEMPTS: "1"
      FLYTE_ATTEMPT_NUMBER: "3"
Is this some default bahavior with spark tasks or a potential bug?
k
This could be a system retry, but would love to more. System retry can happen, if the job never launched
m
This is the Execution Error. It looks like the error is related to our code but the UI says that it is a system error. 1. How do we distinguish system error vs user error 2. Is the system error that triggers retries? 3. If so is there a way to configure the retry globally for system/user error?
Copy code
Traceback (most recent call last):

      File "/opt/venv/lib/python3.10/site-packages/flytekit/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/opt/venv/lib/python3.10/site-packages/flytekit/core/base_task.py", line 572, in dispatch_execute
        raise TypeError(

Message:

    Failed to convert return value for var o0 for function my_app.tasks.read_data with error <class 'py4j.protocol.Py4JJavaError'>: An error occurred while calling o148.parquet.
: java.lang.ClassNotFoundException: org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
	at java.base/java.net.URLClassLoader.findClass(Unknown Source)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Unknown Source)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:225)
	at org.apache.spark.internal.io.FileCommitProtocol$.instantiate(FileCommitProtocol.scala:213)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at <http://org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org|org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org>$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)


SYSTEM ERROR! Contact platform administrators.
y
@Ketan (kumare3) Hey, just wanted to check in and see if you could respond to the questions above, that would be super helpful for us, thanks in advance!
k
Hey folks, Flyte will be conservative when it cannot figure out system vs user errror, it will default to marking it as system error. But a platform admin can resolve it to being a user error
y
I see, then another following up question is if it’ possible to config the number of retires for these system errors? And also for user errors, apart from setting the retries in the task decorator, is it possible to config time of retry globally for the whole project?
k
For user error no, but number of system retries can be configured
y
Do you mind pointing out where it can be configured? Thanks🙇‍♀️
k
That is configured in flytepropeller check the config docs (afk) it should say system retries
y
I’ll check it out, thanks!!
229 Views