1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from itertools import chain, ifilter, imap
22 import operator
23 import os
24 import sys
25 import shlex
26 import traceback
27 from subprocess import Popen, PIPE
28 from tempfile import NamedTemporaryFile
29 from threading import Thread
30 import warnings
31
32 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
33 BatchedSerializer, CloudPickleSerializer, pack_long
34 from pyspark.join import python_join, python_left_outer_join, \
35 python_right_outer_join, python_cogroup
36 from pyspark.statcounter import StatCounter
37 from pyspark.rddsampler import RDDSampler
38
39 from py4j.java_collections import ListConverter, MapConverter
40
41
42 __all__ = ["RDD"]
45 tb = traceback.extract_stack()
46 if len(tb) == 0:
47 return "I'm lost!"
48
49
50
51 file, line, module, what = tb[len(tb) - 1]
52 sparkpath = os.path.dirname(file)
53 first_spark_frame = len(tb) - 1
54 for i in range(0, len(tb)):
55 file, line, fun, what = tb[i]
56 if file.startswith(sparkpath):
57 first_spark_frame = i
58 break
59 if first_spark_frame == 0:
60 file, line, fun, what = tb[0]
61 return "%s at %s:%d" % (fun, file, line)
62 sfile, sline, sfun, swhat = tb[first_spark_frame]
63 ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
64 return "%s at %s:%d" % (sfun, ufile, uline)
65
66 _spark_stack_depth = 0
70 self._traceback = _extract_concise_traceback()
71 self._context = sc
72
78
84
86 """
87 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
88 Represents an immutable, partitioned collection of elements that can be
89 operated on in parallel.
90 """
91
92 - def __init__(self, jrdd, ctx, jrdd_deserializer):
93 self._jrdd = jrdd
94 self.is_cached = False
95 self.is_checkpointed = False
96 self.ctx = ctx
97 self._jrdd_deserializer = jrdd_deserializer
98
100 return self._jrdd.toString()
101
102 @property
104 """
105 The L{SparkContext} that this RDD was created on.
106 """
107 return self.ctx
108
110 """
111 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
112 """
113 self.is_cached = True
114 self._jrdd.cache()
115 return self
116
118 """
119 Set this RDD's storage level to persist its values across operations after the first time
120 it is computed. This can only be used to assign a new storage level if the RDD does not
121 have a storage level set yet.
122 """
123 self.is_cached = True
124 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
125 self._jrdd.persist(javaStorageLevel)
126 return self
127
129 """
130 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
131 """
132 self.is_cached = False
133 self._jrdd.unpersist()
134 return self
135
137 """
138 Mark this RDD for checkpointing. It will be saved to a file inside the
139 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
140 all references to its parent RDDs will be removed. This function must
141 be called before any job has been executed on this RDD. It is strongly
142 recommended that this RDD is persisted in memory, otherwise saving it
143 on a file will require recomputation.
144 """
145 self.is_checkpointed = True
146 self._jrdd.rdd().checkpoint()
147
149 """
150 Return whether this RDD has been checkpointed or not
151 """
152 return self._jrdd.rdd().isCheckpointed()
153
155 """
156 Gets the name of the file to which this RDD was checkpointed
157 """
158 checkpointFile = self._jrdd.rdd().getCheckpointFile()
159 if checkpointFile.isDefined():
160 return checkpointFile.get()
161 else:
162 return None
163
164 - def map(self, f, preservesPartitioning=False):
165 """
166 Return a new RDD containing the distinct elements in this RDD.
167 """
168 def func(split, iterator): return imap(f, iterator)
169 return PipelinedRDD(self, func, preservesPartitioning)
170
171 - def flatMap(self, f, preservesPartitioning=False):
172 """
173 Return a new RDD by first applying a function to all elements of this
174 RDD, and then flattening the results.
175
176 >>> rdd = sc.parallelize([2, 3, 4])
177 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
178 [1, 1, 1, 2, 2, 3]
179 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
180 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
181 """
182 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
183 return self.mapPartitionsWithIndex(func, preservesPartitioning)
184
186 """
187 Return a new RDD by applying a function to each partition of this RDD.
188
189 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
190 >>> def f(iterator): yield sum(iterator)
191 >>> rdd.mapPartitions(f).collect()
192 [3, 7]
193 """
194 def func(s, iterator): return f(iterator)
195 return self.mapPartitionsWithIndex(func)
196
198 """
199 Return a new RDD by applying a function to each partition of this RDD,
200 while tracking the index of the original partition.
201
202 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
203 >>> def f(splitIndex, iterator): yield splitIndex
204 >>> rdd.mapPartitionsWithIndex(f).sum()
205 6
206 """
207 return PipelinedRDD(self, f, preservesPartitioning)
208
210 """
211 Deprecated: use mapPartitionsWithIndex instead.
212
213 Return a new RDD by applying a function to each partition of this RDD,
214 while tracking the index of the original partition.
215
216 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
217 >>> def f(splitIndex, iterator): yield splitIndex
218 >>> rdd.mapPartitionsWithSplit(f).sum()
219 6
220 """
221 warnings.warn("mapPartitionsWithSplit is deprecated; "
222 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
223 return self.mapPartitionsWithIndex(f, preservesPartitioning)
224
226 """
227 Return a new RDD containing only the elements that satisfy a predicate.
228
229 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
230 >>> rdd.filter(lambda x: x % 2 == 0).collect()
231 [2, 4]
232 """
233 def func(iterator): return ifilter(f, iterator)
234 return self.mapPartitions(func)
235
237 """
238 Return a new RDD containing the distinct elements in this RDD.
239
240 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
241 [1, 2, 3]
242 """
243 return self.map(lambda x: (x, None)) \
244 .reduceByKey(lambda x, _: x) \
245 .map(lambda (x, _): x)
246
247 - def sample(self, withReplacement, fraction, seed):
248 """
249 Return a sampled subset of this RDD (relies on numpy and falls back
250 on default random generator if numpy is unavailable).
251
252 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
253 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
254 """
255 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
256
257
258 - def takeSample(self, withReplacement, num, seed):
259 """
260 Return a fixed-size sampled subset of this RDD (currently requires numpy).
261
262 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
263 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
264 """
265
266 fraction = 0.0
267 total = 0
268 multiplier = 3.0
269 initialCount = self.count()
270 maxSelected = 0
271
272 if (num < 0):
273 raise ValueError
274
275 if initialCount > sys.maxint - 1:
276 maxSelected = sys.maxint - 1
277 else:
278 maxSelected = initialCount
279
280 if num > initialCount and not withReplacement:
281 total = maxSelected
282 fraction = multiplier * (maxSelected + 1) / initialCount
283 else:
284 fraction = multiplier * (num + 1) / initialCount
285 total = num
286
287 samples = self.sample(withReplacement, fraction, seed).collect()
288
289
290
291
292 while len(samples) < total:
293 if seed > sys.maxint - 2:
294 seed = -1
295 seed += 1
296 samples = self.sample(withReplacement, fraction, seed).collect()
297
298 sampler = RDDSampler(withReplacement, fraction, seed+1)
299 sampler.shuffle(samples)
300 return samples[0:total]
301
303 """
304 Return the union of this RDD and another one.
305
306 >>> rdd = sc.parallelize([1, 1, 2, 3])
307 >>> rdd.union(rdd).collect()
308 [1, 1, 2, 3, 1, 1, 2, 3]
309 """
310 if self._jrdd_deserializer == other._jrdd_deserializer:
311 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
312 self._jrdd_deserializer)
313 return rdd
314 else:
315
316
317 self_copy = self._reserialize()
318 other_copy = other._reserialize()
319 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
320 self.ctx.serializer)
321
323 if self._jrdd_deserializer == self.ctx.serializer:
324 return self
325 else:
326 return self.map(lambda x: x, preservesPartitioning=True)
327
329 """
330 Return the union of this RDD and another one.
331
332 >>> rdd = sc.parallelize([1, 1, 2, 3])
333 >>> (rdd + rdd).collect()
334 [1, 1, 2, 3, 1, 1, 2, 3]
335 """
336 if not isinstance(other, RDD):
337 raise TypeError
338 return self.union(other)
339
340 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
341 """
342 Sorts this RDD, which is assumed to consist of (key, value) pairs.
343
344 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
345 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
346 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
347 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
348 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
349 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
350 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
351 """
352 if numPartitions is None:
353 numPartitions = self.ctx.defaultParallelism
354
355 bounds = list()
356
357
358
359
360 if numPartitions > 1:
361 rddSize = self.count()
362 maxSampleSize = numPartitions * 20.0
363 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
364
365 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
366 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
367
368
369
370 for i in range(0, numPartitions - 1):
371 index = (len(samples) - 1) * (i + 1) / numPartitions
372 bounds.append(samples[index])
373
374 def rangePartitionFunc(k):
375 p = 0
376 while p < len(bounds) and keyfunc(k) > bounds[p]:
377 p += 1
378 if ascending:
379 return p
380 else:
381 return numPartitions-1-p
382
383 def mapFunc(iterator):
384 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
385
386 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
387 .mapPartitions(mapFunc,preservesPartitioning=True)
388 .flatMap(lambda x: x, preservesPartitioning=True))
389
391 """
392 Return an RDD created by coalescing all elements within each partition
393 into a list.
394
395 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
396 >>> sorted(rdd.glom().collect())
397 [[1, 2], [3, 4]]
398 """
399 def func(iterator): yield list(iterator)
400 return self.mapPartitions(func)
401
403 """
404 Return the Cartesian product of this RDD and another one, that is, the
405 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
406 C{b} is in C{other}.
407
408 >>> rdd = sc.parallelize([1, 2])
409 >>> sorted(rdd.cartesian(rdd).collect())
410 [(1, 1), (1, 2), (2, 1), (2, 2)]
411 """
412
413 deserializer = CartesianDeserializer(self._jrdd_deserializer,
414 other._jrdd_deserializer)
415 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
416
417 - def groupBy(self, f, numPartitions=None):
418 """
419 Return an RDD of grouped items.
420
421 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
422 >>> result = rdd.groupBy(lambda x: x % 2).collect()
423 >>> sorted([(x, sorted(y)) for (x, y) in result])
424 [(0, [2, 8]), (1, [1, 1, 3, 5])]
425 """
426 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
427
428 - def pipe(self, command, env={}):
429 """
430 Return an RDD created by piping elements to a forked external process.
431
432 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
433 ['1', '2', '3']
434 """
435 def func(iterator):
436 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
437 def pipe_objs(out):
438 for obj in iterator:
439 out.write(str(obj).rstrip('\n') + '\n')
440 out.close()
441 Thread(target=pipe_objs, args=[pipe.stdin]).start()
442 return (x.rstrip('\n') for x in pipe.stdout)
443 return self.mapPartitions(func)
444
446 """
447 Applies a function to all elements of this RDD.
448
449 >>> def f(x): print x
450 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
451 """
452 def processPartition(iterator):
453 for x in iterator:
454 f(x)
455 yield None
456 self.mapPartitions(processPartition).collect()
457
459 """
460 Return a list that contains all of the elements in this RDD.
461 """
462 with _JavaStackTrace(self.context) as st:
463 bytesInJava = self._jrdd.collect().iterator()
464 return list(self._collect_iterator_through_file(bytesInJava))
465
467
468
469
470 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
471 tempFile.close()
472 self.ctx._writeToFile(iterator, tempFile.name)
473
474 with open(tempFile.name, 'rb') as tempFile:
475 for item in self._jrdd_deserializer.load_stream(tempFile):
476 yield item
477 os.unlink(tempFile.name)
478
480 """
481 Reduces the elements of this RDD using the specified commutative and
482 associative binary operator.
483
484 >>> from operator import add
485 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
486 15
487 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
488 10
489 """
490 def func(iterator):
491 acc = None
492 for obj in iterator:
493 if acc is None:
494 acc = obj
495 else:
496 acc = f(obj, acc)
497 if acc is not None:
498 yield acc
499 vals = self.mapPartitions(func).collect()
500 return reduce(f, vals)
501
502 - def fold(self, zeroValue, op):
503 """
504 Aggregate the elements of each partition, and then the results for all
505 the partitions, using a given associative function and a neutral "zero
506 value."
507
508 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
509 as its result value to avoid object allocation; however, it should not
510 modify C{t2}.
511
512 >>> from operator import add
513 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
514 15
515 """
516 def func(iterator):
517 acc = zeroValue
518 for obj in iterator:
519 acc = op(obj, acc)
520 yield acc
521 vals = self.mapPartitions(func).collect()
522 return reduce(op, vals, zeroValue)
523
524
525
527 """
528 Add up the elements in this RDD.
529
530 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
531 6.0
532 """
533 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
534
536 """
537 Return the number of elements in this RDD.
538
539 >>> sc.parallelize([2, 3, 4]).count()
540 3
541 """
542 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
543
545 """
546 Return a L{StatCounter} object that captures the mean, variance
547 and count of the RDD's elements in one operation.
548 """
549 def redFunc(left_counter, right_counter):
550 return left_counter.mergeStats(right_counter)
551
552 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
553
555 """
556 Compute the mean of this RDD's elements.
557
558 >>> sc.parallelize([1, 2, 3]).mean()
559 2.0
560 """
561 return self.stats().mean()
562
564 """
565 Compute the variance of this RDD's elements.
566
567 >>> sc.parallelize([1, 2, 3]).variance()
568 0.666...
569 """
570 return self.stats().variance()
571
573 """
574 Compute the standard deviation of this RDD's elements.
575
576 >>> sc.parallelize([1, 2, 3]).stdev()
577 0.816...
578 """
579 return self.stats().stdev()
580
582 """
583 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
584 estimating the standard deviation by dividing by N-1 instead of N).
585
586 >>> sc.parallelize([1, 2, 3]).sampleStdev()
587 1.0
588 """
589 return self.stats().sampleStdev()
590
592 """
593 Compute the sample variance of this RDD's elements (which corrects for bias in
594 estimating the variance by dividing by N-1 instead of N).
595
596 >>> sc.parallelize([1, 2, 3]).sampleVariance()
597 1.0
598 """
599 return self.stats().sampleVariance()
600
602 """
603 Return the count of each unique value in this RDD as a dictionary of
604 (value, count) pairs.
605
606 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
607 [(1, 2), (2, 3)]
608 """
609 def countPartition(iterator):
610 counts = defaultdict(int)
611 for obj in iterator:
612 counts[obj] += 1
613 yield counts
614 def mergeMaps(m1, m2):
615 for (k, v) in m2.iteritems():
616 m1[k] += v
617 return m1
618 return self.mapPartitions(countPartition).reduce(mergeMaps)
619
620 - def take(self, num):
621 """
622 Take the first num elements of the RDD.
623
624 This currently scans the partitions *one by one*, so it will be slow if
625 a lot of partitions are required. In that case, use L{collect} to get
626 the whole RDD instead.
627
628 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
629 [2, 3]
630 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
631 [2, 3, 4, 5, 6]
632 """
633 def takeUpToNum(iterator):
634 taken = 0
635 while taken < num:
636 yield next(iterator)
637 taken += 1
638
639 mapped = self.mapPartitions(takeUpToNum)
640 items = []
641
642
643
644 with _JavaStackTrace(self.context) as st:
645 for partition in range(mapped._jrdd.splits().size()):
646 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
647 partitionsToTake[0] = partition
648 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
649 items.extend(mapped._collect_iterator_through_file(iterator))
650 if len(items) >= num:
651 break
652 return items[:num]
653
655 """
656 Return the first element in this RDD.
657
658 >>> sc.parallelize([2, 3, 4]).first()
659 2
660 """
661 return self.take(1)[0]
662
663 - def saveAsTextFile(self, path):
664 """
665 Save this RDD as a text file, using string representations of elements.
666
667 >>> tempFile = NamedTemporaryFile(delete=True)
668 >>> tempFile.close()
669 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
670 >>> from fileinput import input
671 >>> from glob import glob
672 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
673 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
674 """
675 def func(split, iterator):
676 for x in iterator:
677 if not isinstance(x, basestring):
678 x = unicode(x)
679 yield x.encode("utf-8")
680 keyed = PipelinedRDD(self, func)
681 keyed._bypass_serializer = True
682 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
683
684
685
687 """
688 Return the key-value pairs in this RDD to the master as a dictionary.
689
690 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
691 >>> m[1]
692 2
693 >>> m[3]
694 4
695 """
696 return dict(self.collect())
697
699 """
700 Merge the values for each key using an associative reduce function.
701
702 This will also perform the merging locally on each mapper before
703 sending results to a reducer, similarly to a "combiner" in MapReduce.
704
705 Output will be hash-partitioned with C{numPartitions} partitions, or
706 the default parallelism level if C{numPartitions} is not specified.
707
708 >>> from operator import add
709 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
710 >>> sorted(rdd.reduceByKey(add).collect())
711 [('a', 2), ('b', 1)]
712 """
713 return self.combineByKey(lambda x: x, func, func, numPartitions)
714
716 """
717 Merge the values for each key using an associative reduce function, but
718 return the results immediately to the master as a dictionary.
719
720 This will also perform the merging locally on each mapper before
721 sending results to a reducer, similarly to a "combiner" in MapReduce.
722
723 >>> from operator import add
724 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
725 >>> sorted(rdd.reduceByKeyLocally(add).items())
726 [('a', 2), ('b', 1)]
727 """
728 def reducePartition(iterator):
729 m = {}
730 for (k, v) in iterator:
731 m[k] = v if k not in m else func(m[k], v)
732 yield m
733 def mergeMaps(m1, m2):
734 for (k, v) in m2.iteritems():
735 m1[k] = v if k not in m1 else func(m1[k], v)
736 return m1
737 return self.mapPartitions(reducePartition).reduce(mergeMaps)
738
740 """
741 Count the number of elements for each key, and return the result to the
742 master as a dictionary.
743
744 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
745 >>> sorted(rdd.countByKey().items())
746 [('a', 2), ('b', 1)]
747 """
748 return self.map(lambda x: x[0]).countByValue()
749
750 - def join(self, other, numPartitions=None):
751 """
752 Return an RDD containing all pairs of elements with matching keys in
753 C{self} and C{other}.
754
755 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
756 (k, v1) is in C{self} and (k, v2) is in C{other}.
757
758 Performs a hash join across the cluster.
759
760 >>> x = sc.parallelize([("a", 1), ("b", 4)])
761 >>> y = sc.parallelize([("a", 2), ("a", 3)])
762 >>> sorted(x.join(y).collect())
763 [('a', (1, 2)), ('a', (1, 3))]
764 """
765 return python_join(self, other, numPartitions)
766
768 """
769 Perform a left outer join of C{self} and C{other}.
770
771 For each element (k, v) in C{self}, the resulting RDD will either
772 contain all pairs (k, (v, w)) for w in C{other}, or the pair
773 (k, (v, None)) if no elements in other have key k.
774
775 Hash-partitions the resulting RDD into the given number of partitions.
776
777 >>> x = sc.parallelize([("a", 1), ("b", 4)])
778 >>> y = sc.parallelize([("a", 2)])
779 >>> sorted(x.leftOuterJoin(y).collect())
780 [('a', (1, 2)), ('b', (4, None))]
781 """
782 return python_left_outer_join(self, other, numPartitions)
783
785 """
786 Perform a right outer join of C{self} and C{other}.
787
788 For each element (k, w) in C{other}, the resulting RDD will either
789 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
790 if no elements in C{self} have key k.
791
792 Hash-partitions the resulting RDD into the given number of partitions.
793
794 >>> x = sc.parallelize([("a", 1), ("b", 4)])
795 >>> y = sc.parallelize([("a", 2)])
796 >>> sorted(y.rightOuterJoin(x).collect())
797 [('a', (2, 1)), ('b', (None, 4))]
798 """
799 return python_right_outer_join(self, other, numPartitions)
800
801
802 - def partitionBy(self, numPartitions, partitionFunc=hash):
803 """
804 Return a copy of the RDD partitioned using the specified partitioner.
805
806 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
807 >>> sets = pairs.partitionBy(2).glom().collect()
808 >>> set(sets[0]).intersection(set(sets[1]))
809 set([])
810 """
811 if numPartitions is None:
812 numPartitions = self.ctx.defaultParallelism
813
814
815
816 outputSerializer = self.ctx._unbatched_serializer
817 def add_shuffle_key(split, iterator):
818
819 buckets = defaultdict(list)
820
821 for (k, v) in iterator:
822 buckets[partitionFunc(k) % numPartitions].append((k, v))
823 for (split, items) in buckets.iteritems():
824 yield pack_long(split)
825 yield outputSerializer.dumps(items)
826 keyed = PipelinedRDD(self, add_shuffle_key)
827 keyed._bypass_serializer = True
828 with _JavaStackTrace(self.context) as st:
829 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
830 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
831 id(partitionFunc))
832 jrdd = pairRDD.partitionBy(partitioner).values()
833 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
834
835
836 rdd._partitionFunc = partitionFunc
837 return rdd
838
839
840 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
841 numPartitions=None):
842 """
843 Generic function to combine the elements for each key using a custom
844 set of aggregation functions.
845
846 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
847 type" C. Note that V and C can be different -- for example, one might
848 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
849
850 Users provide three functions:
851
852 - C{createCombiner}, which turns a V into a C (e.g., creates
853 a one-element list)
854 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
855 a list)
856 - C{mergeCombiners}, to combine two C's into a single one.
857
858 In addition, users can control the partitioning of the output RDD.
859
860 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
861 >>> def f(x): return x
862 >>> def add(a, b): return a + str(b)
863 >>> sorted(x.combineByKey(str, add, add).collect())
864 [('a', '11'), ('b', '1')]
865 """
866 if numPartitions is None:
867 numPartitions = self.ctx.defaultParallelism
868 def combineLocally(iterator):
869 combiners = {}
870 for x in iterator:
871 (k, v) = x
872 if k not in combiners:
873 combiners[k] = createCombiner(v)
874 else:
875 combiners[k] = mergeValue(combiners[k], v)
876 return combiners.iteritems()
877 locally_combined = self.mapPartitions(combineLocally)
878 shuffled = locally_combined.partitionBy(numPartitions)
879 def _mergeCombiners(iterator):
880 combiners = {}
881 for (k, v) in iterator:
882 if not k in combiners:
883 combiners[k] = v
884 else:
885 combiners[k] = mergeCombiners(combiners[k], v)
886 return combiners.iteritems()
887 return shuffled.mapPartitions(_mergeCombiners)
888
889
891 """
892 Group the values for each key in the RDD into a single sequence.
893 Hash-partitions the resulting RDD with into numPartitions partitions.
894
895 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
896 >>> sorted(x.groupByKey().collect())
897 [('a', [1, 1]), ('b', [1])]
898 """
899
900 def createCombiner(x):
901 return [x]
902
903 def mergeValue(xs, x):
904 xs.append(x)
905 return xs
906
907 def mergeCombiners(a, b):
908 return a + b
909
910 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
911 numPartitions)
912
913
915 """
916 Pass each value in the key-value pair RDD through a flatMap function
917 without changing the keys; this also retains the original RDD's
918 partitioning.
919 """
920 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
921 return self.flatMap(flat_map_fn, preservesPartitioning=True)
922
924 """
925 Pass each value in the key-value pair RDD through a map function
926 without changing the keys; this also retains the original RDD's
927 partitioning.
928 """
929 map_values_fn = lambda (k, v): (k, f(v))
930 return self.map(map_values_fn, preservesPartitioning=True)
931
932
934 """
935 Alias for cogroup.
936 """
937 return self.cogroup(other)
938
939
940 - def cogroup(self, other, numPartitions=None):
941 """
942 For each key k in C{self} or C{other}, return a resulting RDD that
943 contains a tuple with the list of values for that key in C{self} as well
944 as C{other}.
945
946 >>> x = sc.parallelize([("a", 1), ("b", 4)])
947 >>> y = sc.parallelize([("a", 2)])
948 >>> sorted(x.cogroup(y).collect())
949 [('a', ([1], [2])), ('b', ([4], []))]
950 """
951 return python_cogroup(self, other, numPartitions)
952
954 """
955 Return each (key, value) pair in C{self} that has no pair with matching key
956 in C{other}.
957
958 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
959 >>> y = sc.parallelize([("a", 3), ("c", None)])
960 >>> sorted(x.subtractByKey(y).collect())
961 [('b', 4), ('b', 5)]
962 """
963 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
964 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
965 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
966
967 - def subtract(self, other, numPartitions=None):
968 """
969 Return each value in C{self} that is not contained in C{other}.
970
971 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
972 >>> y = sc.parallelize([("a", 3), ("c", None)])
973 >>> sorted(x.subtract(y).collect())
974 [('a', 1), ('b', 4), ('b', 5)]
975 """
976 rdd = other.map(lambda x: (x, True))
977 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
978
980 """
981 Creates tuples of the elements in this RDD by applying C{f}.
982
983 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
984 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
985 >>> sorted(x.cogroup(y).collect())
986 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
987 """
988 return self.map(lambda x: (f(x), x))
989
997 """
998 Pipelined maps:
999 >>> rdd = sc.parallelize([1, 2, 3, 4])
1000 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1001 [4, 8, 12, 16]
1002 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1003 [4, 8, 12, 16]
1004
1005 Pipelined reduces:
1006 >>> from operator import add
1007 >>> rdd.map(lambda x: 2 * x).reduce(add)
1008 20
1009 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1010 20
1011 """
1012 - def __init__(self, prev, func, preservesPartitioning=False):
1013 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1014
1015 self.func = func
1016 self.preservesPartitioning = preservesPartitioning
1017 self._prev_jrdd = prev._jrdd
1018 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1019 else:
1020 prev_func = prev.func
1021 def pipeline_func(split, iterator):
1022 return func(split, prev_func(split, iterator))
1023 self.func = pipeline_func
1024 self.preservesPartitioning = \
1025 prev.preservesPartitioning and preservesPartitioning
1026 self._prev_jrdd = prev._prev_jrdd
1027 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1028 self.is_cached = False
1029 self.is_checkpointed = False
1030 self.ctx = prev.ctx
1031 self.prev = prev
1032 self._jrdd_val = None
1033 self._jrdd_deserializer = self.ctx.serializer
1034 self._bypass_serializer = False
1035
1036 @property
1038 if self._jrdd_val:
1039 return self._jrdd_val
1040 if self._bypass_serializer:
1041 serializer = NoOpSerializer()
1042 else:
1043 serializer = self.ctx.serializer
1044 command = (self.func, self._prev_jrdd_deserializer, serializer)
1045 pickled_command = CloudPickleSerializer().dumps(command)
1046 broadcast_vars = ListConverter().convert(
1047 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
1048 self.ctx._gateway._gateway_client)
1049 self.ctx._pickled_broadcast_vars.clear()
1050 class_tag = self._prev_jrdd.classTag()
1051 env = MapConverter().convert(self.ctx.environment,
1052 self.ctx._gateway._gateway_client)
1053 includes = ListConverter().convert(self.ctx._python_includes,
1054 self.ctx._gateway._gateway_client)
1055 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
1056 bytearray(pickled_command), env, includes, self.preservesPartitioning,
1057 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
1058 class_tag)
1059 self._jrdd_val = python_rdd.asJavaRDD()
1060 return self._jrdd_val
1061
1063 return not (self.is_cached or self.is_checkpointed)
1064
1067 import doctest
1068 from pyspark.context import SparkContext
1069 globs = globals().copy()
1070
1071
1072 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1073 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
1074 globs['sc'].stop()
1075 if failure_count:
1076 exit(-1)
1077
1078
1079 if __name__ == "__main__":
1080 _test()
1081