1   
   2   
   3   
   4   
   5   
   6   
   7   
   8   
   9   
  10   
  11   
  12   
  13   
  14   
  15   
  16   
  17   
  18  from base64 import standard_b64encode as b64enc 
  19  import copy 
  20  from collections import defaultdict 
  21  from collections import namedtuple 
  22  from itertools import chain, ifilter, imap 
  23  import operator 
  24  import os 
  25  import sys 
  26  import shlex 
  27  import traceback 
  28  from subprocess import Popen, PIPE 
  29  from tempfile import NamedTemporaryFile 
  30  from threading import Thread 
  31  import warnings 
  32  import heapq 
  33  from random import Random 
  34   
  35  from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ 
  36      BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long 
  37  from pyspark.join import python_join, python_left_outer_join, \ 
  38      python_right_outer_join, python_cogroup 
  39  from pyspark.statcounter import StatCounter 
  40  from pyspark.rddsampler import RDDSampler 
  41  from pyspark.storagelevel import StorageLevel 
  42  from pyspark.resultiterable import ResultIterable 
  43   
  44  from py4j.java_collections import ListConverter, MapConverter 
  45   
  46  __all__ = ["RDD"] 
  52      """ 
  53      This function returns consistent hash code for builtin types, especially 
  54      for None and tuple with None. 
  55   
  56      The algrithm is similar to that one used by CPython 2.7 
  57   
  58      >>> portable_hash(None) 
  59      0 
  60      >>> portable_hash((None, 1)) 
  61      219750521 
  62      """ 
  63      if x is None: 
  64          return 0 
  65      if isinstance(x, tuple): 
  66          h = 0x345678 
  67          for i in x: 
  68              h ^= portable_hash(i) 
  69              h *= 1000003 
  70              h &= 0xffffffff 
  71          h ^= len(x) 
  72          if h == -1: 
  73              h = -2 
  74          return h 
  75      return hash(x) 
   76   
  79      """ 
  80      This function returns the traceback info for a callsite, returns a dict 
  81      with function name, file name and line number 
  82      """ 
  83      tb = traceback.extract_stack() 
  84      callsite = namedtuple("Callsite", "function file linenum") 
  85      if len(tb) == 0: 
  86          return None 
  87      file, line, module, what = tb[len(tb) - 1] 
  88      sparkpath = os.path.dirname(file) 
  89      first_spark_frame = len(tb) - 1 
  90      for i in range(0, len(tb)): 
  91          file, line, fun, what = tb[i] 
  92          if file.startswith(sparkpath): 
  93              first_spark_frame = i 
  94              break 
  95      if first_spark_frame == 0: 
  96          file, line, fun, what = tb[0] 
  97          return callsite(function=fun, file=file, linenum=line) 
  98      sfile, sline, sfun, swhat = tb[first_spark_frame] 
  99      ufile, uline, ufun, uwhat = tb[first_spark_frame-1] 
 100      return callsite(function=sfun, file=ufile, linenum=uline) 
  101   
 102  _spark_stack_depth = 0 
 106          tb = _extract_concise_traceback() 
 107          if tb is not None: 
 108              self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum) 
 109          else: 
 110              self._traceback = "Error! Could not extract traceback info" 
 111          self._context = sc 
  112   
 118   
 126      """ 
 127      An implementation of MaxHeap. 
 128      >>> import pyspark.rdd 
 129      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 130      >>> [heap.insert(i) for i in range(10)] 
 131      [None, None, None, None, None, None, None, None, None, None] 
 132      >>> sorted(heap.getElements()) 
 133      [0, 1, 2, 3, 4] 
 134      >>> heap = pyspark.rdd.MaxHeapQ(5) 
 135      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 136      [None, None, None, None, None, None, None, None, None, None] 
 137      >>> sorted(heap.getElements()) 
 138      [0, 1, 2, 3, 4] 
 139      >>> heap = pyspark.rdd.MaxHeapQ(1) 
 140      >>> [heap.insert(i) for i in range(9, -1, -1)] 
 141      [None, None, None, None, None, None, None, None, None, None] 
 142      >>> heap.getElements() 
 143      [0] 
 144      """ 
 145   
 147           
 148          self.q = [0] 
 149          self.maxsize = maxsize 
  150   
 152          while (k > 1) and (self.q[k/2] < self.q[k]): 
 153              self._swap(k, k/2) 
 154              k = k/2 
  155   
 157          t = self.q[i] 
 158          self.q[i] = self.q[j] 
 159          self.q[j] = t 
  160   
 162          N = self.size() 
 163          while 2 * k <= N: 
 164              j = 2 * k 
 165               
 166               
 167              if j < N and self.q[j] < self.q[j + 1]: 
 168                  j = j + 1 
 169              if(self.q[k] > self.q[j]): 
 170                  break 
 171              self._swap(k, j) 
 172              k = j 
  173   
 175          return len(self.q) - 1 
  176   
 178          if (self.size()) < self.maxsize: 
 179              self.q.append(value) 
 180              self._swim(self.size()) 
 181          else: 
 182              self._replaceRoot(value) 
  183   
 186   
 188          if(self.q[1] > value): 
 189              self.q[1] = value 
 190              self._sink(1) 
   191   
 193      """ 
 194      A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. 
 195      Represents an immutable, partitioned collection of elements that can be 
 196      operated on in parallel. 
 197      """ 
 198   
 199 -    def __init__(self, jrdd, ctx, jrdd_deserializer): 
  200          self._jrdd = jrdd 
 201          self.is_cached = False 
 202          self.is_checkpointed = False 
 203          self.ctx = ctx 
 204          self._jrdd_deserializer = jrdd_deserializer 
 205          self._id = jrdd.id() 
  206   
 208          """ 
 209          A unique ID for this RDD (within its SparkContext). 
 210          """ 
 211          return self._id 
  212   
 214          return self._jrdd.toString() 
  215   
 216      @property 
 218          """ 
 219          The L{SparkContext} that this RDD was created on. 
 220          """ 
 221          return self.ctx 
  222   
 224          """ 
 225          Persist this RDD with the default storage level (C{MEMORY_ONLY}). 
 226          """ 
 227          self.is_cached = True 
 228          self._jrdd.cache() 
 229          return self 
  230   
 232          """ 
 233          Set this RDD's storage level to persist its values across operations after the first time 
 234          it is computed. This can only be used to assign a new storage level if the RDD does not 
 235          have a storage level set yet. 
 236          """ 
 237          self.is_cached = True 
 238          javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) 
 239          self._jrdd.persist(javaStorageLevel) 
 240          return self 
  241   
 243          """ 
 244          Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. 
 245          """ 
 246          self.is_cached = False 
 247          self._jrdd.unpersist() 
 248          return self 
  249   
 251          """ 
 252          Mark this RDD for checkpointing. It will be saved to a file inside the 
 253          checkpoint directory set with L{SparkContext.setCheckpointDir()} and 
 254          all references to its parent RDDs will be removed. This function must 
 255          be called before any job has been executed on this RDD. It is strongly 
 256          recommended that this RDD is persisted in memory, otherwise saving it 
 257          on a file will require recomputation. 
 258          """ 
 259          self.is_checkpointed = True 
 260          self._jrdd.rdd().checkpoint() 
  261   
 263          """ 
 264          Return whether this RDD has been checkpointed or not 
 265          """ 
 266          return self._jrdd.rdd().isCheckpointed() 
  267   
 269          """ 
 270          Gets the name of the file to which this RDD was checkpointed 
 271          """ 
 272          checkpointFile = self._jrdd.rdd().getCheckpointFile() 
 273          if checkpointFile.isDefined(): 
 274              return checkpointFile.get() 
 275          else: 
 276              return None 
  277   
 278 -    def map(self, f, preservesPartitioning=False): 
  279          """ 
 280          Return a new RDD by applying a function to each element of this RDD. 
 281           
 282          >>> rdd = sc.parallelize(["b", "a", "c"]) 
 283          >>> sorted(rdd.map(lambda x: (x, 1)).collect()) 
 284          [('a', 1), ('b', 1), ('c', 1)] 
 285          """ 
 286          def func(split, iterator): return imap(f, iterator) 
 287          return PipelinedRDD(self, func, preservesPartitioning) 
  288   
 289 -    def flatMap(self, f, preservesPartitioning=False): 
  290          """ 
 291          Return a new RDD by first applying a function to all elements of this 
 292          RDD, and then flattening the results. 
 293   
 294          >>> rdd = sc.parallelize([2, 3, 4]) 
 295          >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) 
 296          [1, 1, 1, 2, 2, 3] 
 297          >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) 
 298          [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)] 
 299          """ 
 300          def func(s, iterator): return chain.from_iterable(imap(f, iterator)) 
 301          return self.mapPartitionsWithIndex(func, preservesPartitioning) 
  302   
 304          """ 
 305          Return a new RDD by applying a function to each partition of this RDD. 
 306   
 307          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 308          >>> def f(iterator): yield sum(iterator) 
 309          >>> rdd.mapPartitions(f).collect() 
 310          [3, 7] 
 311          """ 
 312          def func(s, iterator): return f(iterator) 
 313          return self.mapPartitionsWithIndex(func) 
  314   
 316          """ 
 317          Return a new RDD by applying a function to each partition of this RDD, 
 318          while tracking the index of the original partition. 
 319   
 320          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 321          >>> def f(splitIndex, iterator): yield splitIndex 
 322          >>> rdd.mapPartitionsWithIndex(f).sum() 
 323          6 
 324          """ 
 325          return PipelinedRDD(self, f, preservesPartitioning) 
  326   
 328          """ 
 329          Deprecated: use mapPartitionsWithIndex instead. 
 330   
 331          Return a new RDD by applying a function to each partition of this RDD, 
 332          while tracking the index of the original partition. 
 333   
 334          >>> rdd = sc.parallelize([1, 2, 3, 4], 4) 
 335          >>> def f(splitIndex, iterator): yield splitIndex 
 336          >>> rdd.mapPartitionsWithSplit(f).sum() 
 337          6 
 338          """ 
 339          warnings.warn("mapPartitionsWithSplit is deprecated; " 
 340              "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2) 
 341          return self.mapPartitionsWithIndex(f, preservesPartitioning) 
  342   
 344          """ 
 345          Return a new RDD containing only the elements that satisfy a predicate. 
 346   
 347          >>> rdd = sc.parallelize([1, 2, 3, 4, 5]) 
 348          >>> rdd.filter(lambda x: x % 2 == 0).collect() 
 349          [2, 4] 
 350          """ 
 351          def func(iterator): return ifilter(f, iterator) 
 352          return self.mapPartitions(func) 
  353   
 355          """ 
 356          Return a new RDD containing the distinct elements in this RDD. 
 357   
 358          >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) 
 359          [1, 2, 3] 
 360          """ 
 361          return self.map(lambda x: (x, None)) \ 
 362                     .reduceByKey(lambda x, _: x) \ 
 363                     .map(lambda (x, _): x) 
  364   
 365 -    def sample(self, withReplacement, fraction, seed=None): 
  366          """ 
 367          Return a sampled subset of this RDD (relies on numpy and falls back 
 368          on default random generator if numpy is unavailable). 
 369   
 370          >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP 
 371          [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98] 
 372          """ 
 373          assert fraction >= 0.0, "Invalid fraction value: %s" % fraction 
 374          return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) 
  375   
 376       
 377 -    def takeSample(self, withReplacement, num, seed=None): 
  378          """ 
 379          Return a fixed-size sampled subset of this RDD (currently requires numpy). 
 380   
 381          >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP 
 382          [4, 2, 1, 8, 2, 7, 0, 4, 1, 4] 
 383          """ 
 384   
 385          fraction = 0.0 
 386          total = 0 
 387          multiplier = 3.0 
 388          initialCount = self.count() 
 389          maxSelected = 0 
 390   
 391          if (num < 0): 
 392              raise ValueError 
 393   
 394          if (initialCount == 0): 
 395              return list() 
 396   
 397          if initialCount > sys.maxint - 1: 
 398              maxSelected = sys.maxint - 1 
 399          else: 
 400              maxSelected = initialCount 
 401   
 402          if num > initialCount and not withReplacement: 
 403              total = maxSelected 
 404              fraction = multiplier * (maxSelected + 1) / initialCount 
 405          else: 
 406              fraction = multiplier * (num + 1) / initialCount 
 407              total = num 
 408   
 409          samples = self.sample(withReplacement, fraction, seed).collect() 
 410   
 411           
 412           
 413           
 414          rand = Random(seed) 
 415          while len(samples) < total: 
 416              samples = self.sample(withReplacement, fraction, rand.randint(0, sys.maxint)).collect() 
 417   
 418          sampler = RDDSampler(withReplacement, fraction, rand.randint(0, sys.maxint)) 
 419          sampler.shuffle(samples) 
 420          return samples[0:total] 
  421   
 423          """ 
 424          Return the union of this RDD and another one. 
 425   
 426          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 427          >>> rdd.union(rdd).collect() 
 428          [1, 1, 2, 3, 1, 1, 2, 3] 
 429          """ 
 430          if self._jrdd_deserializer == other._jrdd_deserializer: 
 431              rdd = RDD(self._jrdd.union(other._jrdd), self.ctx, 
 432                        self._jrdd_deserializer) 
 433              return rdd 
 434          else: 
 435               
 436               
 437              self_copy = self._reserialize() 
 438              other_copy = other._reserialize() 
 439              return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, 
 440                         self.ctx.serializer) 
  441   
 443          """ 
 444          Return the intersection of this RDD and another one. The output will not  
 445          contain any duplicate elements, even if the input RDDs did. 
 446           
 447          Note that this method performs a shuffle internally. 
 448   
 449          >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) 
 450          >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) 
 451          >>> rdd1.intersection(rdd2).collect() 
 452          [1, 2, 3] 
 453          """ 
 454          return self.map(lambda v: (v, None)) \ 
 455              .cogroup(other.map(lambda v: (v, None))) \ 
 456              .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ 
 457              .keys() 
  458   
 460          if self._jrdd_deserializer == self.ctx.serializer: 
 461              return self 
 462          else: 
 463              return self.map(lambda x: x, preservesPartitioning=True) 
  464   
 466          """ 
 467          Return the union of this RDD and another one. 
 468   
 469          >>> rdd = sc.parallelize([1, 1, 2, 3]) 
 470          >>> (rdd + rdd).collect() 
 471          [1, 1, 2, 3, 1, 1, 2, 3] 
 472          """ 
 473          if not isinstance(other, RDD): 
 474              raise TypeError 
 475          return self.union(other) 
  476   
 477 -    def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): 
  478          """ 
 479          Sorts this RDD, which is assumed to consist of (key, value) pairs. 
 480   
 481          >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] 
 482          >>> sc.parallelize(tmp).sortByKey(True, 2).collect() 
 483          [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] 
 484          >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] 
 485          >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) 
 486          >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() 
 487          [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] 
 488          """ 
 489          if numPartitions is None: 
 490              numPartitions = self.ctx.defaultParallelism 
 491   
 492          bounds = list() 
 493   
 494           
 495           
 496           
 497          if numPartitions > 1: 
 498              rddSize = self.count() 
 499              maxSampleSize = numPartitions * 20.0  
 500              fraction = min(maxSampleSize / max(rddSize, 1), 1.0) 
 501   
 502              samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect() 
 503              samples = sorted(samples, reverse=(not ascending), key=keyfunc) 
 504   
 505               
 506               
 507              for i in range(0, numPartitions - 1): 
 508                  index = (len(samples) - 1) * (i + 1) / numPartitions 
 509                  bounds.append(samples[index]) 
 510   
 511          def rangePartitionFunc(k): 
 512              p = 0 
 513              while p < len(bounds) and keyfunc(k) > bounds[p]: 
 514                  p += 1 
 515              if ascending: 
 516                  return p 
 517              else: 
 518                  return numPartitions-1-p 
  519   
 520          def mapFunc(iterator): 
 521              yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k)) 
  522   
 523          return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc) 
 524                      .mapPartitions(mapFunc,preservesPartitioning=True) 
 525                      .flatMap(lambda x: x, preservesPartitioning=True)) 
 526   
 528          """ 
 529          Return an RDD created by coalescing all elements within each partition 
 530          into a list. 
 531   
 532          >>> rdd = sc.parallelize([1, 2, 3, 4], 2) 
 533          >>> sorted(rdd.glom().collect()) 
 534          [[1, 2], [3, 4]] 
 535          """ 
 536          def func(iterator): yield list(iterator) 
 537          return self.mapPartitions(func) 
  538   
 540          """ 
 541          Return the Cartesian product of this RDD and another one, that is, the 
 542          RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and 
 543          C{b} is in C{other}. 
 544   
 545          >>> rdd = sc.parallelize([1, 2]) 
 546          >>> sorted(rdd.cartesian(rdd).collect()) 
 547          [(1, 1), (1, 2), (2, 1), (2, 2)] 
 548          """ 
 549           
 550          deserializer = CartesianDeserializer(self._jrdd_deserializer, 
 551                                               other._jrdd_deserializer) 
 552          return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer) 
  553   
 554 -    def groupBy(self, f, numPartitions=None): 
  555          """ 
 556          Return an RDD of grouped items. 
 557   
 558          >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) 
 559          >>> result = rdd.groupBy(lambda x: x % 2).collect() 
 560          >>> sorted([(x, sorted(y)) for (x, y) in result]) 
 561          [(0, [2, 8]), (1, [1, 1, 3, 5])] 
 562          """ 
 563          return self.map(lambda x: (f(x), x)).groupByKey(numPartitions) 
  564   
 565 -    def pipe(self, command, env={}): 
  566          """ 
 567          Return an RDD created by piping elements to a forked external process. 
 568   
 569          >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() 
 570          ['1', '2', '', '3'] 
 571          """ 
 572          def func(iterator): 
 573              pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE) 
 574              def pipe_objs(out): 
 575                  for obj in iterator: 
 576                      out.write(str(obj).rstrip('\n') + '\n') 
 577                  out.close() 
  578              Thread(target=pipe_objs, args=[pipe.stdin]).start() 
 579              return (x.rstrip('\n') for x in iter(pipe.stdout.readline, '')) 
 580          return self.mapPartitions(func) 
 581   
 583          """ 
 584          Applies a function to all elements of this RDD. 
 585   
 586          >>> def f(x): print x 
 587          >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f) 
 588          """ 
 589          def processPartition(iterator): 
 590              for x in iterator: 
 591                  f(x) 
 592              yield None 
  593          self.mapPartitions(processPartition).collect()   
 594   
 596          """ 
 597          Applies a function to each partition of this RDD. 
 598   
 599          >>> def f(iterator):  
 600          ...      for x in iterator:  
 601          ...           print x  
 602          ...      yield None 
 603          >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) 
 604          """ 
 605          self.mapPartitions(f).collect()   
  606           
 608          """ 
 609          Return a list that contains all of the elements in this RDD. 
 610          """ 
 611          with _JavaStackTrace(self.context) as st: 
 612            bytesInJava = self._jrdd.collect().iterator() 
 613          return list(self._collect_iterator_through_file(bytesInJava)) 
  614   
 616           
 617           
 618           
 619          tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir) 
 620          tempFile.close() 
 621          self.ctx._writeToFile(iterator, tempFile.name) 
 622           
 623          with open(tempFile.name, 'rb') as tempFile: 
 624              for item in self._jrdd_deserializer.load_stream(tempFile): 
 625                  yield item 
 626          os.unlink(tempFile.name) 
  627   
 629          """ 
 630          Reduces the elements of this RDD using the specified commutative and 
 631          associative binary operator. Currently reduces partitions locally. 
 632   
 633          >>> from operator import add 
 634          >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 
 635          15 
 636          >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 
 637          10 
 638          """ 
 639          def func(iterator): 
 640              acc = None 
 641              for obj in iterator: 
 642                  if acc is None: 
 643                      acc = obj 
 644                  else: 
 645                      acc = f(obj, acc) 
 646              if acc is not None: 
 647                  yield acc 
  648          vals = self.mapPartitions(func).collect() 
 649          return reduce(f, vals) 
 650   
 651 -    def fold(self, zeroValue, op): 
  652          """ 
 653          Aggregate the elements of each partition, and then the results for all 
 654          the partitions, using a given associative function and a neutral "zero 
 655          value." 
 656   
 657          The function C{op(t1, t2)} is allowed to modify C{t1} and return it 
 658          as its result value to avoid object allocation; however, it should not 
 659          modify C{t2}. 
 660   
 661          >>> from operator import add 
 662          >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add) 
 663          15 
 664          """ 
 665          def func(iterator): 
 666              acc = zeroValue 
 667              for obj in iterator: 
 668                  acc = op(obj, acc) 
 669              yield acc 
  670          vals = self.mapPartitions(func).collect() 
 671          return reduce(op, vals, zeroValue) 
 672   
 673 -    def aggregate(self, zeroValue, seqOp, combOp): 
  674          """ 
 675          Aggregate the elements of each partition, and then the results for all 
 676          the partitions, using a given combine functions and a neutral "zero 
 677          value." 
 678   
 679          The functions C{op(t1, t2)} is allowed to modify C{t1} and return it 
 680          as its result value to avoid object allocation; however, it should not 
 681          modify C{t2}. 
 682   
 683          The first function (seqOp) can return a different result type, U, than 
 684          the type of this RDD. Thus, we need one operation for merging a T into an U 
 685          and one operation for merging two U 
 686   
 687          >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1)) 
 688          >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1])) 
 689          >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp) 
 690          (10, 4) 
 691          >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp) 
 692          (0, 0) 
 693          """ 
 694          def func(iterator): 
 695              acc = zeroValue 
 696              for obj in iterator: 
 697                  acc = seqOp(acc, obj) 
 698              yield acc 
  699   
 700          return self.mapPartitions(func).fold(zeroValue, combOp) 
 701           
 702   
 704          """ 
 705          Find the maximum item in this RDD. 
 706   
 707          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max() 
 708          43.0 
 709          """ 
 710          return self.reduce(max) 
  711   
 713          """ 
 714          Find the maximum item in this RDD. 
 715   
 716          >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min() 
 717          1.0 
 718          """ 
 719          return self.reduce(min) 
  720       
 722          """ 
 723          Add up the elements in this RDD. 
 724   
 725          >>> sc.parallelize([1.0, 2.0, 3.0]).sum() 
 726          6.0 
 727          """ 
 728          return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add) 
  729   
 731          """ 
 732          Return the number of elements in this RDD. 
 733   
 734          >>> sc.parallelize([2, 3, 4]).count() 
 735          3 
 736          """ 
 737          return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
  738   
 740          """ 
 741          Return a L{StatCounter} object that captures the mean, variance 
 742          and count of the RDD's elements in one operation. 
 743          """ 
 744          def redFunc(left_counter, right_counter): 
 745              return left_counter.mergeStats(right_counter) 
  746   
 747          return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc) 
 748   
 750          """ 
 751          Compute the mean of this RDD's elements. 
 752   
 753          >>> sc.parallelize([1, 2, 3]).mean() 
 754          2.0 
 755          """ 
 756          return self.stats().mean() 
  757   
 759          """ 
 760          Compute the variance of this RDD's elements. 
 761   
 762          >>> sc.parallelize([1, 2, 3]).variance() 
 763          0.666... 
 764          """ 
 765          return self.stats().variance() 
  766   
 768          """ 
 769          Compute the standard deviation of this RDD's elements. 
 770   
 771          >>> sc.parallelize([1, 2, 3]).stdev() 
 772          0.816... 
 773          """ 
 774          return self.stats().stdev() 
  775   
 777          """ 
 778          Compute the sample standard deviation of this RDD's elements (which corrects for bias in 
 779          estimating the standard deviation by dividing by N-1 instead of N). 
 780   
 781          >>> sc.parallelize([1, 2, 3]).sampleStdev() 
 782          1.0 
 783          """ 
 784          return self.stats().sampleStdev() 
  785   
 787          """ 
 788          Compute the sample variance of this RDD's elements (which corrects for bias in 
 789          estimating the variance by dividing by N-1 instead of N). 
 790   
 791          >>> sc.parallelize([1, 2, 3]).sampleVariance() 
 792          1.0 
 793          """ 
 794          return self.stats().sampleVariance() 
  795   
 797          """ 
 798          Return the count of each unique value in this RDD as a dictionary of 
 799          (value, count) pairs. 
 800   
 801          >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) 
 802          [(1, 2), (2, 3)] 
 803          """ 
 804          def countPartition(iterator): 
 805              counts = defaultdict(int) 
 806              for obj in iterator: 
 807                  counts[obj] += 1 
 808              yield counts 
  809          def mergeMaps(m1, m2): 
 810              for (k, v) in m2.iteritems(): 
 811                  m1[k] += v 
 812              return m1 
 813          return self.mapPartitions(countPartition).reduce(mergeMaps) 
 814       
 815 -    def top(self, num): 
  816          """ 
 817          Get the top N elements from a RDD. 
 818   
 819          Note: It returns the list sorted in descending order. 
 820          >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) 
 821          [12] 
 822          >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2) 
 823          [6, 5] 
 824          """ 
 825          def topIterator(iterator): 
 826              q = [] 
 827              for k in iterator: 
 828                  if len(q) < num: 
 829                      heapq.heappush(q, k) 
 830                  else: 
 831                      heapq.heappushpop(q, k) 
 832              yield q 
  833   
 834          def merge(a, b): 
 835              return next(topIterator(a + b)) 
 836   
 837          return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True) 
 838   
 840          """ 
 841          Get the N elements from a RDD ordered in ascending order or as specified 
 842          by the optional key function.  
 843   
 844          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6) 
 845          [1, 2, 3, 4, 5, 6] 
 846          >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x) 
 847          [10, 9, 7, 6, 5, 4] 
 848          """ 
 849   
 850          def topNKeyedElems(iterator, key_=None): 
 851              q = MaxHeapQ(num) 
 852              for k in iterator: 
 853                  if key_ != None: 
 854                      k = (key_(k), k) 
 855                  q.insert(k) 
 856              yield q.getElements() 
  857   
 858          def unKey(x, key_=None): 
 859              if key_ != None: 
 860                  x = [i[1] for i in x] 
 861              return x 
 862           
 863          def merge(a, b): 
 864              return next(topNKeyedElems(a + b)) 
 865          result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge) 
 866          return sorted(unKey(result, key), key=key) 
 867   
 868   
 869 -    def take(self, num): 
  870          """ 
 871          Take the first num elements of the RDD. 
 872   
 873          This currently scans the partitions *one by one*, so it will be slow if 
 874          a lot of partitions are required. In that case, use L{collect} to get 
 875          the whole RDD instead. 
 876   
 877          >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2) 
 878          [2, 3] 
 879          >>> sc.parallelize([2, 3, 4, 5, 6]).take(10) 
 880          [2, 3, 4, 5, 6] 
 881          """ 
 882          def takeUpToNum(iterator): 
 883              taken = 0 
 884              while taken < num: 
 885                  yield next(iterator) 
 886                  taken += 1 
  887           
 888          mapped = self.mapPartitions(takeUpToNum) 
 889          items = [] 
 890           
 891           
 892           
 893          with _JavaStackTrace(self.context) as st: 
 894              for partition in range(mapped._jrdd.splits().size()): 
 895                  partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1) 
 896                  partitionsToTake[0] = partition 
 897                  iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator() 
 898                  items.extend(mapped._collect_iterator_through_file(iterator)) 
 899                  if len(items) >= num: 
 900                      break 
 901          return items[:num] 
 902   
 904          """ 
 905          Return the first element in this RDD. 
 906   
 907          >>> sc.parallelize([2, 3, 4]).first() 
 908          2 
 909          """ 
 910          return self.take(1)[0] 
  911   
 912 -    def saveAsTextFile(self, path): 
  913          """ 
 914          Save this RDD as a text file, using string representations of elements. 
 915   
 916          >>> tempFile = NamedTemporaryFile(delete=True) 
 917          >>> tempFile.close() 
 918          >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name) 
 919          >>> from fileinput import input 
 920          >>> from glob import glob 
 921          >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) 
 922          '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' 
 923   
 924          Empty lines are tolerated when saving to text files. 
 925   
 926          >>> tempFile2 = NamedTemporaryFile(delete=True) 
 927          >>> tempFile2.close() 
 928          >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) 
 929          >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) 
 930          '\\n\\n\\nbar\\nfoo\\n' 
 931          """ 
 932          def func(split, iterator): 
 933              for x in iterator: 
 934                  if not isinstance(x, basestring): 
 935                      x = unicode(x) 
 936                  yield x.encode("utf-8") 
  937          keyed = PipelinedRDD(self, func) 
 938          keyed._bypass_serializer = True 
 939          keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) 
 940   
 941       
 942   
 944          """ 
 945          Return the key-value pairs in this RDD to the master as a dictionary. 
 946   
 947          >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 
 948          >>> m[1] 
 949          2 
 950          >>> m[3] 
 951          4 
 952          """ 
 953          return dict(self.collect()) 
  954   
 956          """ 
 957          Return an RDD with the keys of each tuple. 
 958          >>> m = sc.parallelize([(1, 2), (3, 4)]).keys() 
 959          >>> m.collect() 
 960          [1, 3] 
 961          """ 
 962          return self.map(lambda (k, v): k) 
  963   
 965          """ 
 966          Return an RDD with the values of each tuple. 
 967          >>> m = sc.parallelize([(1, 2), (3, 4)]).values() 
 968          >>> m.collect() 
 969          [2, 4] 
 970          """ 
 971          return self.map(lambda (k, v): v) 
  972   
 974          """ 
 975          Merge the values for each key using an associative reduce function. 
 976   
 977          This will also perform the merging locally on each mapper before 
 978          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 979   
 980          Output will be hash-partitioned with C{numPartitions} partitions, or 
 981          the default parallelism level if C{numPartitions} is not specified. 
 982   
 983          >>> from operator import add 
 984          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
 985          >>> sorted(rdd.reduceByKey(add).collect()) 
 986          [('a', 2), ('b', 1)] 
 987          """ 
 988          return self.combineByKey(lambda x: x, func, func, numPartitions) 
  989   
 991          """ 
 992          Merge the values for each key using an associative reduce function, but 
 993          return the results immediately to the master as a dictionary. 
 994   
 995          This will also perform the merging locally on each mapper before 
 996          sending results to a reducer, similarly to a "combiner" in MapReduce. 
 997   
 998          >>> from operator import add 
 999          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1000          >>> sorted(rdd.reduceByKeyLocally(add).items()) 
1001          [('a', 2), ('b', 1)] 
1002          """ 
1003          def reducePartition(iterator): 
1004              m = {} 
1005              for (k, v) in iterator: 
1006                  m[k] = v if k not in m else func(m[k], v) 
1007              yield m 
 1008          def mergeMaps(m1, m2): 
