1   
  2   
  3   
  4   
  5   
  6   
  7   
  8   
  9   
 10   
 11   
 12   
 13   
 14   
 15   
 16   
 17   
 18  import os 
 19  import shutil 
 20  import sys 
 21  from threading import Lock 
 22  from tempfile import NamedTemporaryFile 
 23  from collections import namedtuple 
 24   
 25  from pyspark import accumulators 
 26  from pyspark.accumulators import Accumulator 
 27  from pyspark.broadcast import Broadcast 
 28  from pyspark.conf import SparkConf 
 29  from pyspark.files import SparkFiles 
 30  from pyspark.java_gateway import launch_gateway 
 31  from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ 
 32      PairDeserializer, CompressedSerializer 
 33  from pyspark.storagelevel import StorageLevel 
 34  from pyspark import rdd 
 35  from pyspark.rdd import RDD 
 36   
 37  from py4j.java_collections import ListConverter 
 38   
 39   
 40   
 41   
 42  DEFAULT_CONFIGS = { 
 43      "spark.serializer": "org.apache.spark.serializer.KryoSerializer", 
 44      "spark.serializer.objectStreamReset": 100, 
 45      "spark.rdd.compress": True, 
 46  } 
 47   
 48   
 49 -class SparkContext(object): 
  50   
 51      """ 
 52      Main entry point for Spark functionality. A SparkContext represents the 
 53      connection to a Spark cluster, and can be used to create L{RDD}s and 
 54      broadcast variables on that cluster. 
 55      """ 
 56   
 57      _gateway = None 
 58      _jvm = None 
 59      _writeToFile = None 
 60      _next_accum_id = 0 
 61      _active_spark_context = None 
 62      _lock = Lock() 
 63      _python_includes = None   
 64      _default_batch_size_for_serialized_input = 10 
 65   
 66 -    def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, 
 67                   environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None, 
 68                   gateway=None): 
  69          """ 
 70          Create a new SparkContext. At least the master and app name should be set, 
 71          either through the named parameters here or through C{conf}. 
 72   
 73          @param master: Cluster URL to connect to 
 74                 (e.g. mesos://host:port, spark://host:port, local[4]). 
 75          @param appName: A name for your job, to display on the cluster web UI. 
 76          @param sparkHome: Location where Spark is installed on cluster nodes. 
 77          @param pyFiles: Collection of .zip or .py files to send to the cluster 
 78                 and add to PYTHONPATH.  These can be paths on the local file 
 79                 system or HDFS, HTTP, HTTPS, or FTP URLs. 
 80          @param environment: A dictionary of environment variables to set on 
 81                 worker nodes. 
 82          @param batchSize: The number of Python objects represented as a single 
 83                 Java object.  Set 1 to disable batching or -1 to use an 
 84                 unlimited batch size. 
 85          @param serializer: The serializer for RDDs. 
 86          @param conf: A L{SparkConf} object setting Spark properties. 
 87          @param gateway: Use an existing gateway and JVM, otherwise a new JVM 
 88                 will be instantiated. 
 89   
 90   
 91          >>> from pyspark.context import SparkContext 
 92          >>> sc = SparkContext('local', 'test') 
 93   
 94          >>> sc2 = SparkContext('local', 'test2') # doctest: +IGNORE_EXCEPTION_DETAIL 
 95          Traceback (most recent call last): 
 96              ... 
 97          ValueError:... 
 98          """ 
 99          if rdd._extract_concise_traceback() is not None: 
100              self._callsite = rdd._extract_concise_traceback() 
101          else: 
102              tempNamedTuple = namedtuple("Callsite", "function file linenum") 
103              self._callsite = tempNamedTuple(function=None, file=None, linenum=None) 
104          SparkContext._ensure_initialized(self, gateway=gateway) 
105          try: 
106              self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, 
107                            conf) 
108          except: 
109               
110              self.stop() 
111              raise 
 112   
113 -    def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, 
114                   conf): 
 115          self.environment = environment or {} 
