from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.ml.feature import VectorAssembler # https://spark.apache.org/docs/latest/ml-clustering.html#k-means from pyspark.ml.clustering import KMeans # Create Spark Session spark = SparkSession.builder.appName("DataFrameApp").master("local").getOrCreate() # read numbers as of double type otherwise they wil be inferred as strings # and kmeans will not run schema = StructType([ StructField("_c0", DoubleType()), StructField("_c1", DoubleType()), StructField("_c2", DoubleType())]) # Load and parse the data data = spark.read.option("header", "false").option("delimiter"," ").schema(schema).csv("file:///usr/local/spark/data/mllib/kmeans_data.txt") data.show() # we need to store all features an an array of floats within a single line called features assembler = VectorAssembler(inputCols=["_c0", "_c1", "_c2"], outputCol="features") assembled = assembler.transform(data).select("features") # examine the contents of dataFrame assembled.show() # Build the model # (k, initMode='k-means||', seed=None, initSteps=2, maxIter=20) kmeans = KMeans(k=2, initMode='k-means||', initSteps=2, seed=None, maxIter=20) # Train the model (cluster the data) --> model will contains clusters model = kmeans.fit(assembled) # print the centers of the 2 clusters after training centers = model.clusterCenters() for center in centers: print(center) # it will print: # [ 0.1, 0.1, 0.1] # [ 9.1, 9.1, 9.1] # Save and load model