1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  import bisect 
  34  from random import Random 
  35  from math import sqrt, log, isinf, isnan 
  36   
  37  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  38      BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ 
  39      PickleSerializer, pack_long, CompressedSerializer 
  40  from pyspark.join import python_join, python_left_outer_join, \ 
  41      python_right_outer_join, python_cogroup 
  42  from pyspark.statcounter import StatCounter 
  43  from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler 
  44  from pyspark.storagelevel import StorageLevel 
  45  from pyspark.resultiterable import ResultIterable 
  46  from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ 
  47      get_used_memory 
  48   
  49  from py4j.java_collections import ListConverter, MapConverter 
  50   
  51  __all__ = ["RDD"] 
  57      """ 
  58      This function returns consistent hash code for builtin types, especially 
  59      for None and tuple with None. 
  60   
  61      The algrithm is similar to that one used by CPython 2.7 
  62   
  63      >>> portable_hash(None) 
  64      0 
  65      >>> portable_hash((None, 1)) 
  66      219750521 
  67      """ 
  68      if x is None: 
  69          return 0 
  70      if isinstance(x, tuple): 
  71          h = 0x345678 
  72          for i in x: 
  73              h ^= portable_hash(i) 
  74              h *= 1000003 
  75              h &= 0xffffffff 
  76          h ^= len(x) 
  77          if h == -1: 
  78              h = -2 
  79          return h 
  80      return hash(x) 
   81   
  84      """ 
  85      This function returns the traceback info for a callsite, returns a dict 
  86      with function name, file name and line number 
  87      """ 
  88      tb = traceback.extract_stack() 
  89      callsite = namedtuple("Callsite", "function file linenum") 
  90      if len(tb) == 0: 
  91          return None 
  92      file, line, module, what = tb[len(tb) - 1] 
  93      sparkpath = os.path.dirname(file) 
  94      first_spark_frame = len(tb) - 1 
  95      for i in range(0, len(tb)): 
  96          file, line, fun, what = tb[i] 
  97          if file.startswith(sparkpath): 
  98              first_spark_frame = i 
  99              break 
 100      if first_spark_frame == 0: 
 101          file, line, fun, what = tb[0] 
 102          return callsite(function=fun, file=file, linenum=line) 
 103      sfile, sline, sfun, swhat = tb[first_spark_frame] 
 104      ufile, uline, ufun, uwhat = tb[first_spark_frame - 1] 
 105      return callsite(function=sfun, file=ufile, linenum=uline) 
  106   
 107  _spark_stack_depth = 0 
 111   
 113          tb = _extract_concise_traceback() 
 114          if tb is not None: 
 115              self._traceback = "%s at %s:%s" % ( 
 116                  tb.function, tb.file, tb.linenum) 
 117          else: 
 118              self._traceback = "Error! Could not extract traceback info" 
 119          self._context = sc 
  120   
 126   
  132   
 135   
 136      """ 
 137      An implementation of MaxHeap. 
 138   
 139      >>> import pyspark.rdd 
 140      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 141      >>> [heap.insert(i) for i in range(10)] 
 142      [None, None, None, None, None, None, None, None, None, None] 
 143      >>> sorted(heap.getElements()) 
 144      [0, 1, 2, 3, 4] 
 145      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 146      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 147      [None, None, None, None, None, None, None, None, None, None] 
 148      >>> sorted(heap.getElements()) 
 149      [0, 1, 2, 3, 4] 
 150      >>> heap = pyspark.rdd.MaxHeapQ(1) 
 151      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 152      [None, None, None, None, None, None, None, None, None, None] 
 153      >>> heap.getElements() 
 154      [0] 
 155      """ 
 156   
 158           
 159          self.q = [0] 
 160          self.maxsize = maxsize 
  161   
 163          while (k > 1) and (self.q[k / 2] < self.q[k]): 
 164              self._swap(k, k / 2) 
 165              k = k / 2 
  166   
 168          t = self.q[i] 
 169          self.q[i] = self.q[j] 
 170          self.q[j] = t 
  171   
 173          N = self.size() 
 174          while 2 * k <= N: 
 175              j = 2 * k 
 176               
 177               
 178              if j < N and self.q[j] < self.q[j + 1]: 
 179                  j = j + 1 
 180              if(self.q[k] > self.q[j]): 
 181                  break 
 182              self._swap(k, j) 
 183              k = j 
  184   
 186          return len(self.q) - 1 
  187   
 189          if (self.size()) < self.maxsize: 
 190              self.q.append(value) 
 191              self._swim(self.size()) 
 192          else: 
 193              self._replaceRoot(value) 
  194   
 197   
 199          if(self.q[1] > value): 
 200              self.q[1] = value 
 201              self._sink(1) 
   202   
 205      """ 
 206      Parse a memory string in the format supported by Java (e.g. 1g, 200m) and 
 207      return the value in MB 
 208   
 209      >>> _parse_memory("256m") 
 210      256 
 211      >>> _parse_memory("2g") 
 212      2048 
 213      """ 
 214      units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} 
 215      if s[-1] not in units: 
 216          raise ValueError("invalid format: " + s) 
 217      return int(float(s[:-1]) * units[s[-1].lower()]) 
  218   
 219   
 220 -class RDD(object): 
  221   
 222      """ 
 223      A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
 224      Represents an immutable, partitioned collection of elements that can be 
 225      operated on in parallel. 
 226      """ 
 227   
 228 -    def __init__(self, jrdd, ctx, jrdd_deserializer): 
  229          self._jrdd = jrdd 
 230          self.is_cached = False 
 231          self.is_checkpointed = False 
 232          self.ctx = ctx 
 233          self._jrdd_deserializer = jrdd_deserializer 
 234          self._id = jrdd.id() 
  235   
 242   
 244          """ 
 245          A unique ID for this RDD (within its SparkContext). 
 246          """ 
 247          return self._id 
  248   
 250          return self._jrdd.toString() 
  251   
 252      @property 
 254          """ 
 255          The L{SparkContext} that this RDD was created on. 
 256          """ 
 257          return self.ctx 
  258   
 260          """ 
 261          Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}). 
 262          """ 
 263          self.is_cached = True 
 264          self.persist(StorageLevel.MEMORY_ONLY_SER) 
 265          return self 
  266   
 268          """ 
 269          Set this RDD's storage level to persist its values across operations 
 270          after the first time it is computed. This can only be used to assign 
 271          a new storage level if the RDD does not have a storage level set yet. 
 272          """ 
 273          self.is_cached = True 
 274          javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 
 275          self._jrdd.persist(javaStorageLevel) 
 276          return self 
  277   
 279          """ 
 280          Mark the RDD as non-persistent, and remove all blocks for it from 
 281          memory and disk. 
 282          """ 
 283          self.is_cached = False 
 284          self._jrdd.unpersist() 
 285          return self 
  286   
 288          """ 
 289          Mark this RDD for checkpointing. It will be saved to a file inside the 
 290          checkpoint directory set with L{SparkContext.setCheckpointDir()} and 
 291          all references to its parent RDDs will be removed. This function must 
 292          be called before any job has been executed on this RDD. It is strongly 
 293          recommended that this RDD is persisted in memory, otherwise saving it 
 294          on a file will require recomputation. 
 295          """ 
 296          self.is_checkpointed = True 
 297          self._jrdd.rdd().checkpoint() 
  298   
 300          """ 
 301          Return whether this RDD has been checkpointed or not 
 302          """ 
 303          return self._jrdd.rdd().isCheckpointed() 
  304   
 306          """ 
 307          Gets the name of the file to which this RDD was checkpointed 
 308          """ 
 309          checkpointFile = self._jrdd.rdd().getCheckpointFile() 
 310          if checkpointFile.isDefined(): 
 311              return checkpointFile.get() 
 312          else: 
 313              return None 
  314   
 315 -    def map(self, f, preservesPartitioning=False): 
  316          """ 
 317          Return a new RDD by applying a function to each element of this RDD. 
 318   
 319          >>> rdd = sc.parallelize(["b", "a", "c"]) 
 320          >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 
 321          [('a', 1), ('b', 1), ('c', 1)] 
 322          """ 
 323          def func(_, iterator): 
 324              return imap(f, iterator) 
  325          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
  326   
 327 -    def flatMap(self, f, preservesPartitioning=False): 
  328          """ 
 329          Return a new RDD by first applying a function to all elements of this 
 330          RDD, and then flattening the results. 
 331   
 332          >>> rdd = sc.parallelize([2, 3, 4]) 
 333          >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 
 334          [1, 1, 1, 2, 2, 3] 
 335          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 
 336          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 
 337          """ 
 338          def func(s, iterator): 
 339              return chain.from_iterable(imap(f, iterator)) 
  340          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
 341   
 343          """ 
 344          Return a new RDD by applying a function to each partition of this RDD. 
 345   
 346          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 347          >>> def f(iterator): yield sum(iterator) 
 348          >>> rdd.mapPartitions(f).collect() 
 349          [3, 7] 
 350          """ 
 351          def func(s, iterator): 
 352              return f(iterator) 
  353          return self.mapPartitionsWithIndex(func) 
 354   
 356          """ 
 357          Return a new RDD by applying a function to each partition of this RDD, 
 358          while tracking the index of the original partition. 
 359   
 360          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 361          >>> def f(splitIndex, iterator): yield splitIndex 
 362          >>> rdd.mapPartitionsWithIndex(f).sum() 
 363          6 
 364          """ 
 365          return PipelinedRDD(self, f, preservesPartitioning) 
  366   
 368          """ 
 369          Deprecated: use mapPartitionsWithIndex instead. 
 370   
 371          Return a new RDD by applying a function to each partition of this RDD, 
 372          while tracking the index of the original partition. 
 373   
 374          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 375          >>> def f(splitIndex, iterator): yield splitIndex 
 376          >>> rdd.mapPartitionsWithSplit(f).sum() 
 377          6 
 378          """ 
 379          warnings.warn("mapPartitionsWithSplit is deprecated; " 
 380                        "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 
 381          return self.mapPartitionsWithIndex(f, preservesPartitioning) 
  382   
 384          """ 
 385          Returns the number of partitions in RDD 
 386   
 387          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 388          >>> rdd.getNumPartitions() 
 389          2 
 390          """ 
 391          return self._jrdd.partitions().size() 
  392   
 394          """ 
 395          Return a new RDD containing only the elements that satisfy a predicate. 
 396   
 397          >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
 398          >>> rdd.filter(lambda x: x % 2 == 0).collect() 
 399          [2, 4] 
 400          """ 
 401          def func(iterator): 
 402              return ifilter(f, iterator) 
  403          return self.mapPartitions(func) 
 404   
 406          """ 
 407          Return a new RDD containing the distinct elements in this RDD. 
 408   
 409          >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 
 410          [1, 2, 3] 
 411          """ 
 412          return self.map(lambda x: (x, None)) \ 
 413                     .reduceByKey(lambda x, _: x) \ 
 414                     .map(lambda (x, _): x) 
  415   
 416 -    def sample(self, withReplacement, fraction, seed=None): 
  417          """ 
 418          Return a sampled subset of this RDD (relies on numpy and falls back 
 419          on default random generator if numpy is unavailable). 
 420   
 421          >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 
 422          [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 
 423          """ 
 424          assert fraction >= 0.0, "Negative fraction value: %s" % fraction 
 425          return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 
  426   
 427       
 428 -    def takeSample(self, withReplacement, num, seed=None): 
  429          """ 
 430          Return a fixed-size sampled subset of this RDD (currently requires 
 431          numpy). 
 432   
 433          >>> rdd = sc.parallelize(range(0, 10)) 
 434          >>> len(rdd.takeSample(True, 20, 1)) 
 435          20 
 436          >>> len(rdd.takeSample(False, 5, 2)) 
 437          5 
 438          >>> len(rdd.takeSample(False, 15, 3)) 
 439          10 
 440          """ 
 441          numStDev = 10.0 
 442   
 443          if num < 0: 
 444              raise ValueError("Sample size cannot be negative.") 
 445          elif num == 0: 
 446              return [] 
 447   
 448          initialCount = self.count() 
 449          if initialCount == 0: 
 450              return [] 
 451   
 452          rand = Random(seed) 
 453   
 454          if (not withReplacement) and num >= initialCount: 
 455               
 456              samples = self.collect() 
 457              rand.shuffle(samples) 
 458              return samples 
 459   
 460          maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint)) 
 461          if num > maxSampleSize: 
 462              raise ValueError( 
 463                  "Sample size cannot be greater than %d." % maxSampleSize) 
 464   
 465          fraction = RDD._computeFractionForSampleSize( 
 466              num, initialCount, withReplacement) 
 467          samples = self.sample(withReplacement, fraction, seed).collect() 
 468   
 469           
 470           
 471           
 472          while len(samples) < num: 
 473               
 474              seed = rand.randint(0, sys.maxint) 
 475              samples = self.sample(withReplacement, fraction, seed).collect() 
 476   
 477          rand.shuffle(samples) 
 478   
 479          return samples[0:num] 
  480   
 481      @staticmethod 
 483          """ 
 484          Returns a sampling rate that guarantees a sample of 
 485          size >= sampleSizeLowerBound 99.99% of the time. 
 486   
 487          How the sampling rate is determined: 
 488          Let p = num / total, where num is the sample size and total is the 
 489          total number of data points in the RDD. We're trying to compute 
 490          q > p such that 
 491            - when sampling with replacement, we're drawing each data point 
 492              with prob_i ~ Pois(q), where we want to guarantee 
 493              Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to 
 494              total), i.e. the failure rate of not having a sufficiently large 
 495              sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient 
 496              to guarantee 0.9999 success rate for num > 12, but we need a 
 497              slightly larger q (9 empirically determined). 
 498            - when sampling without replacement, we're drawing each data point 
 499              with prob_i ~ Binomial(total, fraction) and our choice of q 
 500              guarantees 1-delta, or 0.9999 success rate, where success rate is 
 501              defined the same as in sampling with replacement. 
 502          """ 
 503          fraction = float(sampleSizeLowerBound) / total 
 504          if withReplacement: 
 505              numStDev = 5 
 506              if (sampleSizeLowerBound < 12): 
 507                  numStDev = 9 
 508              return fraction + numStDev * sqrt(fraction / total) 
 509          else: 
 510              delta = 0.00005 
 511              gamma = - log(delta) / total 
 512              return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction)) 
  513   
 515          """ 
 516          Return the union of this RDD and another one. 
 517   
 518          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 519          >>> rdd.union(rdd).collect() 
 520          [1, 1, 2, 3, 1, 1, 2, 3] 
 521          """ 
 522          if self._jrdd_deserializer == other._jrdd_deserializer: 
 523              rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 
 524                        self._jrdd_deserializer) 
 525              return rdd 
 526          else: 
 527               
 528               
 529              self_copy = self._reserialize() 
 530              other_copy = other._reserialize() 
 531              return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 
 532                         self.ctx.serializer) 
  533   
 535          """ 
 536          Return the intersection of this RDD and another one. The output will 
 537          not contain any duplicate elements, even if the input RDDs did. 
 538   
 539          Note that this method performs a shuffle internally. 
 540   
 541          >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 
 542          >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 
 543          >>> rdd1.intersection(rdd2).collect() 
 544          [1, 2, 3] 
 545          """ 
 546          return self.map(lambda v: (v, None)) \ 
 547              .cogroup(other.map(lambda v: (v, None))) \ 
 548              .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 
 549              .keys() 
  550   
 552          serializer = serializer or self.ctx.serializer 
 553          if self._jrdd_deserializer == serializer: 
 554              return self 
 555          else: 
 556              converted = self.map(lambda x: x, preservesPartitioning=True) 
 557              converted._jrdd_deserializer = serializer 
 558              return converted 
  559   
 561          """ 
 562          Return the union of this RDD and another one. 
 563   
 564          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 565          >>> (rdd + rdd).collect() 
 566          [1, 1, 2, 3, 1, 1, 2, 3] 
 567          """ 
 568          if not isinstance(other, RDD): 
 569              raise TypeError 
 570          return self.union(other) 
  571   
 572 -    def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x): 
  573          """ 
 574          Sorts this RDD, which is assumed to consist of (key, value) pairs. 
 575          # noqa 
 576   
 577          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 578          >>> sc.parallelize(tmp).sortByKey().first() 
 579          ('1', 3) 
 580          >>> sc.parallelize(tmp).sortByKey(True, 1).collect() 
 581          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 582          >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 
 583          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 584          >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 
 585          >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 
 586          >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 
 587          [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)] 
 588          """ 
 589          if numPartitions is None: 
 590              numPartitions = self._defaultReducePartitions() 
 591   
 592          def sortPartition(iterator): 
 593              return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending)) 
  594   
 595          if numPartitions == 1: 
 596              if self.getNumPartitions() > 1: 
 597                  self = self.coalesce(1) 
 598              return self.mapPartitions(sortPartition) 
 599   
 600           
 601           
 602           
 603          rddSize = self.count() 
 604          maxSampleSize = numPartitions * 20.0   
 605          fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 
 606          samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 
 607          samples = sorted(samples, reverse=(not ascending), key=keyfunc) 
 608   
 609           
 610           
 611          bounds = [samples[len(samples) * (i + 1) / numPartitions] 
 612                    for i in range(0, numPartitions - 1)] 
 613   
 614          def rangePartitioner(k): 
 615              p = bisect.bisect_left(bounds, keyfunc(k)) 
 616              if ascending: 
 617                  return p 
 618              else: 
 619                  return numPartitions - 1 - p 
 620   
 621          return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True) 
 622   
 623 -    def sortBy(self, keyfunc, ascending=True, numPartitions=None): 
  624          """ 
 625          Sorts this RDD by the given keyfunc 
 626   
 627          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 628          >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() 
 629          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 630          >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() 
 631          [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 632          """ 
 633          return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values() 
  634   
 636          """ 
 637          Return an RDD created by coalescing all elements within each partition 
 638          into a list. 
 639   
 640          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 641          >>> sorted(rdd.glom().collect()) 
 642          [[1, 2], [3, 4]] 
 643          """ 
 644          def func(iterator): 
 645              yield list(iterator) 
  646          return self.mapPartitions(func) 
 647   
 649          """ 
 650          Return the Cartesian product of this RDD and another one, that is, the 
 651          RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 
 652          C{b} is in C{other}. 
 653   
 654          >>> rdd = sc.parallelize([1, 2]) 
 655          >>> sorted(rdd.cartesian(rdd).collect()) 
 656          [(1, 1), (1, 2), (2, 1), (2, 2)] 
 657          """ 
 658           
 659          deserializer = CartesianDeserializer(self._jrdd_deserializer, 
 660                                               other._jrdd_deserializer) 
 661          return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 
  662   
 663 -    def groupBy(self, f, numPartitions=None): 
  664          """ 
 665          Return an RDD of grouped items. 
 666   
 667          >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 
 668          >>> result = rdd.groupBy(lambda x: x % 2).collect() 
 669          >>> sorted([(x, sorted(y)) for (x, y) in result]) 
 670          [(0, [2, 8]), (1, [1, 1, 3, 5])] 
 671          """ 
 672          return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) 
  673   
 674 -    def pipe(self, command, env={}): 
  675          """ 
 676          Return an RDD created by piping elements to a forked external process. 
 677   
 678          >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 
 679          ['1', '2', '', '3'] 
 680          """ 
 681          def func(iterator): 
 682              pipe = Popen( 
 683                  shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
 684   
 685              def pipe_objs(out): 
 686                  for obj in iterator: 
 687                      out.write(str(obj).rstrip('\n') + '\n') 
 688                  out.close() 
  689              Thread(target=pipe_objs, args=[pipe.stdin]).start() 
 690              return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 
 691          return self.mapPartitions(func) 
 692   
 694          """ 
 695          Applies a function to all elements of this RDD. 
 696   
 697          >>> def f(x): print x 
 698          >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 
 699          """ 
 700          def processPartition(iterator): 
 701              for x in iterator: 
 702                  f(x) 
 703              yield None 
  704          self.mapPartitions(processPartition).collect()   
 705   
 707          """ 
 708          Applies a function to each partition of this RDD. 
 709   
 710          >>> def f(iterator): 
 711          ...      for x in iterator: 
 712          ...           print x 
 713          ...      yield None 
 714          >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 
 715          """ 
 716          self.mapPartitions(f).collect()   
  717   
 719          """ 
 720          Return a list that contains all of the elements in this RDD. 
 721          """ 
 722          with _JavaStackTrace(self.context) as st: 
 723              bytesInJava = self._jrdd.collect().iterator() 
 724          return list(self._collect_iterator_through_file(bytesInJava)) 
  725   
 727           
 728           
 729           
 730          tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 
 731          tempFile.close() 
 732          self.ctx._writeToFile(iterator, tempFile.name) 
 733           
 734          with open(tempFile.name, 'rb') as tempFile: 
 735              for item in self._jrdd_deserializer.load_stream(tempFile): 
 736                  yield item 
 737          os.unlink(tempFile.name) 
  738   
 740          """ 
 741          Reduces the elements of this RDD using the specified commutative and 
 742          associative binary operator. Currently reduces partitions locally. 
 743   
 744          >>> from operator import add 
 745          >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 
 746          15 
 747          >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 
 748          10 
 749          """ 
 750          def func(iterator): 
 751              acc = None 
 752              for obj in iterator: 
 753                  if acc is None: 
 754                      acc = obj 
 755                  else: 
 756                      acc = f(obj, acc) 
 757              if acc is not None: 
 758                  yield acc 
  759          vals = self.mapPartitions(func).collect() 
 760          return reduce(f, vals) 
 761   
 762 -    def fold(self, zeroValue, op): 
  763          """ 
 764          Aggregate the elements of each partition, and then the results for all 
 765          the partitions, using a given associative function and a neutral "zero 
 766          value." 
 767   
 768          The function C{op(t1, t2)} is allowed to modify C{t1} and return it 
 769          as its result value to avoid object allocation; however, it should not 
 770          modify C{t2}. 
 771   
 772          >>> from operator import add 
 773          >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 
 774          15 
 775          """ 
 776          def func(iterator): 
 777              acc = zeroValue 
 778              for obj in iterator: 
 779                  acc = op(obj, acc) 
 780              yield acc 
  781          vals = self.mapPartitions(func).collect() 
 782          return reduce(op, vals, zeroValue) 
 783   
 784 -    def aggregate(self, zeroValue, seqOp, combOp): 
  785          """ 
 786          Aggregate the elements of each partition, and then the results for all 
 787          the partitions, using a given combine functions and a neutral "zero 
 788          value." 
 789   
 790          The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 
 791          as its result value to avoid object allocation; however, it should not 
 792          modify C{t2}. 
 793   
 794          The first function (seqOp) can return a different result type, U, than 
 795          the type of this RDD. Thus, we need one operation for merging a T into 
 796          an U and one operation for merging two U 
 797   
 798          >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 
 799          >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
 800          >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 
 801          (10, 4) 
 802          >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 
 803          (0, 0) 
 804          """ 
 805          def func(iterator): 
 806              acc = zeroValue 
 807              for obj in iterator: 
 808                  acc = seqOp(acc, obj) 
 809              yield acc 
  810   
 811          return self.mapPartitions(func).fold(zeroValue, combOp) 
 812   
 814          """ 
 815          Find the maximum item in this RDD. 
 816   
 817          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 
 818          43.0 
 819          """ 
 820          return self.reduce(max) 
  821   
 823          """ 
 824          Find the minimum item in this RDD. 
 825   
 826          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 
 827          1.0 
 828          """ 
 829          return self.reduce(min) 
  830   
 832          """ 
 833          Add up the elements in this RDD. 
 834   
 835          >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 
 836          6.0 
 837          """ 
 838          return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
  839   
 841          """ 
 842          Return the number of elements in this RDD. 
 843   
 844          >>> sc.parallelize([2, 3, 4]).count() 
 845          3 
 846          """ 
 847          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
  848   
 850          """ 
 851          Return a L{StatCounter} object that captures the mean, variance 
 852          and count of the RDD's elements in one operation. 
 853          """ 
 854          def redFunc(left_counter, right_counter): 
 855              return left_counter.mergeStats(right_counter) 
  856   
 857          return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 
 858   
 860          """ 
 861          Compute a histogram using the provided buckets. The buckets 
 862          are all open to the right except for the last which is closed. 
 863          e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50], 
 864          which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1 
 865          and 50 we would have a histogram of 1,0,1. 
 866   
 867          If your histogram is evenly spaced (e.g. [0, 10, 20, 30]), 
 868          this can be switched from an O(log n) inseration to O(1) per 
 869          element(where n = # buckets). 
 870   
 871          Buckets must be sorted and not contain any duplicates, must be 
 872          at least two elements. 
 873   
 874          If `buckets` is a number, it will generates buckets which are 
 875          evenly spaced between the minimum and maximum of the RDD. For 
 876          example, if the min value is 0 and the max is 100, given buckets 
 877          as 2, the resulting buckets will be [0,50) [50,100]. buckets must 
 878          be at least 1 If the RDD contains infinity, NaN throws an exception 
 879          If the elements in RDD do not vary (max == min) always returns 
 880          a single bucket. 
 881   
 882          It will return an tuple of buckets and histogram. 
 883   
 884          >>> rdd = sc.parallelize(range(51)) 
 885          >>> rdd.histogram(2) 
 886          ([0, 25, 50], [25, 26]) 
 887          >>> rdd.histogram([0, 5, 25, 50]) 
 888          ([0, 5, 25, 50], [5, 20, 26]) 
 889          >>> rdd.histogram([0, 15, 30, 45, 60])  # evenly spaced buckets 
 890          ([0, 15, 30, 45, 60], [15, 15, 15, 6]) 
 891          >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"]) 
 892          >>> rdd.histogram(("a", "b", "c")) 
 893          (('a', 'b', 'c'), [2, 2]) 
 894          """ 
 895   
 896          if isinstance(buckets, (int, long)): 
 897              if buckets < 1: 
 898                  raise ValueError("number of buckets must be >= 1") 
 899   
 900               
 901              def comparable(x): 
 902                  if x is None: 
 903                      return False 
 904                  if type(x) is float and isnan(x): 
 905                      return False 
 906                  return True 
  907   
 908              filtered = self.filter(comparable) 
 909   
 910               
 911              def minmax(a, b): 
 912                  return min(a[0], b[0]), max(a[1], b[1]) 
 913              try: 
 914                  minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax) 
 915              except TypeError as e: 
 916                  if " empty " in str(e): 
 917                      raise ValueError("can not generate buckets from empty RDD") 
 918                  raise 
 919   
 920              if minv == maxv or buckets == 1: 
 921                  return [minv, maxv], [filtered.count()] 
 922   
 923              try: 
 924                  inc = (maxv - minv) / buckets 
 925              except TypeError: 
 926                  raise TypeError("Can not generate buckets with non-number in RDD") 
 927   
 928              if isinf(inc): 
 929                  raise ValueError("Can not generate buckets with infinite value") 
 930   
 931               
 932              if inc * buckets != maxv - minv: 
 933                  inc = (maxv - minv) * 1.0 / buckets 
 934   
 935              buckets = [i * inc + minv for i in range(buckets)] 
 936              buckets.append(maxv)   
 937              even = True 
 938   
 939          elif isinstance(buckets, (list, tuple)): 
 940              if len(buckets) < 2: 
 941                  raise ValueError("buckets should have more than one value") 
 942   
 943              if any(i is None or isinstance(i, float) and isnan(i) for i in buckets): 
 944                  raise ValueError("can not have None or NaN in buckets") 
 945   
 946              if sorted(buckets) != list(buckets): 
 947                  raise ValueError("buckets should be sorted") 
 948   
 949              if len(set(buckets)) != len(buckets): 
 950                  raise ValueError("buckets should not contain duplicated values") 
 951   
 952              minv = buckets[0] 
 953              maxv = buckets[-1] 
 954              even = False 
 955              inc = None 
 956              try: 
 957                  steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)] 
 958              except TypeError: 
 959                  pass   
 960              else: 
 961                  if max(steps) - min(steps) < 1e-10:   
 962                      even = True 
 963                      inc = (maxv - minv) / (len(buckets) - 1) 
 964   
 965          else: 
 966              raise TypeError("buckets should be a list or tuple or number(int or long)") 
 967   
 968          def histogram(iterator): 
 969              counters = [0] * len(buckets) 
 970              for i in iterator: 
 971                  if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv: 
 972                      continue 
 973                  t = (int((i - minv) / inc) if even 
 974                       else bisect.bisect_right(buckets, i) - 1) 
 975                  counters[t] += 1 
 976               
 977              last = counters.pop() 
 978              counters[-1] += last 
 979              return [counters] 
 980   
 981          def mergeCounters(a, b): 
 982              return [i + j for i, j in zip(a, b)] 
 983   
 984          return buckets, self.mapPartitions(histogram).reduce(mergeCounters) 
 985   
 987          """ 
 988          Compute the mean of this RDD's elements. 
 989   
 990          >>> sc.parallelize([1, 2, 3]).mean() 
 991          2.0 
 992          """ 
 993          return self.stats().mean() 
  994   
 996          """ 
 997          Compute the variance of this RDD's elements. 
 998   
 999          >>> sc.parallelize([1, 2, 3]).variance() 
1000          0.666... 
1001          """ 
1002          return self.stats().variance() 
 1003   
