1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from collections import namedtuple
22 from itertools import chain, ifilter, imap
23 import operator
24 import os
25 import sys
26 import shlex
27 import traceback
28 from subprocess import Popen, PIPE
29 from tempfile import NamedTemporaryFile
30 from threading import Thread
31 import warnings
32 import heapq
33 import bisect
34 from random import Random
35 from math import sqrt, log, isinf, isnan
36
37 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
38 BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
39 PickleSerializer, pack_long, CompressedSerializer
40 from pyspark.join import python_join, python_left_outer_join, \
41 python_right_outer_join, python_cogroup
42 from pyspark.statcounter import StatCounter
43 from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler
44 from pyspark.storagelevel import StorageLevel
45 from pyspark.resultiterable import ResultIterable
46 from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
47 get_used_memory
48
49 from py4j.java_collections import ListConverter, MapConverter
50
51 __all__ = ["RDD"]
57 """
58 This function returns consistent hash code for builtin types, especially
59 for None and tuple with None.
60
61 The algrithm is similar to that one used by CPython 2.7
62
63 >>> portable_hash(None)
64 0
65 >>> portable_hash((None, 1))
66 219750521
67 """
68 if x is None:
69 return 0
70 if isinstance(x, tuple):
71 h = 0x345678
72 for i in x:
73 h ^= portable_hash(i)
74 h *= 1000003
75 h &= 0xffffffff
76 h ^= len(x)
77 if h == -1:
78 h = -2
79 return h
80 return hash(x)
81
84 """
85 This function returns the traceback info for a callsite, returns a dict
86 with function name, file name and line number
87 """
88 tb = traceback.extract_stack()
89 callsite = namedtuple("Callsite", "function file linenum")
90 if len(tb) == 0:
91 return None
92 file, line, module, what = tb[len(tb) - 1]
93 sparkpath = os.path.dirname(file)
94 first_spark_frame = len(tb) - 1
95 for i in range(0, len(tb)):
96 file, line, fun, what = tb[i]
97 if file.startswith(sparkpath):
98 first_spark_frame = i
99 break
100 if first_spark_frame == 0:
101 file, line, fun, what = tb[0]
102 return callsite(function=fun, file=file, linenum=line)
103 sfile, sline, sfun, swhat = tb[first_spark_frame]
104 ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
105 return callsite(function=sfun, file=ufile, linenum=uline)
106
107 _spark_stack_depth = 0
111
113 tb = _extract_concise_traceback()
114 if tb is not None:
115 self._traceback = "%s at %s:%s" % (
116 tb.function, tb.file, tb.linenum)
117 else:
118 self._traceback = "Error! Could not extract traceback info"
119 self._context = sc
120
126
132
135
136 """
137 An implementation of MaxHeap.
138
139 >>> import pyspark.rdd
140 >>> heap = pyspark.rdd.MaxHeapQ(5)
141 >>> [heap.insert(i) for i in range(10)]
142 [None, None, None, None, None, None, None, None, None, None]
143 >>> sorted(heap.getElements())
144 [0, 1, 2, 3, 4]
145 >>> heap = pyspark.rdd.MaxHeapQ(5)
146 >>> [heap.insert(i) for i in range(9, -1, -1)]
147 [None, None, None, None, None, None, None, None, None, None]
148 >>> sorted(heap.getElements())
149 [0, 1, 2, 3, 4]
150 >>> heap = pyspark.rdd.MaxHeapQ(1)
151 >>> [heap.insert(i) for i in range(9, -1, -1)]
152 [None, None, None, None, None, None, None, None, None, None]
153 >>> heap.getElements()
154 [0]
155 """
156
158
159 self.q = [0]
160 self.maxsize = maxsize
161
163 while (k > 1) and (self.q[k / 2] < self.q[k]):
164 self._swap(k, k / 2)
165 k = k / 2
166
168 t = self.q[i]
169 self.q[i] = self.q[j]
170 self.q[j] = t
171
173 N = self.size()
174 while 2 * k <= N:
175 j = 2 * k
176
177
178 if j < N and self.q[j] < self.q[j + 1]:
179 j = j + 1
180 if(self.q[k] > self.q[j]):
181 break
182 self._swap(k, j)
183 k = j
184
186 return len(self.q) - 1
187
189 if (self.size()) < self.maxsize:
190 self.q.append(value)
191 self._swim(self.size())
192 else:
193 self._replaceRoot(value)
194
197
199 if(self.q[1] > value):
200 self.q[1] = value
201 self._sink(1)
202
205 """
206 Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
207 return the value in MB
208
209 >>> _parse_memory("256m")
210 256
211 >>> _parse_memory("2g")
212 2048
213 """
214 units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
215 if s[-1] not in units:
216 raise ValueError("invalid format: " + s)
217 return int(float(s[:-1]) * units[s[-1].lower()])
218
219
220 -class RDD(object):
221
222 """
223 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
224 Represents an immutable, partitioned collection of elements that can be
225 operated on in parallel.
226 """
227
228 - def __init__(self, jrdd, ctx, jrdd_deserializer):
229 self._jrdd = jrdd
230 self.is_cached = False
231 self.is_checkpointed = False
232 self.ctx = ctx
233 self._jrdd_deserializer = jrdd_deserializer
234 self._id = jrdd.id()
235
242
244 """
245 A unique ID for this RDD (within its SparkContext).
246 """
247 return self._id
248
250 return self._jrdd.toString()
251
252 @property
254 """
255 The L{SparkContext} that this RDD was created on.
256 """
257 return self.ctx
258
260 """
261 Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}).
262 """
263 self.is_cached = True
264 self.persist(StorageLevel.MEMORY_ONLY_SER)
265 return self
266
268 """
269 Set this RDD's storage level to persist its values across operations
270 after the first time it is computed. This can only be used to assign
271 a new storage level if the RDD does not have a storage level set yet.
272 """
273 self.is_cached = True
274 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
275 self._jrdd.persist(javaStorageLevel)
276 return self
277
279 """
280 Mark the RDD as non-persistent, and remove all blocks for it from
281 memory and disk.
282 """
283 self.is_cached = False
284 self._jrdd.unpersist()
285 return self
286
288 """
289 Mark this RDD for checkpointing. It will be saved to a file inside the
290 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
291 all references to its parent RDDs will be removed. This function must
292 be called before any job has been executed on this RDD. It is strongly
293 recommended that this RDD is persisted in memory, otherwise saving it
294 on a file will require recomputation.
295 """
296 self.is_checkpointed = True
297 self._jrdd.rdd().checkpoint()
298
300 """
301 Return whether this RDD has been checkpointed or not
302 """
303 return self._jrdd.rdd().isCheckpointed()
304
306 """
307 Gets the name of the file to which this RDD was checkpointed
308 """
309 checkpointFile = self._jrdd.rdd().getCheckpointFile()
310 if checkpointFile.isDefined():
311 return checkpointFile.get()
312 else:
313 return None
314
315 - def map(self, f, preservesPartitioning=False):
316 """
317 Return a new RDD by applying a function to each element of this RDD.
318
319 >>> rdd = sc.parallelize(["b", "a", "c"])
320 >>> sorted(rdd.map(lambda x: (x, 1)).collect())
321 [('a', 1), ('b', 1), ('c', 1)]
322 """
323 def func(_, iterator):
324 return imap(f, iterator)
325 return self.mapPartitionsWithIndex(func, preservesPartitioning)
326
327 - def flatMap(self, f, preservesPartitioning=False):
328 """
329 Return a new RDD by first applying a function to all elements of this
330 RDD, and then flattening the results.
331
332 >>> rdd = sc.parallelize([2, 3, 4])
333 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
334 [1, 1, 1, 2, 2, 3]
335 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
336 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
337 """
338 def func(s, iterator):
339 return chain.from_iterable(imap(f, iterator))
340 return self.mapPartitionsWithIndex(func, preservesPartitioning)
341
343 """
344 Return a new RDD by applying a function to each partition of this RDD.
345
346 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
347 >>> def f(iterator): yield sum(iterator)
348 >>> rdd.mapPartitions(f).collect()
349 [3, 7]
350 """
351 def func(s, iterator):
352 return f(iterator)
353 return self.mapPartitionsWithIndex(func)
354
356 """
357 Return a new RDD by applying a function to each partition of this RDD,
358 while tracking the index of the original partition.
359
360 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
361 >>> def f(splitIndex, iterator): yield splitIndex
362 >>> rdd.mapPartitionsWithIndex(f).sum()
363 6
364 """
365 return PipelinedRDD(self, f, preservesPartitioning)
366
368 """
369 Deprecated: use mapPartitionsWithIndex instead.
370
371 Return a new RDD by applying a function to each partition of this RDD,
372 while tracking the index of the original partition.
373
374 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
375 >>> def f(splitIndex, iterator): yield splitIndex
376 >>> rdd.mapPartitionsWithSplit(f).sum()
377 6
378 """
379 warnings.warn("mapPartitionsWithSplit is deprecated; "
380 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
381 return self.mapPartitionsWithIndex(f, preservesPartitioning)
382
384 """
385 Returns the number of partitions in RDD
386
387 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
388 >>> rdd.getNumPartitions()
389 2
390 """
391 return self._jrdd.partitions().size()
392
394 """
395 Return a new RDD containing only the elements that satisfy a predicate.
396
397 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
398 >>> rdd.filter(lambda x: x % 2 == 0).collect()
399 [2, 4]
400 """
401 def func(iterator):
402 return ifilter(f, iterator)
403 return self.mapPartitions(func)
404
406 """
407 Return a new RDD containing the distinct elements in this RDD.
408
409 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
410 [1, 2, 3]
411 """
412 return self.map(lambda x: (x, None)) \
413 .reduceByKey(lambda x, _: x) \
414 .map(lambda (x, _): x)
415
416 - def sample(self, withReplacement, fraction, seed=None):
417 """
418 Return a sampled subset of this RDD (relies on numpy and falls back
419 on default random generator if numpy is unavailable).
420
421 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
422 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
423 """
424 assert fraction >= 0.0, "Negative fraction value: %s" % fraction
425 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
426
427
428 - def takeSample(self, withReplacement, num, seed=None):
429 """
430 Return a fixed-size sampled subset of this RDD (currently requires
431 numpy).
432
433 >>> rdd = sc.parallelize(range(0, 10))
434 >>> len(rdd.takeSample(True, 20, 1))
435 20
436 >>> len(rdd.takeSample(False, 5, 2))
437 5
438 >>> len(rdd.takeSample(False, 15, 3))
439 10
440 """
441 numStDev = 10.0
442
443 if num < 0:
444 raise ValueError("Sample size cannot be negative.")
445 elif num == 0:
446 return []
447
448 initialCount = self.count()
449 if initialCount == 0:
450 return []
451
452 rand = Random(seed)
453
454 if (not withReplacement) and num >= initialCount:
455
456 samples = self.collect()
457 rand.shuffle(samples)
458 return samples
459
460 maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
461 if num > maxSampleSize:
462 raise ValueError(
463 "Sample size cannot be greater than %d." % maxSampleSize)
464
465 fraction = RDD._computeFractionForSampleSize(
466 num, initialCount, withReplacement)
467 samples = self.sample(withReplacement, fraction, seed).collect()
468
469
470
471
472 while len(samples) < num:
473
474 seed = rand.randint(0, sys.maxint)
475 samples = self.sample(withReplacement, fraction, seed).collect()
476
477 rand.shuffle(samples)
478
479 return samples[0:num]
480
481 @staticmethod
483 """
484 Returns a sampling rate that guarantees a sample of
485 size >= sampleSizeLowerBound 99.99% of the time.
486
487 How the sampling rate is determined:
488 Let p = num / total, where num is the sample size and total is the
489 total number of data points in the RDD. We're trying to compute
490 q > p such that
491 - when sampling with replacement, we're drawing each data point
492 with prob_i ~ Pois(q), where we want to guarantee
493 Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
494 total), i.e. the failure rate of not having a sufficiently large
495 sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
496 to guarantee 0.9999 success rate for num > 12, but we need a
497 slightly larger q (9 empirically determined).
498 - when sampling without replacement, we're drawing each data point
499 with prob_i ~ Binomial(total, fraction) and our choice of q
500 guarantees 1-delta, or 0.9999 success rate, where success rate is
501 defined the same as in sampling with replacement.
502 """
503 fraction = float(sampleSizeLowerBound) / total
504 if withReplacement:
505 numStDev = 5
506 if (sampleSizeLowerBound < 12):
507 numStDev = 9
508 return fraction + numStDev * sqrt(fraction / total)
509 else:
510 delta = 0.00005
511 gamma = - log(delta) / total
512 return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
513
515 """
516 Return the union of this RDD and another one.
517
518 >>> rdd = sc.parallelize([1, 1, 2, 3])
519 >>> rdd.union(rdd).collect()
520 [1, 1, 2, 3, 1, 1, 2, 3]
521 """
522 if self._jrdd_deserializer == other._jrdd_deserializer:
523 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
524 self._jrdd_deserializer)
525 return rdd
526 else:
527
528
529 self_copy = self._reserialize()
530 other_copy = other._reserialize()
531 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
532 self.ctx.serializer)
533
535 """
536 Return the intersection of this RDD and another one. The output will
537 not contain any duplicate elements, even if the input RDDs did.
538
539 Note that this method performs a shuffle internally.
540
541 >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
542 >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
543 >>> rdd1.intersection(rdd2).collect()
544 [1, 2, 3]
545 """
546 return self.map(lambda v: (v, None)) \
547 .cogroup(other.map(lambda v: (v, None))) \
548 .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
549 .keys()
550
552 serializer = serializer or self.ctx.serializer
553 if self._jrdd_deserializer == serializer:
554 return self
555 else:
556 converted = self.map(lambda x: x, preservesPartitioning=True)
557 converted._jrdd_deserializer = serializer
558 return converted
559
561 """
562 Return the union of this RDD and another one.
563
564 >>> rdd = sc.parallelize([1, 1, 2, 3])
565 >>> (rdd + rdd).collect()
566 [1, 1, 2, 3, 1, 1, 2, 3]
567 """
568 if not isinstance(other, RDD):
569 raise TypeError
570 return self.union(other)
571
572 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
573 """
574 Sorts this RDD, which is assumed to consist of (key, value) pairs.
575 # noqa
576
577 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
578 >>> sc.parallelize(tmp).sortByKey().first()
579 ('1', 3)
580 >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
581 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
582 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
583 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
584 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
585 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
586 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
587 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
588 """
589 if numPartitions is None:
590 numPartitions = self._defaultReducePartitions()
591
592 def sortPartition(iterator):
593 return iter(sorted(iterator, key=lambda (k, v): keyfunc(k), reverse=not ascending))
594
595 if numPartitions == 1:
596 if self.getNumPartitions() > 1:
597 self = self.coalesce(1)
598 return self.mapPartitions(sortPartition)
599
600
601
602
603 rddSize = self.count()
604 maxSampleSize = numPartitions * 20.0
605 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
606 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
607 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
608
609
610
611 bounds = [samples[len(samples) * (i + 1) / numPartitions]
612 for i in range(0, numPartitions - 1)]
613
614 def rangePartitioner(k):
615 p = bisect.bisect_left(bounds, keyfunc(k))
616 if ascending:
617 return p
618 else:
619 return numPartitions - 1 - p
620
621 return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
622
623 - def sortBy(self, keyfunc, ascending=True, numPartitions=None):
624 """
625 Sorts this RDD by the given keyfunc
626
627 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
628 >>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
629 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
630 >>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
631 [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
632 """
633 return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
634
636 """
637 Return an RDD created by coalescing all elements within each partition
638 into a list.
639
640 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
641 >>> sorted(rdd.glom().collect())
642 [[1, 2], [3, 4]]
643 """
644 def func(iterator):
645 yield list(iterator)
646 return self.mapPartitions(func)
647
649 """
650 Return the Cartesian product of this RDD and another one, that is, the
651 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
652 C{b} is in C{other}.
653
654 >>> rdd = sc.parallelize([1, 2])
655 >>> sorted(rdd.cartesian(rdd).collect())
656 [(1, 1), (1, 2), (2, 1), (2, 2)]
657 """
658
659 deserializer = CartesianDeserializer(self._jrdd_deserializer,
660 other._jrdd_deserializer)
661 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
662
663 - def groupBy(self, f, numPartitions=None):
664 """
665 Return an RDD of grouped items.
666
667 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
668 >>> result = rdd.groupBy(lambda x: x % 2).collect()
669 >>> sorted([(x, sorted(y)) for (x, y) in result])
670 [(0, [2, 8]), (1, [1, 1, 3, 5])]
671 """
672 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
673
674 - def pipe(self, command, env={}):
675 """
676 Return an RDD created by piping elements to a forked external process.
677
678 >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
679 ['1', '2', '', '3']
680 """
681 def func(iterator):
682 pipe = Popen(
683 shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
684
685 def pipe_objs(out):
686 for obj in iterator:
687 out.write(str(obj).rstrip('\n') + '\n')
688 out.close()
689 Thread(target=pipe_objs, args=[pipe.stdin]).start()
690 return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
691 return self.mapPartitions(func)
692
694 """
695 Applies a function to all elements of this RDD.
696
697 >>> def f(x): print x
698 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
699 """
700 def processPartition(iterator):
701 for x in iterator:
702 f(x)
703 yield None
704 self.mapPartitions(processPartition).collect()
705
707 """
708 Applies a function to each partition of this RDD.
709
710 >>> def f(iterator):
711 ... for x in iterator:
712 ... print x
713 ... yield None
714 >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
715 """
716 self.mapPartitions(f).collect()
717
719 """
720 Return a list that contains all of the elements in this RDD.
721 """
722 with _JavaStackTrace(self.context) as st:
723 bytesInJava = self._jrdd.collect().iterator()
724 return list(self._collect_iterator_through_file(bytesInJava))
725
727
728
729
730 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
731 tempFile.close()
732 self.ctx._writeToFile(iterator, tempFile.name)
733
734 with open(tempFile.name, 'rb') as tempFile:
735 for item in self._jrdd_deserializer.load_stream(tempFile):
736 yield item
737 os.unlink(tempFile.name)
738
740 """
741 Reduces the elements of this RDD using the specified commutative and
742 associative binary operator. Currently reduces partitions locally.
743
744 >>> from operator import add
745 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
746 15
747 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
748 10
749 """
750 def func(iterator):
751 acc = None
752 for obj in iterator:
753 if acc is None:
754 acc = obj
755 else:
756 acc = f(obj, acc)
757 if acc is not None:
758 yield acc
759 vals = self.mapPartitions(func).collect()
760 return reduce(f, vals)
761
762 - def fold(self, zeroValue, op):
763 """
764 Aggregate the elements of each partition, and then the results for all
765 the partitions, using a given associative function and a neutral "zero
766 value."
767
768 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
769 as its result value to avoid object allocation; however, it should not
770 modify C{t2}.
771
772 >>> from operator import add
773 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
774 15
775 """
776 def func(iterator):
777 acc = zeroValue
778 for obj in iterator:
779 acc = op(obj, acc)
780 yield acc
781 vals = self.mapPartitions(func).collect()
782 return reduce(op, vals, zeroValue)
783
784 - def aggregate(self, zeroValue, seqOp, combOp):
785 """
786 Aggregate the elements of each partition, and then the results for all
787 the partitions, using a given combine functions and a neutral "zero
788 value."
789
790 The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
791 as its result value to avoid object allocation; however, it should not
792 modify C{t2}.
793
794 The first function (seqOp) can return a different result type, U, than
795 the type of this RDD. Thus, we need one operation for merging a T into
796 an U and one operation for merging two U
797
798 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
799 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
800 >>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
801 (10, 4)
802 >>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
803 (0, 0)
804 """
805 def func(iterator):
806 acc = zeroValue
807 for obj in iterator:
808 acc = seqOp(acc, obj)
809 yield acc
810
811 return self.mapPartitions(func).fold(zeroValue, combOp)
812
814 """
815 Find the maximum item in this RDD.
816
817 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
818 43.0
819 """
820 return self.reduce(max)
821
823 """
824 Find the minimum item in this RDD.
825
826 >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
827 1.0
828 """
829 return self.reduce(min)
830
832 """
833 Add up the elements in this RDD.
834
835 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
836 6.0
837 """
838 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
839
841 """
842 Return the number of elements in this RDD.
843
844 >>> sc.parallelize([2, 3, 4]).count()
845 3
846 """
847 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
848
850 """
851 Return a L{StatCounter} object that captures the mean, variance
852 and count of the RDD's elements in one operation.
853 """
854 def redFunc(left_counter, right_counter):
855 return left_counter.mergeStats(right_counter)
856
857 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
858
860 """
861 Compute a histogram using the provided buckets. The buckets
862 are all open to the right except for the last which is closed.
863 e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
864 which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
865 and 50 we would have a histogram of 1,0,1.
866
867 If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
868 this can be switched from an O(log n) inseration to O(1) per
869 element(where n = # buckets).
870
871 Buckets must be sorted and not contain any duplicates, must be
872 at least two elements.
873
874 If `buckets` is a number, it will generates buckets which are
875 evenly spaced between the minimum and maximum of the RDD. For
876 example, if the min value is 0 and the max is 100, given buckets
877 as 2, the resulting buckets will be [0,50) [50,100]. buckets must
878 be at least 1 If the RDD contains infinity, NaN throws an exception
879 If the elements in RDD do not vary (max == min) always returns
880 a single bucket.
881
882 It will return an tuple of buckets and histogram.
883
884 >>> rdd = sc.parallelize(range(51))
885 >>> rdd.histogram(2)
886 ([0, 25, 50], [25, 26])
887 >>> rdd.histogram([0, 5, 25, 50])
888 ([0, 5, 25, 50], [5, 20, 26])
889 >>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
890 ([0, 15, 30, 45, 60], [15, 15, 15, 6])
891 >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
892 >>> rdd.histogram(("a", "b", "c"))
893 (('a', 'b', 'c'), [2, 2])
894 """
895
896 if isinstance(buckets, (int, long)):
897 if buckets < 1:
898 raise ValueError("number of buckets must be >= 1")
899
900
901 def comparable(x):
902 if x is None:
903 return False
904 if type(x) is float and isnan(x):
905 return False
906 return True
907
908 filtered = self.filter(comparable)
909
910
911 def minmax(a, b):
912 return min(a[0], b[0]), max(a[1], b[1])
913 try:
914 minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
915 except TypeError as e:
916 if " empty " in str(e):
917 raise ValueError("can not generate buckets from empty RDD")
918 raise
919
920 if minv == maxv or buckets == 1:
921 return [minv, maxv], [filtered.count()]
922
923 try:
924 inc = (maxv - minv) / buckets
925 except TypeError:
926 raise TypeError("Can not generate buckets with non-number in RDD")
927
928 if isinf(inc):
929 raise ValueError("Can not generate buckets with infinite value")
930
931
932 if inc * buckets != maxv - minv:
933 inc = (maxv - minv) * 1.0 / buckets
934
935 buckets = [i * inc + minv for i in range(buckets)]
936 buckets.append(maxv)
937 even = True
938
939 elif isinstance(buckets, (list, tuple)):
940 if len(buckets) < 2:
941 raise ValueError("buckets should have more than one value")
942
943 if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
944 raise ValueError("can not have None or NaN in buckets")
945
946 if sorted(buckets) != list(buckets):
947 raise ValueError("buckets should be sorted")
948
949 if len(set(buckets)) != len(buckets):
950 raise ValueError("buckets should not contain duplicated values")
951
952 minv = buckets[0]
953 maxv = buckets[-1]
954 even = False
955 inc = None
956 try:
957 steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
958 except TypeError:
959 pass
960 else:
961 if max(steps) - min(steps) < 1e-10:
962 even = True
963 inc = (maxv - minv) / (len(buckets) - 1)
964
965 else:
966 raise TypeError("buckets should be a list or tuple or number(int or long)")
967
968 def histogram(iterator):
969 counters = [0] * len(buckets)
970 for i in iterator:
971 if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
972 continue
973 t = (int((i - minv) / inc) if even
974 else bisect.bisect_right(buckets, i) - 1)
975 counters[t] += 1
976
977 last = counters.pop()
978 counters[-1] += last
979 return [counters]
980
981 def mergeCounters(a, b):
982 return [i + j for i, j in zip(a, b)]
983
984 return buckets, self.mapPartitions(histogram).reduce(mergeCounters)
985
987 """
988 Compute the mean of this RDD's elements.
989
990 >>> sc.parallelize([1, 2, 3]).mean()
991 2.0
992 """
993 return self.stats().mean()
994
996 """
997 Compute the variance of this RDD's elements.
998
999 >>> sc.parallelize([1, 2, 3]).variance()
1000 0.666...
1001 """
1002 return self.stats().variance()
1003
1005 """
1006 Compute the standard deviation of this RDD's elements.
1007
1008 >>> sc.parallelize([1, 2, 3]).stdev()
1009 0.816...
1010 """
1011 return self.stats().stdev()
1012
1014 """
1015 Compute the sample standard deviation of this RDD's elements (which
1016 corrects for bias in estimating the standard deviation by dividing by
1017 N-1 instead of N).
1018
1019 >>> sc.parallelize([1, 2, 3]).sampleStdev()
1020 1.0
1021 """
1022 return self.stats().sampleStdev()
1023
1025 """
1026 Compute the sample variance of this RDD's elements (which corrects
1027 for bias in estimating the variance by dividing by N-1 instead of N).
1028
1029 >>> sc.parallelize([1, 2, 3]).sampleVariance()
1030 1.0
1031 """
1032 return self.stats().sampleVariance()
1033
1035 """
1036 Return the count of each unique value in this RDD as a dictionary of
1037 (value, count) pairs.
1038
1039 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
1040 [(1, 2), (2, 3)]
1041 """
1042 def countPartition(iterator):
1043 counts = defaultdict(int)
1044 for obj in iterator:
1045 counts[obj] += 1
1046 yield counts
1047
1048 def mergeMaps(m1, m2):
1049 for (k, v) in m2.iteritems():
1050 m1[k] += v
1051 return m1
1052 return self.mapPartitions(countPartition).reduce(mergeMaps)
1053
1054 - def top(self, num):
1055 """
1056 Get the top N elements from a RDD.
1057
1058 Note: It returns the list sorted in descending order.
1059 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
1060 [12]
1061 >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
1062 [6, 5]
1063 """
1064 def topIterator(iterator):
1065 q = []
1066 for k in iterator:
1067 if len(q) < num:
1068 heapq.heappush(q, k)
1069 else:
1070 heapq.heappushpop(q, k)
1071 yield q
1072
1073 def merge(a, b):
1074 return next(topIterator(a + b))
1075
1076 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
1077
1079 """
1080 Get the N elements from a RDD ordered in ascending order or as
1081 specified by the optional key function.
1082
1083 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
1084 [1, 2, 3, 4, 5, 6]
1085 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
1086 [10, 9, 7, 6, 5, 4]
1087 """
1088
1089 def topNKeyedElems(iterator, key_=None):
1090 q = MaxHeapQ(num)
1091 for k in iterator:
1092 if key_ is not None:
1093 k = (key_(k), k)
1094 q.insert(k)
1095 yield q.getElements()
1096
1097 def unKey(x, key_=None):
1098 if key_ is not None:
1099 x = [i[1] for i in x]
1100 return x
1101
1102 def merge(a, b):
1103 return next(topNKeyedElems(a + b))
1104 result = self.mapPartitions(
1105 lambda i: topNKeyedElems(i, key)).reduce(merge)
1106 return sorted(unKey(result, key), key=key)
1107
1108 - def take(self, num):
1109 """
1110 Take the first num elements of the RDD.
1111
1112 It works by first scanning one partition, and use the results from
1113 that partition to estimate the number of additional partitions needed
1114 to satisfy the limit.
1115
1116 Translated from the Scala implementation in RDD#take().
1117
1118 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
1119 [2, 3]
1120 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
1121 [2, 3, 4, 5, 6]
1122 >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
1123 [91, 92, 93]
1124 """
1125 items = []
1126 totalParts = self._jrdd.partitions().size()
1127 partsScanned = 0
1128
1129 while len(items) < num and partsScanned < totalParts:
1130
1131
1132
1133 numPartsToTry = 1
1134 if partsScanned > 0:
1135
1136
1137
1138 if len(items) == 0:
1139 numPartsToTry = partsScanned * 4
1140 else:
1141 numPartsToTry = int(1.5 * num * partsScanned / len(items))
1142
1143 left = num - len(items)
1144
1145 def takeUpToNumLeft(iterator):
1146 taken = 0
1147 while taken < left:
1148 yield next(iterator)
1149 taken += 1
1150
1151 p = range(
1152 partsScanned, min(partsScanned + numPartsToTry, totalParts))
1153 res = self.context.runJob(self, takeUpToNumLeft, p, True)
1154
1155 items += res
1156 partsScanned += numPartsToTry
1157
1158 return items[:num]
1159
1161 """
1162 Return the first element in this RDD.
1163
1164 >>> sc.parallelize([2, 3, 4]).first()
1165 2
1166 """
1167 return self.take(1)[0]
1168
1170 """
1171 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1172 system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
1173 converted for output using either user specified converters or, by default,
1174 L{org.apache.spark.api.python.JavaToWritableConverter}.
1175
1176 @param conf: Hadoop job configuration, passed in as a dict
1177 @param keyConverter: (None by default)
1178 @param valueConverter: (None by default)
1179 """
1180 jconf = self.ctx._dictToJavaMap(conf)
1181 pickledRDD = self._toPickleSerialization()
1182 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1183 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
1184 keyConverter, valueConverter, True)
1185
1186 - def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1187 keyConverter=None, valueConverter=None, conf=None):
1188 """
1189 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1190 system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
1191 will be inferred if not specified. Keys and values are converted for output using either
1192 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
1193 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
1194 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
1195
1196 @param path: path to Hadoop file
1197 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
1198 (e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
1199 @param keyClass: fully qualified classname of key Writable class
1200 (e.g. "org.apache.hadoop.io.IntWritable", None by default)
1201 @param valueClass: fully qualified classname of value Writable class
1202 (e.g. "org.apache.hadoop.io.Text", None by default)
1203 @param keyConverter: (None by default)
1204 @param valueConverter: (None by default)
1205 @param conf: Hadoop job configuration, passed in as a dict (None by default)
1206 """
1207 jconf = self.ctx._dictToJavaMap(conf)
1208 pickledRDD = self._toPickleSerialization()
1209 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1210 self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
1211 outputFormatClass,
1212 keyClass, valueClass,
1213 keyConverter, valueConverter, jconf)
1214
1216 """
1217 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1218 system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
1219 converted for output using either user specified converters or, by default,
1220 L{org.apache.spark.api.python.JavaToWritableConverter}.
1221
1222 @param conf: Hadoop job configuration, passed in as a dict
1223 @param keyConverter: (None by default)
1224 @param valueConverter: (None by default)
1225 """
1226 jconf = self.ctx._dictToJavaMap(conf)
1227 pickledRDD = self._toPickleSerialization()
1228 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1229 self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, batched, jconf,
1230 keyConverter, valueConverter, False)
1231
1232 - def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
1233 keyConverter=None, valueConverter=None, conf=None,
1234 compressionCodecClass=None):
1235 """
1236 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1237 system, using the old Hadoop OutputFormat API (mapred package). Key and value types
1238 will be inferred if not specified. Keys and values are converted for output using either
1239 user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The
1240 C{conf} is applied on top of the base Hadoop conf associated with the SparkContext
1241 of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
1242
1243 @param path: path to Hadoop file
1244 @param outputFormatClass: fully qualified classname of Hadoop OutputFormat
1245 (e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
1246 @param keyClass: fully qualified classname of key Writable class
1247 (e.g. "org.apache.hadoop.io.IntWritable", None by default)
1248 @param valueClass: fully qualified classname of value Writable class
1249 (e.g. "org.apache.hadoop.io.Text", None by default)
1250 @param keyConverter: (None by default)
1251 @param valueConverter: (None by default)
1252 @param conf: (None by default)
1253 @param compressionCodecClass: (None by default)
1254 """
1255 jconf = self.ctx._dictToJavaMap(conf)
1256 pickledRDD = self._toPickleSerialization()
1257 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1258 self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
1259 outputFormatClass,
1260 keyClass, valueClass,
1261 keyConverter, valueConverter,
1262 jconf, compressionCodecClass)
1263
1265 """
1266 Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file
1267 system, using the L{org.apache.hadoop.io.Writable} types that we convert from the
1268 RDD's key and value types. The mechanism is as follows:
1269 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
1270 2. Keys and values of this Java RDD are converted to Writables and written out.
1271
1272 @param path: path to sequence file
1273 @param compressionCodecClass: (None by default)
1274 """
1275 pickledRDD = self._toPickleSerialization()
1276 batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
1277 self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, batched,
1278 path, compressionCodecClass)
1279
1281 """
1282 Save this RDD as a SequenceFile of serialized objects. The serializer
1283 used is L{pyspark.serializers.PickleSerializer}, default batch size
1284 is 10.
1285
1286 >>> tmpFile = NamedTemporaryFile(delete=True)
1287 >>> tmpFile.close()
1288 >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
1289 >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
1290 [1, 2, 'rdd', 'spark']
1291 """
1292 self._reserialize(BatchedSerializer(PickleSerializer(),
1293 batchSize))._jrdd.saveAsObjectFile(path)
1294
1295 - def saveAsTextFile(self, path):
1296 """
1297 Save this RDD as a text file, using string representations of elements.
1298
1299 >>> tempFile = NamedTemporaryFile(delete=True)
1300 >>> tempFile.close()
1301 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
1302 >>> from fileinput import input
1303 >>> from glob import glob
1304 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
1305 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
1306
1307 Empty lines are tolerated when saving to text files.
1308
1309 >>> tempFile2 = NamedTemporaryFile(delete=True)
1310 >>> tempFile2.close()
1311 >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
1312 >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
1313 '\\n\\n\\nbar\\nfoo\\n'
1314 """
1315 def func(split, iterator):
1316 for x in iterator:
1317 if not isinstance(x, basestring):
1318 x = unicode(x)
1319 if isinstance(x, unicode):
1320 x = x.encode("utf-8")
1321 yield x
1322 keyed = self.mapPartitionsWithIndex(func)
1323 keyed._bypass_serializer = True
1324 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
1325
1326
1327
1329 """
1330 Return the key-value pairs in this RDD to the master as a dictionary.
1331
1332 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
1333 >>> m[1]
1334 2
1335 >>> m[3]
1336 4
1337 """
1338 return dict(self.collect())
1339
1341 """
1342 Return an RDD with the keys of each tuple.
1343
1344 >>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
1345 >>> m.collect()
1346 [1, 3]
1347 """
1348 return self.map(lambda (k, v): k)
1349
1351 """
1352 Return an RDD with the values of each tuple.
1353
1354 >>> m = sc.parallelize([(1, 2), (3, 4)]).values()
1355 >>> m.collect()
1356 [2, 4]
1357 """
1358 return self.map(lambda (k, v): v)
1359
1361 """
1362 Merge the values for each key using an associative reduce function.
1363
1364 This will also perform the merging locally on each mapper before
1365 sending results to a reducer, similarly to a "combiner" in MapReduce.
1366
1367 Output will be hash-partitioned with C{numPartitions} partitions, or
1368 the default parallelism level if C{numPartitions} is not specified.
1369
1370 >>> from operator import add
1371 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1372 >>> sorted(rdd.reduceByKey(add).collect())
1373 [('a', 2), ('b', 1)]
1374 """
1375 return self.combineByKey(lambda x: x, func, func, numPartitions)
1376
1378 """
1379 Merge the values for each key using an associative reduce function, but
1380 return the results immediately to the master as a dictionary.
1381
1382 This will also perform the merging locally on each mapper before
1383 sending results to a reducer, similarly to a "combiner" in MapReduce.
1384
1385 >>> from operator import add
1386 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1387 >>> sorted(rdd.reduceByKeyLocally(add).items())
1388 [('a', 2), ('b', 1)]
1389 """
1390 def reducePartition(iterator):
1391 m = {}
1392 for (k, v) in iterator:
1393 m[k] = v if k not in m else func(m[k], v)
1394 yield m
1395
1396 def mergeMaps(m1, m2):
1397 for (k, v) in m2.iteritems():
1398 m1[k] = v if k not in m1 else func(m1[k], v)
1399 return m1
1400 return self.mapPartitions(reducePartition).reduce(mergeMaps)
1401
1403 """
1404 Count the number of elements for each key, and return the result to the
1405 master as a dictionary.
1406
1407 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1408 >>> sorted(rdd.countByKey().items())
1409 [('a', 2), ('b', 1)]
1410 """
1411 return self.map(lambda x: x[0]).countByValue()
1412
1413 - def join(self, other, numPartitions=None):
1414 """
1415 Return an RDD containing all pairs of elements with matching keys in
1416 C{self} and C{other}.
1417
1418 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
1419 (k, v1) is in C{self} and (k, v2) is in C{other}.
1420
1421 Performs a hash join across the cluster.
1422
1423 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1424 >>> y = sc.parallelize([("a", 2), ("a", 3)])
1425 >>> sorted(x.join(y).collect())
1426 [('a', (1, 2)), ('a', (1, 3))]
1427 """
1428 return python_join(self, other, numPartitions)
1429
1431 """
1432 Perform a left outer join of C{self} and C{other}.
1433
1434 For each element (k, v) in C{self}, the resulting RDD will either
1435 contain all pairs (k, (v, w)) for w in C{other}, or the pair
1436 (k, (v, None)) if no elements in other have key k.
1437
1438 Hash-partitions the resulting RDD into the given number of partitions.
1439
1440 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1441 >>> y = sc.parallelize([("a", 2)])
1442 >>> sorted(x.leftOuterJoin(y).collect())
1443 [('a', (1, 2)), ('b', (4, None))]
1444 """
1445 return python_left_outer_join(self, other, numPartitions)
1446
1448 """
1449 Perform a right outer join of C{self} and C{other}.
1450
1451 For each element (k, w) in C{other}, the resulting RDD will either
1452 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
1453 if no elements in C{self} have key k.
1454
1455 Hash-partitions the resulting RDD into the given number of partitions.
1456
1457 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1458 >>> y = sc.parallelize([("a", 2)])
1459 >>> sorted(y.rightOuterJoin(x).collect())
1460 [('a', (2, 1)), ('b', (None, 4))]
1461 """
1462 return python_right_outer_join(self, other, numPartitions)
1463
1464
1465
1466
1467 - def partitionBy(self, numPartitions, partitionFunc=portable_hash):
1468 """
1469 Return a copy of the RDD partitioned using the specified partitioner.
1470
1471 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
1472 >>> sets = pairs.partitionBy(2).glom().collect()
1473 >>> set(sets[0]).intersection(set(sets[1]))
1474 set([])
1475 """
1476 if numPartitions is None:
1477 numPartitions = self._defaultReducePartitions()
1478
1479
1480
1481
1482
1483
1484
1485 outputSerializer = self.ctx._unbatched_serializer
1486
1487 limit = (_parse_memory(self.ctx._conf.get(
1488 "spark.python.worker.memory", "512m")) / 2)
1489
1490 def add_shuffle_key(split, iterator):
1491
1492 buckets = defaultdict(list)
1493 c, batch = 0, min(10 * numPartitions, 1000)
1494
1495 for (k, v) in iterator:
1496 buckets[partitionFunc(k) % numPartitions].append((k, v))
1497 c += 1
1498
1499
1500 if (c % 1000 == 0 and get_used_memory() > limit
1501 or c > batch):
1502 n, size = len(buckets), 0
1503 for split in buckets.keys():
1504 yield pack_long(split)
1505 d = outputSerializer.dumps(buckets[split])
1506 del buckets[split]
1507 yield d
1508 size += len(d)
1509
1510 avg = (size / n) >> 20
1511
1512 if avg < 1:
1513 batch *= 1.5
1514 elif avg > 10:
1515 batch = max(batch / 1.5, 1)
1516 c = 0
1517
1518 for (split, items) in buckets.iteritems():
1519 yield pack_long(split)
1520 yield outputSerializer.dumps(items)
1521
1522 keyed = self.mapPartitionsWithIndex(add_shuffle_key)
1523 keyed._bypass_serializer = True
1524 with _JavaStackTrace(self.context) as st:
1525 pairRDD = self.ctx._jvm.PairwiseRDD(
1526 keyed._jrdd.rdd()).asJavaPairRDD()
1527 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
1528 id(partitionFunc))
1529 jrdd = pairRDD.partitionBy(partitioner).values()
1530 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
1531
1532
1533 rdd._partitionFunc = partitionFunc
1534 return rdd
1535
1536
1537 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
1538 numPartitions=None):
1539 """
1540 Generic function to combine the elements for each key using a custom
1541 set of aggregation functions.
1542
1543 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
1544 type" C. Note that V and C can be different -- for example, one might
1545 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
1546
1547 Users provide three functions:
1548
1549 - C{createCombiner}, which turns a V into a C (e.g., creates
1550 a one-element list)
1551 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
1552 a list)
1553 - C{mergeCombiners}, to combine two C's into a single one.
1554
1555 In addition, users can control the partitioning of the output RDD.
1556
1557 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1558 >>> def f(x): return x
1559 >>> def add(a, b): return a + str(b)
1560 >>> sorted(x.combineByKey(str, add, add).collect())
1561 [('a', '11'), ('b', '1')]
1562 """
1563 if numPartitions is None:
1564 numPartitions = self._defaultReducePartitions()
1565
1566 serializer = self.ctx.serializer
1567 spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
1568 == 'true')
1569 memory = _parse_memory(self.ctx._conf.get(
1570 "spark.python.worker.memory", "512m"))
1571 agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
1572
1573 def combineLocally(iterator):
1574 merger = ExternalMerger(agg, memory * 0.9, serializer) \
1575 if spill else InMemoryMerger(agg)
1576 merger.mergeValues(iterator)
1577 return merger.iteritems()
1578
1579 locally_combined = self.mapPartitions(combineLocally)
1580 shuffled = locally_combined.partitionBy(numPartitions)
1581
1582 def _mergeCombiners(iterator):
1583 merger = ExternalMerger(agg, memory, serializer) \
1584 if spill else InMemoryMerger(agg)
1585 merger.mergeCombiners(iterator)
1586 return merger.iteritems()
1587
1588 return shuffled.mapPartitions(_mergeCombiners)
1589
1590 - def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
1591 """
1592 Aggregate the values of each key, using given combine functions and a neutral
1593 "zero value". This function can return a different result type, U, than the type
1594 of the values in this RDD, V. Thus, we need one operation for merging a V into
1595 a U and one operation for merging two U's, The former operation is used for merging
1596 values within a partition, and the latter is used for merging values between
1597 partitions. To avoid memory allocation, both of these functions are
1598 allowed to modify and return their first argument instead of creating a new U.
1599 """
1600 def createZero():
1601 return copy.deepcopy(zeroValue)
1602
1603 return self.combineByKey(
1604 lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
1605
1606 - def foldByKey(self, zeroValue, func, numPartitions=None):
1607 """
1608 Merge the values for each key using an associative function "func"
1609 and a neutral "zeroValue" which may be added to the result an
1610 arbitrary number of times, and must not change the result
1611 (e.g., 0 for addition, or 1 for multiplication.).
1612
1613 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1614 >>> from operator import add
1615 >>> rdd.foldByKey(0, add).collect()
1616 [('a', 2), ('b', 1)]
1617 """
1618 def createZero():
1619 return copy.deepcopy(zeroValue)
1620
1621 return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
1622
1623
1625 """
1626 Group the values for each key in the RDD into a single sequence.
1627 Hash-partitions the resulting RDD with into numPartitions partitions.
1628
1629 Note: If you are grouping in order to perform an aggregation (such as a
1630 sum or average) over each key, using reduceByKey will provide much
1631 better performance.
1632
1633 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1634 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
1635 [('a', [1, 1]), ('b', [1])]
1636 """
1637
1638 def createCombiner(x):
1639 return [x]
1640
1641 def mergeValue(xs, x):
1642 xs.append(x)
1643 return xs
1644
1645 def mergeCombiners(a, b):
1646 a.extend(b)
1647 return a
1648
1649 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1650 numPartitions).mapValues(lambda x: ResultIterable(x))
1651
1652
1654 """
1655 Pass each value in the key-value pair RDD through a flatMap function
1656 without changing the keys; this also retains the original RDD's
1657 partitioning.
1658
1659 >>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
1660 >>> def f(x): return x
1661 >>> x.flatMapValues(f).collect()
1662 [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
1663 """
1664 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1665 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1666
1668 """
1669 Pass each value in the key-value pair RDD through a map function
1670 without changing the keys; this also retains the original RDD's
1671 partitioning.
1672
1673 >>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
1674 >>> def f(x): return len(x)
1675 >>> x.mapValues(f).collect()
1676 [('a', 3), ('b', 1)]
1677 """
1678 map_values_fn = lambda (k, v): (k, f(v))
1679 return self.map(map_values_fn, preservesPartitioning=True)
1680
1682 """
1683 Alias for cogroup but with support for multiple RDDs.
1684
1685 >>> w = sc.parallelize([("a", 5), ("b", 6)])
1686 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1687 >>> y = sc.parallelize([("a", 2)])
1688 >>> z = sc.parallelize([("b", 42)])
1689 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \
1690 sorted(list(w.groupWith(x, y, z).collect())))
1691 [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
1692
1693 """
1694 return python_cogroup((self, other) + others, numPartitions=None)
1695
1696
1697 - def cogroup(self, other, numPartitions=None):
1698 """
1699 For each key k in C{self} or C{other}, return a resulting RDD that
1700 contains a tuple with the list of values for that key in C{self} as
1701 well as C{other}.
1702
1703 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1704 >>> y = sc.parallelize([("a", 2)])
1705 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
1706 [('a', ([1], [2])), ('b', ([4], []))]
1707 """
1708 return python_cogroup((self, other), numPartitions)
1709
1710 - def sampleByKey(self, withReplacement, fractions, seed=None):
1711 """
1712 Return a subset of this RDD sampled by key (via stratified sampling).
1713 Create a sample of this RDD using variable sampling rates for
1714 different keys as specified by fractions, a key to sampling rate map.
1715
1716 >>> fractions = {"a": 0.2, "b": 0.1}
1717 >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
1718 >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
1719 >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
1720 True
1721 >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
1722 True
1723 >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
1724 True
1725 """
1726 for fraction in fractions.values():
1727 assert fraction >= 0.0, "Negative fraction value: %s" % fraction
1728 return self.mapPartitionsWithIndex(
1729 RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
1730
1732 """
1733 Return each (key, value) pair in C{self} that has no pair with matching
1734 key in C{other}.
1735
1736 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1737 >>> y = sc.parallelize([("a", 3), ("c", None)])
1738 >>> sorted(x.subtractByKey(y).collect())
1739 [('b', 4), ('b', 5)]
1740 """
1741 def filter_func((key, vals)):
1742 return len(vals[0]) > 0 and len(vals[1]) == 0
1743 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1744 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1745
1746 - def subtract(self, other, numPartitions=None):
1747 """
1748 Return each value in C{self} that is not contained in C{other}.
1749
1750 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1751 >>> y = sc.parallelize([("a", 3), ("c", None)])
1752 >>> sorted(x.subtract(y).collect())
1753 [('a', 1), ('b', 4), ('b', 5)]
1754 """
1755
1756 rdd = other.map(lambda x: (x, True))
1757 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1758
1760 """
1761 Creates tuples of the elements in this RDD by applying C{f}.
1762
1763 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1764 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1765 >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
1766 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1767 """
1768 return self.map(lambda x: (f(x), x))
1769
1771 """
1772 Return a new RDD that has exactly numPartitions partitions.
1773
1774 Can increase or decrease the level of parallelism in this RDD.
1775 Internally, this uses a shuffle to redistribute data.
1776 If you are decreasing the number of partitions in this RDD, consider
1777 using `coalesce`, which can avoid performing a shuffle.
1778
1779 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1780 >>> sorted(rdd.glom().collect())
1781 [[1], [2, 3], [4, 5], [6, 7]]
1782 >>> len(rdd.repartition(2).glom().collect())
1783 2
1784 >>> len(rdd.repartition(10).glom().collect())
1785 10
1786 """
1787 jrdd = self._jrdd.repartition(numPartitions)
1788 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1789
1790 - def coalesce(self, numPartitions, shuffle=False):
1791 """
1792 Return a new RDD that is reduced into `numPartitions` partitions.
1793
1794 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1795 [[1], [2, 3], [4, 5]]
1796 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1797 [[1, 2, 3, 4, 5]]
1798 """
1799 jrdd = self._jrdd.coalesce(numPartitions)
1800 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1801
1802 - def zip(self, other):
1803 """
1804 Zips this RDD with another one, returning key-value pairs with the
1805 first element in each RDD second element in each RDD, etc. Assumes
1806 that the two RDDs have the same number of partitions and the same
1807 number of elements in each partition (e.g. one was made through
1808 a map on the other).
1809
1810 >>> x = sc.parallelize(range(0,5))
1811 >>> y = sc.parallelize(range(1000, 1005))
1812 >>> x.zip(y).collect()
1813 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1814 """
1815 if self.getNumPartitions() != other.getNumPartitions():
1816 raise ValueError("Can only zip with RDD which has the same number of partitions")
1817
1818 def get_batch_size(ser):
1819 if isinstance(ser, BatchedSerializer):
1820 return ser.batchSize
1821 return 0
1822
1823 def batch_as(rdd, batchSize):
1824 ser = rdd._jrdd_deserializer
1825 if isinstance(ser, BatchedSerializer):
1826 ser = ser.serializer
1827 return rdd._reserialize(BatchedSerializer(ser, batchSize))
1828
1829 my_batch = get_batch_size(self._jrdd_deserializer)
1830 other_batch = get_batch_size(other._jrdd_deserializer)
1831 if my_batch != other_batch:
1832
1833 if my_batch > other_batch:
1834 other = batch_as(other, my_batch)
1835 else:
1836 self = batch_as(self, other_batch)
1837
1838
1839
1840 pairRDD = self._jrdd.zip(other._jrdd)
1841 deserializer = PairDeserializer(self._jrdd_deserializer,
1842 other._jrdd_deserializer)
1843 return RDD(pairRDD, self.ctx, deserializer)
1844
1846 """
1847 Zips this RDD with its element indices.
1848
1849 The ordering is first based on the partition index and then the
1850 ordering of items within each partition. So the first item in
1851 the first partition gets index 0, and the last item in the last
1852 partition receives the largest index.
1853
1854 This method needs to trigger a spark job when this RDD contains
1855 more than one partitions.
1856
1857 >>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
1858 [('a', 0), ('b', 1), ('c', 2), ('d', 3)]
1859 """
1860 starts = [0]
1861 if self.getNumPartitions() > 1:
1862 nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
1863 for i in range(len(nums) - 1):
1864 starts.append(starts[-1] + nums[i])
1865
1866 def func(k, it):
1867 for i, v in enumerate(it, starts[k]):
1868 yield v, i
1869
1870 return self.mapPartitionsWithIndex(func)
1871
1873 """
1874 Zips this RDD with generated unique Long ids.
1875
1876 Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
1877 n is the number of partitions. So there may exist gaps, but this
1878 method won't trigger a spark job, which is different from
1879 L{zipWithIndex}
1880
1881 >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
1882 [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
1883 """
1884 n = self.getNumPartitions()
1885
1886 def func(k, it):
1887 for i, v in enumerate(it):
1888 yield v, i * n + k
1889
1890 return self.mapPartitionsWithIndex(func)
1891
1893 """
1894 Return the name of this RDD.
1895 """
1896 name_ = self._jrdd.name()
1897 if not name_:
1898 return None
1899 return name_.encode('utf-8')
1900
1902 """
1903 Assign a name to this RDD.
1904
1905 >>> rdd1 = sc.parallelize([1,2])
1906 >>> rdd1.setName('RDD1')
1907 >>> rdd1.name()
1908 'RDD1'
1909 """
1910 self._jrdd.setName(name)
1911
1913 """
1914 A description of this RDD and its recursive dependencies for debugging.
1915 """
1916 debug_string = self._jrdd.toDebugString()
1917 if not debug_string:
1918 return None
1919 return debug_string.encode('utf-8')
1920
1922 """
1923 Get the RDD's current storage level.
1924
1925 >>> rdd1 = sc.parallelize([1,2])
1926 >>> rdd1.getStorageLevel()
1927 StorageLevel(False, False, False, False, 1)
1928 >>> print(rdd1.getStorageLevel())
1929 Serialized 1x Replicated
1930 """
1931 java_storage_level = self._jrdd.getStorageLevel()
1932 storage_level = StorageLevel(java_storage_level.useDisk(),
1933 java_storage_level.useMemory(),
1934 java_storage_level.useOffHeap(),
1935 java_storage_level.deserialized(),
1936 java_storage_level.replication())
1937 return storage_level
1938
1940 """
1941 Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
1942 If spark.default.parallelism is set, then we'll use the value from SparkContext
1943 defaultParallelism, otherwise we'll use the number of partitions in this RDD.
1944
1945 This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
1946 the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
1947 be inherent.
1948 """
1949 if self.ctx._conf.contains("spark.default.parallelism"):
1950 return self.ctx.defaultParallelism
1951 else:
1952 return self.getNumPartitions()
1953
1961
1962 """
1963 Pipelined maps:
1964
1965 >>> rdd = sc.parallelize([1, 2, 3, 4])
1966 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1967 [4, 8, 12, 16]
1968 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1969 [4, 8, 12, 16]
1970
1971 Pipelined reduces:
1972 >>> from operator import add
1973 >>> rdd.map(lambda x: 2 * x).reduce(add)
1974 20
1975 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1976 20
1977 """
1978
1979 - def __init__(self, prev, func, preservesPartitioning=False):
1980 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1981
1982 self.func = func
1983 self.preservesPartitioning = preservesPartitioning
1984 self._prev_jrdd = prev._jrdd
1985 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1986 else:
1987 prev_func = prev.func
1988
1989 def pipeline_func(split, iterator):
1990 return func(split, prev_func(split, iterator))
1991 self.func = pipeline_func
1992 self.preservesPartitioning = \
1993 prev.preservesPartitioning and preservesPartitioning
1994 self._prev_jrdd = prev._prev_jrdd
1995 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1996 self.is_cached = False
1997 self.is_checkpointed = False
1998 self.ctx = prev.ctx
1999 self.prev = prev
2000 self._jrdd_val = None
2001 self._jrdd_deserializer = self.ctx.serializer
2002 self._bypass_serializer = False
2003
2004 @property
2006 if self._jrdd_val:
2007 return self._jrdd_val
2008 if self._bypass_serializer:
2009 self._jrdd_deserializer = NoOpSerializer()
2010 command = (self.func, self._prev_jrdd_deserializer,
2011 self._jrdd_deserializer)
2012 ser = CloudPickleSerializer()
2013 pickled_command = ser.dumps(command)
2014 broadcast_vars = ListConverter().convert(
2015 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
2016 self.ctx._gateway._gateway_client)
2017 self.ctx._pickled_broadcast_vars.clear()
2018 env = MapConverter().convert(self.ctx.environment,
2019 self.ctx._gateway._gateway_client)
2020 includes = ListConverter().convert(self.ctx._python_includes,
2021 self.ctx._gateway._gateway_client)
2022 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
2023 bytearray(pickled_command),
2024 env, includes, self.preservesPartitioning,
2025 self.ctx.pythonExec,
2026 broadcast_vars, self.ctx._javaAccumulator)
2027 self._jrdd_val = python_rdd.asJavaRDD()
2028 return self._jrdd_val
2029
2031 return not (self.is_cached or self.is_checkpointed)
2032
2035 import doctest
2036 from pyspark.context import SparkContext
2037 globs = globals().copy()
2038
2039
2040 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
2041 (failure_count, test_count) = doctest.testmod(
2042 globs=globs, optionflags=doctest.ELLIPSIS)
2043 globs['sc'].stop()
2044 if failure_count:
2045 exit(-1)
2046
2047
2048 if __name__ == "__main__":
2049 _test()
2050