1009              for (k, v) in m2.iteritems(): 
1010                  m1[k] = v if k not in m1 else func(m1[k], v) 
1011              return m1 
1012          return self.mapPartitions(reducePartition).reduce(mergeMaps) 
1013   
1015          """ 
1016          Count the number of elements for each key, and return the result to the 
1017          master as a dictionary. 
1018   
1019          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1020          >>> sorted(rdd.countByKey().items()) 
1021          [('a', 2), ('b', 1)] 
1022          """ 
1023          return self.map(lambda x: x[0]).countByValue() 
 1024   
1025 -    def join(self, other, numPartitions=None): 
 1026          """ 
1027          Return an RDD containing all pairs of elements with matching keys in 
1028          C{self} and C{other}. 
1029   
1030          Each pair of elements will be returned as a (k, (v1, v2)) tuple, where 
1031          (k, v1) is in C{self} and (k, v2) is in C{other}. 
1032   
1033          Performs a hash join across the cluster. 
1034   
1035          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1036          >>> y = sc.parallelize([("a", 2), ("a", 3)]) 
1037          >>> sorted(x.join(y).collect()) 
1038          [('a', (1, 2)), ('a', (1, 3))] 
1039          """ 
1040          return python_join(self, other, numPartitions) 
 1041   
1043          """ 
1044          Perform a left outer join of C{self} and C{other}. 
1045   
1046          For each element (k, v) in C{self}, the resulting RDD will either 
1047          contain all pairs (k, (v, w)) for w in C{other}, or the pair 
1048          (k, (v, None)) if no elements in other have key k. 
1049   
1050          Hash-partitions the resulting RDD into the given number of partitions. 
1051   
1052          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1053          >>> y = sc.parallelize([("a", 2)]) 
1054          >>> sorted(x.leftOuterJoin(y).collect()) 
1055          [('a', (1, 2)), ('b', (4, None))] 
1056          """ 
1057          return python_left_outer_join(self, other, numPartitions) 
 1058   