116          self._conf = conf or SparkConf(_jvm=self._jvm) 
117          self._batchSize = batchSize   
118          self._unbatched_serializer = serializer 
119          if batchSize == 1: 
120              self.serializer = self._unbatched_serializer 
121          else: 
122              self.serializer = BatchedSerializer(self._unbatched_serializer, 
123                                                  batchSize) 
124   
125           
126          if master: 
127              self._conf.setMaster(master) 
128          if appName: 
129              self._conf.setAppName(appName) 
130          if sparkHome: 
131              self._conf.setSparkHome(sparkHome) 
132          if environment: 
133              for key, value in environment.iteritems(): 
134                  self._conf.setExecutorEnv(key, value) 
135          for key, value in DEFAULT_CONFIGS.items(): 
136              self._conf.setIfMissing(key, value) 
137   
138           
139          if not self._conf.contains("spark.master"): 
140              raise Exception("A master URL must be set in your configuration") 
141          if not self._conf.contains("spark.app.name"): 
142              raise Exception("An application name must be set in your configuration") 
143   
144           
145           
146          self.master = self._conf.get("spark.master") 
147          self.appName = self._conf.get("spark.app.name") 
148          self.sparkHome = self._conf.get("spark.home", None) 
149          for (k, v) in self._conf.getAll(): 
150              if k.startswith("spark.executorEnv."): 
151                  varName = k[len("spark.executorEnv."):] 
152                  self.environment[varName] = v 
153   
154           
155          self._jsc = self._initialize_context(self._conf._jconf) 
156   
157           
158           
159          self._accumulatorServer = accumulators._start_update_server() 
160          (host, port) = self._accumulatorServer.server_address 
161          self._javaAccumulator = self._jsc.accumulator( 
162              self._jvm.java.util.ArrayList(), 
163              self._jvm.PythonAccumulatorParam(host, port)) 
164   
165          self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') 
166   
167           
168           
169           
170           
171          self._pickled_broadcast_vars = set() 
172   
173          SparkFiles._sc = self 
174          root_dir = SparkFiles.getRootDirectory() 
175          sys.path.append(root_dir) 
176   
177           
178          self._python_includes = list() 
179          for path in (pyFiles or []): 
180              self.addPyFile(path) 
181   
182           
183           
184          for path in self._conf.get("spark.submit.pyFiles", "").split(","): 
185              if path != "": 
186                  (dirname, filename) = os.path.split(path) 
187                  self._python_includes.append(filename) 
188                  sys.path.append(path) 
189                  if dirname not in sys.path: 
190                      sys.path.append(dirname) 
191   
192           
193          local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) 
194          self._temp_dir = \ 
195              self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath() 
 196   
197 -    def _initialize_context(self, jconf): 
 198          """ 
199          Initialize SparkContext in function to allow subclass specific initialization 
200          """ 
201          return self._jvm.JavaSparkContext(jconf) 
 202   
203      @classmethod 
204 -    def _ensure_initialized(cls, instance=None, gateway=None): 
 205          """ 
206          Checks whether a SparkContext is initialized or not. 
207          Throws error if a SparkContext is already running. 
208          """ 
209          with SparkContext._lock: 
210              if not SparkContext._gateway: 
211                  SparkContext._gateway = gateway or launch_gateway() 
212                  SparkContext._jvm = SparkContext._gateway.jvm 
213                  SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile 
214   
215              if instance: 
216                  if (SparkContext._active_spark_context and 
217                          SparkContext._active_spark_context != instance): 
218                      currentMaster = SparkContext._active_spark_context.master 
219                      currentAppName = SparkContext._active_spark_context.appName 
220                      callsite = SparkContext._active_spark_context._callsite 
221   
222                       
223                      raise ValueError( 
224                          "Cannot run multiple SparkContexts at once; " 
225                          "existing SparkContext(app=%s, master=%s)" 
226                          " created by %s at %s:%s " 
227                          % (currentAppName, currentMaster, 
228                              callsite.function, callsite.file, callsite.linenum)) 
229                  else: 
230                      SparkContext._active_spark_context = instance 
 231   
232      @classmethod 
233 -    def setSystemProperty(cls, key, value): 
 234          """ 
235          Set a Java system property, such as spark.executor.memory. This must 
236          must be invoked before instantiating SparkContext. 
237          """ 
238          SparkContext._ensure_initialized() 
239          SparkContext._jvm.java.lang.System.setProperty(key, value) 
 240   
241      @property 
243          """ 
244          The version of Spark on which this application is running. 
245          """ 
246          return self._jsc.version() 
 247   
248      @property 
250          """ 
251          Default level of parallelism to use when not given by user (e.g. for 
252          reduce tasks) 
253          """ 
254          return self._jsc.sc().defaultParallelism() 
 255   
