Hafsa Junaid
03/17/2022, 7:39 PMYee
pyflyte-execute
which is a mini bash script that invokes the rest of the python code.Hafsa Junaid
03/17/2022, 8:04 PMYee
pip freeze
and which python
?Hafsa Junaid
03/17/2022, 8:18 PMYee
austin
03/17/2022, 8:35 PMYee
austin
03/17/2022, 8:46 PMHafsa Junaid
03/17/2022, 9:22 PMaustin
03/17/2022, 9:34 PMHafsa Junaid
03/17/2022, 9:34 PMaustin
03/17/2022, 9:39 PMHafsa Junaid
03/17/2022, 9:45 PMaustin
03/17/2022, 9:46 PMHafsa Junaid
03/17/2022, 9:49 PMfrom pyspark.sql import SparkSession
spark1=SparkSession.builder.appName('KMeanClustering').getOrCreate()
import pandas
import pyspark
from pyspark.sql.functions import pandas_udf
from pyspark.ml.clustering import KMeans , KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from datetime import datetime
from datetime import timedelta
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeansModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
spark1=SparkSession.builder.appName('KMeanClustering').getOrCreate()
SQLContext(sparkContext=spark1.sparkContext, sparkSession=spark1)
ssc = StreamingContext(spark1.sparkContext, 1)
import typing
from flytekit import task, workflow
def ExtractData():
providers_file='csv/providers.csv'
#try:
providers=spark1.read.csv(providers_file,inferSchema=True,header=True)
#ProviderData=providers.select('LAT','LON','UTILIZATION')
ProviderData=providers.select('UTILIZATION')
assembler=VectorAssembler(inputCols= ProviderData.columns , outputCol='features')
dataset_final=assembler.transform(ProviderData)
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures', withStd=True , withMean=False)
ScaledModel = scaler.fit(dataset_final)
dataset_final=ScaledModel.transform(dataset_final)
#dataset_final.show()
return dataset_final
@task
def modelTrack():
dataset=ExtractData()
K_NumberOfClusters=5
Seed=2
kmeans = KMeans().setK(K_NumberOfClusters).setSeed(Seed)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("==================================================================================")
print("Squared euclidean distance from the center within the cluster= " + str(silhouette))
print("==================================================================================")
# Shows the result.
centers = model.clusterCenters()
print("===============CENTROIDS==============: ")
for center in centers:
print(center)
#mlflow.log_metric("Centroid",center)
print("=======================================: ")
#Output
clust_output=model.transform(dataset).select('UTILIZATION','prediction')
rows=clust_output.collect()
print("===========PREDICTION OF A CLUSTER FOR EACH PROVIDER=========== ")
predictions.select('UTILIZATION','prediction').show(truncate=False)
#predictions.select('UTILIZATION','prediction').describe().show(truncate=False)
print("===========NUMBER OF PROVIDERS PER CLUSTER=============== ")
pred2=model.transform(dataset).groupBy('prediction').count()
pred2.show()
@task
def StreamingKmeansModel():
datasetS=ExtractData()
initCenters = [[0],[999],[1500]]
initWeights = [1.0 , 1.0 , 1.0]
stkm = StreamingKMeansModel(initCenters, initWeights)
print('==Initial Centers==')
initCenters=stkm.centers
print(str(initCenters))
dataset=datasetS.select('scaledFeatures')
data_list = dataset.collect()
data_list = [DenseVector(row.scaledFeatures) for row in data_list]
decayFactor = 0.0
data = spark1.sparkContext.parallelize(data_list)
stkm1 = stkm.update(data, 1, "points")
ClusterCenter=stkm1.centers
KmeanCost=stkm1.computeCost(data)
ccl=[ClusterCenter]
cclS = ssc.queueStream(ccl)
cclS.pprint()
print('==Final Centers==')
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
print("====================================================================================")
print('K-means cost(sum of squared distances of points to their nearest center): ',KmeanCost)
print("====================================================================================")
return ('K-means cost(sum of squared distances of points to their nearest center): ',KmeanCost)
#modelTrack(K_NumberOfClusters=5,Seed=2)
#StreamingKmeansModel()
@workflow
def my_wf():
res = modelTrack()
return res
@workflow
def my_wf2():
res2 = StreamingKmeansModel()
return res2
if __name__ == "__main__":
print("===Workflow: 01=== \nData extraction, Clustering and Prediction \n")
my_wf()
print("===Workflow: 02=== \nStreaming clustering, kmeans cost evaluation \n")
my_wf2()
austin
03/17/2022, 9:51 PMHafsa Junaid
03/17/2022, 9:59 PMaustin
03/17/2022, 10:01 PMHafsa Junaid
03/17/2022, 10:01 PMaustin
03/17/2022, 10:05 PM