1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  from numpy import array, ndarray 
 19  from pyspark import SparkContext 
 20  from pyspark.mllib._common import \ 
 21      _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \ 
 22      _serialize_double_matrix, _deserialize_double_matrix, \ 
 23      _serialize_double_vector, _deserialize_double_vector, \ 
 24      _get_initial_weights, _serialize_rating, _regression_train_wrapper, \ 
 25      _linear_predictor_typecheck, _have_scipy, _scipy_issparse 
 26  from pyspark.mllib.linalg import SparseVector, Vectors 
 30   
 31      """ 
 32      The features and labels of a data point. 
 33   
 34      @param label: Label for this data point. 
 35      @param features: Vector of features for this point (NumPy array, list, 
 36          pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix) 
 37      """ 
 38   
 40          self.label = label 
 41          if (type(features) == ndarray or type(features) == SparseVector 
 42                  or (_have_scipy and _scipy_issparse(features))): 
 43              self.features = features 
 44          elif type(features) == list: 
 45              self.features = array(features) 
 46          else: 
 47              raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix") 
  48   
  51   
 54   
 55      """A linear model that has a vector of coefficients and an intercept.""" 
 56   
 60   
 61      @property 
 64   
 65      @property 
 67          return self._intercept 
   68   
 71   
 72      """A linear regression model. 
 73   
 74      >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1) 
 75      >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6 
 76      True 
 77      >>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6 
 78      True 
 79      """ 
 80   
 82          """Predict the value of the dependent variable given a vector x""" 
 83          """containing values for the independent variables.""" 
 84          _linear_predictor_typecheck(x, self._coeff) 
 85          return _dot(x, self._coeff) + self._intercept 
   86   
 89   
 90      """A linear regression model derived from a least-squares fit. 
 91   
 92      >>> from pyspark.mllib.regression import LabeledPoint 
 93      >>> data = [ 
 94      ...     LabeledPoint(0.0, [0.0]), 
 95      ...     LabeledPoint(1.0, [1.0]), 
 96      ...     LabeledPoint(3.0, [2.0]), 
 97      ...     LabeledPoint(2.0, [3.0]) 
 98      ... ] 
 99      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
100      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
101      True 
102      >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 
103      True 
104      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
105      True 
106      >>> data = [ 
107      ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 
108      ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 
109      ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 
110      ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 
111      ... ] 
112      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
113      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
114      True 
115      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
116      True 
117      """ 
 118   
121   
122      @classmethod 
123 -    def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, 
124                initialWeights=None, regParam=1.0, regType=None, intercept=False): 
 125          """ 
126          Train a linear regression model on the given data. 
127   
128          @param data:              The training data. 
129          @param iterations:        The number of iterations (default: 100). 
130          @param step:              The step parameter used in SGD 
131                                    (default: 1.0). 
132          @param miniBatchFraction: Fraction of data to be used for each SGD 
133                                    iteration. 
134          @param initialWeights:    The initial weights (default: None). 
135          @param regParam:          The regularizer parameter (default: 1.0). 
136          @param regType:           The type of regularizer used for training 
137                                    our model. 
138                                    Allowed values: "l1" for using L1Updater, 
139                                                    "l2" for using 
140                                                         SquaredL2Updater, 
141                                                    "none" for no regularizer. 
142                                    (default: "none") 
143          @param intercept:         Boolean parameter which indicates the use 
144                                    or not of the augmented representation for 
145                                    training data (i.e. whether bias features 
146                                    are activated or not). 
147          """ 
148          sc = data.context 
149          if regType is None: 
150              regType = "none" 
151          train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD( 
152              d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept) 
153          return _regression_train_wrapper(sc, train_f, LinearRegressionModel, data, initialWeights) 
  154   
157   
158      """A linear regression model derived from a least-squares fit with an 
159      l_1 penalty term. 
160   
161      >>> from pyspark.mllib.regression import LabeledPoint 
162      >>> data = [ 
163      ...     LabeledPoint(0.0, [0.0]), 
164      ...     LabeledPoint(1.0, [1.0]), 
165      ...     LabeledPoint(3.0, [2.0]), 
166      ...     LabeledPoint(2.0, [3.0]) 
167      ... ] 
168      >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
169      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
170      True 
171      >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 
172      True 
173      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
174      True 
175      >>> data = [ 
176      ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 
177      ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 
178      ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 
179      ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 
180      ... ] 
181      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
182      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
183      True 
184      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
185      True 
186      """ 
 187   
190   
191      @classmethod 
192 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
193                miniBatchFraction=1.0, initialWeights=None): 
 194          """Train a Lasso regression model on the given data.""" 
195          sc = data.context 
196          train_f = lambda d, i: sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD( 
197              d._jrdd, iterations, step, regParam, miniBatchFraction, i) 
198          return _regression_train_wrapper(sc, train_f, LassoModel, data, initialWeights) 
  199   
202   
203      """A linear regression model derived from a least-squares fit with an 
204      l_2 penalty term. 
205   
206      >>> from pyspark.mllib.regression import LabeledPoint 
207      >>> data = [ 
208      ...     LabeledPoint(0.0, [0.0]), 
209      ...     LabeledPoint(1.0, [1.0]), 
210      ...     LabeledPoint(3.0, [2.0]), 
211      ...     LabeledPoint(2.0, [3.0]) 
212      ... ] 
213      >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
214      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
215      True 
216      >>> abs(lrm.predict(array([1.0])) - 1) < 0.5 
217      True 
218      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
219      True 
220      >>> data = [ 
221      ...     LabeledPoint(0.0, SparseVector(1, {0: 0.0})), 
222      ...     LabeledPoint(1.0, SparseVector(1, {0: 1.0})), 
223      ...     LabeledPoint(3.0, SparseVector(1, {0: 2.0})), 
224      ...     LabeledPoint(2.0, SparseVector(1, {0: 3.0})) 
225      ... ] 
226      >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0])) 
227      >>> abs(lrm.predict(array([0.0])) - 0) < 0.5 
228      True 
229      >>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5 
230      True 
231      """ 
 232   
235   
236      @classmethod 
237 -    def train(cls, data, iterations=100, step=1.0, regParam=1.0, 
238                miniBatchFraction=1.0, initialWeights=None): 
 239          """Train a ridge regression model on the given data.""" 
240          sc = data.context 
241          train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD( 
242              d._jrdd, iterations, step, regParam, miniBatchFraction, i) 
243          return _regression_train_wrapper(sc, train_func, RidgeRegressionModel, data, initialWeights) 
  244   
247      import doctest 
248      globs = globals().copy() 
249      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
250      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
251      globs['sc'].stop() 
252      if failure_count: 
253          exit(-1) 
 254   
255  if __name__ == "__main__": 
256      _test() 
257