Hello! I am trying to use the Flyte Spark plugin ...
# announcements
e
Hello! I am trying to use the Flyte Spark plugin with the SynapseML package with the local runner. It works until I try to add the
spark.jars.packages
to the config (see example below). There is some kind of connection refused error:
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused:
. I imagine this has to do with some configuration that I don’t quite understand in the backend. Any help is much appreciated. I tested it out with another package and it had the same error.
Copy code
import datetime
import random
from operator import add

import flytekit
from flytekit import Resources, task, workflow


from flytekitplugins.spark import Spark

@task(
    task_config=Spark(
        # this configuration is applied to the spark cluster
        spark_conf={
            "spark.driver.memory": "8g",
            "spark.jars.repositories": "<https://mmlspark.azureedge.net/maven>", 
            "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5"  # adding this causes problems
        }
    ),
    limits=Resources(mem="2000M"),
    cache_version="1",
)
def hello_spark(partitions: int) -> float:
    print("Starting Spark with Partitions: {}".format(partitions))

    n = 100000 * partitions
    sess = flytekit.current_context().spark_session
    count = (
        sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    )
    pi_val = 4.0 * count / n
    print("Pi val is :{}".format(pi_val))
    return pi_val
s
Hi, @Evan Sadler! Have you tried by setting
"spark.driver.userClassPathFirst": "true"
in your config?
👀 1
Also, the spark operator configuration is available here in (4).
Could you share with me the full task log?
e
Thank you for the help! I actually found a solution, but I do want to surface because it was interesting. I am running this in local model (not deploying on flyte). What fixed it was setting this global var:
Copy code
SPARK_LOCAL_IP=localhost python spark_test.py
I don’t know why setting
spark.jars.packages
causes problems, but it seems like there are connection issues possibly centered around localhost not being used (I am just guessing). Here are highlights of the many many logs printed when there the
SPARK_LOCAL_IP
is not set.
Copy code
22/07/12 10:48:45 ERROR Utils: Aborting task
java.io.IOException: Failed to connect to <http://c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938|c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938>
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399)
	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
	at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366)
	at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:762)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:549)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:962)
	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:954)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
	at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
	at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
	at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
	at <http://org.apache.spark.executor.Executor.org|org.apache.spark.executor.Executor.org>$apache$spark$executor$Executor$$updateDependencies(Executor.scala:954)
	at org.apache.spark.executor.Executor.<init>(Executor.scala:247)
	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: <http://c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938|c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938>
Caused by: java.net.ConnectException: Connection refused
s
I guess
SPARK_LOCAL_IP
by default is set to the loopback address. I believe, when run locally, the env variable has to be set to
localhost
.
👍 1
111 Views