e

    Evan Sadler

    2 months ago
    Hello! I am trying to use the Flyte Spark plugin with the SynapseML package with the local runner. It works until I try to add the
    spark.jars.packages
    to the config (see example below). There is some kind of connection refused error:
    io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused:
    . I imagine this has to do with some configuration that I don’t quite understand in the backend. Any help is much appreciated. I tested it out with another package and it had the same error.
    import datetime
    import random
    from operator import add
    
    import flytekit
    from flytekit import Resources, task, workflow
    
    
    from flytekitplugins.spark import Spark
    
    @task(
        task_config=Spark(
            # this configuration is applied to the spark cluster
            spark_conf={
                "spark.driver.memory": "8g",
                "spark.jars.repositories": "<https://mmlspark.azureedge.net/maven>", 
                "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.9.5"  # adding this causes problems
            }
        ),
        limits=Resources(mem="2000M"),
        cache_version="1",
    )
    def hello_spark(partitions: int) -> float:
        print("Starting Spark with Partitions: {}".format(partitions))
    
        n = 100000 * partitions
        sess = flytekit.current_context().spark_session
        count = (
            sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
        )
        pi_val = 4.0 * count / n
        print("Pi val is :{}".format(pi_val))
        return pi_val
    Samhita Alla

    Samhita Alla

    2 months ago
    Hi, @Evan Sadler! Have you tried by setting
    "spark.driver.userClassPathFirst": "true"
    in your config?
    Also, the spark operator configuration is available here in (4).
    Could you share with me the full task log?
    e

    Evan Sadler

    2 months ago
    Thank you for the help! I actually found a solution, but I do want to surface because it was interesting. I am running this in local model (not deploying on flyte). What fixed it was setting this global var:
    SPARK_LOCAL_IP=localhost python spark_test.py
    I don’t know why setting
    spark.jars.packages
    causes problems, but it seems like there are connection issues possibly centered around localhost not being used (I am just guessing). Here are highlights of the many many logs printed when there the
    SPARK_LOCAL_IP
    is not set.
    22/07/12 10:48:45 ERROR Utils: Aborting task
    java.io.IOException: Failed to connect to <http://c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938|c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938>
    	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
    	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
    	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
    	at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399)
    	at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367)
    	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
    	at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366)
    	at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:762)
    	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:549)
    	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:962)
    	at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:954)
    	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    	at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
    	at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
    	at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
    	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
    	at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
    	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    	at <http://org.apache.spark.executor.Executor.org|org.apache.spark.executor.Executor.org>$apache$spark$executor$Executor$$updateDependencies(Executor.scala:954)
    	at org.apache.spark.executor.Executor.<init>(Executor.scala:247)
    	at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
    	at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
    	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
    	at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
    	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at py4j.Gateway.invoke(Gateway.java:238)
    	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: <http://c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938|c02dq9v9md6m-c02dq9v9md6m.twi.com/10.217.166.73:64938>
    Caused by: java.net.ConnectException: Connection refused
    Samhita Alla

    Samhita Alla

    2 months ago
    I guess
    SPARK_LOCAL_IP
    by default is set to the loopback address. I believe, when run locally, the env variable has to be set to
    localhost
    .