1060          """ 
1061          Perform a right outer join of C{self} and C{other}. 
1062   
1063          For each element (k, w) in C{other}, the resulting RDD will either 
1064          contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) 
1065          if no elements in C{self} have key k. 
1066   
1067          Hash-partitions the resulting RDD into the given number of partitions. 
1068   
1069          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1070          >>> y = sc.parallelize([("a", 2)]) 
1071          >>> sorted(y.rightOuterJoin(x).collect()) 
1072          [('a', (2, 1)), ('b', (None, 4))] 
1073          """ 
1074          return python_right_outer_join(self, other, numPartitions) 
 1075   
1076       
1077       
1078       
1079 -    def partitionBy(self, numPartitions, partitionFunc=portable_hash): 
 1080          """ 
1081          Return a copy of the RDD partitioned using the specified partitioner. 
1082   
1083          >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x)) 
1084          >>> sets = pairs.partitionBy(2).glom().collect() 
1085          >>> set(sets[0]).intersection(set(sets[1])) 
1086          set([]) 
1087          """ 
1088          if numPartitions is None: 
1089              numPartitions = self.ctx.defaultParallelism 
1090   
1091           
1092           
1093           
1094          outputSerializer = self.ctx._unbatched_serializer 
1095          def add_shuffle_key(split, iterator): 
1096   
1097              buckets = defaultdict(list) 
1098   
1099              for (k, v) in iterator: 
1100                  buckets[partitionFunc(k) % numPartitions].append((k, v)) 
1101              for (split, items) in buckets.iteritems(): 
1102                  yield pack_long(split) 
1103                  yield outputSerializer.dumps(items) 
 1104          keyed = PipelinedRDD(self, add_shuffle_key) 