256      @property 
258          """ 
259          Default min number of partitions for Hadoop RDDs when not given by user 
260          """ 
261          return self._jsc.sc().defaultMinPartitions() 
 262   
264          """ 
265          Shut down the SparkContext. 
266          """ 
267          if getattr(self, "_jsc", None): 
268              self._jsc.stop() 
269              self._jsc = None 
270          if getattr(self, "_accumulatorServer", None): 
271              self._accumulatorServer.shutdown() 
272              self._accumulatorServer = None 
273          with SparkContext._lock: 
274              SparkContext._active_spark_context = None 
 275   
276 -    def parallelize(self, c, numSlices=None): 
 277          """ 
278          Distribute a local Python collection to form an RDD. 
279   
280          >>> sc.parallelize(range(5), 5).glom().collect() 
281          [[0], [1], [2], [3], [4]] 
282          """ 
283          numSlices = numSlices or self.defaultParallelism 
284           
285           
286           
287          tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) 
288           
289          if "__len__" not in dir(c): 
290              c = list(c)     
291          batchSize = min(len(c) // numSlices, self._batchSize) 
292          if batchSize > 1: 
293              serializer = BatchedSerializer(self._unbatched_serializer, 
294                                             batchSize) 
295          else: 
296              serializer = self._unbatched_serializer 
297          serializer.dump_stream(c, tempFile) 
298          tempFile.close() 
299          readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile 
300          jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) 
301          return RDD(jrdd, self, serializer) 
 302   
303 -    def pickleFile(self, name, minPartitions=None): 
 304          """ 
305          Load an RDD previously saved using L{RDD.saveAsPickleFile} method. 
306   
307          >>> tmpFile = NamedTemporaryFile(delete=True) 
308          >>> tmpFile.close() 
309          >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) 
310          >>> sorted(sc.pickleFile(tmpFile.name, 3).collect()) 
311          [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 
312          """ 
313          minPartitions = minPartitions or self.defaultMinPartitions 
314          return RDD(self._jsc.objectFile(name, minPartitions), self, 
315                     BatchedSerializer(PickleSerializer())) 
 316   
317 -    def textFile(self, name, minPartitions=None): 
 318          """ 
319          Read a text file from HDFS, a local file system (available on all 
320          nodes), or any Hadoop-supported file system URI, and return it as an 
321          RDD of Strings. 
322   
323          >>> path = os.path.join(tempdir, "sample-text.txt") 
324          >>> with open(path, "w") as testFile: 
325          ...    testFile.write("Hello world!") 
326          >>> textFile = sc.textFile(path) 
327          >>> textFile.collect() 
328          [u'Hello world!'] 
329          """ 
330          minPartitions = minPartitions or min(self.defaultParallelism, 2) 
331          return RDD(self._jsc.textFile(name, minPartitions), self, 
332                     UTF8Deserializer()) 
 333   
334 -    def wholeTextFiles(self, path, minPartitions=None): 
 335          """ 
336          Read a directory of text files from HDFS, a local file system 
337          (available on all nodes), or any  Hadoop-supported file system 
338          URI. Each file is read as a single record and returned in a 
339          key-value pair, where the key is the path of each file, the 
340          value is the content of each file. 
341   
342          For example, if you have the following files:: 
343   
344            hdfs://a-hdfs-path/part-00000 
345            hdfs://a-hdfs-path/part-00001 
346            ... 
347            hdfs://a-hdfs-path/part-nnnnn 
348   
349          Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")}, 
350          then C{rdd} contains:: 
351   
352            (a-hdfs-path/part-00000, its content) 
353            (a-hdfs-path/part-00001, its content) 
354            ... 
355            (a-hdfs-path/part-nnnnn, its content) 
356   
357          NOTE: Small files are preferred, as each file will be loaded 
358          fully in memory. 
359   
360          >>> dirPath = os.path.join(tempdir, "files") 
361          >>> os.mkdir(dirPath) 
362          >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1: 
363          ...    file1.write("1") 
364          >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2: 
365          ...    file2.write("2") 
366          >>> textFiles = sc.wholeTextFiles(dirPath) 
367          >>> sorted(textFiles.collect()) 
368          [(u'.../1.txt', u'1'), (u'.../2.txt', u'2')] 
369          """ 
370          minPartitions = minPartitions or self.defaultMinPartitions 
371          return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, 
372                     PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) 
 373   