1005          """ 
1006          Compute the standard deviation of this RDD's elements. 
1007   
1008          >>> sc.parallelize([1, 2, 3]).stdev() 
1009          0.816... 
1010          """ 
1011          return self.stats().stdev() 
 1012   
1014          """ 
1015          Compute the sample standard deviation of this RDD's elements (which 
1016          corrects for bias in estimating the standard deviation by dividing by 
1017          N-1 instead of N). 
1018   
1019          >>> sc.parallelize([1, 2, 3]).sampleStdev() 
1020          1.0 
1021          """ 
1022          return self.stats().sampleStdev() 
 1023   
1025          """ 
1026          Compute the sample variance of this RDD's elements (which corrects 
1027          for bias in estimating the variance by dividing by N-1 instead of N). 
1028   
1029          >>> sc.parallelize([1, 2, 3]).sampleVariance() 
1030          1.0 
1031          """ 
1032          return self.stats().sampleVariance() 
 1033   
1035          """ 
1036          Return the count of each unique value in this RDD as a dictionary of 
1037          (value, count) pairs. 
1038   
1039          >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 
1040          [(1, 2), (2, 3)] 
1041          """ 
1042          def countPartition(iterator): 
1043              counts = defaultdict(int) 
1044              for obj in iterator: 
1045                  counts[obj] += 1 
1046              yield counts 
 1047   
