1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  from numpy import array, dot 
 19  from math import sqrt 
 20  from pyspark import SparkContext 
 21  from pyspark.mllib._common import \ 
 22      _get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \ 
 23      _serialize_double_matrix, _deserialize_double_matrix, \ 
 24      _serialize_double_vector, _deserialize_double_vector, \ 
 25      _get_initial_weights, _serialize_rating, _regression_train_wrapper 
 26  from pyspark.mllib.linalg import SparseVector 
 30   
 31      """A clustering model derived from the k-means method. 
 32   
 33      >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) 
 34      >>> model = KMeans.train( 
 35      ...     sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") 
 36      >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) 
 37      True 
 38      >>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0])) 
 39      True 
 40      >>> model = KMeans.train(sc.parallelize(data), 2) 
 41      >>> sparse_data = [ 
 42      ...     SparseVector(3, {1: 1.0}), 
 43      ...     SparseVector(3, {1: 1.1}), 
 44      ...     SparseVector(3, {2: 1.0}), 
 45      ...     SparseVector(3, {2: 1.1}) 
 46      ... ] 
 47      >>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||") 
 48      >>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.])) 
 49      True 
 50      >>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1])) 
 51      True 
 52      >>> model.predict(sparse_data[0]) == model.predict(sparse_data[1]) 
 53      True 
 54      >>> model.predict(sparse_data[2]) == model.predict(sparse_data[3]) 
 55      True 
 56      >>> type(model.clusterCenters) 
 57      <type 'list'> 
 58      """ 
 59   
 61          self.centers = centers 
  62   
 63      @property 
 65          """Get the cluster centers, represented as a list of NumPy arrays.""" 
 66          return self.centers 
  67   
 69          """Find the cluster to which x belongs in this model.""" 
 70          best = 0 
 71          best_distance = float("inf") 
 72          for i in range(0, len(self.centers)): 
 73              distance = _squared_distance(x, self.centers[i]) 
 74              if distance < best_distance: 
 75                  best = i 
 76                  best_distance = distance 
 77          return best 
   78   
 81   
 82      @classmethod 
 83 -    def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"): 
  84          """Train a k-means clustering model.""" 
 85          sc = data.context 
 86          dataBytes = _get_unmangled_double_vector_rdd(data) 
 87          ans = sc._jvm.PythonMLLibAPI().trainKMeansModel( 
 88              dataBytes._jrdd, k, maxIterations, runs, initializationMode) 
 89          if len(ans) != 1: 
 90              raise RuntimeError("JVM call result had unexpected length") 
 91          elif type(ans[0]) != bytearray: 
 92              raise RuntimeError("JVM call result had first element of type " 
 93                                 + type(ans[0]) + " which is not bytearray") 
 94          matrix = _deserialize_double_matrix(ans[0]) 
 95          return KMeansModel([row for row in matrix]) 
   96   
 99      import doctest 
100      globs = globals().copy() 
101      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
102      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
103      globs['sc'].stop() 
104      if failure_count: 
105          exit(-1) 
 106   
107   
108  if __name__ == "__main__": 
109      _test() 
110