1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from collections import namedtuple
22 from itertools import chain, ifilter, imap
23 import operator
24 import os
25 import sys
26 import shlex
27 import traceback
28 from subprocess import Popen, PIPE
29 from tempfile import NamedTemporaryFile
30 from threading import Thread
31 import warnings
32 import heapq
33 import bisect
34 from random import Random
35 from math import sqrt, log, isinf, isnan
36
37 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
38 BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
39 PickleSerializer, pack_long, CompressedSerializer
40 from pyspark.join import python_join, python_left_outer_join, \
41 python_right_outer_join, python_cogroup
42 from pyspark.statcounter import StatCounter
43 from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
44 from pyspark.storagelevel import StorageLevel
45 from pyspark.resultiterable import ResultIterable
46 from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
47 get_used_memory
48
49 from py4j.java_collections import ListConverter, MapConverter
50
51 __all__ = ["RDD"]
57 """
58 This function returns consistent hash code for builtin types, especially
59 for None and tuple with None.
60
61 The algrithm is similar to that one used by CPython 2.7
62
63 >>> portable_hash(None)
64 0
65 >>> portable_hash((None, 1))
66 219750521
67 """
68 if x is None:
69 return 0
70 if isinstance(x, tuple):
71 h = 0x345678
72 for i in x:
73 h ^= portable_hash(i)
74 h *= 1000003
75 h &= 0xffffffff
76 h ^= len(x)
77 if h == -1:
78 h = -2
79 return h
80 return hash(x)
81
84 """
85 This function returns the traceback info for a callsite, returns a dict
86 with function name, file name and line number
87 """
88 tb = traceback.extract_stack()
89 callsite = namedtuple("Callsite", "function file linenum")
90 if len(tb) == 0:
91 return None
92 file, line, module, what = tb[len(tb) - 1]
93 sparkpath = os.path.dirname(file)
94 first_spark_frame = len(tb) - 1
95 for i in range(0, len(tb)):
96 file, line, fun, what = tb[i]
97 if file.startswith(sparkpath):
98 first_spark_frame = i
99 break
100 if first_spark_frame == 0:
101 file, line, fun, what = tb[0]
102 return callsite(function=fun, file=file, linenum=line)
103 sfile, sline, sfun, swhat = tb[first_spark_frame]
104 ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
105 return callsite(function=sfun, file=ufile, linenum=uline)
106
107 _spark_stack_depth = 0
111
113 tb = _extract_concise_traceback()
114 if tb is not None:
115 self._traceback = "%s at %s:%s" % (
116 tb.function, tb.file, tb.linenum)
117 else:
118 self._traceback = "Error! Could not extract traceback info"
119 self._context = sc
120
126
132
135
136 """
137 An implementation of MaxHeap.
138
139 >>> import pyspark.rdd
140 >>> heap = pyspark.rdd.MaxHeapQ(5)
141 >>> [heap.insert(i) for i in range(10)]
142 [None, None, None, None, None, None, None, None, None, None]
143 >>> sorted(heap.getElements())
144 [0, 1, 2, 3, 4]
145 >>> heap = pyspark.rdd.MaxHeapQ(5)
146 >>> [heap.insert(i) for i in range(9, -1, -1)]
147 [None, None, None, None, None, None, None, None, None, None]
148 >>> sorted(heap.getElements())
149 [0, 1, 2, 3, 4]
150 >>> heap = pyspark.rdd.MaxHeapQ(1)
151 >>> [heap.insert(i) for i in range(9, -1, -1)]
152 [None, None, None, None, None, None, None, None, None, None]
153 >>> heap.getElements()
154 [0]
155 """
156
158
159 self.q = [0]
160 self.maxsize = maxsize
161
163 while (k > 1) and (self.q[k / 2] < self.q[k]):
164 self._swap(k, k / 2)
165 k = k / 2
166
168 t = self.q[i]
169 self.q[i] = self.q[j]
170 self.q[j] = t
171
173 N = self.size()
174 while 2 * k <= N:
175 j = 2 * k
176
177
178 if j < N and self.q[j] < self.q[j + 1]:
179 j = j + 1
180 if(self.q[k] > self.q[j]):
181 break
182 self._swap(k, j)
183 k = j
184
186 return len(self.q) - 1
187
189 if (self.size()) < self.maxsize:
190 self.q.append(value)
191 self._swim(self.size())
192 else:
193 self._replaceRoot(value)
194
197
199 if(self.q[1] > value):
200 self.q[1] = value
201 self._sink(1)
202
205 """
206 Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
207 return the value in MB
208
209 >>> _parse_memory("256m")
210 256
211 >>> _parse_memory("2g")
212 2048
213 """
214 units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
215 if s[-1] not in units:
216 raise ValueError("invalid format: " + s)
217 return int(float(s[:-1]) * units[s[-1].lower()])
218
219
220 -class RDD(object):
221
222 """
223 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
224 Represents an immutable, partitioned collection of elements that can be
225 operated on in parallel.
226 """
227
228 - def __init__(self, jrdd, ctx, jrdd_deserializer):
229 self._jrdd = jrdd
230 self.is_cached = False
231 self.is_checkpointed = False
232 self.ctx = ctx
233 self._jrdd_deserializer = jrdd_deserializer
234 self._id = jrdd.id()
235
242
244 """
245 A unique ID for this RDD (within its SparkContext).
246 """
247 return self._id
248
250 return self._jrdd.toString()
251
252 @property
254 """
255 The L{SparkContext} that this RDD was created on.
256 """
257 return self.ctx
258
260 """
261 Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}).
262 """
263 self.is_cached = True
264 self.persist(StorageLevel.MEMORY_ONLY_SER)
265 return self
266
268 """
269 Set this RDD's storage level to persist its values across operations
270 after the first time it is computed. This can only be used to assign
271 a new storage level if the RDD does not have a storage level set yet.
272 """
273 self.is_cached = True
274 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
275 self._jrdd.persist(javaStorageLevel)
276 return self
277
279 """
280 Mark the RDD as non-persistent, and remove all blocks for it from
281 memory and disk.
282 """
283 self.is_cached = False
284 self._jrdd.unpersist()
285 return self
286
288 """
289 Mark this RDD for checkpointing. It will be saved to a file inside the
290 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
291 all references to its parent RDDs will be removed. This function must
292 be called before any job has been executed on this RDD. It is strongly
293 recommended that this RDD is persisted in memory, otherwise saving it
294 on a file will require recomputation.
295 """
296 self.is_checkpointed = True
297 self._jrdd.rdd().checkpoint()
298
300 """
301 Return whether this RDD has been checkpointed or not
302 """
303 return self._jrdd.rdd().isCheckpointed()
304
306 """
307 Gets the name of the file to which this RDD was checkpointed
308 """
309 checkpointFile = self._jrdd.rdd().getCheckpointFile()
310 if checkpointFile.isDefined():
311 return checkpointFile.get()
312 else:
313 return None
314
315 - def map(self, f, preservesPartitioning=False):
316 """
317 Return a new RDD by applying a function to each element of this RDD.
318
319 >>> rdd = sc.parallelize(["b", "a", "c"])
320 >>> sorted(rdd.map(lambda x: (x, 1)).collect())
321 [('a', 1), ('b', 1), ('c', 1)]
322 """
323 def func(_, iterator):
324 return imap(f, iterator)
325 return self.mapPartitionsWithIndex(func, preservesPartitioning)
326
327 - def flatMap(self, f, preservesPartitioning=False):
328 """
329 Return a new RDD by first applying a function to all elements of this
330 RDD, and then flattening the results.
331
332 >>> rdd = sc.parallelize([2, 3, 4])
333 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
334 [1, 1, 1, 2, 2, 3]
335 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
336 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
337 """
338 def func(s, iterator):
339 return chain.from_iterable(imap(f, iterator))
340 return self.mapPartitionsWithIndex(func, preservesPartitioning)
341
343 """
344 Return a new RDD by applying a function to each partition of this RDD.
345
346 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
347 >>> def f(iterator): yield sum(iterator)
348 >>> rdd.mapPartitions(f).collect()
349 [3, 7]
350 """
351 def func(s, iterator):
352 return f(iterator)
353 return self.mapPartitionsWithIndex(func)
354
356 """
357 Return a new RDD by applying a function to each partition of this RDD,
358 while tracking the index of the original partition.
359
360 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
361 >>> def f(splitIndex, iterator): yield splitIndex
362 >>> rdd.mapPartitionsWithIndex(f).sum()
363 6
364 """
365 return PipelinedRDD(self, f, preservesPartitioning)
366
368 """
369 Deprecated: use mapPartitionsWithIndex instead.
370
371 Return a new RDD by applying a function to each partition of this RDD,
372 while tracking the index of the original partition.
373
374 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
375 >>> def f(splitIndex, iterator): yield splitIndex
376 >>> rdd.mapPartitionsWithSplit(f).sum()
377 6
378 """
379 warnings.warn("mapPartitionsWithSplit is deprecated; "
380 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
381 return self.mapPartitionsWithIndex(f, preservesPartitioning)
382
384 """
385 Returns the number of partitions in RDD
386
387 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
388 >>> rdd.getNumPartitions()
389 2
390 """
391 return self._jrdd.partitions().size()
392
394 """
395 Return a new RDD containing only the elements that satisfy a predicate.
396
397 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
398 >>> rdd.filter(lambda x: x % 2 == 0).collect()
399 [2, 4]
400 """
401 def func(iterator):
402 return ifilter(f, iterator)
403 return self.mapPartitions(func)
404
406 """
407 Return a new RDD containing the distinct elements in this RDD.
408
409 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
410 [1, 2, 3]
411 """
412 return self.map(lambda x: (x, None)) \
413 .reduceByKey(lambda x, _: x) \
414 .map(lambda (x, _): x)
415
416 - def sample(self, withReplacement, fraction, seed=None):
417 """
418 Return a sampled subset of this RDD (relies on numpy and falls back
419 on default random generator if numpy is unavailable).
420 """
421 assert fraction >= 0.0, "Negative fraction value: %s" % fraction
422 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
423
424
425 - def takeSample(self, withReplacement, num, seed=None):
426 """
427 Return a fixed-size sampled subset of this RDD (currently requires
428 numpy).
429
430 >>> rdd = sc.parallelize(range(0, 10))
431 >>> len(rdd.takeSample(True, 20, 1))
432 20
433 >>> len(rdd.takeSample(False, 5, 2))
434 5
435 >>> len(rdd.takeSample(False, 15, 3))
436 10
437 """
438 numStDev = 10.0
439
440 if num < 0:
441 raise ValueError("Sample size cannot be negative.")
442 elif num == 0:
443 return []
444
445 initialCount = self.count()
446 if initialCount == 0:
447 return []
448
449 rand = Random(seed)
450
451 if (not withReplacement) and num >= initialCount:
452
453 samples = self.collect()
454 rand.shuffle(samples)
455 return samples
456
457 maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
458 if num > maxSampleSize:
459 raise ValueError(
460 "Sample size cannot be greater than %d." % maxSampleSize)
461
462 fraction = RDD._computeFractionForSampleSize(
463 num, initialCount, withReplacement)
464 samples = self.sample(withReplacement, fraction, seed).collect()
465
466
467
468
469 while len(samples) < num:
470
471 seed = rand.randint(0, sys.maxint)
472 samples = self.sample(withReplacement, fraction, seed).collect()
473
474 rand.shuffle(samples)
475
476 return samples[0:num]
477
478 @staticmethod
480 """
481 Returns a sampling rate that guarantees a sample of
482 size >= sampleSizeLowerBound 99.99% of the time.
483
484 How the sampling rate is determined:
485 Let p = num / total, where num is the sample size and total is the
486 total number of data points in the RDD. We're trying to compute
487 q > p such that
488 - when sampling with replacement, we're drawing each data point
489 with prob_i ~ Pois(q), where we want to guarantee
490 Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
491 total), i.e. the failure rate of not having a sufficiently large
492 sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
493 to guarantee 0.9999 success rate for num > 12, but we need a
494 slightly larger q (9 empirically determined).
495 - when sampling without replacement, we're drawing each data point
496 with prob_i ~ Binomial(total, fraction) and our choice of q
497 guarantees 1-delta, or 0.9999 success rate, where success rate is
498 defined the same as in sampling with replacement.
499 """
500 fraction = float(sampleSizeLowerBound) / total
501 if withReplacement:
502 numStDev = 5
503 if (sampleSizeLowerBound < 12):
504 numStDev = 9
505 return fraction + numStDev * sqrt(fraction / total)
506 else:
507 delta = 0.00005
508 gamma = - log(delta) / total
509 return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
510
512 """
513 Return the union of this RDD and another one.
514
515 >>> rdd = sc.parallelize([1, 1, 2, 3])
516 >>> rdd.union(rdd).collect()
517 [1, 1, 2, 3, 1, 1, 2, 3]
518 """
519 if self._jrdd_deserializer == other._jrdd_deserializer:
520 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
521 self._jrdd_deserializer)
522 return rdd
523 else:
524
525
526 self_copy = self._reserialize()
527 other_copy = other._reserialize()
528 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
529 self.ctx.serializer)
530
532 """
533 Return the intersection of this RDD and another one. The output will
534 not contain any duplicate elements, even if the input RDDs did.
535
536 Note that this method performs a shuffle internally.
537
538 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
539 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
540 >>> rdd1.intersection(rdd2).collect()
541 [1, 2, 3]
542 """
543 return self.map(lambda v: (v, None)) \
544 .cogroup(other.map(lambda v: (v, None))) \
545 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
546 .keys()
547
549 serializer = serializer or self.ctx.serializer
550 if self._jrdd_deserializer == serializer:
551 return self
552 else:
553 converted = self.map(lambda x: x, preservesPartitioning=True)
554 converted._jrdd_deserializer = serializer
555 return converted
556
558 """
559 Return the union of this RDD and another one.
560
561 >>> rdd = sc.parallelize([1, 1, 2, 3])
562 >>> (rdd + rdd).collect()
563 [1, 1, 2, 3, 1, 1, 2, 3]
564 """
565 if not isinstance(other, RDD):
566 raise TypeError
567 return self.union(other)
568
569 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
570 """
571 Sorts this RDD, which is assumed to consist of (key, value) pairs.
572 # noqa
573
574 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
575 >>> sc.parallelize(tmp).sortByKey().first()
576 ('1', 3)
577 >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
578 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
579 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
580 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
581 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
582 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
583 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
584 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
585 """
586 if numPartitions is None:
587 numPartitions = self._defaultReducePartitions()
588
589 def sortPartition(iterator):
590 return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending))
591
592 if numPartitions == 1:
593 if self.getNumPartitions() > 1:
594 self = self.coalesce(1)
595 return self.mapPartitions(sortPartition)
596
597
598
599
600 rddSize = self.count()
601 if not rddSize:
602 return self
603 maxSampleSize = numPartitions * 20.0
604 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
605 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
606 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
607
608
609
610 bounds = [samples[len(samples) * (i + 1) / numPartitions]
611 for i in range(0, numPartitions - 1)]
612
613 def rangePartitioner(k):
614 p = bisect.bisect_left(bounds, keyfunc(k))
615 if ascending:
616 return p
617 else:
618 return numPartitions - 1 - p
619
620 return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
621
622 - def sortBy(self, keyfunc, ascending=True, numPartitions=None):
623 """
624 Sorts this RDD by the given keyfunc
625
626 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
627 >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
628 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
629 >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
630 [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
631 """
632 return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
633
635 """
636 Return an RDD created by coalescing all elements within each partition
637 into a list.
638
639 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
640 >>> sorted(rdd.glom().collect())
641 [[1, 2], [3, 4]]
642 """
643 def func(iterator):
644 yield list(iterator)
645 return self.mapPartitions(func)
646
648 """
649 Return the Cartesian product of this RDD and another one, that is, the
650 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
651 C{b} is in C{other}.
652
653 >>> rdd = sc.parallelize([1, 2])
654 >>> sorted(rdd.cartesian(rdd).collect())
655 [(1, 1), (1, 2), (2, 1), (2, 2)]
656 """
657
658 deserializer = CartesianDeserializer(self._jrdd_deserializer,
659 other._jrdd_deserializer)
660 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
661
662 - def groupBy(self, f, numPartitions=None):
663 """
664 Return an RDD of grouped items.
665
666 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
667 >>> result = rdd.groupBy(lambda x: x % 2).collect()
668 >>> sorted([(x, sorted(y)) for (x, y) in result])
669 [(0, [2, 8]), (1, [1, 1, 3, 5])]
670 """
671 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
672
673 - def pipe(self, command, env={}):
674 """
675 Return an RDD created by piping elements to a forked external process.
676
677 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
678 ['1', '2', '', '3']
679 """
680 def func(iterator):
681 pipe = Popen(
682 shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
683
684 def pipe_objs(out):
685 for obj in iterator:
686 out.write(str(obj).rstrip('\n') + '\n')
687 out.close()
688 Thread(target=pipe_objs, args=[pipe.stdin]).start()
689 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
690 return self.mapPartitions(func)
691
693 """
694 Applies a function to all elements of this RDD.
695
696 >>> def f(x): print x
697 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
698 """
699 def processPartition(iterator):
700 for x in iterator:
701 f(x)
702 yield None
703 self.mapPartitions(processPartition).collect()
704
706 """
707 Applies a function to each partition of this RDD.
708
709 >>> def f(iterator):
710 ... for x in iterator:
711 ... print x
712 ... yield None
713 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
714 """
715 self.mapPartitions(f).collect()
716
718 """
719 Return a list that contains all of the elements in this RDD.
720 """
721 with _JavaStackTrace(self.context) as st:
722 bytesInJava = self._jrdd.collect().iterator()
723 return list(self._collect_iterator_through_file(bytesInJava))
724
726
727
728
729 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
730 tempFile.close()
731 self.ctx._writeToFile(iterator, tempFile.name)
732
733 with open(tempFile.name, 'rb') as tempFile:
734 for item in self._jrdd_deserializer.load_stream(tempFile):
735 yield item
736 os.unlink(tempFile.name)
737
739 """
740 Reduces the elements of this RDD using the specified commutative and
741 associative binary operator. Currently reduces partitions locally.
742
743 >>> from operator import add
744 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
745 15
746 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
747 10
748 """
749 def func(iterator):
750 acc = None
751 for obj in iterator:
752 if acc is None:
753 acc = obj
754 else:
755 acc = f(obj, acc)
756 if acc is not None:
757 yield acc
758 vals = self.mapPartitions(func).collect()
759 return reduce(f, vals)
760
761 - def fold(self, zeroValue, op):
762 """
763 Aggregate the elements of each partition, and then the results for all
764 the partitions, using a given associative function and a neutral "zero
765 value."
766
767 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
768 as its result value to avoid object allocation; however, it should not
769 modify C{t2}.
770
771 >>> from operator import add
772 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
773 15
774 """
775 def func(iterator):
776 acc = zeroValue
777 for obj in iterator:
778 acc = op(obj, acc)
779 yield acc
780 vals = self.mapPartitions(func).collect()
781 return reduce(op, vals, zeroValue)
782
783 - def aggregate(self, zeroValue, seqOp, combOp):
784 """
785 Aggregate the elements of each partition, and then the results for all
786 the partitions, using a given combine functions and a neutral "zero
787 value."
788
789 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
790 as its result value to avoid object allocation; however, it should not
791 modify C{t2}.
792
793 The first function (seqOp) can return a different result type, U, than
794 the type of this RDD. Thus, we need one operation for merging a T into
795 an U and one operation for merging two U
796
797 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
798 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
799 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
800 (10, 4)
801 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
802 (0, 0)
803 """
804 def func(iterator):
805 acc = zeroValue
806 for obj in iterator:
807 acc = seqOp(acc, obj)
808 yield acc
809
810 return self.mapPartitions(func).fold(zeroValue, combOp)
811
813 """
814 Find the maximum item in this RDD.
815
816 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
817 43.0
818 """
819 return self.reduce(max)
820
822 """
823 Find the minimum item in this RDD.
824
825 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
826 1.0
827 """
828 return self.reduce(min)
829
831 """
832 Add up the elements in this RDD.
833
834 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
835 6.0
836 """
837 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
838
840 """
841 Return the number of elements in this RDD.
842
843 >>> sc.parallelize([2, 3, 4]).count()
844 3
845 """
846 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
847
849 """
850 Return a L{StatCounter} object that captures the mean, variance
851 and count of the RDD's elements in one operation.
852 """
853 def redFunc(left_counter, right_counter):
854 return left_counter.mergeStats(right_counter)
855
856 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
857
859 """
860 Compute a histogram using the provided buckets. The buckets
861 are all open to the right except for the last which is closed.
862 e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
863 which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
864 and 50 we would have a histogram of 1,0,1.
865
866 If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
867 this can be switched from an O(log n) inseration to O(1) per
868 element(where n = # buckets).
869
870 Buckets must be sorted and not contain any duplicates, must be
871 at least two elements.
872
873 If `buckets` is a number, it will generates buckets which are
874 evenly spaced between the minimum and maximum of the RDD. For
875 example, if the min value is 0 and the max is 100, given buckets
876 as 2, the resulting buckets will be [0,50) [50,100]. buckets must
877 be at least 1 If the RDD contains infinity, NaN throws an exception
878 If the elements in RDD do not vary (max == min) always returns
879 a single bucket.
880
881 It will return an tuple of buckets and histogram.
882
883 >>> rdd = sc.parallelize(range(51))
884 >>> rdd.histogram(2)
885 ([0, 25, 50], [25, 26])
886 >>> rdd.histogram([0, 5, 25, 50])
887 ([0, 5, 25, 50], [5, 20, 26])
888 >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
889 ([0, 15, 30, 45, 60], [15, 15, 15, 6])
890 >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
891 >>> rdd.histogram(("a", "b", "c"))
892 (('a', 'b', 'c'), [2, 2])
893 """
894
895 if isinstance(buckets, (int, long)):
896 if buckets < 1:
897 raise ValueError("number of buckets must be >= 1")
898
899
900 def comparable(x):
901 if x is None:
902 return False
903 if type(x) is float and isnan(x):
904 return False
905 return True
906
907 filtered = self.filter(comparable)
908
909
910 def minmax(a, b):
911 return min(a[0], b[0]), max(a[1], b[1])
912 try:
913 minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
914 except TypeError as e:
915 if " empty " in str(e):
916 raise ValueError("can not generate buckets from empty RDD")
917 raise
918
919 if minv == maxv or buckets == 1:
920 return [minv, maxv], [filtered.count()]
921
922 try:
923 inc = (maxv - minv) / buckets
924 except TypeError:
925 raise TypeError("Can not generate buckets with non-number in RDD")
926
927 if isinf(inc):
928 raise ValueError("Can not generate buckets with infinite value")
929
930
931 if inc * buckets != maxv - minv:
932 inc = (maxv - minv) * 1.0 / buckets
933
934 buckets = [i * inc + minv for i in range(buckets)]
935 buckets.append(maxv)
936 even = True
937
938 elif isinstance(buckets, (list, tuple)):
939 if len(buckets) < 2:
940 raise ValueError("buckets should have more than one value")
941
942 if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
943 raise ValueError("can not have None or NaN in buckets")
944
945 if sorted(buckets) != list(buckets):
946 raise ValueError("buckets should be sorted")
947
948 if len(set(buckets)) != len(buckets):
949 raise ValueError("buckets should not contain duplicated values")
950
951 minv = buckets[0]
952 maxv = buckets[-1]
953 even = False
954 inc = None
955 try:
956 steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
957 except TypeError:
958 pass
959 else:
960 if max(steps) - min(steps) < 1e-10:
961 even = True
962 inc = (maxv - minv) / (len(buckets) - 1)
963
964 else:
965 raise TypeError("buckets should be a list or tuple or number(int or long)")
966
967 def histogram(iterator):
968 counters = [0] * len(buckets)
969 for i in iterator:
970 if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
971 continue
972 t = (int((i - minv) / inc) if even
973 else bisect.bisect_right(buckets, i) - 1)
974 counters[t] += 1
975
976 last = counters.pop()
977 counters[-1] += last
978 return [counters]
979
980 def mergeCounters(a, b):
981 return [i + j for i, j in zip(a, b)]
982
983 return buckets, self.mapPartitions(histogram).reduce(mergeCounters)
984
986 """
987 Compute the mean of this RDD's elements.
988
989 >>> sc.parallelize([1, 2, 3]).mean()
990 2.0
991 """
992 return self.stats().mean()
993
995 """
996 Compute the variance of this RDD's elements.
997
998 >>> sc.parallelize([1, 2, 3]).variance()
999 0.666...
1000 """
1001 return self.stats().variance()
1002
1004 """
1005 Compute the standard deviation of this RDD's elements.
1006
1007 >>> sc.parallelize([1, 2, 3]).stdev()
1008 0.816...
1009 """
1010 return self.stats().stdev()
1011
1013 """
1014 Compute the sample standard deviation of this RDD's elements (which
1015 corrects for bias in estimating the standard deviation by dividing by
1016 N-1 instead of N).
1017
1018 >>> sc.parallelize([1, 2, 3]).sampleStdev()
1019 1.0
1020 """
1021 return self.stats().sampleStdev()
1022
1024 """
1025 Compute the sample variance of this RDD's elements (which corrects
1026 for bias in estimating the variance by dividing by N-1 instead of N).
1027
1028 >>> sc.parallelize([1, 2, 3]).sampleVariance()
1029 1.0
1030 """
1031 return self.stats().sampleVariance()
1032
1034 """
1035 Return the count of each unique value in this RDD as a dictionary of
1036 (value, count) pairs.
1037
1038 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
1039 [(1, 2), (2, 3)]
1040 """
1041 def countPartition(iterator):
1042 counts = defaultdict(int)
1043 for obj in iterator:
1044 counts[obj] += 1
1045 yield counts
1046
1047 def mergeMaps(m1, m2):
1048 for (k, v) in m2.iteritems():
1049 m1[k] += v
1050 return m1
1051 return self.mapPartitions(countPartition).reduce(mergeMaps)
1052
1053 - def top(self, num):
1054 """
1055 Get the top N elements from a RDD.
1056
1057 Note: It returns the list sorted in descending order.
1058 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
1059 [12]
1060 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
1061 [6, 5]
1062 """
1063 def topIterator(iterator):
1064 q = []
1065 for k in iterator:
1066 if len(q) < num:
1067 heapq.heappush(q, k)
1068 else:
1069 heapq.heappushpop(q, k)
1070 yield q
1071
1072 def merge(a, b):
1073 return next(topIterator(a + b))
1074
1075 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
1076
1078 """
1079 Get the N elements from a RDD ordered in ascending order or as
1080 specified by the optional key function.
1081
1082 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
1083 [1, 2, 3, 4, 5, 6]
1084 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
1085 [10, 9, 7, 6, 5, 4]
1086 """
1087
1088 def topNKeyedElems(iterator, key_=None):
1089 q = MaxHeapQ(num)
1090 for k in iterator:
1091 if key_ is not None:
1092 k = (key_(k), k)
1093 q.insert(k)
1094 yield q.getElements()
1095
1096 def unKey(x, key_=None):
1097 if key_ is not None:
1098 x = [i[1] for i in x]
1099 return x
1100
1101 def merge(a, b):
1102 return next(topNKeyedElems(a + b))
1103 result = self.mapPartitions(
1104 lambda i: topNKeyedElems(i, key)).reduce(merge)
1105 return sorted(unKey(result, key), key=key)
1106
1107 - def take(self, num):
1108 """
1109 Take the first num elements of the RDD.
1110
1111 It works by first scanning one partition, and use the results from
1112 that partition to estimate the number of additional partitions needed
1113 to satisfy the limit.
1114
1115 Translated from the Scala implementation in RDD#take().
1116
1117 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
1118 [2, 3]
1119 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
1120 [2, 3, 4, 5, 6]
1121 >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
1122 [91, 92, 93]
1123 """
1124 items = []
1125 totalParts = self._jrdd.partitions().size()
1126 partsScanned = 0
1127
1128 while len(items) < num and partsScanned < totalParts:
1129
1130
1131
1132 numPartsToTry = 1
1133 if partsScanned > 0:
1134
1135
1136
1137 if len(items) == 0:
1138 numPartsToTry = partsScanned * 4
1139 else:
1140 numPartsToTry = int(1.5 * num * partsScanned / len(items))
1141
1142 left = num - len(items)
1143
1144 def takeUpToNumLeft(iterator):
1145 taken = 0
1146 while taken < left:
1147 yield next(iterator)
1148 taken += 1
1149
1150 p = range(
1151 partsScanned, min(partsScanned + numPartsToTry, totalParts))
1152 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1153
1154 items += res
1155 partsScanned += numPartsToTry
1156
1157 return items[:num]
1158
1160 """
1161 Return the first element in this RDD.
1162
1163 >>> sc.parallelize([2, 3, 4]).first()
1164 2
1165 """
1166 return self.take(1)[0]
1167
1169 """
1170 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1171 system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
1172 converted for output using either user specified converters or, by default,
1173 L{org.apache.spark.api.python.JavaToWritableConverter}.
1174
1175 @param conf: Hadoop job configuration, passed in as a dict
1176 @param keyConverter: (None by default)
1177 @param valueConverter: (None by default)
1178 """
1179 jconf = self.ctx._dictToJavaMap(conf)
1180 pickledRDD = self._toPickleSerialization()
1181 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1182 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
1183 keyConverter, valueConverter, True)
1184
1185 - def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1186 keyConverter=None, valueConverter=None, conf=None):
1187 """
1188 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1189 system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
1190 will be inferred if not specified. Keys and values are converted for output using either
1191 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
1192 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
1193 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
1194
1195 @param path: path to Hadoop file
1196 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
1197 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
1198 @param keyClass: fully qualified classname of key Writable class
1199 (e.g. "org.apache.hadoop.io.IntWritable", None by default)
1200 @param valueClass: fully qualified classname of value Writable class
1201 (e.g. "org.apache.hadoop.io.Text", None by default)
1202 @param keyConverter: (None by default)
1203 @param valueConverter: (None by default)
1204 @param conf: Hadoop job configuration, passed in as a dict (None by default)
1205 """
1206 jconf = self.ctx._dictToJavaMap(conf)
1207 pickledRDD = self._toPickleSerialization()
1208 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1209 self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
1210 outputFormatClass,
1211 keyClass, valueClass,
1212 keyConverter, valueConverter, jconf)
1213
1215 """
1216 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1217 system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
1218 converted for output using either user specified converters or, by default,
1219 L{org.apache.spark.api.python.JavaToWritableConverter}.
1220
1221 @param conf: Hadoop job configuration, passed in as a dict
1222 @param keyConverter: (None by default)
1223 @param valueConverter: (None by default)
1224 """
1225 jconf = self.ctx._dictToJavaMap(conf)
1226 pickledRDD = self._toPickleSerialization()
1227 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1228 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
1229 keyConverter, valueConverter, False)
1230
1231 - def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1232 keyConverter=None, valueConverter=None, conf=None,
1233 compressionCodecClass=None):
1234 """
1235 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1236 system, using the old Hadoop OutputFormat API (mapred package). Key and value types
1237 will be inferred if not specified. Keys and values are converted for output using either
1238 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
1239 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
1240 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
1241
1242 @param path: path to Hadoop file
1243 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
1244 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
1245 @param keyClass: fully qualified classname of key Writable class
1246 (e.g. "org.apache.hadoop.io.IntWritable", None by default)
1247 @param valueClass: fully qualified classname of value Writable class
1248 (e.g. "org.apache.hadoop.io.Text", None by default)
1249 @param keyConverter: (None by default)
1250 @param valueConverter: (None by default)
1251 @param conf: (None by default)
1252 @param compressionCodecClass: (None by default)
1253 """
1254 jconf = self.ctx._dictToJavaMap(conf)
1255 pickledRDD = self._toPickleSerialization()
1256 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1257 self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
1258 outputFormatClass,
1259 keyClass, valueClass,
1260 keyConverter, valueConverter,
1261 jconf, compressionCodecClass)
1262
1264 """
1265 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1266 system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
1267 RDD's key and value types. The mechanism is as follows:
1268 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
1269 2. Keys and values of this Java RDD are converted to Writables and written out.
1270
1271 @param path: path to sequence file
1272 @param compressionCodecClass: (None by default)
1273 """
1274 pickledRDD = self._toPickleSerialization()
1275 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1276 self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
1277 path, compressionCodecClass)
1278
1280 """
1281 Save this RDD as a SequenceFile of serialized objects. The serializer
1282 used is L{pyspark.serializers.PickleSerializer}, default batch size
1283 is 10.
1284
1285 >>> tmpFile = NamedTemporaryFile(delete=True)
1286 >>> tmpFile.close()
1287 >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
1288 >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
1289 [1, 2, 'rdd', 'spark']
1290 """
1291 self._reserialize(BatchedSerializer(PickleSerializer(),
1292 batchSize))._jrdd.saveAsObjectFile(path)
1293
1294 - def saveAsTextFile(self, path):
1295 """
1296 Save this RDD as a text file, using string representations of elements.
1297
1298 >>> tempFile = NamedTemporaryFile(delete=True)
1299 >>> tempFile.close()
1300 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
1301 >>> from fileinput import input
1302 >>> from glob import glob
1303 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
1304 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
1305
1306 Empty lines are tolerated when saving to text files.
1307
1308 >>> tempFile2 = NamedTemporaryFile(delete=True)
1309 >>> tempFile2.close()
1310 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
1311 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
1312 '\\n\\n\\nbar\\nfoo\\n'
1313 """
1314 def func(split, iterator):
1315 for x in iterator:
1316 if not isinstance(x, basestring):
1317 x = unicode(x)
1318 if isinstance(x, unicode):
1319 x = x.encode("utf-8")
1320 yield x
1321 keyed = self.mapPartitionsWithIndex(func)
1322 keyed._bypass_serializer = True
1323 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
1324
1325
1326
1328 """
1329 Return the key-value pairs in this RDD to the master as a dictionary.
1330
1331 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
1332 >>> m[1]
1333 2
1334 >>> m[3]
1335 4
1336 """
1337 return dict(self.collect())
1338
1340 """
1341 Return an RDD with the keys of each tuple.
1342
1343 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
1344 >>> m.collect()
1345 [1, 3]
1346 """
1347 return self.map(lambda (k, v): k)
1348
1350 """
1351 Return an RDD with the values of each tuple.
1352
1353 >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
1354 >>> m.collect()
1355 [2, 4]
1356 """
1357 return self.map(lambda (k, v): v)
1358
1360 """
1361 Merge the values for each key using an associative reduce function.
1362
1363 This will also perform the merging locally on each mapper before
1364 sending results to a reducer, similarly to a "combiner" in MapReduce.
1365
1366 Output will be hash-partitioned with C{numPartitions} partitions, or
1367 the default parallelism level if C{numPartitions} is not specified.
1368
1369 >>> from operator import add
1370 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1371 >>> sorted(rdd.reduceByKey(add).collect())
1372 [('a', 2), ('b', 1)]
1373 """
1374 return self.combineByKey(lambda x: x, func, func, numPartitions)
1375
1377 """
1378 Merge the values for each key using an associative reduce function, but
1379 return the results immediately to the master as a dictionary.
1380
1381 This will also perform the merging locally on each mapper before
1382 sending results to a reducer, similarly to a "combiner" in MapReduce.
1383
1384 >>> from operator import add
1385 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1386 >>> sorted(rdd.reduceByKeyLocally(add).items())
1387 [('a', 2), ('b', 1)]
1388 """
1389 def reducePartition(iterator):
1390 m = {}
1391 for (k, v) in iterator:
1392 m[k] = v if k not in m else func(m[k], v)
1393 yield m
1394
1395 def mergeMaps(m1, m2):
1396 for (k, v) in m2.iteritems():
1397 m1[k] = v if k not in m1 else func(m1[k], v)
1398 return m1
1399 return self.mapPartitions(reducePartition).reduce(mergeMaps)
1400
1402 """
1403 Count the number of elements for each key, and return the result to the
1404 master as a dictionary.
1405
1406 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1407 >>> sorted(rdd.countByKey().items())
1408 [('a', 2), ('b', 1)]
1409 """
1410 return self.map(lambda x: x[0]).countByValue()
1411
1412 - def join(self, other, numPartitions=None):
1413 """
1414 Return an RDD containing all pairs of elements with matching keys in
1415 C{self} and C{other}.
1416
1417 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
1418 (k, v1) is in C{self} and (k, v2) is in C{other}.
1419
1420 Performs a hash join across the cluster.
1421
1422 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1423 >>> y = sc.parallelize([("a", 2), ("a", 3)])
1424 >>> sorted(x.join(y).collect())
1425 [('a', (1, 2)), ('a', (1, 3))]
1426 """
1427 return python_join(self, other, numPartitions)
1428
1430 """
1431 Perform a left outer join of C{self} and C{other}.
1432
1433 For each element (k, v) in C{self}, the resulting RDD will either
1434 contain all pairs (k, (v, w)) for w in C{other}, or the pair
1435 (k, (v, None)) if no elements in other have key k.
1436
1437 Hash-partitions the resulting RDD into the given number of partitions.
1438
1439 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1440 >>> y = sc.parallelize([("a", 2)])
1441 >>> sorted(x.leftOuterJoin(y).collect())
1442 [('a', (1, 2)), ('b', (4, None))]
1443 """
1444 return python_left_outer_join(self, other, numPartitions)
1445
1447 """
1448 Perform a right outer join of C{self} and C{other}.
1449
1450 For each element (k, w) in C{other}, the resulting RDD will either
1451 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
1452 if no elements in C{self} have key k.
1453
1454 Hash-partitions the resulting RDD into the given number of partitions.
1455
1456 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1457 >>> y = sc.parallelize([("a", 2)])
1458 >>> sorted(y.rightOuterJoin(x).collect())
1459 [('a', (2, 1)), ('b', (None, 4))]
1460 """
1461 return python_right_outer_join(self, other, numPartitions)
1462
1463
1464
1465
1466 - def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1467 """
1468 Return a copy of the RDD partitioned using the specified partitioner.
1469
1470 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
1471 >>> sets = pairs.partitionBy(2).glom().collect()
1472 >>> set(sets[0]).intersection(set(sets[1]))
1473 set([])
1474 """
1475 if numPartitions is None:
1476 numPartitions = self._defaultReducePartitions()
1477
1478
1479
1480
1481
1482
1483
1484 outputSerializer = self.ctx._unbatched_serializer
1485
1486 limit = (_parse_memory(self.ctx._conf.get(
1487 "spark.python.worker.memory", "512m")) / 2)
1488
1489 def add_shuffle_key(split, iterator):
1490
1491 buckets = defaultdict(list)
1492 c, batch = 0, min(10 * numPartitions, 1000)
1493
1494 for (k, v) in iterator:
1495 buckets[partitionFunc(k) % numPartitions].append((k, v))
1496 c += 1
1497
1498
1499 if (c % 1000 == 0 and get_used_memory() > limit
1500 or c > batch):
1501 n, size = len(buckets), 0
1502 for split in buckets.keys():
1503 yield pack_long(split)
1504 d = outputSerializer.dumps(buckets[split])
1505 del buckets[split]
1506 yield d
1507 size += len(d)
1508
1509 avg = (size / n) >> 20
1510
1511 if avg < 1:
1512 batch *= 1.5
1513 elif avg > 10:
1514 batch = max(batch / 1.5, 1)
1515 c = 0
1516
1517 for (split, items) in buckets.iteritems():
1518 yield pack_long(split)
1519 yield outputSerializer.dumps(items)
1520
1521 keyed = self.mapPartitionsWithIndex(add_shuffle_key)
1522 keyed._bypass_serializer = True
1523 with _JavaStackTrace(self.context) as st:
1524 pairRDD = self.ctx._jvm.PairwiseRDD(
1525 keyed._jrdd.rdd()).asJavaPairRDD()
1526 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
1527 id(partitionFunc))
1528 jrdd = pairRDD.partitionBy(partitioner).values()
1529 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
1530
1531
1532 rdd._partitionFunc = partitionFunc
1533 return rdd
1534
1535
1536 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
1537 numPartitions=None):
1538 """
1539 Generic function to combine the elements for each key using a custom
1540 set of aggregation functions.
1541
1542 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
1543 type" C. Note that V and C can be different -- for example, one might
1544 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
1545
1546 Users provide three functions:
1547
1548 - C{createCombiner}, which turns a V into a C (e.g., creates
1549 a one-element list)
1550 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
1551 a list)
1552 - C{mergeCombiners}, to combine two C's into a single one.
1553
1554 In addition, users can control the partitioning of the output RDD.
1555
1556 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1557 >>> def f(x): return x
1558 >>> def add(a, b): return a + str(b)
1559 >>> sorted(x.combineByKey(str, add, add).collect())
1560 [('a', '11'), ('b', '1')]
1561 """
1562 if numPartitions is None:
1563 numPartitions = self._defaultReducePartitions()
1564
1565 serializer = self.ctx.serializer
1566 spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
1567 == 'true')
1568 memory = _parse_memory(self.ctx._conf.get(
1569 "spark.python.worker.memory", "512m"))
1570 agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
1571
1572 def combineLocally(iterator):
1573 merger = ExternalMerger(agg, memory * 0.9, serializer) \
1574 if spill else InMemoryMerger(agg)
1575 merger.mergeValues(iterator)
1576 return merger.iteritems()
1577
1578 locally_combined = self.mapPartitions(combineLocally)
1579 shuffled = locally_combined.partitionBy(numPartitions)
1580
1581 def _mergeCombiners(iterator):
1582 merger = ExternalMerger(agg, memory, serializer) \
1583 if spill else InMemoryMerger(agg)
1584 merger.mergeCombiners(iterator)
1585 return merger.iteritems()
1586
1587 return shuffled.mapPartitions(_mergeCombiners)
1588
1589 - def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
1590 """
1591 Aggregate the values of each key, using given combine functions and a neutral
1592 "zero value". This function can return a different result type, U, than the type
1593 of the values in this RDD, V. Thus, we need one operation for merging a V into
1594 a U and one operation for merging two U's, The former operation is used for merging
1595 values within a partition, and the latter is used for merging values between
1596 partitions. To avoid memory allocation, both of these functions are
1597 allowed to modify and return their first argument instead of creating a new U.
1598 """
1599 def createZero():
1600 return copy.deepcopy(zeroValue)
1601
1602 return self.combineByKey(
1603 lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
1604
1605 - def foldByKey(self, zeroValue, func, numPartitions=None):
1606 """
1607 Merge the values for each key using an associative function "func"
1608 and a neutral "zeroValue" which may be added to the result an
1609 arbitrary number of times, and must not change the result
1610 (e.g., 0 for addition, or 1 for multiplication.).
1611
1612 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1613 >>> from operator import add
1614 >>> rdd.foldByKey(0, add).collect()
1615 [('a', 2), ('b', 1)]
1616 """
1617 def createZero():
1618 return copy.deepcopy(zeroValue)
1619
1620 return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
1621
1622
1624 """
1625 Group the values for each key in the RDD into a single sequence.
1626 Hash-partitions the resulting RDD with into numPartitions partitions.
1627
1628 Note: If you are grouping in order to perform an aggregation (such as a
1629 sum or average) over each key, using reduceByKey will provide much
1630 better performance.
1631
1632 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1633 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
1634 [('a', [1, 1]), ('b', [1])]
1635 """
1636
1637 def createCombiner(x):
1638 return [x]
1639
1640 def mergeValue(xs, x):
1641 xs.append(x)
1642 return xs
1643
1644 def mergeCombiners(a, b):
1645 a.extend(b)
1646 return a
1647
1648 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1649 numPartitions).mapValues(lambda x: ResultIterable(x))
1650
1651
1653 """
1654 Pass each value in the key-value pair RDD through a flatMap function
1655 without changing the keys; this also retains the original RDD's
1656 partitioning.
1657
1658 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
1659 >>> def f(x): return x
1660 >>> x.flatMapValues(f).collect()
1661 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
1662 """
1663 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1664 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1665
1667 """
1668 Pass each value in the key-value pair RDD through a map function
1669 without changing the keys; this also retains the original RDD's
1670 partitioning.
1671
1672 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
1673 >>> def f(x): return len(x)
1674 >>> x.mapValues(f).collect()
1675 [('a', 3), ('b', 1)]
1676 """
1677 map_values_fn = lambda (k, v): (k, f(v))
1678 return self.map(map_values_fn, preservesPartitioning=True)
1679
1681 """
1682 Alias for cogroup but with support for multiple RDDs.
1683
1684 >>> w = sc.parallelize([("a", 5), ("b", 6)])
1685 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1686 >>> y = sc.parallelize([("a", 2)])
1687 >>> z = sc.parallelize([("b", 42)])
1688 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \
1689 sorted(list(w.groupWith(x, y, z).collect())))
1690 [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
1691
1692 """
1693 return python_cogroup((self, other) + others, numPartitions=None)
1694
1695
1696 - def cogroup(self, other, numPartitions=None):
1697 """
1698 For each key k in C{self} or C{other}, return a resulting RDD that
1699 contains a tuple with the list of values for that key in C{self} as
1700 well as C{other}.
1701
1702 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1703 >>> y = sc.parallelize([("a", 2)])
1704 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
1705 [('a', ([1], [2])), ('b', ([4], []))]
1706 """
1707 return python_cogroup((self, other), numPartitions)
1708
1709 - def sampleByKey(self, withReplacement, fractions, seed=None):
1710 """
1711 Return a subset of this RDD sampled by key (via stratified sampling).
1712 Create a sample of this RDD using variable sampling rates for
1713 different keys as specified by fractions, a key to sampling rate map.
1714
1715 >>> fractions = {"a": 0.2, "b": 0.1}
1716 >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
1717 >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
1718 >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
1719 True
1720 >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
1721 True
1722 >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
1723 True
1724 """
1725 for fraction in fractions.values():
1726 assert fraction >= 0.0, "Negative fraction value: %s" % fraction
1727 return self.mapPartitionsWithIndex(
1728 RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
1729
1731 """
1732 Return each (key, value) pair in C{self} that has no pair with matching
1733 key in C{other}.
1734
1735 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1736 >>> y = sc.parallelize([("a", 3), ("c", None)])
1737 >>> sorted(x.subtractByKey(y).collect())
1738 [('b', 4), ('b', 5)]
1739 """
1740 def filter_func((key, vals)):
1741 return len(vals[0]) > 0 and len(vals[1]) == 0
1742 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1743 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1744
1745 - def subtract(self, other, numPartitions=None):
1746 """
1747 Return each value in C{self} that is not contained in C{other}.
1748
1749 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1750 >>> y = sc.parallelize([("a", 3), ("c", None)])
1751 >>> sorted(x.subtract(y).collect())
1752 [('a', 1), ('b', 4), ('b', 5)]
1753 """
1754
1755 rdd = other.map(lambda x: (x, True))
1756 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1757
1759 """
1760 Creates tuples of the elements in this RDD by applying C{f}.
1761
1762 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1763 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1764 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
1765 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1766 """
1767 return self.map(lambda x: (f(x), x))
1768
1770 """
1771 Return a new RDD that has exactly numPartitions partitions.
1772
1773 Can increase or decrease the level of parallelism in this RDD.
1774 Internally, this uses a shuffle to redistribute data.
1775 If you are decreasing the number of partitions in this RDD, consider
1776 using `coalesce`, which can avoid performing a shuffle.
1777
1778 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1779 >>> sorted(rdd.glom().collect())
1780 [[1], [2, 3], [4, 5], [6, 7]]
1781 >>> len(rdd.repartition(2).glom().collect())
1782 2
1783 >>> len(rdd.repartition(10).glom().collect())
1784 10
1785 """
1786 jrdd = self._jrdd.repartition(numPartitions)
1787 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1788
1789 - def coalesce(self, numPartitions, shuffle=False):
1790 """
1791 Return a new RDD that is reduced into `numPartitions` partitions.
1792
1793 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1794 [[1], [2, 3], [4, 5]]
1795 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1796 [[1, 2, 3, 4, 5]]
1797 """
1798 jrdd = self._jrdd.coalesce(numPartitions)
1799 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1800
1801 - def zip(self, other):
1802 """
1803 Zips this RDD with another one, returning key-value pairs with the
1804 first element in each RDD second element in each RDD, etc. Assumes
1805 that the two RDDs have the same number of partitions and the same
1806 number of elements in each partition (e.g. one was made through
1807 a map on the other).
1808
1809 >>> x = sc.parallelize(range(0,5))
1810 >>> y = sc.parallelize(range(1000, 1005))
1811 >>> x.zip(y).collect()
1812 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1813 """
1814 if self.getNumPartitions() != other.getNumPartitions():
1815 raise ValueError("Can only zip with RDD which has the same number of partitions")
1816
1817 def get_batch_size(ser):
1818 if isinstance(ser, BatchedSerializer):
1819 return ser.batchSize
1820 return 0
1821
1822 def batch_as(rdd, batchSize):
1823 ser = rdd._jrdd_deserializer
1824 if isinstance(ser, BatchedSerializer):
1825 ser = ser.serializer
1826 return rdd._reserialize(BatchedSerializer(ser, batchSize))
1827
1828 my_batch = get_batch_size(self._jrdd_deserializer)
1829 other_batch = get_batch_size(other._jrdd_deserializer)
1830 if my_batch != other_batch:
1831
1832 if my_batch > other_batch:
1833 other = batch_as(other, my_batch)
1834 else:
1835 self = batch_as(self, other_batch)
1836
1837
1838
1839 pairRDD = self._jrdd.zip(other._jrdd)
1840 deserializer = PairDeserializer(self._jrdd_deserializer,
1841 other._jrdd_deserializer)
1842 return RDD(pairRDD, self.ctx, deserializer)
1843
1845 """
1846 Zips this RDD with its element indices.
1847
1848 The ordering is first based on the partition index and then the
1849 ordering of items within each partition. So the first item in
1850 the first partition gets index 0, and the last item in the last
1851 partition receives the largest index.
1852
1853 This method needs to trigger a spark job when this RDD contains
1854 more than one partitions.
1855
1856 >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
1857 [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
1858 """
1859 starts = [0]
1860 if self.getNumPartitions() > 1:
1861 nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
1862 for i in range(len(nums) - 1):
1863 starts.append(starts[-1] + nums[i])
1864
1865 def func(k, it):
1866 for i, v in enumerate(it, starts[k]):
1867 yield v, i
1868
1869 return self.mapPartitionsWithIndex(func)
1870
1872 """
1873 Zips this RDD with generated unique Long ids.
1874
1875 Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
1876 n is the number of partitions. So there may exist gaps, but this
1877 method won't trigger a spark job, which is different from
1878 L{zipWithIndex}
1879
1880 >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
1881 [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
1882 """
1883 n = self.getNumPartitions()
1884
1885 def func(k, it):
1886 for i, v in enumerate(it):
1887 yield v, i * n + k
1888
1889 return self.mapPartitionsWithIndex(func)
1890
1892 """
1893 Return the name of this RDD.
1894 """
1895 name_ = self._jrdd.name()
1896 if not name_:
1897 return None
1898 return name_.encode('utf-8')
1899
1901 """
1902 Assign a name to this RDD.
1903
1904 >>> rdd1 = sc.parallelize([1,2])
1905 >>> rdd1.setName('RDD1')
1906 >>> rdd1.name()
1907 'RDD1'
1908 """
1909 self._jrdd.setName(name)
1910
1912 """
1913 A description of this RDD and its recursive dependencies for debugging.
1914 """
1915 debug_string = self._jrdd.toDebugString()
1916 if not debug_string:
1917 return None
1918 return debug_string.encode('utf-8')
1919
1921 """
1922 Get the RDD's current storage level.
1923
1924 >>> rdd1 = sc.parallelize([1,2])
1925 >>> rdd1.getStorageLevel()
1926 StorageLevel(False, False, False, False, 1)
1927 >>> print(rdd1.getStorageLevel())
1928 Serialized 1x Replicated
1929 """
1930 java_storage_level = self._jrdd.getStorageLevel()
1931 storage_level = StorageLevel(java_storage_level.useDisk(),
1932 java_storage_level.useMemory(),
1933 java_storage_level.useOffHeap(),
1934 java_storage_level.deserialized(),
1935 java_storage_level.replication())
1936 return storage_level
1937
1939 """
1940 Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
1941 If spark.default.parallelism is set, then we'll use the value from SparkContext
1942 defaultParallelism, otherwise we'll use the number of partitions in this RDD.
1943
1944 This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
1945 the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
1946 be inherent.
1947 """
1948 if self.ctx._conf.contains("spark.default.parallelism"):
1949 return self.ctx.defaultParallelism
1950 else:
1951 return self.getNumPartitions()
1952
1960
1961 """
1962 Pipelined maps:
1963
1964 >>> rdd = sc.parallelize([1, 2, 3, 4])
1965 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1966 [4, 8, 12, 16]
1967 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1968 [4, 8, 12, 16]
1969
1970 Pipelined reduces:
1971 >>> from operator import add
1972 >>> rdd.map(lambda x: 2 * x).reduce(add)
1973 20
1974 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1975 20
1976 """
1977
1978 - def __init__(self, prev, func, preservesPartitioning=False):
1979 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1980
1981 self.func = func
1982 self.preservesPartitioning = preservesPartitioning
1983 self._prev_jrdd = prev._jrdd
1984 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1985 else:
1986 prev_func = prev.func
1987
1988 def pipeline_func(split, iterator):
1989 return func(split, prev_func(split, iterator))
1990 self.func = pipeline_func
1991 self.preservesPartitioning = \
1992 prev.preservesPartitioning and preservesPartitioning
1993 self._prev_jrdd = prev._prev_jrdd
1994 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1995 self.is_cached = False
1996 self.is_checkpointed = False
1997 self.ctx = prev.ctx
1998 self.prev = prev
1999 self._jrdd_val = None
2000 self._jrdd_deserializer = self.ctx.serializer
2001 self._bypass_serializer = False
2002
2003 @property
2005 if self._jrdd_val:
2006 return self._jrdd_val
2007 if self._bypass_serializer:
2008 self._jrdd_deserializer = NoOpSerializer()
2009 command = (self.func, self._prev_jrdd_deserializer,
2010 self._jrdd_deserializer)
2011 ser = CloudPickleSerializer()
2012 pickled_command = ser.dumps(command)
2013 broadcast_vars = ListConverter().convert(
2014 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
2015 self.ctx._gateway._gateway_client)
2016 self.ctx._pickled_broadcast_vars.clear()
2017 env = MapConverter().convert(self.ctx.environment,
2018 self.ctx._gateway._gateway_client)
2019 includes = ListConverter().convert(self.ctx._python_includes,
2020 self.ctx._gateway._gateway_client)
2021 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2022 bytearray(pickled_command),
2023 env, includes, self.preservesPartitioning,
2024 self.ctx.pythonExec,
2025 broadcast_vars, self.ctx._javaAccumulator)
2026 self._jrdd_val = python_rdd.asJavaRDD()
2027 return self._jrdd_val
2028
2030 return not (self.is_cached or self.is_checkpointed)
2031
2034 import doctest
2035 from pyspark.context import SparkContext
2036 globs = globals().copy()
2037
2038
2039 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
2040 (failure_count, test_count) = doctest.testmod(
2041 globs=globs, optionflags=doctest.ELLIPSIS)
2042 globs['sc'].stop()
2043 if failure_count:
2044 exit(-1)
2045
2046
2047 if __name__ == "__main__":
2048 _test()
2049