creamy-baker-21593
03/17/2022, 7:39 PMthankful-minister-83577
thankful-minister-83577
pyflyte-execute
which is a mini bash script that invokes the rest of the python code.thankful-minister-83577
thankful-minister-83577
creamy-baker-21593
03/17/2022, 8:04 PMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
pip freeze
and which python
?creamy-baker-21593
03/17/2022, 8:18 PMcreamy-baker-21593
03/17/2022, 8:19 PMcreamy-baker-21593
03/17/2022, 8:21 PMthankful-minister-83577
thankful-minister-83577
echoing-translator-95395
03/17/2022, 8:35 PMthankful-minister-83577
thankful-minister-83577
echoing-translator-95395
03/17/2022, 8:46 PMcreamy-baker-21593
03/17/2022, 9:22 PMcreamy-baker-21593
03/17/2022, 9:25 PMcreamy-baker-21593
03/17/2022, 9:26 PMcreamy-baker-21593
03/17/2022, 9:30 PMechoing-translator-95395
03/17/2022, 9:34 PMechoing-translator-95395
03/17/2022, 9:34 PMcreamy-baker-21593
03/17/2022, 9:34 PMcreamy-baker-21593
03/17/2022, 9:39 PMechoing-translator-95395
03/17/2022, 9:39 PMechoing-translator-95395
03/17/2022, 9:40 PMcreamy-baker-21593
03/17/2022, 9:45 PMechoing-translator-95395
03/17/2022, 9:46 PMcreamy-baker-21593
03/17/2022, 9:49 PMfrom pyspark.sql import SparkSession
spark1=SparkSession.builder.appName('KMeanClustering').getOrCreate()
import pandas
import pyspark
from pyspark.sql.functions import pandas_udf
from pyspark.ml.clustering import KMeans , KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from datetime import datetime
from datetime import timedelta
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeansModel
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
spark1=SparkSession.builder.appName('KMeanClustering').getOrCreate()
SQLContext(sparkContext=spark1.sparkContext, sparkSession=spark1)
ssc = StreamingContext(spark1.sparkContext, 1)
import typing
from flytekit import task, workflow
def ExtractData():
providers_file='csv/providers.csv'
#try:
providers=spark1.read.csv(providers_file,inferSchema=True,header=True)
#ProviderData=providers.select('LAT','LON','UTILIZATION')
ProviderData=providers.select('UTILIZATION')
assembler=VectorAssembler(inputCols= ProviderData.columns , outputCol='features')
dataset_final=assembler.transform(ProviderData)
scaler = StandardScaler(inputCol='features',outputCol='scaledFeatures', withStd=True , withMean=False)
ScaledModel = scaler.fit(dataset_final)
dataset_final=ScaledModel.transform(dataset_final)
#dataset_final.show()
return dataset_final
@task
def modelTrack():
dataset=ExtractData()
K_NumberOfClusters=5
Seed=2
kmeans = KMeans().setK(K_NumberOfClusters).setSeed(Seed)
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("==================================================================================")
print("Squared euclidean distance from the center within the cluster= " + str(silhouette))
print("==================================================================================")
# Shows the result.
centers = model.clusterCenters()
print("===============CENTROIDS==============: ")
for center in centers:
print(center)
#mlflow.log_metric("Centroid",center)
print("=======================================: ")
#Output
clust_output=model.transform(dataset).select('UTILIZATION','prediction')
rows=clust_output.collect()
print("===========PREDICTION OF A CLUSTER FOR EACH PROVIDER=========== ")
predictions.select('UTILIZATION','prediction').show(truncate=False)
#predictions.select('UTILIZATION','prediction').describe().show(truncate=False)
print("===========NUMBER OF PROVIDERS PER CLUSTER=============== ")
pred2=model.transform(dataset).groupBy('prediction').count()
pred2.show()
@task
def StreamingKmeansModel():
datasetS=ExtractData()
initCenters = [[0],[999],[1500]]
initWeights = [1.0 , 1.0 , 1.0]
stkm = StreamingKMeansModel(initCenters, initWeights)
print('==Initial Centers==')
initCenters=stkm.centers
print(str(initCenters))
dataset=datasetS.select('scaledFeatures')
data_list = dataset.collect()
data_list = [DenseVector(row.scaledFeatures) for row in data_list]
decayFactor = 0.0
data = spark1.sparkContext.parallelize(data_list)
stkm1 = stkm.update(data, 1, "points")
ClusterCenter=stkm1.centers
KmeanCost=stkm1.computeCost(data)
ccl=[ClusterCenter]
cclS = ssc.queueStream(ccl)
cclS.pprint()
print('==Final Centers==')
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
print("====================================================================================")
print('K-means cost(sum of squared distances of points to their nearest center): ',KmeanCost)
print("====================================================================================")
return ('K-means cost(sum of squared distances of points to their nearest center): ',KmeanCost)
#modelTrack(K_NumberOfClusters=5,Seed=2)
#StreamingKmeansModel()
@workflow
def my_wf():
res = modelTrack()
return res
@workflow
def my_wf2():
res2 = StreamingKmeansModel()
return res2
if __name__ == "__main__":
print("===Workflow: 01=== \nData extraction, Clustering and Prediction \n")
my_wf()
print("===Workflow: 02=== \nStreaming clustering, kmeans cost evaluation \n")
my_wf2()
creamy-baker-21593
03/17/2022, 9:49 PMechoing-translator-95395
03/17/2022, 9:51 PMechoing-translator-95395
03/17/2022, 9:52 PMechoing-translator-95395
03/17/2022, 9:59 PMcreamy-baker-21593
03/17/2022, 9:59 PMechoing-translator-95395
03/17/2022, 10:01 PMcreamy-baker-21593
03/17/2022, 10:01 PMechoing-translator-95395
03/17/2022, 10:05 PMechoing-translator-95395
03/18/2022, 1:19 AM