1048          def mergeMaps(m1, m2): 
1049              for (k, v) in m2.iteritems(): 
1050                  m1[k] += v 
1051              return m1 
1052          return self.mapPartitions(countPartition).reduce(mergeMaps) 
1053   
1054 -    def top(self, num): 
 1055          """ 
1056          Get the top N elements from a RDD. 
1057   
1058          Note: It returns the list sorted in descending order. 
1059          >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 
1060          [12] 
1061          >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 
1062          [6, 5] 
1063          """ 
1064          def topIterator(iterator): 
1065              q = [] 
1066              for k in iterator: 
1067                  if len(q) < num: 
1068                      heapq.heappush(q, k) 
1069                  else: 
1070                      heapq.heappushpop(q, k) 
1071              yield q 
 1072   
1073          def merge(a, b): 
1074              return next(topIterator(a + b)) 
1075   
1076          return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 
1077   
1079          """ 
1080          Get the N elements from a RDD ordered in ascending order or as 
1081          specified by the optional key function. 
1082   
1083          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 
1084          [1, 2, 3, 4, 5, 6] 
1085          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 
1086          [10, 9, 7, 6, 5, 4] 
1087          """ 
1088   
1089          def topNKeyedElems(iterator, key_=None): 
1090              q = MaxHeapQ(num) 
1091              for k in iterator: 
1092                  if key_ is not None: 
1093                      k = (key_(k), k) 
1094                  q.insert(k) 
1095              yield q.getElements() 
 1096   