374 -    def _dictToJavaMap(self, d): 
 375          jm = self._jvm.java.util.HashMap() 
376          if not d: 
377              d = {} 
378          for k, v in d.iteritems(): 
379              jm[k] = v 
380          return jm 
 381   
382 -    def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, 
383                       valueConverter=None, minSplits=None, batchSize=None): 
 384          """ 
385          Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, 
386          a local file system (available on all nodes), or any Hadoop-supported file system URI. 
387          The mechanism is as follows: 
388              1. A Java RDD is created from the SequenceFile or other InputFormat, and the key 
389                 and value Writable classes 
390              2. Serialization is attempted via Pyrolite pickling 
391              3. If this fails, the fallback is to call 'toString' on each key and value 
392              4. C{PickleSerializer} is used to deserialize pickled objects on the Python side 
393   
394          @param path: path to sequncefile 
395          @param keyClass: fully qualified classname of key Writable class 
396                 (e.g. "org.apache.hadoop.io.Text") 
397          @param valueClass: fully qualified classname of value Writable class 
398                 (e.g. "org.apache.hadoop.io.LongWritable") 
399          @param keyConverter: 
400          @param valueConverter: 
401          @param minSplits: minimum splits in dataset 
402                 (default min(2, sc.defaultParallelism)) 
403          @param batchSize: The number of Python objects represented as a single 
404                 Java object. (default sc._default_batch_size_for_serialized_input) 
405          """ 
406          minSplits = minSplits or min(self.defaultParallelism, 2) 
407          batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 
408          ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 
409          jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, 
410                                                  keyConverter, valueConverter, minSplits, batchSize) 
411          return RDD(jrdd, self, ser) 
 412   
413 -    def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, 
414                           valueConverter=None, conf=None, batchSize=None): 
 415          """ 
416          Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, 
417          a local file system (available on all nodes), or any Hadoop-supported file system URI. 
418          The mechanism is the same as for sc.sequenceFile. 
419   
420          A Hadoop configuration can be passed in as a Python dict. This will be converted into a 
421          Configuration in Java 
422   
423          @param path: path to Hadoop file 
424          @param inputFormatClass: fully qualified classname of Hadoop InputFormat 
425                 (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") 
426          @param keyClass: fully qualified classname of key Writable class 
427                 (e.g. "org.apache.hadoop.io.Text") 
428          @param valueClass: fully qualified classname of value Writable class 
429                 (e.g. "org.apache.hadoop.io.LongWritable") 
430          @param keyConverter: (None by default) 
431          @param valueConverter: (None by default) 
432          @param conf: Hadoop configuration, passed in as a dict 
433                 (None by default) 
434          @param batchSize: The number of Python objects represented as a single 
435                 Java object. (default sc._default_batch_size_for_serialized_input) 
436          """ 
437          jconf = self._dictToJavaMap(conf) 
438          batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 
439          ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 
440          jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, 
441                                                      valueClass, keyConverter, valueConverter, 
442                                                      jconf, batchSize) 
443          return RDD(jrdd, self, ser) 
 444   
445 -    def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, 
446                          valueConverter=None, conf=None, batchSize=None): 
 447          """ 
448          Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary 
449          Hadoop configuration, which is passed in as a Python dict. 
450          This will be converted into a Configuration in Java. 
451          The mechanism is the same as for sc.sequenceFile. 
452   
453          @param inputFormatClass: fully qualified classname of Hadoop InputFormat 
454                 (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") 
455          @param keyClass: fully qualified classname of key Writable class 
456                 (e.g. "org.apache.hadoop.io.Text") 
457          @param valueClass: fully qualified classname of value Writable class 
458                 (e.g. "org.apache.hadoop.io.LongWritable") 
459          @param keyConverter: (None by default) 
460          @param valueConverter: (None by default) 
461          @param conf: Hadoop configuration, passed in as a dict 
462                 (None by default) 
463          @param batchSize: The number of Python objects represented as a single 
464                 Java object. (default sc._default_batch_size_for_serialized_input) 
465          """ 
466          jconf = self._dictToJavaMap(conf) 
467          batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 
468          ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 
469          jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, 
470                                                     valueClass, keyConverter, valueConverter, 
471                                                     jconf, batchSize) 
472          return RDD(jrdd, self, ser) 
 473   
