1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 >>> from pyspark.context import SparkContext
20 >>> sc = SparkContext('local', 'test')
21 >>> b = sc.broadcast([1, 2, 3, 4, 5])
22 >>> b.value
23 [1, 2, 3, 4, 5]
24 >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
25 [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
26 >>> b.unpersist()
27
28 >>> large_broadcast = sc.broadcast(list(range(10000)))
29 """
30 import os
31
32 from pyspark.serializers import CompressedSerializer, PickleSerializer
33
34
35 _broadcastRegistry = {}
36
37
43
44
46
47 """
48 A broadcast variable created with
49 L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}.
50 Access its value through C{.value}.
51 """
52
53 - def __init__(self, bid, value, java_broadcast=None,
54 pickle_registry=None, path=None):
55 """
56 Should not be called directly by users -- use
57 L{SparkContext.broadcast()<pyspark.context.SparkContext.broadcast>}
58 instead.
59 """
60 self.bid = bid
61 if path is None:
62 self.value = value
63 self._jbroadcast = java_broadcast
64 self._pickle_registry = pickle_registry
65 self.path = path
66
68 self._jbroadcast.unpersist(blocking)
69 os.unlink(self.path)
70
72 self._pickle_registry.add(self)
73 return (_from_id, (self.bid, ))
74
76 if item == 'value' and self.path is not None:
77 ser = CompressedSerializer(PickleSerializer())
78 value = ser.load_stream(open(self.path)).next()
79 self.value = value
80 return value
81
82 raise AttributeError(item)
83
84
85 if __name__ == "__main__":
86 import doctest
87 doctest.testmod()
88