1097          def unKey(x, key_=None): 
1098              if key_ is not None: 
1099                  x = [i[1] for i in x] 
1100              return x 
1101   
1102          def merge(a, b): 
1103              return next(topNKeyedElems(a + b)) 
1104          result = self.mapPartitions( 
1105              lambda i: topNKeyedElems(i, key)).reduce(merge) 
1106          return sorted(unKey(result, key), key=key) 
1107   
1108 -    def take(self, num): 
 1109          """ 
1110          Take the first num elements of the RDD. 
1111   
1112          It works by first scanning one partition, and use the results from 
1113          that partition to estimate the number of additional partitions needed 
1114          to satisfy the limit. 
1115   
1116          Translated from the Scala implementation in RDD#take(). 
1117   
1118          >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 
1119          [2, 3] 
1120          >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 
1121          [2, 3, 4, 5, 6] 
1122          >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3) 
1123          [91, 92, 93] 
1124          """ 
1125          items = [] 
1126          totalParts = self._jrdd.partitions().size() 
1127          partsScanned = 0 
1128   
1129          while len(items) < num and partsScanned < totalParts: 
1130               
1131               
1132               
1133              numPartsToTry = 1 
1134              if partsScanned > 0: 
1135                   
1136                   
1137                   
1138                  if len(items) == 0: 
1139                      numPartsToTry = partsScanned * 4 
1140                  else: 
1141                      numPartsToTry = int(1.5 * num * partsScanned / len(items)) 
1142   
1143              left = num - len(items) 
1144   
1145              def takeUpToNumLeft(iterator): 
1146                  taken = 0 
1147                  while taken < left: 
1148                      yield next(iterator) 
1149                      taken += 1 
 1150   
