1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from itertools import chain, ifilter, imap
22 import operator
23 import os
24 import sys
25 import shlex
26 import traceback
27 from subprocess import Popen, PIPE
28 from tempfile import NamedTemporaryFile
29 from threading import Thread
30 import warnings
31 import heapq
32
33 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
34 BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
35 from pyspark.join import python_join, python_left_outer_join, \
36 python_right_outer_join, python_cogroup
37 from pyspark.statcounter import StatCounter
38 from pyspark.rddsampler import RDDSampler
39 from pyspark.storagelevel import StorageLevel
40
41 from py4j.java_collections import ListConverter, MapConverter
42
43 __all__ = ["RDD"]
47 tb = traceback.extract_stack()
48 if len(tb) == 0:
49 return "I'm lost!"
50
51
52
53 file, line, module, what = tb[len(tb) - 1]
54 sparkpath = os.path.dirname(file)
55 first_spark_frame = len(tb) - 1
56 for i in range(0, len(tb)):
57 file, line, fun, what = tb[i]
58 if file.startswith(sparkpath):
59 first_spark_frame = i
60 break
61 if first_spark_frame == 0:
62 file, line, fun, what = tb[0]
63 return "%s at %s:%d" % (fun, file, line)
64 sfile, sline, sfun, swhat = tb[first_spark_frame]
65 ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
66 return "%s at %s:%d" % (sfun, ufile, uline)
67
68 _spark_stack_depth = 0
72 self._traceback = _extract_concise_traceback()
73 self._context = sc
74
80
86
88 """
89 An implementation of MaxHeap.
90 >>> import pyspark.rdd
91 >>> heap = pyspark.rdd.MaxHeapQ(5)
92 >>> [heap.insert(i) for i in range(10)]
93 [None, None, None, None, None, None, None, None, None, None]
94 >>> sorted(heap.getElements())
95 [0, 1, 2, 3, 4]
96 >>> heap = pyspark.rdd.MaxHeapQ(5)
97 >>> [heap.insert(i) for i in range(9, -1, -1)]
98 [None, None, None, None, None, None, None, None, None, None]
99 >>> sorted(heap.getElements())
100 [0, 1, 2, 3, 4]
101 >>> heap = pyspark.rdd.MaxHeapQ(1)
102 >>> [heap.insert(i) for i in range(9, -1, -1)]
103 [None, None, None, None, None, None, None, None, None, None]
104 >>> heap.getElements()
105 [0]
106 """
107
109
110 self.q = [0]
111 self.maxsize = maxsize
112
114 while (k > 1) and (self.q[k/2] < self.q[k]):
115 self._swap(k, k/2)
116 k = k/2
117
119 t = self.q[i]
120 self.q[i] = self.q[j]
121 self.q[j] = t
122
124 N = self.size()
125 while 2 * k <= N:
126 j = 2 * k
127
128
129 if j < N and self.q[j] < self.q[j + 1]:
130 j = j + 1
131 if(self.q[k] > self.q[j]):
132 break
133 self._swap(k, j)
134 k = j
135
137 return len(self.q) - 1
138
140 if (self.size()) < self.maxsize:
141 self.q.append(value)
142 self._swim(self.size())
143 else:
144 self._replaceRoot(value)
145
148
150 if(self.q[1] > value):
151 self.q[1] = value
152 self._sink(1)
153
155 """
156 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
157 Represents an immutable, partitioned collection of elements that can be
158 operated on in parallel.
159 """
160
161 - def __init__(self, jrdd, ctx, jrdd_deserializer):
162 self._jrdd = jrdd
163 self.is_cached = False
164 self.is_checkpointed = False
165 self.ctx = ctx
166 self._jrdd_deserializer = jrdd_deserializer
167
169 return self._jrdd.toString()
170
171 @property
173 """
174 The L{SparkContext} that this RDD was created on.
175 """
176 return self.ctx
177
179 """
180 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
181 """
182 self.is_cached = True
183 self._jrdd.cache()
184 return self
185
187 """
188 Set this RDD's storage level to persist its values across operations after the first time
189 it is computed. This can only be used to assign a new storage level if the RDD does not
190 have a storage level set yet.
191 """
192 self.is_cached = True
193 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
194 self._jrdd.persist(javaStorageLevel)
195 return self
196
198 """
199 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
200 """
201 self.is_cached = False
202 self._jrdd.unpersist()
203 return self
204
206 """
207 Mark this RDD for checkpointing. It will be saved to a file inside the
208 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
209 all references to its parent RDDs will be removed. This function must
210 be called before any job has been executed on this RDD. It is strongly
211 recommended that this RDD is persisted in memory, otherwise saving it
212 on a file will require recomputation.
213 """
214 self.is_checkpointed = True
215 self._jrdd.rdd().checkpoint()
216
218 """
219 Return whether this RDD has been checkpointed or not
220 """
221 return self._jrdd.rdd().isCheckpointed()
222
224 """
225 Gets the name of the file to which this RDD was checkpointed
226 """
227 checkpointFile = self._jrdd.rdd().getCheckpointFile()
228 if checkpointFile.isDefined():
229 return checkpointFile.get()
230 else:
231 return None
232
233 - def map(self, f, preservesPartitioning=False):
234 """
235 Return a new RDD by applying a function to each element of this RDD.
236 """
237 def func(split, iterator): return imap(f, iterator)
238 return PipelinedRDD(self, func, preservesPartitioning)
239
240 - def flatMap(self, f, preservesPartitioning=False):
241 """
242 Return a new RDD by first applying a function to all elements of this
243 RDD, and then flattening the results.
244
245 >>> rdd = sc.parallelize([2, 3, 4])
246 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
247 [1, 1, 1, 2, 2, 3]
248 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
249 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
250 """
251 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
252 return self.mapPartitionsWithIndex(func, preservesPartitioning)
253
255 """
256 Return a new RDD by applying a function to each partition of this RDD.
257
258 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
259 >>> def f(iterator): yield sum(iterator)
260 >>> rdd.mapPartitions(f).collect()
261 [3, 7]
262 """
263 def func(s, iterator): return f(iterator)
264 return self.mapPartitionsWithIndex(func)
265
267 """
268 Return a new RDD by applying a function to each partition of this RDD,
269 while tracking the index of the original partition.
270
271 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
272 >>> def f(splitIndex, iterator): yield splitIndex
273 >>> rdd.mapPartitionsWithIndex(f).sum()
274 6
275 """
276 return PipelinedRDD(self, f, preservesPartitioning)
277
279 """
280 Deprecated: use mapPartitionsWithIndex instead.
281
282 Return a new RDD by applying a function to each partition of this RDD,
283 while tracking the index of the original partition.
284
285 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
286 >>> def f(splitIndex, iterator): yield splitIndex
287 >>> rdd.mapPartitionsWithSplit(f).sum()
288 6
289 """
290 warnings.warn("mapPartitionsWithSplit is deprecated; "
291 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
292 return self.mapPartitionsWithIndex(f, preservesPartitioning)
293
295 """
296 Return a new RDD containing only the elements that satisfy a predicate.
297
298 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
299 >>> rdd.filter(lambda x: x % 2 == 0).collect()
300 [2, 4]
301 """
302 def func(iterator): return ifilter(f, iterator)
303 return self.mapPartitions(func)
304
306 """
307 Return a new RDD containing the distinct elements in this RDD.
308
309 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
310 [1, 2, 3]
311 """
312 return self.map(lambda x: (x, None)) \
313 .reduceByKey(lambda x, _: x) \
314 .map(lambda (x, _): x)
315
316 - def sample(self, withReplacement, fraction, seed):
317 """
318 Return a sampled subset of this RDD (relies on numpy and falls back
319 on default random generator if numpy is unavailable).
320
321 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
322 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
323 """
324 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction
325 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
326
327
328 - def takeSample(self, withReplacement, num, seed):
329 """
330 Return a fixed-size sampled subset of this RDD (currently requires numpy).
331
332 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
333 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
334 """
335
336 fraction = 0.0
337 total = 0
338 multiplier = 3.0
339 initialCount = self.count()
340 maxSelected = 0
341
342 if (num < 0):
343 raise ValueError
344
345 if (initialCount == 0):
346 return list()
347
348 if initialCount > sys.maxint - 1:
349 maxSelected = sys.maxint - 1
350 else:
351 maxSelected = initialCount
352
353 if num > initialCount and not withReplacement:
354 total = maxSelected
355 fraction = multiplier * (maxSelected + 1) / initialCount
356 else:
357 fraction = multiplier * (num + 1) / initialCount
358 total = num
359
360 samples = self.sample(withReplacement, fraction, seed).collect()
361
362
363
364
365 while len(samples) < total:
366 if seed > sys.maxint - 2:
367 seed = -1
368 seed += 1
369 samples = self.sample(withReplacement, fraction, seed).collect()
370
371 sampler = RDDSampler(withReplacement, fraction, seed+1)
372 sampler.shuffle(samples)
373 return samples[0:total]
374
376 """
377 Return the union of this RDD and another one.
378
379 >>> rdd = sc.parallelize([1, 1, 2, 3])
380 >>> rdd.union(rdd).collect()
381 [1, 1, 2, 3, 1, 1, 2, 3]
382 """
383 if self._jrdd_deserializer == other._jrdd_deserializer:
384 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
385 self._jrdd_deserializer)
386 return rdd
387 else:
388
389
390 self_copy = self._reserialize()
391 other_copy = other._reserialize()
392 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
393 self.ctx.serializer)
394
396 if self._jrdd_deserializer == self.ctx.serializer:
397 return self
398 else:
399 return self.map(lambda x: x, preservesPartitioning=True)
400
402 """
403 Return the union of this RDD and another one.
404
405 >>> rdd = sc.parallelize([1, 1, 2, 3])
406 >>> (rdd + rdd).collect()
407 [1, 1, 2, 3, 1, 1, 2, 3]
408 """
409 if not isinstance(other, RDD):
410 raise TypeError
411 return self.union(other)
412
413 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
414 """
415 Sorts this RDD, which is assumed to consist of (key, value) pairs.
416
417 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
418 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
419 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
420 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
421 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
422 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
423 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
424 """
425 if numPartitions is None:
426 numPartitions = self.ctx.defaultParallelism
427
428 bounds = list()
429
430
431
432
433 if numPartitions > 1:
434 rddSize = self.count()
435 maxSampleSize = numPartitions * 20.0
436 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
437
438 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
439 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
440
441
442
443 for i in range(0, numPartitions - 1):
444 index = (len(samples) - 1) * (i + 1) / numPartitions
445 bounds.append(samples[index])
446
447 def rangePartitionFunc(k):
448 p = 0
449 while p < len(bounds) and keyfunc(k) > bounds[p]:
450 p += 1
451 if ascending:
452 return p
453 else:
454 return numPartitions-1-p
455
456 def mapFunc(iterator):
457 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
458
459 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
460 .mapPartitions(mapFunc,preservesPartitioning=True)
461 .flatMap(lambda x: x, preservesPartitioning=True))
462
464 """
465 Return an RDD created by coalescing all elements within each partition
466 into a list.
467
468 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
469 >>> sorted(rdd.glom().collect())
470 [[1, 2], [3, 4]]
471 """
472 def func(iterator): yield list(iterator)
473 return self.mapPartitions(func)
474
476 """
477 Return the Cartesian product of this RDD and another one, that is, the
478 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
479 C{b} is in C{other}.
480
481 >>> rdd = sc.parallelize([1, 2])
482 >>> sorted(rdd.cartesian(rdd).collect())
483 [(1, 1), (1, 2), (2, 1), (2, 2)]
484 """
485
486 deserializer = CartesianDeserializer(self._jrdd_deserializer,
487 other._jrdd_deserializer)
488 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
489
490 - def groupBy(self, f, numPartitions=None):
491 """
492 Return an RDD of grouped items.
493
494 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
495 >>> result = rdd.groupBy(lambda x: x % 2).collect()
496 >>> sorted([(x, sorted(y)) for (x, y) in result])
497 [(0, [2, 8]), (1, [1, 1, 3, 5])]
498 """
499 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
500
501 - def pipe(self, command, env={}):
502 """
503 Return an RDD created by piping elements to a forked external process.
504
505 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
506 ['1', '2', '3']
507 """
508 def func(iterator):
509 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
510 def pipe_objs(out):
511 for obj in iterator:
512 out.write(str(obj).rstrip('\n') + '\n')
513 out.close()
514 Thread(target=pipe_objs, args=[pipe.stdin]).start()
515 return (x.rstrip('\n') for x in pipe.stdout)
516 return self.mapPartitions(func)
517
519 """
520 Applies a function to all elements of this RDD.
521
522 >>> def f(x): print x
523 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
524 """
525 def processPartition(iterator):
526 for x in iterator:
527 f(x)
528 yield None
529 self.mapPartitions(processPartition).collect()
530
532 """
533 Return a list that contains all of the elements in this RDD.
534 """
535 with _JavaStackTrace(self.context) as st:
536 bytesInJava = self._jrdd.collect().iterator()
537 return list(self._collect_iterator_through_file(bytesInJava))
538
540
541
542
543 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
544 tempFile.close()
545 self.ctx._writeToFile(iterator, tempFile.name)
546
547 with open(tempFile.name, 'rb') as tempFile:
548 for item in self._jrdd_deserializer.load_stream(tempFile):
549 yield item
550 os.unlink(tempFile.name)
551
553 """
554 Reduces the elements of this RDD using the specified commutative and
555 associative binary operator.
556
557 >>> from operator import add
558 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
559 15
560 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
561 10
562 """
563 def func(iterator):
564 acc = None
565 for obj in iterator:
566 if acc is None:
567 acc = obj
568 else:
569 acc = f(obj, acc)
570 if acc is not None:
571 yield acc
572 vals = self.mapPartitions(func).collect()
573 return reduce(f, vals)
574
575 - def fold(self, zeroValue, op):
576 """
577 Aggregate the elements of each partition, and then the results for all
578 the partitions, using a given associative function and a neutral "zero
579 value."
580
581 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
582 as its result value to avoid object allocation; however, it should not
583 modify C{t2}.
584
585 >>> from operator import add
586 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
587 15
588 """
589 def func(iterator):
590 acc = zeroValue
591 for obj in iterator:
592 acc = op(obj, acc)
593 yield acc
594 vals = self.mapPartitions(func).collect()
595 return reduce(op, vals, zeroValue)
596
597
598
600 """
601 Add up the elements in this RDD.
602
603 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
604 6.0
605 """
606 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
607
609 """
610 Return the number of elements in this RDD.
611
612 >>> sc.parallelize([2, 3, 4]).count()
613 3
614 """
615 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
616
618 """
619 Return a L{StatCounter} object that captures the mean, variance
620 and count of the RDD's elements in one operation.
621 """
622 def redFunc(left_counter, right_counter):
623 return left_counter.mergeStats(right_counter)
624
625 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
626
628 """
629 Compute the mean of this RDD's elements.
630
631 >>> sc.parallelize([1, 2, 3]).mean()
632 2.0
633 """
634 return self.stats().mean()
635
637 """
638 Compute the variance of this RDD's elements.
639
640 >>> sc.parallelize([1, 2, 3]).variance()
641 0.666...
642 """
643 return self.stats().variance()
644
646 """
647 Compute the standard deviation of this RDD's elements.
648
649 >>> sc.parallelize([1, 2, 3]).stdev()
650 0.816...
651 """
652 return self.stats().stdev()
653
655 """
656 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
657 estimating the standard deviation by dividing by N-1 instead of N).
658
659 >>> sc.parallelize([1, 2, 3]).sampleStdev()
660 1.0
661 """
662 return self.stats().sampleStdev()
663
665 """
666 Compute the sample variance of this RDD's elements (which corrects for bias in
667 estimating the variance by dividing by N-1 instead of N).
668
669 >>> sc.parallelize([1, 2, 3]).sampleVariance()
670 1.0
671 """
672 return self.stats().sampleVariance()
673
675 """
676 Return the count of each unique value in this RDD as a dictionary of
677 (value, count) pairs.
678
679 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
680 [(1, 2), (2, 3)]
681 """
682 def countPartition(iterator):
683 counts = defaultdict(int)
684 for obj in iterator:
685 counts[obj] += 1
686 yield counts
687 def mergeMaps(m1, m2):
688 for (k, v) in m2.iteritems():
689 m1[k] += v
690 return m1
691 return self.mapPartitions(countPartition).reduce(mergeMaps)
692
693 - def top(self, num):
694 """
695 Get the top N elements from a RDD.
696
697 Note: It returns the list sorted in descending order.
698 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
699 [12]
700 >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2)
701 [6, 5]
702 """
703 def topIterator(iterator):
704 q = []
705 for k in iterator:
706 if len(q) < num:
707 heapq.heappush(q, k)
708 else:
709 heapq.heappushpop(q, k)
710 yield q
711
712 def merge(a, b):
713 return next(topIterator(a + b))
714
715 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
716
718 """
719 Get the N elements from a RDD ordered in ascending order or as specified
720 by the optional key function.
721
722 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
723 [1, 2, 3, 4, 5, 6]
724 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
725 [10, 9, 7, 6, 5, 4]
726 """
727
728 def topNKeyedElems(iterator, key_=None):
729 q = MaxHeapQ(num)
730 for k in iterator:
731 if key_ != None:
732 k = (key_(k), k)
733 q.insert(k)
734 yield q.getElements()
735
736 def unKey(x, key_=None):
737 if key_ != None:
738 x = [i[1] for i in x]
739 return x
740
741 def merge(a, b):
742 return next(topNKeyedElems(a + b))
743 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
744 return sorted(unKey(result, key), key=key)
745
746
747 - def take(self, num):
748 """
749 Take the first num elements of the RDD.
750
751 This currently scans the partitions *one by one*, so it will be slow if
752 a lot of partitions are required. In that case, use L{collect} to get
753 the whole RDD instead.
754
755 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
756 [2, 3]
757 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
758 [2, 3, 4, 5, 6]
759 """
760 def takeUpToNum(iterator):
761 taken = 0
762 while taken < num:
763 yield next(iterator)
764 taken += 1
765
766 mapped = self.mapPartitions(takeUpToNum)
767 items = []
768
769
770
771 with _JavaStackTrace(self.context) as st:
772 for partition in range(mapped._jrdd.splits().size()):
773 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
774 partitionsToTake[0] = partition
775 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
776 items.extend(mapped._collect_iterator_through_file(iterator))
777 if len(items) >= num:
778 break
779 return items[:num]
780
782 """
783 Return the first element in this RDD.
784
785 >>> sc.parallelize([2, 3, 4]).first()
786 2
787 """
788 return self.take(1)[0]
789
790 - def saveAsTextFile(self, path):
791 """
792 Save this RDD as a text file, using string representations of elements.
793
794 >>> tempFile = NamedTemporaryFile(delete=True)
795 >>> tempFile.close()
796 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
797 >>> from fileinput import input
798 >>> from glob import glob
799 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
800 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
801 """
802 def func(split, iterator):
803 for x in iterator:
804 if not isinstance(x, basestring):
805 x = unicode(x)
806 yield x.encode("utf-8")
807 keyed = PipelinedRDD(self, func)
808 keyed._bypass_serializer = True
809 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
810
811
812
814 """
815 Return the key-value pairs in this RDD to the master as a dictionary.
816
817 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
818 >>> m[1]
819 2
820 >>> m[3]
821 4
822 """
823 return dict(self.collect())
824
826 """
827 Merge the values for each key using an associative reduce function.
828
829 This will also perform the merging locally on each mapper before
830 sending results to a reducer, similarly to a "combiner" in MapReduce.
831
832 Output will be hash-partitioned with C{numPartitions} partitions, or
833 the default parallelism level if C{numPartitions} is not specified.
834
835 >>> from operator import add
836 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
837 >>> sorted(rdd.reduceByKey(add).collect())
838 [('a', 2), ('b', 1)]
839 """
840 return self.combineByKey(lambda x: x, func, func, numPartitions)
841
843 """
844 Merge the values for each key using an associative reduce function, but
845 return the results immediately to the master as a dictionary.
846
847 This will also perform the merging locally on each mapper before
848 sending results to a reducer, similarly to a "combiner" in MapReduce.
849
850 >>> from operator import add
851 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
852 >>> sorted(rdd.reduceByKeyLocally(add).items())
853 [('a', 2), ('b', 1)]
854 """
855 def reducePartition(iterator):
856 m = {}
857 for (k, v) in iterator:
858 m[k] = v if k not in m else func(m[k], v)
859 yield m
860 def mergeMaps(m1, m2):
861 for (k, v) in m2.iteritems():
862 m1[k] = v if k not in m1 else func(m1[k], v)
863 return m1
864 return self.mapPartitions(reducePartition).reduce(mergeMaps)
865
867 """
868 Count the number of elements for each key, and return the result to the
869 master as a dictionary.
870
871 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
872 >>> sorted(rdd.countByKey().items())
873 [('a', 2), ('b', 1)]
874 """
875 return self.map(lambda x: x[0]).countByValue()
876
877 - def join(self, other, numPartitions=None):
878 """
879 Return an RDD containing all pairs of elements with matching keys in
880 C{self} and C{other}.
881
882 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
883 (k, v1) is in C{self} and (k, v2) is in C{other}.
884
885 Performs a hash join across the cluster.
886
887 >>> x = sc.parallelize([("a", 1), ("b", 4)])
888 >>> y = sc.parallelize([("a", 2), ("a", 3)])
889 >>> sorted(x.join(y).collect())
890 [('a', (1, 2)), ('a', (1, 3))]
891 """
892 return python_join(self, other, numPartitions)
893
895 """
896 Perform a left outer join of C{self} and C{other}.
897
898 For each element (k, v) in C{self}, the resulting RDD will either
899 contain all pairs (k, (v, w)) for w in C{other}, or the pair
900 (k, (v, None)) if no elements in other have key k.
901
902 Hash-partitions the resulting RDD into the given number of partitions.
903
904 >>> x = sc.parallelize([("a", 1), ("b", 4)])
905 >>> y = sc.parallelize([("a", 2)])
906 >>> sorted(x.leftOuterJoin(y).collect())
907 [('a', (1, 2)), ('b', (4, None))]
908 """
909 return python_left_outer_join(self, other, numPartitions)
910
912 """
913 Perform a right outer join of C{self} and C{other}.
914
915 For each element (k, w) in C{other}, the resulting RDD will either
916 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
917 if no elements in C{self} have key k.
918
919 Hash-partitions the resulting RDD into the given number of partitions.
920
921 >>> x = sc.parallelize([("a", 1), ("b", 4)])
922 >>> y = sc.parallelize([("a", 2)])
923 >>> sorted(y.rightOuterJoin(x).collect())
924 [('a', (2, 1)), ('b', (None, 4))]
925 """
926 return python_right_outer_join(self, other, numPartitions)
927
928
929 - def partitionBy(self, numPartitions, partitionFunc=hash):
930 """
931 Return a copy of the RDD partitioned using the specified partitioner.
932
933 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
934 >>> sets = pairs.partitionBy(2).glom().collect()
935 >>> set(sets[0]).intersection(set(sets[1]))
936 set([])
937 """
938 if numPartitions is None:
939 numPartitions = self.ctx.defaultParallelism
940
941
942
943 outputSerializer = self.ctx._unbatched_serializer
944 def add_shuffle_key(split, iterator):
945
946 buckets = defaultdict(list)
947
948 for (k, v) in iterator:
949 buckets[partitionFunc(k) % numPartitions].append((k, v))
950 for (split, items) in buckets.iteritems():
951 yield pack_long(split)
952 yield outputSerializer.dumps(items)
953 keyed = PipelinedRDD(self, add_shuffle_key)
954 keyed._bypass_serializer = True
955 with _JavaStackTrace(self.context) as st:
956 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
957 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
958 id(partitionFunc))
959 jrdd = pairRDD.partitionBy(partitioner).values()
960 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
961
962
963 rdd._partitionFunc = partitionFunc
964 return rdd
965
966
967 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
968 numPartitions=None):
969 """
970 Generic function to combine the elements for each key using a custom
971 set of aggregation functions.
972
973 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
974 type" C. Note that V and C can be different -- for example, one might
975 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
976
977 Users provide three functions:
978
979 - C{createCombiner}, which turns a V into a C (e.g., creates
980 a one-element list)
981 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
982 a list)
983 - C{mergeCombiners}, to combine two C's into a single one.
984
985 In addition, users can control the partitioning of the output RDD.
986
987 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
988 >>> def f(x): return x
989 >>> def add(a, b): return a + str(b)
990 >>> sorted(x.combineByKey(str, add, add).collect())
991 [('a', '11'), ('b', '1')]
992 """
993 if numPartitions is None:
994 numPartitions = self.ctx.defaultParallelism
995 def combineLocally(iterator):
996 combiners = {}
997 for x in iterator:
998 (k, v) = x
999 if k not in combiners:
1000 combiners[k] = createCombiner(v)
1001 else:
1002 combiners[k] = mergeValue(combiners[k], v)
1003 return combiners.iteritems()
1004 locally_combined = self.mapPartitions(combineLocally)
1005 shuffled = locally_combined.partitionBy(numPartitions)
1006 def _mergeCombiners(iterator):
1007 combiners = {}
1008 for (k, v) in iterator:
1009 if not k in combiners:
1010 combiners[k] = v
1011 else:
1012 combiners[k] = mergeCombiners(combiners[k], v)
1013 return combiners.iteritems()
1014 return shuffled.mapPartitions(_mergeCombiners)
1015
1016 - def foldByKey(self, zeroValue, func, numPartitions=None):
1017 """
1018 Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1019 which may be added to the result an arbitrary number of times, and must not change
1020 the result (e.g., 0 for addition, or 1 for multiplication.).
1021
1022 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1023 >>> from operator import add
1024 >>> rdd.foldByKey(0, add).collect()
1025 [('a', 2), ('b', 1)]
1026 """
1027 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1028
1029
1030
1032 """
1033 Group the values for each key in the RDD into a single sequence.
1034 Hash-partitions the resulting RDD with into numPartitions partitions.
1035
1036 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1037 >>> sorted(x.groupByKey().collect())
1038 [('a', [1, 1]), ('b', [1])]
1039 """
1040
1041 def createCombiner(x):
1042 return [x]
1043
1044 def mergeValue(xs, x):
1045 xs.append(x)
1046 return xs
1047
1048 def mergeCombiners(a, b):
1049 return a + b
1050
1051 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1052 numPartitions)
1053
1054
1056 """
1057 Pass each value in the key-value pair RDD through a flatMap function
1058 without changing the keys; this also retains the original RDD's
1059 partitioning.
1060 """
1061 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1062 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1063
1065 """
1066 Pass each value in the key-value pair RDD through a map function
1067 without changing the keys; this also retains the original RDD's
1068 partitioning.
1069 """
1070 map_values_fn = lambda (k, v): (k, f(v))
1071 return self.map(map_values_fn, preservesPartitioning=True)
1072
1073
1075 """
1076 Alias for cogroup.
1077 """
1078 return self.cogroup(other)
1079
1080
1081 - def cogroup(self, other, numPartitions=None):
1082 """
1083 For each key k in C{self} or C{other}, return a resulting RDD that
1084 contains a tuple with the list of values for that key in C{self} as well
1085 as C{other}.
1086
1087 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1088 >>> y = sc.parallelize([("a", 2)])
1089 >>> sorted(x.cogroup(y).collect())
1090 [('a', ([1], [2])), ('b', ([4], []))]
1091 """
1092 return python_cogroup(self, other, numPartitions)
1093
1095 """
1096 Return each (key, value) pair in C{self} that has no pair with matching key
1097 in C{other}.
1098
1099 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1100 >>> y = sc.parallelize([("a", 3), ("c", None)])
1101 >>> sorted(x.subtractByKey(y).collect())
1102 [('b', 4), ('b', 5)]
1103 """
1104 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
1105 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1106 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1107
1108 - def subtract(self, other, numPartitions=None):
1109 """
1110 Return each value in C{self} that is not contained in C{other}.
1111
1112 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1113 >>> y = sc.parallelize([("a", 3), ("c", None)])
1114 >>> sorted(x.subtract(y).collect())
1115 [('a', 1), ('b', 4), ('b', 5)]
1116 """
1117 rdd = other.map(lambda x: (x, True))
1118 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1119
1121 """
1122 Creates tuples of the elements in this RDD by applying C{f}.
1123
1124 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1125 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1126 >>> sorted(x.cogroup(y).collect())
1127 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1128 """
1129 return self.map(lambda x: (f(x), x))
1130
1132 """
1133 Return a new RDD that has exactly numPartitions partitions.
1134
1135 Can increase or decrease the level of parallelism in this RDD. Internally, this uses
1136 a shuffle to redistribute data.
1137 If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
1138 which can avoid performing a shuffle.
1139 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1140 >>> sorted(rdd.glom().collect())
1141 [[1], [2, 3], [4, 5], [6, 7]]
1142 >>> len(rdd.repartition(2).glom().collect())
1143 2
1144 >>> len(rdd.repartition(10).glom().collect())
1145 10
1146 """
1147 jrdd = self._jrdd.repartition(numPartitions)
1148 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1149
1150 - def coalesce(self, numPartitions, shuffle=False):
1151 """
1152 Return a new RDD that is reduced into `numPartitions` partitions.
1153 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1154 [[1], [2, 3], [4, 5]]
1155 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1156 [[1, 2, 3, 4, 5]]
1157 """
1158 jrdd = self._jrdd.coalesce(numPartitions)
1159 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1160
1161 - def zip(self, other):
1162 """
1163 Zips this RDD with another one, returning key-value pairs with the first element in each RDD
1164 second element in each RDD, etc. Assumes that the two RDDs have the same number of
1165 partitions and the same number of elements in each partition (e.g. one was made through
1166 a map on the other).
1167
1168 >>> x = sc.parallelize(range(0,5))
1169 >>> y = sc.parallelize(range(1000, 1005))
1170 >>> x.zip(y).collect()
1171 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1172 """
1173 pairRDD = self._jrdd.zip(other._jrdd)
1174 deserializer = PairDeserializer(self._jrdd_deserializer,
1175 other._jrdd_deserializer)
1176 return RDD(pairRDD, self.ctx, deserializer)
1177
1179 """
1180 Return the name of this RDD.
1181 """
1182 name_ = self._jrdd.name()
1183 if not name_:
1184 return None
1185 return name_.encode('utf-8')
1186
1188 """
1189 Assign a name to this RDD.
1190 >>> rdd1 = sc.parallelize([1,2])
1191 >>> rdd1.setName('RDD1')
1192 >>> rdd1.name()
1193 'RDD1'
1194 """
1195 self._jrdd.setName(name)
1196
1198 """
1199 A description of this RDD and its recursive dependencies for debugging.
1200 """
1201 debug_string = self._jrdd.toDebugString()
1202 if not debug_string:
1203 return None
1204 return debug_string.encode('utf-8')
1205
1207 """
1208 Get the RDD's current storage level.
1209 >>> rdd1 = sc.parallelize([1,2])
1210 >>> rdd1.getStorageLevel()
1211 StorageLevel(False, False, False, 1)
1212 """
1213 java_storage_level = self._jrdd.getStorageLevel()
1214 storage_level = StorageLevel(java_storage_level.useDisk(),
1215 java_storage_level.useMemory(),
1216 java_storage_level.deserialized(),
1217 java_storage_level.replication())
1218 return storage_level
1219
1227 """
1228 Pipelined maps:
1229 >>> rdd = sc.parallelize([1, 2, 3, 4])
1230 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1231 [4, 8, 12, 16]
1232 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1233 [4, 8, 12, 16]
1234
1235 Pipelined reduces:
1236 >>> from operator import add
1237 >>> rdd.map(lambda x: 2 * x).reduce(add)
1238 20
1239 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1240 20
1241 """
1242 - def __init__(self, prev, func, preservesPartitioning=False):
1243 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1244
1245 self.func = func
1246 self.preservesPartitioning = preservesPartitioning
1247 self._prev_jrdd = prev._jrdd
1248 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1249 else:
1250 prev_func = prev.func
1251 def pipeline_func(split, iterator):
1252 return func(split, prev_func(split, iterator))
1253 self.func = pipeline_func
1254 self.preservesPartitioning = \
1255 prev.preservesPartitioning and preservesPartitioning
1256 self._prev_jrdd = prev._prev_jrdd
1257 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1258 self.is_cached = False
1259 self.is_checkpointed = False
1260 self.ctx = prev.ctx
1261 self.prev = prev
1262 self._jrdd_val = None
1263 self._jrdd_deserializer = self.ctx.serializer
1264 self._bypass_serializer = False
1265
1266 @property
1268 if self._jrdd_val:
1269 return self._jrdd_val
1270 if self._bypass_serializer:
1271 serializer = NoOpSerializer()
1272 else:
1273 serializer = self.ctx.serializer
1274 command = (self.func, self._prev_jrdd_deserializer, serializer)
1275 pickled_command = CloudPickleSerializer().dumps(command)
1276 broadcast_vars = ListConverter().convert(
1277 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
1278 self.ctx._gateway._gateway_client)
1279 self.ctx._pickled_broadcast_vars.clear()
1280 class_tag = self._prev_jrdd.classTag()
1281 env = MapConverter().convert(self.ctx.environment,
1282 self.ctx._gateway._gateway_client)
1283 includes = ListConverter().convert(self.ctx._python_includes,
1284 self.ctx._gateway._gateway_client)
1285 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
1286 bytearray(pickled_command), env, includes, self.preservesPartitioning,
1287 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
1288 class_tag)
1289 self._jrdd_val = python_rdd.asJavaRDD()
1290 return self._jrdd_val
1291
1293 return not (self.is_cached or self.is_checkpointed)
1294
1297 import doctest
1298 from pyspark.context import SparkContext
1299 globs = globals().copy()
1300
1301
1302 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1303 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
1304 globs['sc'].stop()
1305 if failure_count:
1306 exit(-1)
1307
1308
1309 if __name__ == "__main__":
1310 _test()
1311