1105          keyed._bypass_serializer = True 
1106          with _JavaStackTrace(self.context) as st: 
1107              pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() 
1108              partitioner = self.ctx._jvm.PythonPartitioner(numPartitions, 
1109                                                            id(partitionFunc)) 
1110          jrdd = pairRDD.partitionBy(partitioner).values() 
1111          rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) 
1112           
1113           
1114          rdd._partitionFunc = partitionFunc 
1115          return rdd 
1116   
1117       
1118 -    def combineByKey(self, createCombiner, mergeValue, mergeCombiners, 
1119                       numPartitions=None): 
 1120          """ 
1121          Generic function to combine the elements for each key using a custom 
1122          set of aggregation functions. 
1123   
1124          Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined 
1125          type" C.  Note that V and C can be different -- for example, one might 
1126          group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). 
1127   
1128          Users provide three functions: 
1129   
1130              - C{createCombiner}, which turns a V into a C (e.g., creates 
1131                a one-element list) 
1132              - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of 
1133                a list) 
1134              - C{mergeCombiners}, to combine two C's into a single one. 
1135   
1136          In addition, users can control the partitioning of the output RDD. 
1137   
1138          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1139          >>> def f(x): return x 
1140          >>> def add(a, b): return a + str(b) 
1141          >>> sorted(x.combineByKey(str, add, add).collect()) 
1142          [('a', '11'), ('b', '1')] 
1143          """ 
1144          if numPartitions is None: 
1145              numPartitions = self.ctx.defaultParallelism 
1146          def combineLocally(iterator): 
1147              combiners = {} 
1148              for x in iterator: 
1149                  (k, v) = x 
1150                  if k not in combiners: 
1151                      combiners[k] = createCombiner(v) 
1152                  else: 
1153                      combiners[k] = mergeValue(combiners[k], v) 
1154              return combiners.iteritems() 
 1155          locally_combined = self.mapPartitions(combineLocally) 