1151              p = range( 
1152                  partsScanned, min(partsScanned + numPartsToTry, totalParts)) 
1153              res = self.context.runJob(self, takeUpToNumLeft, p, True) 
1154   
1155              items += res 
1156              partsScanned += numPartsToTry 
1157   
1158          return items[:num] 
1159   
1161          """ 
1162          Return the first element in this RDD. 
1163   
1164          >>> sc.parallelize([2, 3, 4]).first() 
1165          2 
1166          """ 
1167          return self.take(1)[0] 
 1168   
1170          """ 
1171          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1172          system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are 
1173          converted for output using either user specified converters or, by default, 
1174          L{org.apache.spark.api.python.JavaToWritableConverter}. 
1175   
1176          @param conf: Hadoop job configuration, passed in as a dict 
1177          @param keyConverter: (None by default) 
1178          @param valueConverter: (None by default) 
1179          """ 
1180          jconf = self.ctx._dictToJavaMap(conf) 
1181          pickledRDD = self._toPickleSerialization() 
1182          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1183          self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 
1184                                                      keyConverter, valueConverter, True) 
 1185   
1186 -    def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 
1187                                 keyConverter=None, valueConverter=None, conf=None): 
 1188          """ 
1189          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1190          system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types 
1191          will be inferred if not specified. Keys and values are converted for output using either 
1192          user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 
1193          C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 
1194          of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 
1195   
1196          @param path: path to Hadoop file 
1197          @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 
1198                 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat") 
1199          @param keyClass: fully qualified classname of key Writable class 
1200                 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 
1201          @param valueClass: fully qualified classname of value Writable class 
1202                 (e.g. "org.apache.hadoop.io.Text", None by default) 
1203          @param keyConverter: (None by default) 
1204          @param valueConverter: (None by default) 
1205          @param conf: Hadoop job configuration, passed in as a dict (None by default) 
1206          """ 
1207          jconf = self.ctx._dictToJavaMap(conf) 
1208          pickledRDD = self._toPickleSerialization() 
1209          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1210          self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, 
1211                                                         outputFormatClass, 
1212                                                         keyClass, valueClass, 
1213                                                         keyConverter, valueConverter, jconf) 
 1214   
1216          """ 
1217          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1218          system, using the old Hadoop OutputFormat API (mapred package). Keys/values are 
1219          converted for output using either user specified converters or, by default, 
1220          L{org.apache.spark.api.python.JavaToWritableConverter}. 
1221   
1222          @param conf: Hadoop job configuration, passed in as a dict 
1223          @param keyConverter: (None by default) 
1224          @param valueConverter: (None by default) 
1225          """ 
1226          jconf = self.ctx._dictToJavaMap(conf) 
1227          pickledRDD = self._toPickleSerialization() 
1228          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1229          self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf, 
1230                                                      keyConverter, valueConverter, False) 
 1231   
1232 -    def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, 
1233                           keyConverter=None, valueConverter=None, conf=None, 
1234                           compressionCodecClass=None): 
 1235          """ 
1236          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1237          system, using the old Hadoop OutputFormat API (mapred package). Key and value types 
1238          will be inferred if not specified. Keys and values are converted for output using either 
1239          user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The 
1240          C{conf} is applied on top of the base Hadoop conf associated with the SparkContext 
1241          of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. 
1242   
1243          @param path: path to Hadoop file 
1244          @param outputFormatClass: fully qualified classname of Hadoop OutputFormat 
1245                 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat") 
1246          @param keyClass: fully qualified classname of key Writable class 
1247                 (e.g. "org.apache.hadoop.io.IntWritable", None by default) 
1248          @param valueClass: fully qualified classname of value Writable class 
1249                 (e.g. "org.apache.hadoop.io.Text", None by default) 
1250          @param keyConverter: (None by default) 
1251          @param valueConverter: (None by default) 
1252          @param conf: (None by default) 
1253          @param compressionCodecClass: (None by default) 
1254          """ 
1255          jconf = self.ctx._dictToJavaMap(conf) 
1256          pickledRDD = self._toPickleSerialization() 
1257          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1258          self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, 
1259                                                   outputFormatClass, 
1260                                                   keyClass, valueClass, 
1261                                                   keyConverter, valueConverter, 
1262                                                   jconf, compressionCodecClass) 
 1263   
1265          """ 
1266          Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file 
1267          system, using the L{org.apache.hadoop.io.Writable} types that we convert from the 
1268          RDD's key and value types. The mechanism is as follows: 
1269              1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. 
1270              2. Keys and values of this Java RDD are converted to Writables and written out. 
1271   
1272          @param path: path to sequence file 
1273          @param compressionCodecClass: (None by default) 
1274          """ 
1275          pickledRDD = self._toPickleSerialization() 
1276          batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) 
1277          self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched, 
1278                                                     path, compressionCodecClass) 
 1279   
1281          """ 
1282          Save this RDD as a SequenceFile of serialized objects. The serializer 
1283          used is L{pyspark.serializers.PickleSerializer}, default batch size 
1284          is 10. 
1285   
1286          >>> tmpFile = NamedTemporaryFile(delete=True) 
1287          >>> tmpFile.close() 
1288          >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) 
1289          >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) 
1290          [1, 2, 'rdd', 'spark'] 
1291          """ 
1292          self._reserialize(BatchedSerializer(PickleSerializer(), 
1293                                              batchSize))._jrdd.saveAsObjectFile(path) 
 1294   
1295 -    def saveAsTextFile(self, path): 
 1296          """ 
1297          Save this RDD as a text file, using string representations of elements. 
1298   
1299          >>> tempFile = NamedTemporaryFile(delete=True) 
1300          >>> tempFile.close() 
1301          >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 
1302          >>> from fileinput import input 
1303          >>> from glob import glob 
1304          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
1305          '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 
1306   
1307          Empty lines are tolerated when saving to text files. 
1308   
1309          >>> tempFile2 = NamedTemporaryFile(delete=True) 
1310          >>> tempFile2.close() 
1311          >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 
1312          >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 
1313          '\\n\\n\\nbar\\nfoo\\n' 
1314          """ 
1315          def func(split, iterator): 
1316              for x in iterator: 
1317                  if not isinstance(x, basestring): 
1318                      x = unicode(x) 
1319                  if isinstance(x, unicode): 
1320                      x = x.encode("utf-8") 
1321                  yield x 
 1322          keyed = self.mapPartitionsWithIndex(func) 
1323          keyed._bypass_serializer = True 
1324          keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 
1325   
1326       
1327   
1329          """ 
1330          Return the key-value pairs in this RDD to the master as a dictionary. 
1331   
1332          >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 
1333          >>> m[1] 
1334          2 
1335          >>> m[3] 
1336          4 
1337          """ 
1338          return dict(self.collect()) 
 1339   
