1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 from base64 import standard_b64encode as b64enc
19 import copy
20 from collections import defaultdict
21 from itertools import chain, ifilter, imap
22 import operator
23 import os
24 import sys
25 import shlex
26 import traceback
27 from subprocess import Popen, PIPE
28 from tempfile import NamedTemporaryFile
29 from threading import Thread
30 import warnings
31 import heapq
32
33 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
34 BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
35 from pyspark.join import python_join, python_left_outer_join, \
36 python_right_outer_join, python_cogroup
37 from pyspark.statcounter import StatCounter
38 from pyspark.rddsampler import RDDSampler
39 from pyspark.storagelevel import StorageLevel
40
41 from py4j.java_collections import ListConverter, MapConverter
42
43 __all__ = ["RDD"]
47 tb = traceback.extract_stack()
48 if len(tb) == 0:
49 return "I'm lost!"
50
51
52
53 file, line, module, what = tb[len(tb) - 1]
54 sparkpath = os.path.dirname(file)
55 first_spark_frame = len(tb) - 1
56 for i in range(0, len(tb)):
57 file, line, fun, what = tb[i]
58 if file.startswith(sparkpath):
59 first_spark_frame = i
60 break
61 if first_spark_frame == 0:
62 file, line, fun, what = tb[0]
63 return "%s at %s:%d" % (fun, file, line)
64 sfile, sline, sfun, swhat = tb[first_spark_frame]
65 ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
66 return "%s at %s:%d" % (sfun, ufile, uline)
67
68 _spark_stack_depth = 0
72 self._traceback = _extract_concise_traceback()
73 self._context = sc
74
80
86
88 """
89 An implementation of MaxHeap.
90 >>> import pyspark.rdd
91 >>> heap = pyspark.rdd.MaxHeapQ(5)
92 >>> [heap.insert(i) for i in range(10)]
93 [None, None, None, None, None, None, None, None, None, None]
94 >>> sorted(heap.getElements())
95 [0, 1, 2, 3, 4]
96 >>> heap = pyspark.rdd.MaxHeapQ(5)
97 >>> [heap.insert(i) for i in range(9, -1, -1)]
98 [None, None, None, None, None, None, None, None, None, None]
99 >>> sorted(heap.getElements())
100 [0, 1, 2, 3, 4]
101 >>> heap = pyspark.rdd.MaxHeapQ(1)
102 >>> [heap.insert(i) for i in range(9, -1, -1)]
103 [None, None, None, None, None, None, None, None, None, None]
104 >>> heap.getElements()
105 [0]
106 """
107
109
110 self.q = [0]
111 self.maxsize = maxsize
112
114 while (k > 1) and (self.q[k/2] < self.q[k]):
115 self._swap(k, k/2)
116 k = k/2
117
119 t = self.q[i]
120 self.q[i] = self.q[j]
121 self.q[j] = t
122
124 N = self.size()
125 while 2 * k <= N:
126 j = 2 * k
127
128
129 if j < N and self.q[j] < self.q[j + 1]:
130 j = j + 1
131 if(self.q[k] > self.q[j]):
132 break
133 self._swap(k, j)
134 k = j
135
137 return len(self.q) - 1
138
140 if (self.size()) < self.maxsize:
141 self.q.append(value)
142 self._swim(self.size())
143 else:
144 self._replaceRoot(value)
145
148
150 if(self.q[1] > value):
151 self.q[1] = value
152 self._sink(1)
153
155 """
156 A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
157 Represents an immutable, partitioned collection of elements that can be
158 operated on in parallel.
159 """
160
161 - def __init__(self, jrdd, ctx, jrdd_deserializer):
162 self._jrdd = jrdd
163 self.is_cached = False
164 self.is_checkpointed = False
165 self.ctx = ctx
166 self._jrdd_deserializer = jrdd_deserializer
167
169 return self._jrdd.toString()
170
171 @property
173 """
174 The L{SparkContext} that this RDD was created on.
175 """
176 return self.ctx
177
179 """
180 Persist this RDD with the default storage level (C{MEMORY_ONLY}).
181 """
182 self.is_cached = True
183 self._jrdd.cache()
184 return self
185
187 """
188 Set this RDD's storage level to persist its values across operations after the first time
189 it is computed. This can only be used to assign a new storage level if the RDD does not
190 have a storage level set yet.
191 """
192 self.is_cached = True
193 javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
194 self._jrdd.persist(javaStorageLevel)
195 return self
196
198 """
199 Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
200 """
201 self.is_cached = False
202 self._jrdd.unpersist()
203 return self
204
206 """
207 Mark this RDD for checkpointing. It will be saved to a file inside the
208 checkpoint directory set with L{SparkContext.setCheckpointDir()} and
209 all references to its parent RDDs will be removed. This function must
210 be called before any job has been executed on this RDD. It is strongly
211 recommended that this RDD is persisted in memory, otherwise saving it
212 on a file will require recomputation.
213 """
214 self.is_checkpointed = True
215 self._jrdd.rdd().checkpoint()
216
218 """
219 Return whether this RDD has been checkpointed or not
220 """
221 return self._jrdd.rdd().isCheckpointed()
222
224 """
225 Gets the name of the file to which this RDD was checkpointed
226 """
227 checkpointFile = self._jrdd.rdd().getCheckpointFile()
228 if checkpointFile.isDefined():
229 return checkpointFile.get()
230 else:
231 return None
232
233 - def map(self, f, preservesPartitioning=False):
234 """
235 Return a new RDD by applying a function to each element of this RDD.
236 """
237 def func(split, iterator): return imap(f, iterator)
238 return PipelinedRDD(self, func, preservesPartitioning)
239
240 - def flatMap(self, f, preservesPartitioning=False):
241 """
242 Return a new RDD by first applying a function to all elements of this
243 RDD, and then flattening the results.
244
245 >>> rdd = sc.parallelize([2, 3, 4])
246 >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
247 [1, 1, 1, 2, 2, 3]
248 >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
249 [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
250 """
251 def func(s, iterator): return chain.from_iterable(imap(f, iterator))
252 return self.mapPartitionsWithIndex(func, preservesPartitioning)
253
255 """
256 Return a new RDD by applying a function to each partition of this RDD.
257
258 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
259 >>> def f(iterator): yield sum(iterator)
260 >>> rdd.mapPartitions(f).collect()
261 [3, 7]
262 """
263 def func(s, iterator): return f(iterator)
264 return self.mapPartitionsWithIndex(func)
265
267 """
268 Return a new RDD by applying a function to each partition of this RDD,
269 while tracking the index of the original partition.
270
271 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
272 >>> def f(splitIndex, iterator): yield splitIndex
273 >>> rdd.mapPartitionsWithIndex(f).sum()
274 6
275 """
276 return PipelinedRDD(self, f, preservesPartitioning)
277
279 """
280 Deprecated: use mapPartitionsWithIndex instead.
281
282 Return a new RDD by applying a function to each partition of this RDD,
283 while tracking the index of the original partition.
284
285 >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
286 >>> def f(splitIndex, iterator): yield splitIndex
287 >>> rdd.mapPartitionsWithSplit(f).sum()
288 6
289 """
290 warnings.warn("mapPartitionsWithSplit is deprecated; "
291 "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
292 return self.mapPartitionsWithIndex(f, preservesPartitioning)
293
295 """
296 Return a new RDD containing only the elements that satisfy a predicate.
297
298 >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
299 >>> rdd.filter(lambda x: x % 2 == 0).collect()
300 [2, 4]
301 """
302 def func(iterator): return ifilter(f, iterator)
303 return self.mapPartitions(func)
304
306 """
307 Return a new RDD containing the distinct elements in this RDD.
308
309 >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
310 [1, 2, 3]
311 """
312 return self.map(lambda x: (x, None)) \
313 .reduceByKey(lambda x, _: x) \
314 .map(lambda (x, _): x)
315
316 - def sample(self, withReplacement, fraction, seed):
317 """
318 Return a sampled subset of this RDD (relies on numpy and falls back
319 on default random generator if numpy is unavailable).
320
321 >>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
322 [2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
323 """
324 assert fraction >= 0.0, "Invalid fraction value: %s" % fraction
325 return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
326
327
328 - def takeSample(self, withReplacement, num, seed):
329 """
330 Return a fixed-size sampled subset of this RDD (currently requires numpy).
331
332 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
333 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
334 """
335
336 fraction = 0.0
337 total = 0
338 multiplier = 3.0
339 initialCount = self.count()
340 maxSelected = 0
341
342 if (num < 0):
343 raise ValueError
344
345 if (initialCount == 0):
346 return list()
347
348 if initialCount > sys.maxint - 1:
349 maxSelected = sys.maxint - 1
350 else:
351 maxSelected = initialCount
352
353 if num > initialCount and not withReplacement:
354 total = maxSelected
355 fraction = multiplier * (maxSelected + 1) / initialCount
356 else:
357 fraction = multiplier * (num + 1) / initialCount
358 total = num
359
360 samples = self.sample(withReplacement, fraction, seed).collect()
361
362
363
364
365 while len(samples) < total:
366 if seed > sys.maxint - 2:
367 seed = -1
368 seed += 1
369 samples = self.sample(withReplacement, fraction, seed).collect()
370
371 sampler = RDDSampler(withReplacement, fraction, seed+1)
372 sampler.shuffle(samples)
373 return samples[0:total]
374
376 """
377 Return the union of this RDD and another one.
378
379 >>> rdd = sc.parallelize([1, 1, 2, 3])
380 >>> rdd.union(rdd).collect()
381 [1, 1, 2, 3, 1, 1, 2, 3]
382 """
383 if self._jrdd_deserializer == other._jrdd_deserializer:
384 rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
385 self._jrdd_deserializer)
386 return rdd
387 else:
388
389
390 self_copy = self._reserialize()
391 other_copy = other._reserialize()
392 return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
393 self.ctx.serializer)
394
396 if self._jrdd_deserializer == self.ctx.serializer:
397 return self
398 else:
399 return self.map(lambda x: x, preservesPartitioning=True)
400
402 """
403 Return the union of this RDD and another one.
404
405 >>> rdd = sc.parallelize([1, 1, 2, 3])
406 >>> (rdd + rdd).collect()
407 [1, 1, 2, 3, 1, 1, 2, 3]
408 """
409 if not isinstance(other, RDD):
410 raise TypeError
411 return self.union(other)
412
413 - def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
414 """
415 Sorts this RDD, which is assumed to consist of (key, value) pairs.
416
417 >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
418 >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
419 [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
420 >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
421 >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
422 >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
423 [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
424 """
425 if numPartitions is None:
426 numPartitions = self.ctx.defaultParallelism
427
428 bounds = list()
429
430
431
432
433 if numPartitions > 1:
434 rddSize = self.count()
435 maxSampleSize = numPartitions * 20.0
436 fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
437
438 samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
439 samples = sorted(samples, reverse=(not ascending), key=keyfunc)
440
441
442
443 for i in range(0, numPartitions - 1):
444 index = (len(samples) - 1) * (i + 1) / numPartitions
445 bounds.append(samples[index])
446
447 def rangePartitionFunc(k):
448 p = 0
449 while p < len(bounds) and keyfunc(k) > bounds[p]:
450 p += 1
451 if ascending:
452 return p
453 else:
454 return numPartitions-1-p
455
456 def mapFunc(iterator):
457 yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
458
459 return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
460 .mapPartitions(mapFunc,preservesPartitioning=True)
461 .flatMap(lambda x: x, preservesPartitioning=True))
462
464 """
465 Return an RDD created by coalescing all elements within each partition
466 into a list.
467
468 >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
469 >>> sorted(rdd.glom().collect())
470 [[1, 2], [3, 4]]
471 """
472 def func(iterator): yield list(iterator)
473 return self.mapPartitions(func)
474
476 """
477 Return the Cartesian product of this RDD and another one, that is, the
478 RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
479 C{b} is in C{other}.
480
481 >>> rdd = sc.parallelize([1, 2])
482 >>> sorted(rdd.cartesian(rdd).collect())
483 [(1, 1), (1, 2), (2, 1), (2, 2)]
484 """
485
486 deserializer = CartesianDeserializer(self._jrdd_deserializer,
487 other._jrdd_deserializer)
488 return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
489
490 - def groupBy(self, f, numPartitions=None):
491 """
492 Return an RDD of grouped items.
493
494 >>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
495 >>> result = rdd.groupBy(lambda x: x % 2).collect()
496 >>> sorted([(x, sorted(y)) for (x, y) in result])
497 [(0, [2, 8]), (1, [1, 1, 3, 5])]
498 """
499 return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
500
501 - def pipe(self, command, env={}):
502 """
503 Return an RDD created by piping elements to a forked external process.
504
505 >>> sc.parallelize([1, 2, 3]).pipe('cat').collect()
506 ['1', '2', '3']
507 """
508 def func(iterator):
509 pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
510 def pipe_objs(out):
511 for obj in iterator:
512 out.write(str(obj).rstrip('\n') + '\n')
513 out.close()
514 Thread(target=pipe_objs, args=[pipe.stdin]).start()
515 return (x.rstrip('\n') for x in pipe.stdout)
516 return self.mapPartitions(func)
517
519 """
520 Applies a function to all elements of this RDD.
521
522 >>> def f(x): print x
523 >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
524 """
525 def processPartition(iterator):
526 for x in iterator:
527 f(x)
528 yield None
529 self.mapPartitions(processPartition).collect()
530
532 """
533 Return a list that contains all of the elements in this RDD.
534 """
535 with _JavaStackTrace(self.context) as st:
536 bytesInJava = self._jrdd.collect().iterator()
537 return list(self._collect_iterator_through_file(bytesInJava))
538
540
541
542
543 tempFile = NamedTemporaryFile(delete=False, dir=self.ctx._temp_dir)
544 tempFile.close()
545 self.ctx._writeToFile(iterator, tempFile.name)
546
547 with open(tempFile.name, 'rb') as tempFile:
548 for item in self._jrdd_deserializer.load_stream(tempFile):
549 yield item
550 os.unlink(tempFile.name)
551
553 """
554 Reduces the elements of this RDD using the specified commutative and
555 associative binary operator.
556
557 >>> from operator import add
558 >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
559 15
560 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
561 10
562 """
563 def func(iterator):
564 acc = None
565 for obj in iterator:
566 if acc is None:
567 acc = obj
568 else:
569 acc = f(obj, acc)
570 if acc is not None:
571 yield acc
572 vals = self.mapPartitions(func).collect()
573 return reduce(f, vals)
574
575 - def fold(self, zeroValue, op):
576 """
577 Aggregate the elements of each partition, and then the results for all
578 the partitions, using a given associative function and a neutral "zero
579 value."
580
581 The function C{op(t1, t2)} is allowed to modify C{t1} and return it
582 as its result value to avoid object allocation; however, it should not
583 modify C{t2}.
584
585 >>> from operator import add
586 >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
587 15
588 """
589 def func(iterator):
590 acc = zeroValue
591 for obj in iterator:
592 acc = op(obj, acc)
593 yield acc
594 vals = self.mapPartitions(func).collect()
595 return reduce(op, vals, zeroValue)
596
597
598
600 """
601 Add up the elements in this RDD.
602
603 >>> sc.parallelize([1.0, 2.0, 3.0]).sum()
604 6.0
605 """
606 return self.mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
607
609 """
610 Return the number of elements in this RDD.
611
612 >>> sc.parallelize([2, 3, 4]).count()
613 3
614 """
615 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
616
618 """
619 Return a L{StatCounter} object that captures the mean, variance
620 and count of the RDD's elements in one operation.
621 """
622 def redFunc(left_counter, right_counter):
623 return left_counter.mergeStats(right_counter)
624
625 return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
626
628 """
629 Compute the mean of this RDD's elements.
630
631 >>> sc.parallelize([1, 2, 3]).mean()
632 2.0
633 """
634 return self.stats().mean()
635
637 """
638 Compute the variance of this RDD's elements.
639
640 >>> sc.parallelize([1, 2, 3]).variance()
641 0.666...
642 """
643 return self.stats().variance()
644
646 """
647 Compute the standard deviation of this RDD's elements.
648
649 >>> sc.parallelize([1, 2, 3]).stdev()
650 0.816...
651 """
652 return self.stats().stdev()
653
655 """
656 Compute the sample standard deviation of this RDD's elements (which corrects for bias in
657 estimating the standard deviation by dividing by N-1 instead of N).
658
659 >>> sc.parallelize([1, 2, 3]).sampleStdev()
660 1.0
661 """
662 return self.stats().sampleStdev()
663
665 """
666 Compute the sample variance of this RDD's elements (which corrects for bias in
667 estimating the variance by dividing by N-1 instead of N).
668
669 >>> sc.parallelize([1, 2, 3]).sampleVariance()
670 1.0
671 """
672 return self.stats().sampleVariance()
673
675 """
676 Return the count of each unique value in this RDD as a dictionary of
677 (value, count) pairs.
678
679 >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
680 [(1, 2), (2, 3)]
681 """
682 def countPartition(iterator):
683 counts = defaultdict(int)
684 for obj in iterator:
685 counts[obj] += 1
686 yield counts
687 def mergeMaps(m1, m2):
688 for (k, v) in m2.iteritems():
689 m1[k] += v
690 return m1
691 return self.mapPartitions(countPartition).reduce(mergeMaps)
692
693 - def top(self, num):
694 """
695 Get the top N elements from a RDD.
696
697 Note: It returns the list sorted in descending order.
698 >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
699 [12]
700 >>> sc.parallelize([2, 3, 4, 5, 6], 2).cache().top(2)
701 [6, 5]
702 """
703 def topIterator(iterator):
704 q = []
705 for k in iterator:
706 if len(q) < num:
707 heapq.heappush(q, k)
708 else:
709 heapq.heappushpop(q, k)
710 yield q
711
712 def merge(a, b):
713 return next(topIterator(a + b))
714
715 return sorted(self.mapPartitions(topIterator).reduce(merge), reverse=True)
716
718 """
719 Get the N elements from a RDD ordered in ascending order or as specified
720 by the optional key function.
721
722 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
723 [1, 2, 3, 4, 5, 6]
724 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
725 [10, 9, 7, 6, 5, 4]
726 """
727
728 def topNKeyedElems(iterator, key_=None):
729 q = MaxHeapQ(num)
730 for k in iterator:
731 if key_ != None:
732 k = (key_(k), k)
733 q.insert(k)
734 yield q.getElements()
735
736 def unKey(x, key_=None):
737 if key_ != None:
738 x = [i[1] for i in x]
739 return x
740
741 def merge(a, b):
742 return next(topNKeyedElems(a + b))
743 result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
744 return sorted(unKey(result, key), key=key)
745
746
747 - def take(self, num):
748 """
749 Take the first num elements of the RDD.
750
751 This currently scans the partitions *one by one*, so it will be slow if
752 a lot of partitions are required. In that case, use L{collect} to get
753 the whole RDD instead.
754
755 >>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
756 [2, 3]
757 >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
758 [2, 3, 4, 5, 6]
759 """
760 def takeUpToNum(iterator):
761 taken = 0
762 while taken < num:
763 yield next(iterator)
764 taken += 1
765
766 mapped = self.mapPartitions(takeUpToNum)
767 items = []
768
769
770
771 with _JavaStackTrace(self.context) as st:
772 for partition in range(mapped._jrdd.splits().size()):
773 partitionsToTake = self.ctx._gateway.new_array(self.ctx._jvm.int, 1)
774 partitionsToTake[0] = partition
775 iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
776 items.extend(mapped._collect_iterator_through_file(iterator))
777 if len(items) >= num:
778 break
779 return items[:num]
780
782 """
783 Return the first element in this RDD.
784
785 >>> sc.parallelize([2, 3, 4]).first()
786 2
787 """
788 return self.take(1)[0]
789
790 - def saveAsTextFile(self, path):
791 """
792 Save this RDD as a text file, using string representations of elements.
793
794 >>> tempFile = NamedTemporaryFile(delete=True)
795 >>> tempFile.close()
796 >>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
797 >>> from fileinput import input
798 >>> from glob import glob
799 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
800 '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
801 """
802 def func(split, iterator):
803 for x in iterator:
804 if not isinstance(x, basestring):
805 x = unicode(x)
806 yield x.encode("utf-8")
807 keyed = PipelinedRDD(self, func)
808 keyed._bypass_serializer = True
809 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
810
811
812
814 """
815 Return the key-value pairs in this RDD to the master as a dictionary.
816
817 >>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
818 >>> m[1]
819 2
820 >>> m[3]
821 4
822 """
823 return dict(self.collect())
824
826 """
827 Merge the values for each key using an associative reduce function.
828
829 This will also perform the merging locally on each mapper before
830 sending results to a reducer, similarly to a "combiner" in MapReduce.
831
832 Output will be hash-partitioned with C{numPartitions} partitions, or
833 the default parallelism level if C{numPartitions} is not specified.
834
835 >>> from operator import add
836 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
837 >>> sorted(rdd.reduceByKey(add).collect())
838 [('a', 2), ('b', 1)]
839 """
840 return self.combineByKey(lambda x: x, func, func, numPartitions)
841
843 """
844 Merge the values for each key using an associative reduce function, but
845 return the results immediately to the master as a dictionary.
846
847 This will also perform the merging locally on each mapper before
848 sending results to a reducer, similarly to a "combiner" in MapReduce.
849
850 >>> from operator import add
851 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
852 >>> sorted(rdd.reduceByKeyLocally(add).items())
853 [('a', 2), ('b', 1)]
854 """
855 def reducePartition(iterator):
856 m = {}
857 for (k, v) in iterator:
858 m[k] = v if k not in m else func(m[k], v)
859 yield m
860 def mergeMaps(m1, m2):
861 for (k, v) in m2.iteritems():
862 m1[k] = v if k not in m1 else func(m1[k], v)
863 return m1
864 return self.mapPartitions(reducePartition).reduce(mergeMaps)
865
867 """
868 Count the number of elements for each key, and return the result to the
869 master as a dictionary.
870
871 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
872 >>> sorted(rdd.countByKey().items())
873 [('a', 2), ('b', 1)]
874 """
875 return self.map(lambda x: x[0]).countByValue()
876
877 - def join(self, other, numPartitions=None):
878 """
879 Return an RDD containing all pairs of elements with matching keys in
880 C{self} and C{other}.
881
882 Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
883 (k, v1) is in C{self} and (k, v2) is in C{other}.
884
885 Performs a hash join across the cluster.
886
887 >>> x = sc.parallelize([("a", 1), ("b", 4)])
888 >>> y = sc.parallelize([("a", 2), ("a", 3)])
889 >>> sorted(x.join(y).collect())
890 [('a', (1, 2)), ('a', (1, 3))]
891 """
892 return python_join(self, other, numPartitions)
893
895 """
896 Perform a left outer join of C{self} and C{other}.
897
898 For each element (k, v) in C{self}, the resulting RDD will either
899 contain all pairs (k, (v, w)) for w in C{other}, or the pair
900 (k, (v, None)) if no elements in other have key k.
901
902 Hash-partitions the resulting RDD into the given number of partitions.
903
904 >>> x = sc.parallelize([("a", 1), ("b", 4)])
905 >>> y = sc.parallelize([("a", 2)])
906 >>> sorted(x.leftOuterJoin(y).collect())
907 [('a', (1, 2)), ('b', (4, None))]
908 """
909 return python_left_outer_join(self, other, numPartitions)
910
912 """
913 Perform a right outer join of C{self} and C{other}.
914
915 For each element (k, w) in C{other}, the resulting RDD will either
916 contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
917 if no elements in C{self} have key k.
918
919 Hash-partitions the resulting RDD into the given number of partitions.
920
921 >>> x = sc.parallelize([("a", 1), ("b", 4)])
922 >>> y = sc.parallelize([("a", 2)])
923 >>> sorted(y.rightOuterJoin(x).collect())
924 [('a', (2, 1)), ('b', (None, 4))]
925 """
926 return python_right_outer_join(self, other, numPartitions)
927
928
929 - def partitionBy(self, numPartitions, partitionFunc=None):
930 """
931 Return a copy of the RDD partitioned using the specified partitioner.
932
933 >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
934 >>> sets = pairs.partitionBy(2).glom().collect()
935 >>> set(sets[0]).intersection(set(sets[1]))
936 set([])
937 """
938 if numPartitions is None:
939 numPartitions = self.ctx.defaultParallelism
940
941 if partitionFunc is None:
942 partitionFunc = lambda x: 0 if x is None else hash(x)
943
944
945
946 outputSerializer = self.ctx._unbatched_serializer
947 def add_shuffle_key(split, iterator):
948
949 buckets = defaultdict(list)
950
951 for (k, v) in iterator:
952 buckets[partitionFunc(k) % numPartitions].append((k, v))
953 for (split, items) in buckets.iteritems():
954 yield pack_long(split)
955 yield outputSerializer.dumps(items)
956 keyed = PipelinedRDD(self, add_shuffle_key)
957 keyed._bypass_serializer = True
958 with _JavaStackTrace(self.context) as st:
959 pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
960 partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
961 id(partitionFunc))
962 jrdd = pairRDD.partitionBy(partitioner).values()
963 rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
964
965
966 rdd._partitionFunc = partitionFunc
967 return rdd
968
969
970 - def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
971 numPartitions=None):
972 """
973 Generic function to combine the elements for each key using a custom
974 set of aggregation functions.
975
976 Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
977 type" C. Note that V and C can be different -- for example, one might
978 group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]).
979
980 Users provide three functions:
981
982 - C{createCombiner}, which turns a V into a C (e.g., creates
983 a one-element list)
984 - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
985 a list)
986 - C{mergeCombiners}, to combine two C's into a single one.
987
988 In addition, users can control the partitioning of the output RDD.
989
990 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
991 >>> def f(x): return x
992 >>> def add(a, b): return a + str(b)
993 >>> sorted(x.combineByKey(str, add, add).collect())
994 [('a', '11'), ('b', '1')]
995 """
996 if numPartitions is None:
997 numPartitions = self.ctx.defaultParallelism
998 def combineLocally(iterator):
999 combiners = {}
1000 for x in iterator:
1001 (k, v) = x
1002 if k not in combiners:
1003 combiners[k] = createCombiner(v)
1004 else:
1005 combiners[k] = mergeValue(combiners[k], v)
1006 return combiners.iteritems()
1007 locally_combined = self.mapPartitions(combineLocally)
1008 shuffled = locally_combined.partitionBy(numPartitions)
1009 def _mergeCombiners(iterator):
1010 combiners = {}
1011 for (k, v) in iterator:
1012 if not k in combiners:
1013 combiners[k] = v
1014 else:
1015 combiners[k] = mergeCombiners(combiners[k], v)
1016 return combiners.iteritems()
1017 return shuffled.mapPartitions(_mergeCombiners)
1018
1019 - def foldByKey(self, zeroValue, func, numPartitions=None):
1020 """
1021 Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1022 which may be added to the result an arbitrary number of times, and must not change
1023 the result (e.g., 0 for addition, or 1 for multiplication.).
1024
1025 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1026 >>> from operator import add
1027 >>> rdd.foldByKey(0, add).collect()
1028 [('a', 2), ('b', 1)]
1029 """
1030 return self.combineByKey(lambda v: func(zeroValue, v), func, func, numPartitions)
1031
1032
1033
1035 """
1036 Group the values for each key in the RDD into a single sequence.
1037 Hash-partitions the resulting RDD with into numPartitions partitions.
1038
1039 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1040 >>> sorted(x.groupByKey().collect())
1041 [('a', [1, 1]), ('b', [1])]
1042 """
1043
1044 def createCombiner(x):
1045 return [x]
1046
1047 def mergeValue(xs, x):
1048 xs.append(x)
1049 return xs
1050
1051 def mergeCombiners(a, b):
1052 return a + b
1053
1054 return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
1055 numPartitions)
1056
1057
1059 """
1060 Pass each value in the key-value pair RDD through a flatMap function
1061 without changing the keys; this also retains the original RDD's
1062 partitioning.
1063 """
1064 flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
1065 return self.flatMap(flat_map_fn, preservesPartitioning=True)
1066
1068 """
1069 Pass each value in the key-value pair RDD through a map function
1070 without changing the keys; this also retains the original RDD's
1071 partitioning.
1072 """
1073 map_values_fn = lambda (k, v): (k, f(v))
1074 return self.map(map_values_fn, preservesPartitioning=True)
1075
1076
1078 """
1079 Alias for cogroup.
1080 """
1081 return self.cogroup(other)
1082
1083
1084 - def cogroup(self, other, numPartitions=None):
1085 """
1086 For each key k in C{self} or C{other}, return a resulting RDD that
1087 contains a tuple with the list of values for that key in C{self} as well
1088 as C{other}.
1089
1090 >>> x = sc.parallelize([("a", 1), ("b", 4)])
1091 >>> y = sc.parallelize([("a", 2)])
1092 >>> sorted(x.cogroup(y).collect())
1093 [('a', ([1], [2])), ('b', ([4], []))]
1094 """
1095 return python_cogroup(self, other, numPartitions)
1096
1098 """
1099 Return each (key, value) pair in C{self} that has no pair with matching key
1100 in C{other}.
1101
1102 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
1103 >>> y = sc.parallelize([("a", 3), ("c", None)])
1104 >>> sorted(x.subtractByKey(y).collect())
1105 [('b', 4), ('b', 5)]
1106 """
1107 filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
1108 map_func = lambda (key, vals): [(key, val) for val in vals[0]]
1109 return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
1110
1111 - def subtract(self, other, numPartitions=None):
1112 """
1113 Return each value in C{self} that is not contained in C{other}.
1114
1115 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
1116 >>> y = sc.parallelize([("a", 3), ("c", None)])
1117 >>> sorted(x.subtract(y).collect())
1118 [('a', 1), ('b', 4), ('b', 5)]
1119 """
1120 rdd = other.map(lambda x: (x, True))
1121 return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
1122
1124 """
1125 Creates tuples of the elements in this RDD by applying C{f}.
1126
1127 >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
1128 >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
1129 >>> sorted(x.cogroup(y).collect())
1130 [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
1131 """
1132 return self.map(lambda x: (f(x), x))
1133
1135 """
1136 Return a new RDD that has exactly numPartitions partitions.
1137
1138 Can increase or decrease the level of parallelism in this RDD. Internally, this uses
1139 a shuffle to redistribute data.
1140 If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
1141 which can avoid performing a shuffle.
1142 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
1143 >>> sorted(rdd.glom().collect())
1144 [[1], [2, 3], [4, 5], [6, 7]]
1145 >>> len(rdd.repartition(2).glom().collect())
1146 2
1147 >>> len(rdd.repartition(10).glom().collect())
1148 10
1149 """
1150 jrdd = self._jrdd.repartition(numPartitions)
1151 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1152
1153 - def coalesce(self, numPartitions, shuffle=False):
1154 """
1155 Return a new RDD that is reduced into `numPartitions` partitions.
1156 >>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
1157 [[1], [2, 3], [4, 5]]
1158 >>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
1159 [[1, 2, 3, 4, 5]]
1160 """
1161 jrdd = self._jrdd.coalesce(numPartitions)
1162 return RDD(jrdd, self.ctx, self._jrdd_deserializer)
1163
1164 - def zip(self, other):
1165 """
1166 Zips this RDD with another one, returning key-value pairs with the first element in each RDD
1167 second element in each RDD, etc. Assumes that the two RDDs have the same number of
1168 partitions and the same number of elements in each partition (e.g. one was made through
1169 a map on the other).
1170
1171 >>> x = sc.parallelize(range(0,5))
1172 >>> y = sc.parallelize(range(1000, 1005))
1173 >>> x.zip(y).collect()
1174 [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
1175 """
1176 pairRDD = self._jrdd.zip(other._jrdd)
1177 deserializer = PairDeserializer(self._jrdd_deserializer,
1178 other._jrdd_deserializer)
1179 return RDD(pairRDD, self.ctx, deserializer)
1180
1182 """
1183 Return the name of this RDD.
1184 """
1185 name_ = self._jrdd.name()
1186 if not name_:
1187 return None
1188 return name_.encode('utf-8')
1189
1191 """
1192 Assign a name to this RDD.
1193 >>> rdd1 = sc.parallelize([1,2])
1194 >>> rdd1.setName('RDD1')
1195 >>> rdd1.name()
1196 'RDD1'
1197 """
1198 self._jrdd.setName(name)
1199
1201 """
1202 A description of this RDD and its recursive dependencies for debugging.
1203 """
1204 debug_string = self._jrdd.toDebugString()
1205 if not debug_string:
1206 return None
1207 return debug_string.encode('utf-8')
1208
1210 """
1211 Get the RDD's current storage level.
1212 >>> rdd1 = sc.parallelize([1,2])
1213 >>> rdd1.getStorageLevel()
1214 StorageLevel(False, False, False, 1)
1215 """
1216 java_storage_level = self._jrdd.getStorageLevel()
1217 storage_level = StorageLevel(java_storage_level.useDisk(),
1218 java_storage_level.useMemory(),
1219 java_storage_level.deserialized(),
1220 java_storage_level.replication())
1221 return storage_level
1222
1230 """
1231 Pipelined maps:
1232 >>> rdd = sc.parallelize([1, 2, 3, 4])
1233 >>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
1234 [4, 8, 12, 16]
1235 >>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
1236 [4, 8, 12, 16]
1237
1238 Pipelined reduces:
1239 >>> from operator import add
1240 >>> rdd.map(lambda x: 2 * x).reduce(add)
1241 20
1242 >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
1243 20
1244 """
1245 - def __init__(self, prev, func, preservesPartitioning=False):
1246 if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
1247
1248 self.func = func
1249 self.preservesPartitioning = preservesPartitioning
1250 self._prev_jrdd = prev._jrdd
1251 self._prev_jrdd_deserializer = prev._jrdd_deserializer
1252 else:
1253 prev_func = prev.func
1254 def pipeline_func(split, iterator):
1255 return func(split, prev_func(split, iterator))
1256 self.func = pipeline_func
1257 self.preservesPartitioning = \
1258 prev.preservesPartitioning and preservesPartitioning
1259 self._prev_jrdd = prev._prev_jrdd
1260 self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
1261 self.is_cached = False
1262 self.is_checkpointed = False
1263 self.ctx = prev.ctx
1264 self.prev = prev
1265 self._jrdd_val = None
1266 self._jrdd_deserializer = self.ctx.serializer
1267 self._bypass_serializer = False
1268
1269 @property
1271 if self._jrdd_val:
1272 return self._jrdd_val
1273 if self._bypass_serializer:
1274 serializer = NoOpSerializer()
1275 else:
1276 serializer = self.ctx.serializer
1277 command = (self.func, self._prev_jrdd_deserializer, serializer)
1278 pickled_command = CloudPickleSerializer().dumps(command)
1279 broadcast_vars = ListConverter().convert(
1280 [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
1281 self.ctx._gateway._gateway_client)
1282 self.ctx._pickled_broadcast_vars.clear()
1283 class_tag = self._prev_jrdd.classTag()
1284 env = MapConverter().convert(self.ctx.environment,
1285 self.ctx._gateway._gateway_client)
1286 includes = ListConverter().convert(self.ctx._python_includes,
1287 self.ctx._gateway._gateway_client)
1288 python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
1289 bytearray(pickled_command), env, includes, self.preservesPartitioning,
1290 self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
1291 class_tag)
1292 self._jrdd_val = python_rdd.asJavaRDD()
1293 return self._jrdd_val
1294
1296 return not (self.is_cached or self.is_checkpointed)
1297
1300 import doctest
1301 from pyspark.context import SparkContext
1302 globs = globals().copy()
1303
1304
1305 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
1306 (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
1307 globs['sc'].stop()
1308 if failure_count:
1309 exit(-1)
1310
1311
1312 if __name__ == "__main__":
1313 _test()
1314