Evan Sadler
07/11/2022, 4:15 PMspark.jars.packages
to the config (see example below). There is some kind of connection refused error: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused:
.
I imagine this has to do with some configuration that I don’t quite understand in the backend. Any help is much appreciated. I tested it out with another package and it had the same error.
import datetime
import random
from operator import add
import flytekit
from flytekit import Resources, task, workflow
from flytekitplugins.spark import Spark
@task(
task_config=Spark(
# this configuration is applied to the spark cluster
spark_conf={
"spark.driver.memory": "8g",
"spark.jars.repositories": "<https://mmlspark.azureedge.net/maven>",
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5" # adding this causes problems
}
),
limits=Resources(mem="2000M"),
cache_version="1",
)
def hello_spark(partitions: int) -> float:
print("Starting Spark with Partitions: {}".format(partitions))
n = 100000 * partitions
sess = flytekit.current_context().spark_session
count = (
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
)
pi_val = 4.0 * count / n
print("Pi val is :{}".format(pi_val))
return pi_val
Samhita Alla
"spark.driver.userClassPathFirst": "true"
in your config?Evan Sadler
07/12/2022, 2:52 PMSPARK_LOCAL_IP=localhost python spark_test.py
I don’t know why setting spark.jars.packages
causes problems, but it seems like there are connection issues possibly centered around localhost not being used (I am just guessing). Here are highlights of the many many logs printed when there the SPARK_LOCAL_IP
is not set.
22/07/12 10:48:45 ERROR Utils: Aborting task
java.io.IOException: Failed to connect to <http://c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938|c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938>
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399)
at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366)
at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:762)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:549)
at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:962)
at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:954)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
at <http://org.apache.spark.executor.Executor.org|org.apache.spark.executor.Executor.org>$apache$spark$executor$Executor$$updateDependencies(Executor.scala:954)
at org.apache.spark.executor.Executor.<init>(Executor.scala:247)
at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: <http://c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938|c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938>
Caused by: java.net.ConnectException: Connection refused
Samhita Alla
SPARK_LOCAL_IP
by default is set to the loopback address. I believe, when run locally, the env variable has to be set to localhost
.