1341          """ 
1342          Return an RDD with the keys of each tuple. 
1343   
1344          >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 
1345          >>> m.collect() 
1346          [1, 3] 
1347          """ 
1348          return self.map(lambda (k, v): k) 
 1349   
1351          """ 
1352          Return an RDD with the values of each tuple. 
1353   
1354          >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 
1355          >>> m.collect() 
1356          [2, 4] 
1357          """ 
1358          return self.map(lambda (k, v): v) 
 1359   
1361          """ 
1362          Merge the values for each key using an associative reduce function. 
1363   
1364          This will also perform the merging locally on each mapper before 
1365          sending results to a reducer, similarly to a "combiner" in MapReduce. 
1366   
1367          Output will be hash-partitioned with C{numPartitions} partitions, or 
1368          the default parallelism level if C{numPartitions} is not specified. 
1369   
1370          >>> from operator import add 
1371          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1372          >>> sorted(rdd.reduceByKey(add).collect()) 
1373          [('a', 2), ('b', 1)] 
1374          """ 
1375          return self.combineByKey(lambda x: x, func, func, numPartitions) 
 1376   
1378          """ 
1379          Merge the values for each key using an associative reduce function, but 
1380          return the results immediately to the master as a dictionary. 
1381   
1382          This will also perform the merging locally on each mapper before 
1383          sending results to a reducer, similarly to a "combiner" in MapReduce. 
1384   
1385          >>> from operator import add 
1386          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1387          >>> sorted(rdd.reduceByKeyLocally(add).items()) 
1388          [('a', 2), ('b', 1)] 
1389          """ 
1390          def reducePartition(iterator): 
1391              m = {} 
1392              for (k, v) in iterator: 
1393                  m[k] = v if k not in m else func(m[k], v) 
1394              yield m 
 1395   
1396          def mergeMaps(m1, m2): 
1397              for (k, v) in m2.iteritems(): 
1398                  m1[k] = v if k not in m1 else func(m1[k], v) 
1399              return m1 
1400          return self.mapPartitions(reducePartition).reduce(mergeMaps) 
1401   
1403          """ 
1404          Count the number of elements for each key, and return the result to the 
1405          master as a dictionary. 
1406   
1407          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1408          >>> sorted(rdd.countByKey().items()) 
1409          [('a', 2), ('b', 1)] 
1410          """ 
1411          return self.map(lambda x: x[0]).countByValue() 
 1412   
1413 -    def join(self, other, numPartitions=None): 
 1414          """ 
1415          Return an RDD containing all pairs of elements with matching keys in 
1416          C{self} and C{other}. 
1417   
1418          Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 
1419          (k, v1) is in C{self} and (k, v2) is in C{other}. 
1420   
1421          Performs a hash join across the cluster. 
1422   
1423          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1424          >>> y = sc.parallelize([("a", 2), ("a", 3)]) 
1425          >>> sorted(x.join(y).collect()) 
1426          [('a', (1, 2)), ('a', (1, 3))] 
1427          """ 
1428          return python_join(self, other, numPartitions) 
 1429   
1431          """ 
1432          Perform a left outer join of C{self} and C{other}. 
1433   
1434          For each element (k, v) in C{self}, the resulting RDD will either 
1435          contain all pairs (k, (v, w)) for w in C{other}, or the pair 
1436          (k, (v, None)) if no elements in other have key k. 
1437   
1438          Hash-partitions the resulting RDD into the given number of partitions. 
1439   
1440          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1441          >>> y = sc.parallelize([("a", 2)]) 
1442          >>> sorted(x.leftOuterJoin(y).collect()) 
1443          [('a', (1, 2)), ('b', (4, None))] 
1444          """ 
1445          return python_left_outer_join(self, other, numPartitions) 
 1446   
1448          """ 
1449          Perform a right outer join of C{self} and C{other}. 
1450   
1451          For each element (k, w) in C{other}, the resulting RDD will either 
1452          contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
1453          if no elements in C{self} have key k. 
1454   
1455          Hash-partitions the resulting RDD into the given number of partitions. 
1456   
1457          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1458          >>> y = sc.parallelize([("a", 2)]) 
1459          >>> sorted(y.rightOuterJoin(x).collect()) 
1460          [('a', (2, 1)), ('b', (None, 4))] 
1461          """ 
1462          return python_right_outer_join(self, other, numPartitions) 
 1463   
1464       
1465       
1466       
1467 -    def partitionBy(self, numPartitions, partitionFunc=portable_hash): 
 1468          """ 
1469          Return a copy of the RDD partitioned using the specified partitioner. 
1470   
1471          >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 
1472          >>> sets = pairs.partitionBy(2).glom().collect() 
1473          >>> set(sets[0]).intersection(set(sets[1])) 
1474          set([]) 
1475          """ 
1476          if numPartitions is None: 
1477              numPartitions = self._defaultReducePartitions() 
1478   
1479           
1480           
1481           
1482           
1483           
1484           
1485          outputSerializer = self.ctx._unbatched_serializer 
1486   
1487          limit = (_parse_memory(self.ctx._conf.get( 
1488              "spark.python.worker.memory", "512m")) / 2) 
1489   
1490          def add_shuffle_key(split, iterator): 
1491   
1492              buckets = defaultdict(list) 
1493              c, batch = 0, min(10 * numPartitions, 1000) 
1494   
1495              for (k, v) in iterator: 
1496                  buckets[partitionFunc(k) % numPartitions].append((k, v)) 
1497                  c += 1 
1498   
1499                   
1500                  if (c % 1000 == 0 and get_used_memory() > limit 
1501                          or c > batch): 
1502                      n, size = len(buckets), 0 
1503                      for split in buckets.keys(): 
1504                          yield pack_long(split) 
1505                          d = outputSerializer.dumps(buckets[split]) 
1506                          del buckets[split] 
1507                          yield d 
1508                          size += len(d) 
1509   
1510                      avg = (size / n) >> 20 
1511                       
1512                      if avg < 1: 
1513                          batch *= 1.5 
1514                      elif avg > 10: 
1515                          batch = max(batch / 1.5, 1) 
1516                      c = 0 
1517   
1518              for (split, items) in buckets.iteritems(): 
1519                  yield pack_long(split) 
1520                  yield outputSerializer.dumps(items) 
 1521   
1522          keyed = self.mapPartitionsWithIndex(add_shuffle_key) 
1523          keyed._bypass_serializer = True 
1524          with _JavaStackTrace(self.context) as st: 
1525              pairRDD = self.ctx._jvm.PairwiseRDD( 
1526                  keyed._jrdd.rdd()).asJavaPairRDD() 
1527              partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 
1528                                                            id(partitionFunc)) 
1529          jrdd = pairRDD.partitionBy(partitioner).values() 
1530          rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 
1531           
1532           
1533          rdd._partitionFunc = partitionFunc 
1534          return rdd 
1535   
1536       
1537 -    def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 
1538                       numPartitions=None): 
 1539          """ 
1540          Generic function to combine the elements for each key using a custom 
1541          set of aggregation functions. 
1542   
1543          Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 
1544          type" C.  Note that V and C can be different -- for example, one might 
1545          group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 
1546   
1547          Users provide three functions: 
1548   
1549              - C{createCombiner}, which turns a V into a C (e.g., creates 
1550                a one-element list) 
1551              - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 
1552                a list) 
1553              - C{mergeCombiners}, to combine two C's into a single one. 
1554   
1555          In addition, users can control the partitioning of the output RDD. 
1556   
1557          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1558          >>> def f(x): return x 
1559          >>> def add(a, b): return a + str(b) 
1560          >>> sorted(x.combineByKey(str, add, add).collect()) 
1561          [('a', '11'), ('b', '1')] 
1562          """ 
1563          if numPartitions is None: 
1564              numPartitions = self._defaultReducePartitions() 
1565   
1566          serializer = self.ctx.serializer 
1567          spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() 
1568                   == 'true') 
1569          memory = _parse_memory(self.ctx._conf.get( 
1570              "spark.python.worker.memory", "512m")) 
1571          agg = Aggregator(createCombiner, mergeValue, mergeCombiners) 
1572   
1573          def combineLocally(iterator): 
1574              merger = ExternalMerger(agg, memory * 0.9, serializer) \ 
1575                  if spill else InMemoryMerger(agg) 
1576              merger.mergeValues(iterator) 
1577              return merger.iteritems() 
 1578   
