1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import numpy as np 
 19  import warnings 
 20   
 21  from pyspark.mllib.linalg import Vectors, SparseVector 
 22  from pyspark.mllib.regression import LabeledPoint 
 23  from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point 
 24  from pyspark.rdd import RDD 
 25  from pyspark.serializers import NoOpSerializer 
 29   
 30      """ 
 31      Helper methods to load, save and pre-process data used in MLlib. 
 32      """ 
 33   
 34      @staticmethod 
 36          warnings.warn("deprecated", DeprecationWarning) 
 37          return _parse_libsvm_line(line) 
  38   
 39      @staticmethod 
 41          """ 
 42          Parses a line in LIBSVM format into (label, indices, values). 
 43          """ 
 44          items = line.split(None) 
 45          label = float(items[0]) 
 46          nnz = len(items) - 1 
 47          indices = np.zeros(nnz, dtype=np.int32) 
 48          values = np.zeros(nnz) 
 49          for i in xrange(nnz): 
 50              index, value = items[1 + i].split(":") 
 51              indices[i] = int(index) - 1 
 52              values[i] = float(value) 
 53          return label, indices, values 
  54   
 55      @staticmethod 
 57          """Converts a LabeledPoint to a string in LIBSVM format.""" 
 58          items = [str(p.label)] 
 59          v = _convert_vector(p.features) 
 60          if type(v) == np.ndarray: 
 61              for i in xrange(len(v)): 
 62                  items.append(str(i + 1) + ":" + str(v[i])) 
 63          elif type(v) == SparseVector: 
 64              nnz = len(v.indices) 
 65              for i in xrange(nnz): 
 66                  items.append(str(v.indices[i] + 1) + ":" + str(v.values[i])) 
 67          else: 
 68              raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector" 
 69                              " but got " % type(v)) 
 70          return " ".join(items) 
  71   
 72      @staticmethod 
 73 -    def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None): 
  74          warnings.warn("deprecated", DeprecationWarning) 
 75          return loadLibSVMFile(sc, path, numFeatures, minPartitions) 
  76   
 77      @staticmethod 
 79          """ 
 80          Loads labeled data in the LIBSVM format into an RDD of 
 81          LabeledPoint. The LIBSVM format is a text-based format used by 
 82          LIBSVM and LIBLINEAR. Each line represents a labeled sparse 
 83          feature vector using the following format: 
 84   
 85          label index1:value1 index2:value2 ... 
 86   
 87          where the indices are one-based and in ascending order. This 
 88          method parses each line into a LabeledPoint, where the feature 
 89          indices are converted to zero-based. 
 90   
 91          @param sc: Spark context 
 92          @param path: file or directory path in any Hadoop-supported file 
 93                       system URI 
 94          @param numFeatures: number of features, which will be determined 
 95                              from the input data if a nonpositive value 
 96                              is given. This is useful when the dataset is 
 97                              already split into multiple files and you 
 98                              want to load them separately, because some 
 99                              features may not present in certain files, 
100                              which leads to inconsistent feature 
101                              dimensions. 
102          @param minPartitions: min number of partitions 
103          @return: labeled data stored as an RDD of LabeledPoint 
104   
105          >>> from tempfile import NamedTemporaryFile 
106          >>> from pyspark.mllib.util import MLUtils 
107          >>> tempFile = NamedTemporaryFile(delete=True) 
108          >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0") 
109          >>> tempFile.flush() 
110          >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect() 
111          >>> tempFile.close() 
112          >>> type(examples[0]) == LabeledPoint 
113          True 
114          >>> print examples[0] 
115          (1.0,(6,[0,2,4],[1.0,2.0,3.0])) 
116          >>> type(examples[1]) == LabeledPoint 
117          True 
118          >>> print examples[1] 
119          (-1.0,(6,[],[])) 
120          >>> type(examples[2]) == LabeledPoint 
121          True 
122          >>> print examples[2] 
123          (-1.0,(6,[1,3,5],[4.0,5.0,6.0])) 
124          """ 
125   
126          lines = sc.textFile(path, minPartitions) 
127          parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l)) 
128          if numFeatures <= 0: 
129              parsed.cache() 
130              numFeatures = parsed.map(lambda x: -1 if x[1].size == 0 else x[1][-1]).reduce(max) + 1 
131          return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2]))) 
 132   
133      @staticmethod 
135          """ 
136          Save labeled data in LIBSVM format. 
137   
138          @param data: an RDD of LabeledPoint to be saved 
139          @param dir: directory to save the data 
140   
141          >>> from tempfile import NamedTemporaryFile 
142          >>> from fileinput import input 
143          >>> from glob import glob 
144          >>> from pyspark.mllib.util import MLUtils 
145          >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \ 
146                          LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] 
147          >>> tempFile = NamedTemporaryFile(delete=True) 
148          >>> tempFile.close() 
149          >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name) 
150          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
151          '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n' 
152          """ 
153          lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p)) 
154          lines.saveAsTextFile(dir) 
 155   
156      @staticmethod 
158          """ 
159          Load labeled points saved using RDD.saveAsTextFile. 
160   
161          @param sc: Spark context 
162          @param path: file or directory path in any Hadoop-supported file 
163                       system URI 
164          @param minPartitions: min number of partitions 
165          @return: labeled data stored as an RDD of LabeledPoint 
166   
167          >>> from tempfile import NamedTemporaryFile 
168          >>> from pyspark.mllib.util import MLUtils 
169          >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \ 
170                          LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))] 
171          >>> tempFile = NamedTemporaryFile(delete=True) 
172          >>> tempFile.close() 
173          >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name) 
174          >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect() 
175          >>> type(loaded[0]) == LabeledPoint 
176          True 
177          >>> print examples[0] 
178          (1.1,(3,[0,2],[-1.23,4.56e-07])) 
179          >>> type(examples[1]) == LabeledPoint 
180          True 
181          >>> print examples[1] 
182          (0.0,[1.01,2.02,3.03]) 
183          """ 
184          minPartitions = minPartitions or min(sc.defaultParallelism, 2) 
185          jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions) 
186          serialized = RDD(jSerialized, sc, NoOpSerializer()) 
187          return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes))) 
  188   
191      import doctest 
192      from pyspark.context import SparkContext 
193      globs = globals().copy() 
194       
195       
196      globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2) 
197      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
198      globs['sc'].stop() 
199      if failure_count: 
200          exit(-1) 
 201   
202   
203  if __name__ == "__main__": 
204      _test() 
205