1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 PySpark supports custom serializers for transferring data; this can improve
20 performance.
21
22 By default, PySpark uses L{PickleSerializer} to serialize objects using Python's
23 C{cPickle} serializer, which can serialize nearly any Python object.
24 Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be
25 faster.
26
27 The serializer is chosen when creating L{SparkContext}:
28
29 >>> from pyspark.context import SparkContext
30 >>> from pyspark.serializers import MarshalSerializer
31 >>> sc = SparkContext('local', 'test', serializer=MarshalSerializer())
32 >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)
33 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
34 >>> sc.stop()
35
36 By default, PySpark serialize objects in batches; the batch size can be
37 controlled through SparkContext's C{batchSize} parameter
38 (the default size is 1024 objects):
39
40 >>> sc = SparkContext('local', 'test', batchSize=2)
41 >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
42
43 Behind the scenes, this creates a JavaRDD with four partitions, each of
44 which contains two batches of two objects:
45
46 >>> rdd.glom().collect()
47 [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
48 >>> rdd._jrdd.count()
49 8L
50 >>> sc.stop()
51
52 A batch size of -1 uses an unlimited batch size, and a size of 1 disables
53 batching:
54
55 >>> sc = SparkContext('local', 'test', batchSize=1)
56 >>> rdd = sc.parallelize(range(16), 4).map(lambda x: x)
57 >>> rdd.glom().collect()
58 [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]]
59 >>> rdd._jrdd.count()
60 16L
61 """
62
63 import cPickle
64 from itertools import chain, izip, product
65 import marshal
66 import struct
67 from pyspark import cloudpickle
68
69
70 __all__ = ["PickleSerializer", "MarshalSerializer"]
71
72
74 END_OF_DATA_SECTION = -1
75 PYTHON_EXCEPTION_THROWN = -2
76 TIMING_DATA = -3
77
78
80
82 """
83 Serialize an iterator of objects to the output stream.
84 """
85 raise NotImplementedError
86
88 """
89 Return an iterator of deserialized objects from the input stream.
90 """
91 raise NotImplementedError
92
93
95 return self.load_stream(stream)
96
97
98
99
100
101
102
104 return isinstance(other, self.__class__)
105
107 return not self.__eq__(other)
108
109
111 """
112 Serializer that writes objects as a stream of (length, data) pairs,
113 where C{length} is a 32-bit integer and data is C{length} bytes.
114 """
115
117 for obj in iterator:
118 self._write_with_length(obj, stream)
119
121 while True:
122 try:
123 yield self._read_with_length(stream)
124 except EOFError:
125 return
126
128 serialized = self.dumps(obj)
129 write_int(len(serialized), stream)
130 stream.write(serialized)
131
133 length = read_int(stream)
134 obj = stream.read(length)
135 if obj == "":
136 raise EOFError
137 return self.loads(obj)
138
140 """
141 Serialize an object into a byte array.
142 When batching is used, this will be called with an array of objects.
143 """
144 raise NotImplementedError
145
147 """
148 Deserialize an object from a byte array.
149 """
150 raise NotImplementedError
151
152
154 """
155 Serializes a stream of objects in batches by calling its wrapped
156 Serializer with streams of objects.
157 """
158
159 UNLIMITED_BATCH_SIZE = -1
160
161 - def __init__(self, serializer, batchSize=UNLIMITED_BATCH_SIZE):
162 self.serializer = serializer
163 self.batchSize = batchSize
164
166 if self.batchSize == self.UNLIMITED_BATCH_SIZE:
167 yield list(iterator)
168 else:
169 items = []
170 count = 0
171 for item in iterator:
172 items.append(item)
173 count += 1
174 if count == self.batchSize:
175 yield items
176 items = []
177 count = 0
178 if items:
179 yield items
180
182 self.serializer.dump_stream(self._batched(iterator), stream)
183
185 return chain.from_iterable(self._load_stream_without_unbatching(stream))
186
188 return self.serializer.load_stream(stream)
189
191 return isinstance(other, BatchedSerializer) and \
192 other.serializer == self.serializer
193
195 return "BatchedSerializer<%s>" % str(self.serializer)
196
197
199 """
200 Deserializes the JavaRDD cartesian() of two PythonRDDs.
201 """
202
204 self.key_ser = key_ser
205 self.val_ser = val_ser
206
208 key_stream = self.key_ser._load_stream_without_unbatching(stream)
209 val_stream = self.val_ser._load_stream_without_unbatching(stream)
210 key_is_batched = isinstance(self.key_ser, BatchedSerializer)
211 val_is_batched = isinstance(self.val_ser, BatchedSerializer)
212 for (keys, vals) in izip(key_stream, val_stream):
213 keys = keys if key_is_batched else [keys]
214 vals = vals if val_is_batched else [vals]
215 for pair in product(keys, vals):
216 yield pair
217
219 return isinstance(other, CartesianDeserializer) and \
220 self.key_ser == other.key_ser and self.val_ser == other.val_ser
221
223 return "CartesianDeserializer<%s, %s>" % \
224 (str(self.key_ser), str(self.val_ser))
225
226
228
229 - def loads(self, obj): return obj
230 - def dumps(self, obj): return obj
231
232
234 """
235 Serializes objects using Python's cPickle serializer:
236
237 http://docs.python.org/2/library/pickle.html
238
239 This serializer supports nearly any Python object, but may
240 not be as fast as more specialized serializers.
241 """
242
243 - def dumps(self, obj): return cPickle.dumps(obj, 2)
244 loads = cPickle.loads
245
247
248 - def dumps(self, obj): return cloudpickle.dumps(obj, 2)
249
250
252 """
253 Serializes objects using Python's Marshal serializer:
254
255 http://docs.python.org/2/library/marshal.html
256
257 This serializer is faster than PickleSerializer but supports fewer datatypes.
258 """
259
260 dumps = marshal.dumps
261 loads = marshal.loads
262
263
265 """
266 Deserializes streams written by getBytes.
267 """
268
269 - def loads(self, stream):
270 length = read_int(stream)
271 return stream.read(length).decode('utf8')
272
274 while True:
275 try:
276 yield self.loads(stream)
277 except struct.error:
278 return
279 except EOFError:
280 return
281
282
284 length = stream.read(8)
285 if length == "":
286 raise EOFError
287 return struct.unpack("!q", length)[0]
288
289
291 stream.write(struct.pack("!q", value))
292
293
295 return struct.pack("!q", value)
296
297
299 length = stream.read(4)
300 if length == "":
301 raise EOFError
302 return struct.unpack("!i", length)[0]
303
304
306 stream.write(struct.pack("!i", value))
307
308
310 write_int(len(obj), stream)
311 stream.write(obj)
312