1156          shuffled = locally_combined.partitionBy(numPartitions) 
1157          def _mergeCombiners(iterator): 
1158              combiners = {} 
1159              for (k, v) in iterator: 
1160                  if not k in combiners: 
1161                      combiners[k] = v 
1162                  else: 
1163                      combiners[k] = mergeCombiners(combiners[k], v) 
1164              return combiners.iteritems() 
1165          return shuffled.mapPartitions(_mergeCombiners) 
1166       
1167 -    def foldByKey(self, zeroValue, func, numPartitions=None): 
 1168          """ 
1169          Merge the values for each key using an associative function "func" and a neutral "zeroValue" 
1170          which may be added to the result an arbitrary number of times, and must not change  
1171          the result (e.g., 0 for addition, or 1 for multiplication.).                 
1172   
1173          >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1174          >>> from operator import add 
1175          >>> rdd.foldByKey(0, add).collect() 
1176          [('a', 2), ('b', 1)] 
1177          """ 
1178          return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions) 
 1179       
1180       
1181       
1183          """ 
1184          Group the values for each key in the RDD into a single sequence. 
1185          Hash-partitions the resulting RDD with into numPartitions partitions. 
1186   
1187          Note: If you are grouping in order to perform an aggregation (such as a 
1188          sum or average) over each key, using reduceByKey will provide much better 
1189          performance. 
1190   
1191          >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
1192          >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect())) 
1193          [('a', [1, 1]), ('b', [1])] 
1194          """ 
1195   
1196          def createCombiner(x): 
1197              return [x] 
 1198   
