https://flyte.org logo
#announcements
Title
# announcements
h

Hafsa Junaid

03/17/2022, 7:39 PM
Hey! Why is it necessary to install flytekit in virtual environment. ??? Flytekit not found error occurs even if the module is installed. But it works fine if flytekit is installed in venv and that activated venv is used for workflows and programs.
y

Yee

03/17/2022, 7:59 PM
i don’t think it’s strictly necessary to install it inside a virtual env, it’s just necessary that the right python environment is available when a task runs.
when a task runs, the container is given the command
pyflyte-execute
which is a mini bash script that invokes the rest of the python code.
can you show a redacted version of your dockerfile?
maybe i can play around with it and see what’s happening
h

Hafsa Junaid

03/17/2022, 8:04 PM
for now, I am using flyte in my python program, creating a workflow of two different functions. And I am running it with command eg: python3 myapp.py
y

Yee

03/17/2022, 8:07 PM
hmm..
it’s not quite set up to do that. do you have a backend set up?
unless you’re looking for local execution… in which case you should be able to do that assuming the py is well formed?
do you mind copy/pasting that here? along with
pip freeze
and
which python
?
h

Hafsa Junaid

03/17/2022, 8:18 PM
https://docs.flyte.org/en/latest/getting_started/build.html Following this link, I executed my python program instead of example. Here it is mentioned to create venv for further processes. And if I am not doing that, flytekit module not found error occurs.
The reason I am getting into this debate is that, I am trying to debug the problem I am facing while submitting python task with flyte workflow as dataproc job.
I can create venv for flyte and run the program successfully on local machine. But when I have to submit this program as dataproc to GCP I am having error of flytekit module not found.
y

Yee

03/17/2022, 8:33 PM
not sure about dataproc will have to learn more about that first. most of my experience is in aws. is dataproc a container running service?
when you run locally, not on dataproc, does it work without a virtual env?
a

austin

03/17/2022, 8:35 PM
in broad strokes, GCP dataproc = AWS emr
y

Yee

03/17/2022, 8:36 PM
oic
it sounds like there might be a bit of a mismatch then. maybe @Ketan (kumare3) can chime in when he gets a chance. Flyte sits at a layer above aws emr/gcp dataproc
a

austin

03/17/2022, 8:46 PM
Yes, @Yee — this is the case. @Hafsa Junaid I think would need a bit more information about what you’re trying to do, as this doesn’t sound like well covered in our documentation or well known use-cases/patterns ( ex: there might be another path to help you get where you’re going, and/or can find a way to ensure there is a successful way to accomplish this ).
h

Hafsa Junaid

03/17/2022, 9:22 PM
Okay so let me try to elaborate on what I am doing exactly: I have an app.py file of a pyspark example. It imports different modules and packages. All of them run locally with the correct output. I am uploading that app.py with all dependent files on GCP bucket (cloud storage). Using gs bucket link, I submit the app.py as a dataproc pyspark job. The successful output of a successful job is up there. (screen shot: 01) BUT When I am using flytekit module for task and workflow pipeline in the same app.py and follow the same above-mentioned procedure, the job fails and the output says that flytekit not found. (screenshot: 02) Whereas the same python file with flyte workflow is successfully executed locally.
screenshot: 01
screenshot: 02
I was trying to solve this issue and I realized that flytekit import gives an error if I am trying to install flytekit outside a virtual environment. That's why I wanted to gain more information regarding this.
a

austin

03/17/2022, 9:34 PM
hmmmm; does the dataproc cluster have flytekit installed? Also, trying to undersrtand why you would want flytekit ‘in’ the dataproc cluster?
h

Hafsa Junaid

03/17/2022, 9:34 PM
Another possibility of dataproc job failing could be that flytekit is not supported or accepted by GCP. Comment on this possibility @austin @Yee so that I can be more specific to my problem statement.
@austin If the program has all dependencies installed then there's no need for installation on the cluster. Or at least I don't do for any module. (As far as my knowledge) So my task is to submit app.py project as a dataproc job. For pipelining functions in app.py, I am using flytekit import task , workflow
a

austin

03/17/2022, 9:39 PM
https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/init-actions <- it is possible to install python packages and/or lots of things in a dataproc cluster.
Though, I still don’t get why dataproc would/should want to know about flytekit
h

Hafsa Junaid

03/17/2022, 9:45 PM
Because "flytekit" is used in the code (app.py) dataproc has to run as a job.
a

austin

03/17/2022, 9:46 PM
the contents of your app.py are quite unclear — perhaps that is something you can share ( certainly change any sensitive info )?
h

Hafsa Junaid

03/17/2022, 9:49 PM
Copy code
from pyspark.sql import SparkSession
spark1=SparkSession.builder.appName('KMeanClustering').getOrCreate()
import pandas
import pyspark
from pyspark.sql.functions import pandas_udf
from pyspark.ml.clustering import KMeans , KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from datetime import datetime
from datetime import timedelta
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeansModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector  
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
spark1=SparkSession.builder.appName('KMeanClustering').getOrCreate()
SQLContext(sparkContext=spark1.sparkContext, sparkSession=spark1)
ssc = StreamingContext(spark1.sparkContext, 1)

import typing
from flytekit import task, workflow

