1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import numpy
19
20 from numpy import array, shape
21 from pyspark import SparkContext
22 from pyspark.mllib._common import \
23 _dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
24 _serialize_double_matrix, _deserialize_double_matrix, \
25 _serialize_double_vector, _deserialize_double_vector, \
26 _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
27 _linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
28 from pyspark.mllib.linalg import SparseVector
29 from pyspark.mllib.regression import LabeledPoint, LinearModel
30 from math import exp, log
34
35 """A linear binary classification model derived from logistic regression.
36
37 >>> data = [
38 ... LabeledPoint(0.0, [0.0]),
39 ... LabeledPoint(1.0, [1.0]),
40 ... LabeledPoint(1.0, [2.0]),
41 ... LabeledPoint(1.0, [3.0])
42 ... ]
43 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
44 >>> lrm.predict(array([1.0])) > 0
45 True
46 >>> lrm.predict(array([0.0])) <= 0
47 True
48 >>> sparse_data = [
49 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
50 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
51 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
52 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
53 ... ]
54 >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
55 >>> lrm.predict(array([0.0, 1.0])) > 0
56 True
57 >>> lrm.predict(array([0.0, 0.0])) <= 0
58 True
59 >>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
60 True
61 >>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
62 True
63 """
64
66 _linear_predictor_typecheck(x, self._coeff)
67 margin = _dot(x, self._coeff) + self._intercept
68 if margin > 0:
69 prob = 1 / (1 + exp(-margin))
70 else:
71 exp_margin = exp(margin)
72 prob = exp_margin / (1 + exp_margin)
73 return 1 if prob > 0.5 else 0
74
77
78 @classmethod
79 - def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
80 initialWeights=None, regParam=1.0, regType=None, intercept=False):
81 """
82 Train a logistic regression model on the given data.
83
84 @param data: The training data.
85 @param iterations: The number of iterations (default: 100).
86 @param step: The step parameter used in SGD
87 (default: 1.0).
88 @param miniBatchFraction: Fraction of data to be used for each SGD
89 iteration.
90 @param initialWeights: The initial weights (default: None).
91 @param regParam: The regularizer parameter (default: 1.0).
92 @param regType: The type of regularizer used for training
93 our model.
94 Allowed values: "l1" for using L1Updater,
95 "l2" for using
96 SquaredL2Updater,
97 "none" for no regularizer.
98 (default: "none")
99 @param intercept: Boolean parameter which indicates the use
100 or not of the augmented representation for
101 training data (i.e. whether bias features
102 are activated or not).
103 """
104 sc = data.context
105 if regType is None:
106 regType = "none"
107 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
108 d._jrdd, iterations, step, miniBatchFraction, i, regParam, regType, intercept)
109 return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data,
110 initialWeights)
111
114
115 """A support vector machine.
116
117 >>> data = [
118 ... LabeledPoint(0.0, [0.0]),
119 ... LabeledPoint(1.0, [1.0]),
120 ... LabeledPoint(1.0, [2.0]),
121 ... LabeledPoint(1.0, [3.0])
122 ... ]
123 >>> svm = SVMWithSGD.train(sc.parallelize(data))
124 >>> svm.predict(array([1.0])) > 0
125 True
126 >>> sparse_data = [
127 ... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
128 ... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
129 ... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
130 ... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
131 ... ]
132 >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
133 >>> svm.predict(SparseVector(2, {1: 1.0})) > 0
134 True
135 >>> svm.predict(SparseVector(2, {0: -1.0})) <= 0
136 True
137 """
138
140 _linear_predictor_typecheck(x, self._coeff)
141 margin = _dot(x, self._coeff) + self._intercept
142 return 1 if margin >= 0 else 0
143
146
147 @classmethod
148 - def train(cls, data, iterations=100, step=1.0, regParam=1.0,
149 miniBatchFraction=1.0, initialWeights=None, regType=None, intercept=False):
150 """
151 Train a support vector machine on the given data.
152
153 @param data: The training data.
154 @param iterations: The number of iterations (default: 100).
155 @param step: The step parameter used in SGD
156 (default: 1.0).
157 @param regParam: The regularizer parameter (default: 1.0).
158 @param miniBatchFraction: Fraction of data to be used for each SGD
159 iteration.
160 @param initialWeights: The initial weights (default: None).
161 @param regType: The type of regularizer used for training
162 our model.
163 Allowed values: "l1" for using L1Updater,
164 "l2" for using
165 SquaredL2Updater,
166 "none" for no regularizer.
167 (default: "none")
168 @param intercept: Boolean parameter which indicates the use
169 or not of the augmented representation for
170 training data (i.e. whether bias features
171 are activated or not).
172 """
173 sc = data.context
174 if regType is None:
175 regType = "none"
176 train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
177 d._jrdd, iterations, step, regParam, miniBatchFraction, i, regType, intercept)
178 return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)
179
182
183 """
184 Model for Naive Bayes classifiers.
185
186 Contains two parameters:
187 - pi: vector of logs of class priors (dimension C)
188 - theta: matrix of logs of class conditional probabilities (CxD)
189
190 >>> data = [
191 ... LabeledPoint(0.0, [0.0, 0.0]),
192 ... LabeledPoint(0.0, [0.0, 1.0]),
193 ... LabeledPoint(1.0, [1.0, 0.0]),
194 ... ]
195 >>> model = NaiveBayes.train(sc.parallelize(data))
196 >>> model.predict(array([0.0, 1.0]))
197 0.0
198 >>> model.predict(array([1.0, 0.0]))
199 1.0
200 >>> sparse_data = [
201 ... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
202 ... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
203 ... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
204 ... ]
205 >>> model = NaiveBayes.train(sc.parallelize(sparse_data))
206 >>> model.predict(SparseVector(2, {1: 1.0}))
207 0.0
208 >>> model.predict(SparseVector(2, {0: 1.0}))
209 1.0
210 """
211
213 self.labels = labels
214 self.pi = pi
215 self.theta = theta
216
218 """Return the most likely class for a data vector x"""
219 return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))]
220
223
224 @classmethod
225 - def train(cls, data, lambda_=1.0):
226 """
227 Train a Naive Bayes model given an RDD of (label, features) vectors.
228
229 This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can
230 handle all kinds of discrete data. For example, by converting
231 documents into TF-IDF vectors, it can be used for document
232 classification. By making every vector a 0-1 vector, it can also be
233 used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
234
235 @param data: RDD of NumPy vectors, one per element, where the first
236 coordinate is the label and the rest is the feature vector
237 (e.g. a count vector).
238 @param lambda_: The smoothing parameter
239 """
240 sc = data.context
241 dataBytes = _get_unmangled_labeled_point_rdd(data)
242 ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
243 return NaiveBayesModel(
244 _deserialize_double_vector(ans[0]),
245 _deserialize_double_vector(ans[1]),
246 _deserialize_double_matrix(ans[2]))
247
250 import doctest
251 globs = globals().copy()
252 globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
253 (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
254 globs['sc'].stop()
255 if failure_count:
256 exit(-1)
257
258 if __name__ == "__main__":
259 _test()
260