1199          def mergeValue(xs, x): 
1200              xs.append(x) 
1201              return xs 
1202   
1203          def mergeCombiners(a, b): 
1204              return a + b 
1205   
1206          return self.combineByKey(createCombiner, mergeValue, mergeCombiners, 
1207                  numPartitions).mapValues(lambda x: ResultIterable(x)) 
1208   
1209       
1211          """ 
1212          Pass each value in the key-value pair RDD through a flatMap function 
1213          without changing the keys; this also retains the original RDD's 
1214          partitioning. 
1215   
1216          >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) 
1217          >>> def f(x): return x 
1218          >>> x.flatMapValues(f).collect() 
1219          [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')] 
1220          """ 
1221          flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) 
1222          return self.flatMap(flat_map_fn, preservesPartitioning=True) 
 1223   
1225          """ 
1226          Pass each value in the key-value pair RDD through a map function 
1227          without changing the keys; this also retains the original RDD's 
1228          partitioning. 
1229   
1230          >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) 
1231          >>> def f(x): return len(x) 
1232          >>> x.mapValues(f).collect() 
1233          [('a', 3), ('b', 1)] 
1234          """ 
1235          map_values_fn = lambda (k, v): (k, f(v)) 
1236          return self.map(map_values_fn, preservesPartitioning=True) 
 1237   