1579          locally_combined = self.mapPartitions(combineLocally) 
1580          shuffled = locally_combined.partitionBy(numPartitions) 
1581   
1582          def _mergeCombiners(iterator): 
1583              merger = ExternalMerger(agg, memory, serializer) \ 
1584                  if spill else InMemoryMerger(agg) 
1585              merger.mergeCombiners(iterator) 
1586              return merger.iteritems() 
1587   
1588          return shuffled.mapPartitions(_mergeCombiners) 
1589   
1590 -    def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): 
 1591          """ 
1592          Aggregate the values of each key, using given combine functions and a neutral 
1593          "zero value". This function can return a different result type, U, than the type 
1594          of the values in this RDD, V. Thus, we need one operation for merging a V into 
1595          a U and one operation for merging two U's, The former operation is used for merging 
1596          values within a partition, and the latter is used for merging values between 
1597          partitions. To avoid memory allocation, both of these functions are 
1598          allowed to modify and return their first argument instead of creating a new U. 
1599          """ 
1600          def createZero(): 
1601              return copy.deepcopy(zeroValue) 
 1602   
1603          return self.combineByKey( 
1604              lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions) 
1605   
1606 -    def foldByKey(self, zeroValue, func, numPartitions=None): 
 1607          """ 
1608          Merge the values for each key using an associative function "func" 
1609          and a neutral "zeroValue" which may be added to the result an 
1610          arbitrary number of times, and must not change the result 
1611          (e.g., 0 for addition, or 1 for multiplication.). 
1612   
1613          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1614          >>> from operator import add 
1615          >>> rdd.foldByKey(0, add).collect() 
1616          [('a', 2), ('b', 1)] 
1617          """ 
1618          def createZero(): 
1619              return copy.deepcopy(zeroValue) 
 1620   
1621          return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions) 
1622   
1623       
1625          """ 
1626          Group the values for each key in the RDD into a single sequence. 
1627          Hash-partitions the resulting RDD with into numPartitions partitions. 
1628   
1629          Note: If you are grouping in order to perform an aggregation (such as a 
1630          sum or average) over each key, using reduceByKey will provide much 
1631          better performance. 
1632   
1633          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1634          >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 
1635          [('a', [1, 1]), ('b', [1])] 
1636          """ 
1637   
1638          def createCombiner(x): 
1639              return [x] 
 1640   
1641          def mergeValue(xs, x): 
1642              xs.append(x) 
1643              return xs 
1644   
1645          def mergeCombiners(a, b): 
1646              a.extend(b) 
1647              return a 
1648   
1649          return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 
1650                                   numPartitions).mapValues(lambda x: ResultIterable(x)) 
1651   
1652       
1654          """ 
1655          Pass each value in the key-value pair RDD through a flatMap function 
1656          without changing the keys; this also retains the original RDD's 
1657          partitioning. 
1658   
1659          >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 
1660          >>> def f(x): return x 
1661          >>> x.flatMapValues(f).collect() 
1662          [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 
1663          """ 
1664          flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 
1665          return self.flatMap(flat_map_fn, preservesPartitioning=True) 
 1666   
1668          """ 
1669          Pass each value in the key-value pair RDD through a map function 
1670          without changing the keys; this also retains the original RDD's 
1671          partitioning. 
1672   
1673          >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 
1674          >>> def f(x): return len(x) 
1675          >>> x.mapValues(f).collect() 
1676          [('a', 3), ('b', 1)] 
1677          """ 
1678          map_values_fn = lambda (k, v): (k, f(v)) 
1679          return self.map(map_values_fn, preservesPartitioning=True) 
 1680   
1682          """ 
1683          Alias for cogroup but with support for multiple RDDs. 
1684   
1685          >>> w = sc.parallelize([("a", 5), ("b", 6)]) 
1686          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1687          >>> y = sc.parallelize([("a", 2)]) 
1688          >>> z = sc.parallelize([("b", 42)]) 
1689          >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \ 
1690                  sorted(list(w.groupWith(x, y, z).collect()))) 
1691          [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))] 
1692   
1693          """ 
1694          return python_cogroup((self, other) + others, numPartitions=None) 
 1695   
1696       
1697 -    def cogroup(self, other, numPartitions=None): 
 1698          """ 
1699          For each key k in C{self} or C{other}, return a resulting RDD that 
1700          contains a tuple with the list of values for that key in C{self} as 
1701          well as C{other}. 
1702   
1703          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1704          >>> y = sc.parallelize([("a", 2)]) 
1705          >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 
1706          [('a', ([1], [2])), ('b', ([4], []))] 
1707          """ 
1708          return python_cogroup((self, other), numPartitions) 
 1709   
1710 -    def sampleByKey(self, withReplacement, fractions, seed=None): 
 1711          """ 
1712          Return a subset of this RDD sampled by key (via stratified sampling). 
1713          Create a sample of this RDD using variable sampling rates for 
1714          different keys as specified by fractions, a key to sampling rate map. 
1715   
1716          >>> fractions = {"a": 0.2, "b": 0.1} 
1717          >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000))) 
1718          >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect()) 
1719          >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150 
1720          True 
1721          >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0 
1722          True 
1723          >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0 
1724          True 
1725          """ 
1726          for fraction in fractions.values(): 
1727              assert fraction >= 0.0, "Negative fraction value: %s" % fraction 
1728          return self.mapPartitionsWithIndex( 
1729              RDDStratifiedSampler(withReplacement, fractions, seed).func, True) 
 1730   
1732          """ 
1733          Return each (key, value) pair in C{self} that has no pair with matching 
1734          key in C{other}. 
1735   
1736          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 
1737          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1738          >>> sorted(x.subtractByKey(y).collect()) 
1739          [('b', 4), ('b', 5)] 
1740          """ 
1741          def filter_func((key, vals)): 
1742              return len(vals[0]) > 0 and len(vals[1]) == 0 
 1743          map_func = lambda (key, vals): [(key, val) for val in vals[0]] 
1744          return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 
1745   
1746 -    def subtract(self, other, numPartitions=None): 
 1747          """ 
1748          Return each value in C{self} that is not contained in C{other}. 
1749   
1750          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 
1751          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1752          >>> sorted(x.subtract(y).collect()) 
1753          [('a', 1), ('b', 4), ('b', 5)] 
1754          """ 
1755           
1756          rdd = other.map(lambda x: (x, True)) 
1757          return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) 
 1758   
1760          """ 
1761          Creates tuples of the elements in this RDD by applying C{f}. 
1762   
1763          >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 
1764          >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 
1765          >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 
1766          [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 
1767          """ 
1768          return self.map(lambda x: (f(x), x)) 
 1769   
1771          """ 
1772           Return a new RDD that has exactly numPartitions partitions. 
1773   
1774           Can increase or decrease the level of parallelism in this RDD. 
1775           Internally, this uses a shuffle to redistribute data. 
1776           If you are decreasing the number of partitions in this RDD, consider 
1777           using `coalesce`, which can avoid performing a shuffle. 
1778   
1779           >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 
1780           >>> sorted(rdd.glom().collect()) 
1781           [[1], [2, 3], [4, 5], [6, 7]] 
1782           >>> len(rdd.repartition(2).glom().collect()) 
1783           2 
1784           >>> len(rdd.repartition(10).glom().collect()) 
1785           10 
1786          """ 
1787          jrdd = self._jrdd.repartition(numPartitions) 
1788          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1789   
1790 -    def coalesce(self, numPartitions, shuffle=False): 
 1791          """ 
1792          Return a new RDD that is reduced into `numPartitions` partitions. 
1793   
1794          >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 
1795          [[1], [2, 3], [4, 5]] 
1796          >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 
1797          [[1, 2, 3, 4, 5]] 
1798          """ 
1799          jrdd = self._jrdd.coalesce(numPartitions) 
1800          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1801   
1802 -    def zip(self, other): 
 1803          """ 
1804          Zips this RDD with another one, returning key-value pairs with the 
1805          first element in each RDD second element in each RDD, etc. Assumes 
1806          that the two RDDs have the same number of partitions and the same 
1807          number of elements in each partition (e.g. one was made through 
1808          a map on the other). 
1809   
1810          >>> x = sc.parallelize(range(0,5)) 
1811          >>> y = sc.parallelize(range(1000, 1005)) 
1812          >>> x.zip(y).collect() 
1813          [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 
1814          """ 
1815          if self.getNumPartitions() != other.getNumPartitions(): 
1816              raise ValueError("Can only zip with RDD which has the same number of partitions") 
1817   
1818          def get_batch_size(ser): 
1819              if isinstance(ser, BatchedSerializer): 
1820                  return ser.batchSize 
1821              return 0 
 1822   
