1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import numpy as np
19 import warnings
20
21 from pyspark.mllib.linalg import Vectors, SparseVector
22 from pyspark.mllib.regression import LabeledPoint
23 from pyspark.mllib._common import _convert_vector, _deserialize_labeled_point
24 from pyspark.rdd import RDD
25 from pyspark.serializers import NoOpSerializer
29
30 """
31 Helper methods to load, save and pre-process data used in MLlib.
32 """
33
34 @staticmethod
36 warnings.warn("deprecated", DeprecationWarning)
37 return _parse_libsvm_line(line)
38
39 @staticmethod
41 """
42 Parses a line in LIBSVM format into (label, indices, values).
43 """
44 items = line.split(None)
45 label = float(items[0])
46 nnz = len(items) - 1
47 indices = np.zeros(nnz, dtype=np.int32)
48 values = np.zeros(nnz)
49 for i in xrange(nnz):
50 index, value = items[1 + i].split(":")
51 indices[i] = int(index) - 1
52 values[i] = float(value)
53 return label, indices, values
54
55 @staticmethod
57 """Converts a LabeledPoint to a string in LIBSVM format."""
58 items = [str(p.label)]
59 v = _convert_vector(p.features)
60 if type(v) == np.ndarray:
61 for i in xrange(len(v)):
62 items.append(str(i + 1) + ":" + str(v[i]))
63 elif type(v) == SparseVector:
64 nnz = len(v.indices)
65 for i in xrange(nnz):
66 items.append(str(v.indices[i] + 1) + ":" + str(v.values[i]))
67 else:
68 raise TypeError("_convert_labeled_point_to_libsvm needs either ndarray or SparseVector"
69 " but got " % type(v))
70 return " ".join(items)
71
72 @staticmethod
73 - def loadLibSVMFile(sc, path, multiclass=False, numFeatures=-1, minPartitions=None):
74 warnings.warn("deprecated", DeprecationWarning)
75 return loadLibSVMFile(sc, path, numFeatures, minPartitions)
76
77 @staticmethod
79 """
80 Loads labeled data in the LIBSVM format into an RDD of
81 LabeledPoint. The LIBSVM format is a text-based format used by
82 LIBSVM and LIBLINEAR. Each line represents a labeled sparse
83 feature vector using the following format:
84
85 label index1:value1 index2:value2 ...
86
87 where the indices are one-based and in ascending order. This
88 method parses each line into a LabeledPoint, where the feature
89 indices are converted to zero-based.
90
91 @param sc: Spark context
92 @param path: file or directory path in any Hadoop-supported file
93 system URI
94 @param numFeatures: number of features, which will be determined
95 from the input data if a nonpositive value
96 is given. This is useful when the dataset is
97 already split into multiple files and you
98 want to load them separately, because some
99 features may not present in certain files,
100 which leads to inconsistent feature
101 dimensions.
102 @param minPartitions: min number of partitions
103 @return: labeled data stored as an RDD of LabeledPoint
104
105 >>> from tempfile import NamedTemporaryFile
106 >>> from pyspark.mllib.util import MLUtils
107 >>> tempFile = NamedTemporaryFile(delete=True)
108 >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
109 >>> tempFile.flush()
110 >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
111 >>> tempFile.close()
112 >>> type(examples[0]) == LabeledPoint
113 True
114 >>> print examples[0]
115 (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
116 >>> type(examples[1]) == LabeledPoint
117 True
118 >>> print examples[1]
119 (-1.0,(6,[],[]))
120 >>> type(examples[2]) == LabeledPoint
121 True
122 >>> print examples[2]
123 (-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
124 """
125
126 lines = sc.textFile(path, minPartitions)
127 parsed = lines.map(lambda l: MLUtils._parse_libsvm_line(l))
128 if numFeatures <= 0:
129 parsed.cache()
130 numFeatures = parsed.map(lambda x: -1 if x[1].size == 0 else x[1][-1]).reduce(max) + 1
131 return parsed.map(lambda x: LabeledPoint(x[0], Vectors.sparse(numFeatures, x[1], x[2])))
132
133 @staticmethod
135 """
136 Save labeled data in LIBSVM format.
137
138 @param data: an RDD of LabeledPoint to be saved
139 @param dir: directory to save the data
140
141 >>> from tempfile import NamedTemporaryFile
142 >>> from fileinput import input
143 >>> from glob import glob
144 >>> from pyspark.mllib.util import MLUtils
145 >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), \
146 LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
147 >>> tempFile = NamedTemporaryFile(delete=True)
148 >>> tempFile.close()
149 >>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
150 >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
151 '0.0 1:1.01 2:2.02 3:3.03\\n1.1 1:1.23 3:4.56\\n'
152 """
153 lines = data.map(lambda p: MLUtils._convert_labeled_point_to_libsvm(p))
154 lines.saveAsTextFile(dir)
155
156 @staticmethod
158 """
159 Load labeled points saved using RDD.saveAsTextFile.
160
161 @param sc: Spark context
162 @param path: file or directory path in any Hadoop-supported file
163 system URI
164 @param minPartitions: min number of partitions
165 @return: labeled data stored as an RDD of LabeledPoint
166
167 >>> from tempfile import NamedTemporaryFile
168 >>> from pyspark.mllib.util import MLUtils
169 >>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), \
170 LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
171 >>> tempFile = NamedTemporaryFile(delete=True)
172 >>> tempFile.close()
173 >>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
174 >>> loaded = MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
175 >>> type(loaded[0]) == LabeledPoint
176 True
177 >>> print examples[0]
178 (1.1,(3,[0,2],[-1.23,4.56e-07]))
179 >>> type(examples[1]) == LabeledPoint
180 True
181 >>> print examples[1]
182 (0.0,[1.01,2.02,3.03])
183 """
184 minPartitions = minPartitions or min(sc.defaultParallelism, 2)
185 jSerialized = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
186 serialized = RDD(jSerialized, sc, NoOpSerializer())
187 return serialized.map(lambda bytes: _deserialize_labeled_point(bytearray(bytes)))
188
191 import doctest
192 from pyspark.context import SparkContext
193 globs = globals().copy()
194
195
196 globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
197 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
198 globs['sc'].stop()
199 if failure_count:
200 exit(-1)
201
202
203 if __name__ == "__main__":
204 _test()
205