def ExtractData():
    providers_file='csv/providers.csv'
    #try:
    providers=spark1.read.csv(providers_file,inferSchema=True,header=True)
        #ProviderData=providers.select('LAT','LON','UTILIZATION')
    ProviderData=providers.select('UTILIZATION')
    assembler=VectorAssembler(inputCols= ProviderData.columns , outputCol='features')
    dataset_final=assembler.transform(ProviderData)
    scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures', withStd=True , withMean=False)
    ScaledModel = scaler.fit(dataset_final)
    dataset_final=ScaledModel.transform(dataset_final)
    #dataset_final.show()
    return dataset_final
     
@task
def modelTrack():
    dataset=ExtractData()
    K_NumberOfClusters=5
    Seed=2
    kmeans = KMeans().setK(K_NumberOfClusters).setSeed(Seed)
    model = kmeans.fit(dataset)

    # Make predictions
    predictions = model.transform(dataset)


    # Evaluate clustering by computing Silhouette score
    evaluator = ClusteringEvaluator()

    silhouette = evaluator.evaluate(predictions)
    print("==================================================================================")
    print("Squared euclidean distance  from the center within the cluster= " + str(silhouette))
    print("==================================================================================")

    # Shows the result.
    centers = model.clusterCenters()
    print("===============CENTROIDS==============: ")
    for center in centers:
        print(center)
        #mlflow.log_metric("Centroid",center)
    print("=======================================: ")
    #Output
    clust_output=model.transform(dataset).select('UTILIZATION','prediction')
    rows=clust_output.collect()
    print("===========PREDICTION OF A CLUSTER FOR EACH PROVIDER=========== ")
    predictions.select('UTILIZATION','prediction').show(truncate=False)
    #predictions.select('UTILIZATION','prediction').describe().show(truncate=False)
    
    print("===========NUMBER OF PROVIDERS PER CLUSTER=============== ")
    pred2=model.transform(dataset).groupBy('prediction').count()
    pred2.show()
    


@task
def StreamingKmeansModel():
    datasetS=ExtractData()
    initCenters = [[0],[999],[1500]]
    initWeights = [1.0 , 1.0 , 1.0]
    stkm = StreamingKMeansModel(initCenters, initWeights)

    print('==Initial Centers==')
    initCenters=stkm.centers
    print(str(initCenters))
    
    dataset=datasetS.select('scaledFeatures')
    data_list = dataset.collect()
    data_list = [DenseVector(row.scaledFeatures) for row in data_list]

    decayFactor = 0.0
    data = spark1.sparkContext.parallelize(data_list)

    stkm1 = stkm.update(data, 1, "points")
    ClusterCenter=stkm1.centers
    KmeanCost=stkm1.computeCost(data)

    ccl=[ClusterCenter]
    cclS = ssc.queueStream(ccl)
    cclS.pprint()
    print('==Final Centers==')
    ssc.start()
    ssc.stop(stopSparkContext=True, stopGraceFully=True)
    
    print("====================================================================================")
    print('K-means cost(sum of squared distances of points to their nearest center): ',KmeanCost)
    print("====================================================================================")
    return ('K-means cost(sum of squared distances of points to their nearest center): ',KmeanCost)
#modelTrack(K_NumberOfClusters=5,Seed=2)
#StreamingKmeansModel()    
@workflow
def my_wf():
    
    res = modelTrack()
    return res

@workflow
def my_wf2():
    
    res2 = StreamingKmeansModel()
    return res2

if __name__ == "__main__":
    print("===Workflow: 01=== \nData extraction, Clustering and Prediction \n")
    my_wf()
    print("===Workflow: 02=== \nStreaming clustering, kmeans cost evaluation \n")
    my_wf2()
The content of app.py
a

austin

03/17/2022, 9:51 PM
interesting … I generally wouldn’t think of submitting something like this ( annotated with tasks/workflows ) to dataproc / pyspark.
I would generally submit an app.py to a flyte cluster, which in turn would run ( separate ) pyspark computations as tasks. dataproc is not generally the orchestration.
I haven’t seen templates yet for submitting jobs directly as tasks via something like https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow (AWS EMR case) or https://cloud.google.com/dataproc/docs/samples/dataproc-submit-pyspark-job (GCP Dataflow case). I could see there being a dataflow backend plugin [ but does not exist yet ] https://docs.flyte.org/projects/cookbook/en/latest/integrations.html
h

Hafsa Junaid

03/17/2022, 9:59 PM
Yess.. I got your point and this is exactly what my next task is in the to-do list. But for now, I am supposed to submit app.py as a dataproc job and the problem statement is that it gives a module not found error.
a

austin

03/17/2022, 10:01 PM
I guess if you are submitting an app.py directly to dataproc — why do you need flyte?
h

Hafsa Junaid

03/17/2022, 10:01 PM
Alright then, seems like I should start working on https://docs.flyte.org/projects/cookbook/en/latest/auto/integrations/kubernetes/k8s_spark/index.html to get this task done. Thank you so much for the assistance.
a

austin

03/17/2022, 10:05 PM
People have been successful with that, so not a bad road to head down.
👍 1
and, do share how it goes ( if docs should be improved, etc ).
179 Views