1823          def batch_as(rdd, batchSize): 
1824              ser = rdd._jrdd_deserializer 
1825              if isinstance(ser, BatchedSerializer): 
1826                  ser = ser.serializer 
1827              return rdd._reserialize(BatchedSerializer(ser, batchSize)) 
1828   
1829          my_batch = get_batch_size(self._jrdd_deserializer) 
1830          other_batch = get_batch_size(other._jrdd_deserializer) 
1831          if my_batch != other_batch: 
1832               
1833              if my_batch > other_batch: 
1834                  other = batch_as(other, my_batch) 
1835              else: 
1836                  self = batch_as(self, other_batch) 
1837   
1838           
1839           
1840          pairRDD = self._jrdd.zip(other._jrdd) 
1841          deserializer = PairDeserializer(self._jrdd_deserializer, 
1842                                          other._jrdd_deserializer) 
1843          return RDD(pairRDD, self.ctx, deserializer) 
1844   
1846          """ 
1847          Zips this RDD with its element indices. 
1848   
1849          The ordering is first based on the partition index and then the 
1850          ordering of items within each partition. So the first item in 
1851          the first partition gets index 0, and the last item in the last 
1852          partition receives the largest index. 
1853   
1854          This method needs to trigger a spark job when this RDD contains 
1855          more than one partitions. 
1856   
1857          >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect() 
1858          [('a', 0), ('b', 1), ('c', 2), ('d', 3)] 
1859          """ 
1860          starts = [0] 
1861          if self.getNumPartitions() > 1: 
1862              nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect() 
1863              for i in range(len(nums) - 1): 
1864                  starts.append(starts[-1] + nums[i]) 
1865   
1866          def func(k, it): 
1867              for i, v in enumerate(it, starts[k]): 
1868                  yield v, i 
 1869   
1870          return self.mapPartitionsWithIndex(func) 
1871   
1873          """ 
1874          Zips this RDD with generated unique Long ids. 
1875   
1876          Items in the kth partition will get ids k, n+k, 2*n+k, ..., where 
1877          n is the number of partitions. So there may exist gaps, but this 
1878          method won't trigger a spark job, which is different from 
1879          L{zipWithIndex} 
1880   
1881          >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() 
1882          [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] 
1883          """ 
1884          n = self.getNumPartitions() 
1885   
1886          def func(k, it): 
1887              for i, v in enumerate(it): 
1888                  yield v, i * n + k 
 1889   
1890          return self.mapPartitionsWithIndex(func) 
1891   
1893          """ 
1894          Return the name of this RDD. 
1895          """ 
1896          name_ = self._jrdd.name() 
1897          if not name_: 
1898              return None 
1899          return name_.encode('utf-8') 
 1900   
1902          """ 
1903          Assign a name to this RDD. 
1904   
1905          >>> rdd1 = sc.parallelize([1,2]) 
1906          >>> rdd1.setName('RDD1') 
1907          >>> rdd1.name() 
1908          'RDD1' 
1909          """ 
1910          self._jrdd.setName(name) 
 1911   
1913          """ 
1914          A description of this RDD and its recursive dependencies for debugging. 
1915          """ 
1916          debug_string = self._jrdd.toDebugString() 
1917          if not debug_string: 
1918              return None 
1919          return debug_string.encode('utf-8') 
 1920   
1922          """ 
1923          Get the RDD's current storage level. 
1924   
1925          >>> rdd1 = sc.parallelize([1,2]) 
1926          >>> rdd1.getStorageLevel() 
1927          StorageLevel(False, False, False, False, 1) 
1928          >>> print(rdd1.getStorageLevel()) 
1929          Serialized 1x Replicated 
1930          """ 
1931          java_storage_level = self._jrdd.getStorageLevel() 
1932          storage_level = StorageLevel(java_storage_level.useDisk(), 
1933                                       java_storage_level.useMemory(), 
1934                                       java_storage_level.useOffHeap(), 
1935                                       java_storage_level.deserialized(), 
1936                                       java_storage_level.replication()) 
1937          return storage_level 
 1938   
1940          """ 
1941          Returns the default number of partitions to use during reduce tasks (e.g., groupBy). 
1942          If spark.default.parallelism is set, then we'll use the value from SparkContext 
1943          defaultParallelism, otherwise we'll use the number of partitions in this RDD. 
1944   
1945          This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce 
1946          the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will 
1947          be inherent. 
1948          """ 
1949          if self.ctx._conf.contains("spark.default.parallelism"): 
1950              return self.ctx.defaultParallelism 
1951          else: 
1952              return self.getNumPartitions() 
 1953   
1961   
1962      """ 
1963      Pipelined maps: 
1964   
1965      >>> rdd = sc.parallelize([1, 2, 3, 4]) 
1966      >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 
1967      [4, 8, 12, 16] 
1968      >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 
1969      [4, 8, 12, 16] 
1970   
1971      Pipelined reduces: 
1972      >>> from operator import add 
1973      >>> rdd.map(lambda x: 2 * x).reduce(add) 
1974      20 
1975      >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 
1976      20 
1977      """ 
1978   
1979 -    def __init__(self, prev, func, preservesPartitioning=False): 
 1980          if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 
1981               
1982              self.func = func 
1983              self.preservesPartitioning = preservesPartitioning 
1984              self._prev_jrdd = prev._jrdd 
1985              self._prev_jrdd_deserializer = prev._jrdd_deserializer 
1986          else: 
1987              prev_func = prev.func 
1988   
1989              def pipeline_func(split, iterator): 
1990                  return func(split, prev_func(split, iterator)) 
 1991              self.func = pipeline_func 
1992              self.preservesPartitioning = \ 
1993                  prev.preservesPartitioning and preservesPartitioning 
1994              self._prev_jrdd = prev._prev_jrdd   
1995              self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 
1996          self.is_cached = False 
1997          self.is_checkpointed = False 
1998          self.ctx = prev.ctx 
1999          self.prev = prev 
2000          self._jrdd_val = None 
2001          self._jrdd_deserializer = self.ctx.serializer 
2002          self._bypass_serializer = False 
 2003   
2004      @property 
2006          if self._jrdd_val: 
2007              return self._jrdd_val 
2008          if self._bypass_serializer: 
2009              self._jrdd_deserializer = NoOpSerializer() 
2010          command = (self.func, self._prev_jrdd_deserializer, 
2011                     self._jrdd_deserializer) 
2012          ser = CloudPickleSerializer() 
2013          pickled_command = ser.dumps(command) 
2014          broadcast_vars = ListConverter().convert( 
2015              [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 
2016              self.ctx._gateway._gateway_client) 
2017          self.ctx._pickled_broadcast_vars.clear() 
2018          env = MapConverter().convert(self.ctx.environment, 
2019                                       self.ctx._gateway._gateway_client) 
2020          includes = ListConverter().convert(self.ctx._python_includes, 
2021                                             self.ctx._gateway._gateway_client) 
2022          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 
2023                                               bytearray(pickled_command), 
2024                                               env, includes, self.preservesPartitioning, 
2025                                               self.ctx.pythonExec, 
2026                                               broadcast_vars, self.ctx._javaAccumulator) 
2027          self._jrdd_val = python_rdd.asJavaRDD() 
2028          return self._jrdd_val 
 2029   
2031          return not (self.is_cached or self.is_checkpointed) 
 2032   
2035      import doctest 
2036      from pyspark.context import SparkContext 
2037      globs = globals().copy() 
2038       
2039       
2040      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
2041      (failure_count, test_count) = doctest.testmod( 
2042          globs=globs, optionflags=doctest.ELLIPSIS) 
2043      globs['sc'].stop() 
2044      if failure_count: 
2045          exit(-1) 
 2046   
2047   
2048  if __name__ == "__main__": 
2049      _test() 
2050