1238       
1240          """ 
1241          Alias for cogroup. 
1242          """ 
1243          return self.cogroup(other) 
 1244   
1245       
1246 -    def cogroup(self, other, numPartitions=None): 
 1247          """ 
1248          For each key k in C{self} or C{other}, return a resulting RDD that 
1249          contains a tuple with the list of values for that key in C{self} as well 
1250          as C{other}. 
1251   
1252          >>> x = sc.parallelize([("a", 1), ("b", 4)]) 
1253          >>> y = sc.parallelize([("a", 2)]) 
1254          >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect()))) 
1255          [('a', ([1], [2])), ('b', ([4], []))] 
1256          """ 
1257          return python_cogroup(self, other, numPartitions) 
 1258   
1260          """ 
1261          Return each (key, value) pair in C{self} that has no pair with matching key 
1262          in C{other}. 
1263   
1264          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) 
1265          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1266          >>> sorted(x.subtractByKey(y).collect()) 
1267          [('b', 4), ('b', 5)] 
1268          """ 
1269          filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0 
1270          map_func = lambda (key, vals): [(key, val) for val in vals[0]] 
1271          return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func) 
 1272   
1273 -    def subtract(self, other, numPartitions=None): 
 1274          """ 
1275          Return each value in C{self} that is not contained in C{other}. 
1276   
1277          >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) 
1278          >>> y = sc.parallelize([("a", 3), ("c", None)]) 
1279          >>> sorted(x.subtract(y).collect()) 
1280          [('a', 1), ('b', 4), ('b', 5)] 
1281          """ 
1282          rdd = other.map(lambda x: (x, True))  
1283          return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])  
 1284   
1286          """ 
1287          Creates tuples of the elements in this RDD by applying C{f}. 
1288   
1289          >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) 
1290          >>> y = sc.parallelize(zip(range(0,5), range(0,5))) 
1291          >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect())) 
1292          [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))] 
1293          """ 
1294          return self.map(lambda x: (f(x), x)) 
 1295   
