1   
 2   
 3   
 4   
 5   
 6   
 7   
 8   
 9   
10   
11   
12   
13   
14   
15   
16   
17   
18  """ 
19  >>> from pyspark.context import SparkContext 
20  >>> sc = SparkContext('local', 'test') 
21  >>> b = sc.broadcast([1, 2, 3, 4, 5]) 
22  >>> b.value 
23  [1, 2, 3, 4, 5] 
24  >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() 
25  [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] 
26  >>> b.unpersist() 
27   
28  >>> large_broadcast = sc.broadcast(list(range(10000))) 
29  """ 
30  import os 
31   
32  from pyspark.serializers import CompressedSerializer, PickleSerializer 
33   
34   
35  _broadcastRegistry = {} 
36   
37   
43   
44   
46   
47      """ 
48      A broadcast variable created with 
49      L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}. 
50      Access its value through C{.value}. 
51      """ 
52   
53 -    def __init__(self, bid, value, java_broadcast=None, 
54                   pickle_registry=None, path=None): 
 55          """ 
56          Should not be called directly by users -- use 
57          L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>} 
58          instead. 
59          """ 
60          self.bid = bid 
61          if path is None: 
62              self.value = value 
63          self._jbroadcast = java_broadcast 
64          self._pickle_registry = pickle_registry 
65          self.path = path 
 66   
68          self._jbroadcast.unpersist(blocking) 
69          os.unlink(self.path) 
 70   
72          self._pickle_registry.add(self) 
73          return (_from_id, (self.bid, )) 
 74   
76          if item == 'value' and self.path is not None: 
77              ser = CompressedSerializer(PickleSerializer()) 
78              value = ser.load_stream(open(self.path)).next() 
79              self.value = value 
80              return value 
81   
82          raise AttributeError(item) 
  83   
84   
85  if __name__ == "__main__": 
86      import doctest 
87      doctest.testmod() 
88