474 -    def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, 
475                     valueConverter=None, conf=None, batchSize=None): 
 476          """ 
477          Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, 
478          a local file system (available on all nodes), or any Hadoop-supported file system URI. 
479          The mechanism is the same as for sc.sequenceFile. 
480   
481          A Hadoop configuration can be passed in as a Python dict. This will be converted into a 
482          Configuration in Java. 
483   
484          @param path: path to Hadoop file 
485          @param inputFormatClass: fully qualified classname of Hadoop InputFormat 
486                 (e.g. "org.apache.hadoop.mapred.TextInputFormat") 
487          @param keyClass: fully qualified classname of key Writable class 
488                 (e.g. "org.apache.hadoop.io.Text") 
489          @param valueClass: fully qualified classname of value Writable class 
490                 (e.g. "org.apache.hadoop.io.LongWritable") 
491          @param keyConverter: (None by default) 
492          @param valueConverter: (None by default) 
493          @param conf: Hadoop configuration, passed in as a dict 
494                 (None by default) 
495          @param batchSize: The number of Python objects represented as a single 
496                 Java object. (default sc._default_batch_size_for_serialized_input) 
497          """ 
498          jconf = self._dictToJavaMap(conf) 
499          batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 
500          ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 
501          jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, 
502                                                valueClass, keyConverter, valueConverter, 
503                                                jconf, batchSize) 
504          return RDD(jrdd, self, ser) 
 505   
506 -    def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, 
507                    valueConverter=None, conf=None, batchSize=None): 
 508          """ 
509          Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary 
510          Hadoop configuration, which is passed in as a Python dict. 
511          This will be converted into a Configuration in Java. 
512          The mechanism is the same as for sc.sequenceFile. 
513   
514          @param inputFormatClass: fully qualified classname of Hadoop InputFormat 
515                 (e.g. "org.apache.hadoop.mapred.TextInputFormat") 
516          @param keyClass: fully qualified classname of key Writable class 
517                 (e.g. "org.apache.hadoop.io.Text") 
518          @param valueClass: fully qualified classname of value Writable class 
519                 (e.g. "org.apache.hadoop.io.LongWritable") 
520          @param keyConverter: (None by default) 
521          @param valueConverter: (None by default) 
522          @param conf: Hadoop configuration, passed in as a dict 
523                 (None by default) 
524          @param batchSize: The number of Python objects represented as a single 
525                 Java object. (default sc._default_batch_size_for_serialized_input) 
526          """ 
527          jconf = self._dictToJavaMap(conf) 
528          batchSize = max(1, batchSize or self._default_batch_size_for_serialized_input) 
529          ser = BatchedSerializer(PickleSerializer()) if (batchSize > 1) else PickleSerializer() 
530          jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, 
531                                               valueClass, keyConverter, valueConverter, 
532                                               jconf, batchSize) 
533          return RDD(jrdd, self, ser) 
 534   
535 -    def _checkpointFile(self, name, input_deserializer): 
 536          jrdd = self._jsc.checkpointFile(name) 
537          return RDD(jrdd, self, input_deserializer) 
 538   
539 -    def union(self, rdds): 
 540          """ 
541          Build the union of a list of RDDs. 
542   
543          This supports unions() of RDDs with different serialized formats, 
544          although this forces them to be reserialized using the default 
545          serializer: 
546   
547          >>> path = os.path.join(tempdir, "union-text.txt") 
548          >>> with open(path, "w") as testFile: 
549          ...    testFile.write("Hello") 
550          >>> textFile = sc.textFile(path) 
551          >>> textFile.collect() 
552          [u'Hello'] 
553          >>> parallelized = sc.parallelize(["World!"]) 
554          >>> sorted(sc.union([textFile, parallelized]).collect()) 
555          [u'Hello', 'World!'] 
556          """ 
557          first_jrdd_deserializer = rdds[0]._jrdd_deserializer 
558          if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds): 
559              rdds = [x._reserialize() for x in rdds] 
560          first = rdds[0]._jrdd 
561          rest = [x._jrdd for x in rdds[1:]] 
562          rest = ListConverter().convert(rest, self._gateway._gateway_client) 
563          return RDD(self._jsc.union(first, rest), self, rdds[0]._jrdd_deserializer) 
 564   
