1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  """ 
 19  Python package for statistical functions in MLlib. 
 20  """ 
 21   
 22  from pyspark.mllib._common import \ 
 23      _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \ 
 24      _serialize_double, _serialize_double_vector, \ 
 25      _deserialize_double, _deserialize_double_matrix, _deserialize_double_vector 
 29   
 30      """ 
 31      Trait for multivariate statistical summary of a data matrix. 
 32      """ 
 33   
 35          """ 
 36          :param sc:  Spark context 
 37          :param java_summary:  Handle to Java summary object 
 38          """ 
 39          self._sc = sc 
 40          self._java_summary = java_summary 
  41   
 44   
 46          return _deserialize_double_vector(self._java_summary.mean()) 
  47   
 49          return _deserialize_double_vector(self._java_summary.variance()) 
  50   
 52          return self._java_summary.count() 
  53   
 55          return _deserialize_double_vector(self._java_summary.numNonzeros()) 
  56   
 58          return _deserialize_double_vector(self._java_summary.max()) 
  59   
 61          return _deserialize_double_vector(self._java_summary.min()) 
   62   
 65   
 66      @staticmethod 
 68          """ 
 69          Computes column-wise summary statistics for the input RDD[Vector]. 
 70   
 71          >>> from linalg import Vectors 
 72          >>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]), 
 73          ...                       Vectors.dense([4, 5, 0,  3]), 
 74          ...                       Vectors.dense([6, 7, 0,  8])]) 
 75          >>> cStats = Statistics.colStats(rdd) 
 76          >>> cStats.mean() 
 77          array([ 4.,  4.,  0.,  3.]) 
 78          >>> cStats.variance() 
 79          array([  4.,  13.,   0.,  25.]) 
 80          >>> cStats.count() 
 81          3L 
 82          >>> cStats.numNonzeros() 
 83          array([ 3.,  2.,  0.,  3.]) 
 84          >>> cStats.max() 
 85          array([ 6.,  7.,  0.,  8.]) 
 86          >>> cStats.min() 
 87          array([ 2.,  0.,  0., -2.]) 
 88          """ 
 89          sc = X.ctx 
 90          Xser = _get_unmangled_double_vector_rdd(X) 
 91          cStats = sc._jvm.PythonMLLibAPI().colStats(Xser._jrdd) 
 92          return MultivariateStatisticalSummary(sc, cStats) 
  93   
 94      @staticmethod 
 95 -    def corr(x, y=None, method=None): 
  96          """ 
 97          Compute the correlation (matrix) for the input RDD(s) using the 
 98          specified method. 
 99          Methods currently supported: I{pearson (default), spearman}. 
100   
101          If a single RDD of Vectors is passed in, a correlation matrix 
102          comparing the columns in the input RDD is returned. Use C{method=} 
103          to specify the method to be used for single RDD inout. 
104          If two RDDs of floats are passed in, a single float is returned. 
105   
106          >>> x = sc.parallelize([1.0, 0.0, -2.0], 2) 
107          >>> y = sc.parallelize([4.0, 5.0, 3.0], 2) 
108          >>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2) 
109          >>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7 
110          True 
111          >>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson") 
112          True 
113          >>> Statistics.corr(x, y, "spearman") 
114          0.5 
115          >>> from math import isnan 
116          >>> isnan(Statistics.corr(x, zeros)) 
117          True 
118          >>> from linalg import Vectors 
119          >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]), 
120          ...                       Vectors.dense([6, 7, 0,  8]), Vectors.dense([9, 0, 0, 1])]) 
121          >>> pearsonCorr = Statistics.corr(rdd) 
122          >>> print str(pearsonCorr).replace('nan', 'NaN') 
123          [[ 1.          0.05564149         NaN  0.40047142] 
124           [ 0.05564149  1.                 NaN  0.91359586] 
125           [        NaN         NaN  1.                 NaN] 
126           [ 0.40047142  0.91359586         NaN  1.        ]] 
127          >>> spearmanCorr = Statistics.corr(rdd, method="spearman") 
128          >>> print str(spearmanCorr).replace('nan', 'NaN') 
129          [[ 1.          0.10540926         NaN  0.4       ] 
130           [ 0.10540926  1.                 NaN  0.9486833 ] 
131           [        NaN         NaN  1.                 NaN] 
132           [ 0.4         0.9486833          NaN  1.        ]] 
133          >>> try: 
134          ...     Statistics.corr(rdd, "spearman") 
135          ...     print "Method name as second argument without 'method=' shouldn't be allowed." 
136          ... except TypeError: 
137          ...     pass 
138          """ 
139          sc = x.ctx 
140           
141           
142           
143          if type(y) == str: 
144              raise TypeError("Use 'method=' to specify method name.") 
145          if not y: 
146              try: 
147                  Xser = _get_unmangled_double_vector_rdd(x) 
148              except TypeError: 
149                  raise TypeError("corr called on a single RDD not consisted of Vectors.") 
150              resultMat = sc._jvm.PythonMLLibAPI().corr(Xser._jrdd, method) 
151              return _deserialize_double_matrix(resultMat) 
152          else: 
153              xSer = _get_unmangled_rdd(x, _serialize_double) 
154              ySer = _get_unmangled_rdd(y, _serialize_double) 
155              result = sc._jvm.PythonMLLibAPI().corr(xSer._jrdd, ySer._jrdd, method) 
156              return result 
  157   
160      import doctest 
161      from pyspark import SparkContext 
162      globs = globals().copy() 
163      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
164      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
165      globs['sc'].stop() 
166      if failure_count: 
167          exit(-1) 
 168   
169   
170  if __name__ == "__main__": 
171      _test() 
172