1297          """ 
1298           Return a new RDD that has exactly numPartitions partitions. 
1299             
1300           Can increase or decrease the level of parallelism in this RDD. Internally, this uses 
1301           a shuffle to redistribute data. 
1302           If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 
1303           which can avoid performing a shuffle. 
1304           >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4) 
1305           >>> sorted(rdd.glom().collect()) 
1306           [[1], [2, 3], [4, 5], [6, 7]] 
1307           >>> len(rdd.repartition(2).glom().collect()) 
1308           2 
1309           >>> len(rdd.repartition(10).glom().collect()) 
1310           10 
1311          """ 
1312          jrdd = self._jrdd.repartition(numPartitions) 
1313          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1314   
1315 -    def coalesce(self, numPartitions, shuffle=False): 
 1316          """ 
1317          Return a new RDD that is reduced into `numPartitions` partitions. 
1318          >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect() 
1319          [[1], [2, 3], [4, 5]] 
1320          >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 
1321          [[1, 2, 3, 4, 5]] 
1322          """ 
1323          jrdd = self._jrdd.coalesce(numPartitions) 
1324          return RDD(jrdd, self.ctx, self._jrdd_deserializer) 
 1325   
1326 -    def zip(self, other): 
 1327          """ 
1328          Zips this RDD with another one, returning key-value pairs with the first element in each RDD 
1329          second element in each RDD, etc. Assumes that the two RDDs have the same number of 
1330          partitions and the same number of elements in each partition (e.g. one was made through 
1331          a map on the other). 
1332   
1333          >>> x = sc.parallelize(range(0,5)) 
1334          >>> y = sc.parallelize(range(1000, 1005)) 
1335          >>> x.zip(y).collect() 
1336          [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)] 
1337          """ 
1338          pairRDD = self._jrdd.zip(other._jrdd) 
1339          deserializer = PairDeserializer(self._jrdd_deserializer, 
1340                                               other._jrdd_deserializer) 
1341          return RDD(pairRDD, self.ctx, deserializer) 
 1342   
1344          """ 
1345          Return the name of this RDD. 
1346          """ 
1347          name_ = self._jrdd.name() 
1348          if not name_: 
1349              return None 
1350          return name_.encode('utf-8') 
 1351   
1353          """ 
1354          Assign a name to this RDD. 
1355          >>> rdd1 = sc.parallelize([1,2]) 
1356          >>> rdd1.setName('RDD1') 
1357          >>> rdd1.name() 
1358          'RDD1' 
1359          """ 
1360          self._jrdd.setName(name) 
 1361   
1363          """ 
1364          A description of this RDD and its recursive dependencies for debugging. 
1365          """ 
1366          debug_string = self._jrdd.toDebugString() 
1367          if not debug_string: 
1368              return None 
1369          return debug_string.encode('utf-8') 
 1370   
1372          """ 
1373          Get the RDD's current storage level. 
1374          >>> rdd1 = sc.parallelize([1,2]) 
1375          >>> rdd1.getStorageLevel() 
1376          StorageLevel(False, False, False, False, 1) 
1377          """ 
1378          java_storage_level = self._jrdd.getStorageLevel() 
1379          storage_level = StorageLevel(java_storage_level.useDisk(), 
1380                                       java_storage_level.useMemory(), 
1381                                       java_storage_level.useOffHeap(), 
1382                                       java_storage_level.deserialized(), 
1383                                       java_storage_level.replication()) 
1384          return storage_level 
 1385   
1392      """ 
1393      Pipelined maps: 
1394      >>> rdd = sc.parallelize([1, 2, 3, 4]) 
1395      >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect() 
1396      [4, 8, 12, 16] 
1397      >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect() 
1398      [4, 8, 12, 16] 
1399   
1400      Pipelined reduces: 
1401      >>> from operator import add 
1402      >>> rdd.map(lambda x: 2 * x).reduce(add) 
1403      20 
1404      >>> rdd.flatMap(lambda x: [x, x]).reduce(add) 
1405      20 
1406      """ 
1407 -    def __init__(self, prev, func, preservesPartitioning=False): 
 1408          if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable(): 
1409               
1410              self.func = func 
1411              self.preservesPartitioning = preservesPartitioning 
1412              self._prev_jrdd = prev._jrdd 
1413              self._prev_jrdd_deserializer = prev._jrdd_deserializer 
1414          else: 
1415              prev_func = prev.func 
1416              def pipeline_func(split, iterator): 
1417                  return func(split, prev_func(split, iterator)) 
 1418              self.func = pipeline_func 
1419              self.preservesPartitioning = \ 
1420                  prev.preservesPartitioning and preservesPartitioning 
1421              self._prev_jrdd = prev._prev_jrdd   
1422              self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer 
1423          self.is_cached = False 
1424          self.is_checkpointed = False 
1425          self.ctx = prev.ctx 
1426          self.prev = prev 
1427          self._jrdd_val = None 
1428          self._jrdd_deserializer = self.ctx.serializer 
1429          self._bypass_serializer = False 
 1430   
1431      @property 
1433          if self._jrdd_val: 
1434              return self._jrdd_val 
1435          if self._bypass_serializer: 
1436              serializer = NoOpSerializer() 
1437          else: 
1438              serializer = self.ctx.serializer 
1439          command = (self.func, self._prev_jrdd_deserializer, serializer) 
1440          pickled_command = CloudPickleSerializer().dumps(command) 
1441          broadcast_vars = ListConverter().convert( 
1442              [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], 
1443              self.ctx._gateway._gateway_client) 
1444          self.ctx._pickled_broadcast_vars.clear() 
1445          class_tag = self._prev_jrdd.classTag() 
1446          env = MapConverter().convert(self.ctx.environment, 
1447                                       self.ctx._gateway._gateway_client) 
1448          includes = ListConverter().convert(self.ctx._python_includes, 
1449                                       self.ctx._gateway._gateway_client) 
1450          python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), 
1451              bytearray(pickled_command), env, includes, self.preservesPartitioning, 
1452              self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator, 
1453              class_tag) 
1454          self._jrdd_val = python_rdd.asJavaRDD() 
1455          return self._jrdd_val 
 1456   
1458          return not (self.is_cached or self.is_checkpointed) 
 1459   
1462      import doctest 
1463      from pyspark.context import SparkContext 
1464      globs = globals().copy() 
1465       
1466       
1467      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
1468      (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS) 
1469      globs['sc'].stop() 
1470      if failure_count: 
1471          exit(-1) 
 1472   
1473   
1474  if __name__ == "__main__": 
1475      _test() 
1476