565 -    def broadcast(self, value): 
 566          """ 
567          Broadcast a read-only variable to the cluster, returning a 
568          L{Broadcast<pyspark.broadcast.Broadcast>} 
569          object for reading it in distributed functions. The variable will 
570          be sent to each cluster only once. 
571          """ 
572          ser = CompressedSerializer(PickleSerializer()) 
573           
574          tempFile = NamedTemporaryFile(delete=False, dir=self._temp_dir) 
575          ser.dump_stream([value], tempFile) 
576          tempFile.close() 
577          jbroadcast = self._jvm.PythonRDD.readBroadcastFromFile(self._jsc, tempFile.name) 
578          return Broadcast(jbroadcast.id(), None, jbroadcast, 
579                           self._pickled_broadcast_vars, tempFile.name) 
 580   
581 -    def accumulator(self, value, accum_param=None): 
 582          """ 
583          Create an L{Accumulator} with the given initial value, using a given 
584          L{AccumulatorParam} helper object to define how to add values of the 
585          data type if provided. Default AccumulatorParams are used for integers 
586          and floating-point numbers if you do not provide one. For other types, 
587          a custom AccumulatorParam can be used. 
588          """ 
589          if accum_param is None: 
590              if isinstance(value, int): 
591                  accum_param = accumulators.INT_ACCUMULATOR_PARAM 
592              elif isinstance(value, float): 
593                  accum_param = accumulators.FLOAT_ACCUMULATOR_PARAM 
594              elif isinstance(value, complex): 
595                  accum_param = accumulators.COMPLEX_ACCUMULATOR_PARAM 
596              else: 
597                  raise Exception("No default accumulator param for type %s" % type(value)) 
598          SparkContext._next_accum_id += 1 
599          return Accumulator(SparkContext._next_accum_id - 1, value, accum_param) 
 600   
601 -    def addFile(self, path): 
 602          """ 
603          Add a file to be downloaded with this Spark job on every node. 
604          The C{path} passed can be either a local file, a file in HDFS 
605          (or other Hadoop-supported filesystems), or an HTTP, HTTPS or 
606          FTP URI. 
607   
608          To access the file in Spark jobs, use 
609          L{SparkFiles.get(path)<pyspark.files.SparkFiles.get>} to find its 
610          download location. 
611   
612          >>> from pyspark import SparkFiles 
613          >>> path = os.path.join(tempdir, "test.txt") 
614          >>> with open(path, "w") as testFile: 
615          ...    testFile.write("100") 
616          >>> sc.addFile(path) 
617          >>> def func(iterator): 
618          ...    with open(SparkFiles.get("test.txt")) as testFile: 
619          ...        fileVal = int(testFile.readline()) 
620          ...        return [x * fileVal for x in iterator] 
621          >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() 
622          [100, 200, 300, 400] 
623          """ 
624          self._jsc.sc().addFile(path) 
 625   
626 -    def clearFiles(self): 
 627          """ 
628          Clear the job's list of files added by L{addFile} or L{addPyFile} so 
629          that they do not get downloaded to any new nodes. 
630          """ 
631           
632          self._jsc.sc().clearFiles() 
 633   
634 -    def addPyFile(self, path): 
 635          """ 
636          Add a .py or .zip dependency for all tasks to be executed on this 
637          SparkContext in the future.  The C{path} passed can be either a local 
638          file, a file in HDFS (or other Hadoop-supported filesystems), or an 
639          HTTP, HTTPS or FTP URI. 
640          """ 
641          self.addFile(path) 
642          (dirname, filename) = os.path.split(path)   
643   
644          if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'): 
645              self._python_includes.append(filename) 
646               
647              sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) 
 648   
649 -    def setCheckpointDir(self, dirName): 
 650          """ 
651          Set the directory under which RDDs are going to be checkpointed. The 
652          directory must be a HDFS path if running on a cluster. 
653          """ 
654          self._jsc.sc().setCheckpointDir(dirName) 
 655   
656 -    def _getJavaStorageLevel(self, storageLevel): 
 657          """ 
658          Returns a Java StorageLevel based on a pyspark.StorageLevel. 
659          """ 
660          if not isinstance(storageLevel, StorageLevel): 
661              raise Exception("storageLevel must be of type pyspark.StorageLevel") 
662   
663          newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel 
664          return newStorageLevel(storageLevel.useDisk, 
665                                 storageLevel.useMemory, 
666                                 storageLevel.useOffHeap, 
667                                 storageLevel.deserialized, 
668                                 storageLevel.replication) 
 669   
670 -    def setJobGroup(self, groupId, description, interruptOnCancel=False): 
 671          """ 
672          Assigns a group ID to all the jobs started by this thread until the group ID is set to a 
673          different value or cleared. 
674   
675          Often, a unit of execution in an application consists of multiple Spark actions or jobs. 
676          Application programmers can use this method to group all those jobs together and give a 
677          group description. Once set, the Spark web UI will associate such jobs with this group. 
678   
679          The application can use L{SparkContext.cancelJobGroup} to cancel all 
680          running jobs in this group. 
681   
682          >>> import thread, threading 
683          >>> from time import sleep 
684          >>> result = "Not Set" 
685          >>> lock = threading.Lock() 
686          >>> def map_func(x): 
687          ...     sleep(100) 
688          ...     raise Exception("Task should have been cancelled") 
689          >>> def start_job(x): 
690          ...     global result 
691          ...     try: 
692          ...         sc.setJobGroup("job_to_cancel", "some description") 
693          ...         result = sc.parallelize(range(x)).map(map_func).collect() 
694          ...     except Exception as e: 
695          ...         result = "Cancelled" 
696          ...     lock.release() 
697          >>> def stop_job(): 
698          ...     sleep(5) 
699          ...     sc.cancelJobGroup("job_to_cancel") 
700          >>> supress = lock.acquire() 
701          >>> supress = thread.start_new_thread(start_job, (10,)) 
702          >>> supress = thread.start_new_thread(stop_job, tuple()) 
703          >>> supress = lock.acquire() 
704          >>> print result 
705          Cancelled 
706   
707          If interruptOnCancel is set to true for the job group, then job cancellation will result 
708          in Thread.interrupt() being called on the job's executor threads. This is useful to help 
709          ensure that the tasks are actually stopped in a timely manner, but is off by default due 
710          to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead. 
711          """ 
712          self._jsc.setJobGroup(groupId, description, interruptOnCancel) 
 713   
714 -    def setLocalProperty(self, key, value): 
 715          """ 
716          Set a local property that affects jobs submitted from this thread, such as the 
717          Spark fair scheduler pool. 
718          """ 
719          self._jsc.setLocalProperty(key, value) 
 720   
721 -    def getLocalProperty(self, key): 
 722          """ 
723          Get a local property set in this thread, or null if it is missing. See 
724          L{setLocalProperty} 
725          """ 
726          return self._jsc.getLocalProperty(key) 
 727   
728 -    def sparkUser(self): 
 729          """ 
730          Get SPARK_USER for user who is running SparkContext. 
731          """ 
732          return self._jsc.sc().sparkUser() 
 733   
734 -    def cancelJobGroup(self, groupId): 
 735          """ 
736          Cancel active jobs for the specified group. See L{SparkContext.setJobGroup} 
737          for more information. 
738          """ 
739          self._jsc.sc().cancelJobGroup(groupId) 
 740   
741 -    def cancelAllJobs(self): 
 742          """ 
743          Cancel all jobs that have been scheduled or are running. 
744          """ 
745          self._jsc.sc().cancelAllJobs() 
 746   
747 -    def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False): 
 748          """ 
749          Executes the given partitionFunc on the specified set of partitions, 
750          returning the result as an array of elements. 
751   
752          If 'partitions' is not specified, this will run over all partitions. 
753   
754          >>> myRDD = sc.parallelize(range(6), 3) 
755          >>> sc.runJob(myRDD, lambda part: [x * x for x in part]) 
756          [0, 1, 4, 9, 16, 25] 
757   
758          >>> myRDD = sc.parallelize(range(6), 3) 
759          >>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True) 
760          [0, 1, 16, 25] 
761          """ 
762          if partitions is None: 
763              partitions = range(rdd._jrdd.partitions().size()) 
764          javaPartitions = ListConverter().convert(partitions, self._gateway._gateway_client) 
765   
766           
767           
768           
769          mappedRDD = rdd.mapPartitions(partitionFunc) 
770          it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal) 
771          return list(mappedRDD._collect_iterator_through_file(it)) 
  772   
775      import atexit 
776      import doctest 
777      import tempfile 
778      globs = globals().copy() 
779      globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) 
780      globs['tempdir'] = tempfile.mkdtemp() 
781      atexit.register(lambda: shutil.rmtree(globs['tempdir'])) 
782      (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) 
783      globs['sc'].stop() 
784      if failure_count: 
785          exit(-1) 
 786   
787   
788  if __name__ == "__main__": 
789      _test() 
790