Classification model trained using Multinomial/Binary Logistic Regression.
Parameters: |
|
---|
>>> data = [
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
0
>>> lrm.predict(sc.parallelize([[1.0, 0.0], [0.0, 1.0]])).collect()
[1, 0]
>>> lrm.clearThreshold()
>>> lrm.predict([0.0, 1.0])
0.279...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> lrm.predict(array([0.0, 1.0]))
1
>>> lrm.predict(array([1.0, 0.0]))
0
>>> lrm.predict(SparseVector(2, {1: 1.0}))
1
>>> lrm.predict(SparseVector(2, {0: 1.0}))
0
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> lrm.save(sc, path)
>>> sameModel = LogisticRegressionModel.load(sc, path)
>>> sameModel.predict(array([0.0, 1.0]))
1
>>> sameModel.predict(SparseVector(2, {0: 1.0}))
0
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except:
... pass
>>> multi_class_data = [
... LabeledPoint(0.0, [0.0, 1.0, 0.0]),
... LabeledPoint(1.0, [1.0, 0.0, 0.0]),
... LabeledPoint(2.0, [0.0, 0.0, 1.0])
... ]
>>> data = sc.parallelize(multi_class_data)
>>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
>>> mcm.predict([0.0, 0.5, 0.0])
0
>>> mcm.predict([0.8, 0.0, 0.0])
1
>>> mcm.predict([0.0, 0.0, 0.3])
2
Note
Experimental
Clears the threshold so that predict will output raw prediction scores. It is used for binary classification only.
Predict values for a single data point or an RDD of points using the model trained.
Note
Experimental
Sets the threshold that separates positive predictions from negative predictions. An example with prediction score greater than or equal to this threshold is identified as an positive, and negative otherwise. It is used for binary classification only.
Note
Experimental
Returns the threshold (if any) used for converting raw prediction scores into 0/1 predictions. It is used for binary classification only.
Train a logistic regression model on the given data.
Parameters: |
|
---|
Train a logistic regression model on the given data.
Parameters: |
|
---|
>>> data = [
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), iterations=10)
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
0
Model for Support Vector Machines (SVMs).
Parameters: |
|
---|
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10)
>>> svm.predict([1.0])
1
>>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>>> svm.clearThreshold()
>>> svm.predict(array([1.0]))
1.44...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> svm.predict(SparseVector(2, {1: 1.0}))
1
>>> svm.predict(SparseVector(2, {0: -1.0}))
0
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> svm.save(sc, path)
>>> sameModel = SVMModel.load(sc, path)
>>> sameModel.predict(SparseVector(2, {1: 1.0}))
1
>>> sameModel.predict(SparseVector(2, {0: -1.0}))
0
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except:
... pass
Note
Experimental
Clears the threshold so that predict will output raw prediction scores. It is used for binary classification only.
Predict values for a single data point or an RDD of points using the model trained.
Note
Experimental
Sets the threshold that separates positive predictions from negative predictions. An example with prediction score greater than or equal to this threshold is identified as an positive, and negative otherwise. It is used for binary classification only.
Note
Experimental
Returns the threshold (if any) used for converting raw prediction scores into 0/1 predictions. It is used for binary classification only.
Train a support vector machine on the given data.
Parameters: |
|
---|
Model for Naive Bayes classifiers.
Parameters: |
|
---|
>>> data = [
... LabeledPoint(0.0, [0.0, 0.0]),
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
>>> model.predict(sc.parallelize([[1.0, 0.0]])).collect()
[1.0]
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
... ]
>>> model = NaiveBayes.train(sc.parallelize(sparse_data))
>>> model.predict(SparseVector(2, {1: 1.0}))
0.0
>>> model.predict(SparseVector(2, {0: 1.0}))
1.0
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = NaiveBayesModel.load(sc, path)
>>> sameModel.predict(SparseVector(2, {0: 1.0})) == model.predict(SparseVector(2, {0: 1.0}))
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|
Train a Naive Bayes model given an RDD of (label, features) vectors.
This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which can handle all kinds of discrete data. For example, by converting documents into TF-IDF vectors, it can be used for document classification. By making every vector a 0-1 vector, it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). The input feature values must be nonnegative.
Parameters: |
|
---|
Run LogisticRegression with SGD on a batch of data.
The weights obtained at the end of training a stream are used as initial weights for the next batch.
Parameters: |
|
---|
Returns the latest model.
Make predictions on a dstream.
Returns: | Transformed dstream object. |
---|
Make predictions on a keyed dstream.
Returns: | Transformed dstream object. |
---|
A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random",
... seed=50, initializationSteps=5, epsilon=1e-4)
>>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
True
>>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
True
>>> model.k
2
>>> model.computeCost(sc.parallelize(data))
2.0000000000000004
>>> model = KMeans.train(sc.parallelize(data), 2)
>>> sparse_data = [
... SparseVector(3, {1: 1.0}),
... SparseVector(3, {1: 1.1}),
... SparseVector(3, {2: 1.0}),
... SparseVector(3, {2: 1.1})
... ]
>>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||",
... seed=50, initializationSteps=5, epsilon=1e-4)
>>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
True
>>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
True
>>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
True
>>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
>>> isinstance(model.clusterCenters, list)
True
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = KMeansModel.load(sc, path)
>>> sameModel.predict(sparse_data[0]) == model.predict(sparse_data[0])
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
Return the K-means cost (sum of squared distances of points to their nearest center) for this model on the given data.
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|
Note
Experimental
A clustering model derived from the Gaussian Mixture Model method.
>>> from pyspark.mllib.linalg import Vectors, DenseMatrix
>>> from numpy.testing import assert_equal
>>> from shutil import rmtree
>>> import os, tempfile
>>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
... 0.9,0.8,0.75,0.935,
... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2))
>>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001,
... maxIterations=50, seed=10)
>>> labels = model.predict(clusterdata_1).collect()
>>> labels[0]==labels[1]
False
>>> labels[1]==labels[2]
True
>>> labels[4]==labels[5]
True
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = GaussianMixtureModel.load(sc, path)
>>> assert_equal(model.weights, sameModel.weights)
>>> mus, sigmas = list(
... zip(*[(g.mu, g.sigma) for g in model.gaussians]))
>>> sameMus, sameSigmas = list(
... zip(*[(g.mu, g.sigma) for g in sameModel.gaussians]))
>>> mus == sameMus
True
>>> sigmas == sameSigmas
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
>>> data = array([-5.1971, -2.5359, -3.8220,
... -5.2211, -5.0602, 4.7118,
... 6.8989, 3.4592, 4.6322,
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734])
>>> clusterdata_2 = sc.parallelize(data.reshape(5,3))
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
... maxIterations=150, seed=10)
>>> labels = model.predict(clusterdata_2).collect()
>>> labels[0]==labels[1]==labels[2]
True
>>> labels[3]==labels[4]
True
Array of MultivariateGaussian where gaussians[i] represents the Multivariate Gaussian (Normal) Distribution for Gaussian i.
Load the GaussianMixtureModel from disk.
Parameters: |
|
---|
Find the cluster to which the points in ‘x’ has maximum membership in this model.
Parameters: | x – RDD of data points. |
---|---|
Returns: | cluster_labels. RDD of cluster labels. |
Note
Experimental
Learning algorithm for Gaussian Mixtures using the expectation-maximization algorithm.
Parameters: |
|
---|
Note
Experimental
Model produced by [[PowerIterationClustering]].
>>> data = [(0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0), (1, 3, 1.0),
... (2, 3, 1.0), (3, 4, 0.1), (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0),
... (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0), (10, 11, 1.0),
... (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0)]
>>> rdd = sc.parallelize(data, 2)
>>> model = PowerIterationClustering.train(rdd, 2, 100)
>>> model.k
2
>>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
>>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
True
>>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
True
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = PowerIterationClusteringModel.load(sc, path)
>>> sameModel.k
2
>>> result = sorted(model.assignments().collect(), key=lambda x: x.id)
>>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
True
>>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
Note
Experimental
Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]]. From the abstract: PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data.
Parameters: |
|
---|
Note
Experimental
Provides methods to set k, decayFactor, timeUnit to configure the KMeans algorithm for fitting and predicting on incoming dstreams. More details on how the centroids are updated are provided under the docs of StreamingKMeansModel.
Parameters: |
|
---|
Make predictions on a keyed dstream. Returns a transformed dstream object.
Set number of batches after which the centroids of that particular batch has half the weightage.
Set initial centers. Should be set before calling trainOn.
Note
Experimental
Clustering model which can perform an online update of the centroids.
The update formula for each centroid is given by
where
c_t: Centroid at the n_th iteration.
at the n_th iteration.
x_t: Centroid of the new data closest to c_t.
m_t: Number of samples (or) weights of the new data closest to c_t
c_t+1: New centroid.
n_t+1: New number of weights.
a: Decay Factor, which gives the forgetfulness.
Note that if a is set to 1, it is the weighted mean of the previous and new data. If it set to zero, the old centroids are completely forgotten.
Parameters: |
|
---|
>>> initCenters = [[0.0, 0.0], [1.0, 1.0]]
>>> initWeights = [1.0, 1.0]
>>> stkm = StreamingKMeansModel(initCenters, initWeights)
>>> data = sc.parallelize([[-0.1, -0.1], [0.1, 0.1],
... [0.9, 0.9], [1.1, 1.1]])
>>> stkm = stkm.update(data, 1.0, u"batches")
>>> stkm.centers
array([[ 0., 0.],
[ 1., 1.]])
>>> stkm.predict([-0.1, -0.1])
0
>>> stkm.predict([0.9, 0.9])
1
>>> stkm.clusterWeights
[3.0, 3.0]
>>> decayFactor = 0.0
>>> data = sc.parallelize([DenseVector([1.5, 1.5]), DenseVector([0.2, 0.2])])
>>> stkm = stkm.update(data, 0.0, u"batches")
>>> stkm.centers
array([[ 0.2, 0.2],
[ 1.5, 1.5]])
>>> stkm.clusterWeights
[1.0, 1.0]
>>> stkm.predict([0.2, 0.2])
0
>>> stkm.predict([1.5, 1.5])
1
Update the centroids, according to data
Parameters: |
|
---|
Train a LDA model.
Parameters: |
|
---|
A clustering model derived from the LDA method.
Latent Dirichlet Allocation (LDA), a topic model designed for text documents. Terminology - “word” = “term”: an element of the vocabulary - “token”: instance of a term appearing in a document - “topic”: multinomial distribution over words representing some concept References: - Original LDA paper (journal version): Blei, Ng, and Jordan. “Latent Dirichlet Allocation.” JMLR, 2003.
>>> from pyspark.mllib.linalg import Vectors
>>> from numpy.testing import assert_almost_equal, assert_equal
>>> data = [
... [1, Vectors.dense([0.0, 1.0])],
... [2, SparseVector(2, {0: 1.0})],
... ]
>>> rdd = sc.parallelize(data)
>>> model = LDA.train(rdd, k=2)
>>> model.vocabSize()
2
>>> topics = model.topicsMatrix()
>>> topics_expect = array([[0.5, 0.5], [0.5, 0.5]])
>>> assert_almost_equal(topics, topics_expect, 1)
>>> import os, tempfile
>>> from shutil import rmtree
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = LDAModel.load(sc, path)
>>> assert_equal(sameModel.topicsMatrix(), model.topicsMatrix())
>>> sameModel.vocabSize() == model.vocabSize()
True
>>> try:
... rmtree(path)
... except OSError:
... pass
Load the LDAModel from disk.
Parameters: |
|
---|
Save the LDAModel on to disk.
Parameters: |
|
---|
Evaluator for binary classification.
Parameters: | scoreAndLabels – an RDD of (score, label) pairs |
---|
>>> scoreAndLabels = sc.parallelize([
... (0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)], 2)
>>> metrics = BinaryClassificationMetrics(scoreAndLabels)
>>> metrics.areaUnderROC
0.70...
>>> metrics.areaUnderPR
0.83...
>>> metrics.unpersist()
Evaluator for regression.
Parameters: | predictionAndObservations – an RDD of (prediction, observation) pairs. |
---|
>>> predictionAndObservations = sc.parallelize([
... (2.5, 3.0), (0.0, -0.5), (2.0, 2.0), (8.0, 7.0)])
>>> metrics = RegressionMetrics(predictionAndObservations)
>>> metrics.explainedVariance
8.859...
>>> metrics.meanAbsoluteError
0.5...
>>> metrics.meanSquaredError
0.37...
>>> metrics.rootMeanSquaredError
0.61...
>>> metrics.r2
0.94...
Returns the explained variance regression score. explainedVariance = 1 - variance(y - hat{y}) / variance(y)
Returns the mean absolute error, which is a risk function corresponding to the expected value of the absolute error loss or l1-norm loss.
Evaluator for multiclass classification.
Parameters: | predictionAndLabels – an RDD of (prediction, label) pairs. |
---|
>>> predictionAndLabels = sc.parallelize([(0.0, 0.0), (0.0, 1.0), (0.0, 0.0),
... (1.0, 0.0), (1.0, 1.0), (1.0, 1.0), (1.0, 1.0), (2.0, 2.0), (2.0, 0.0)])
>>> metrics = MulticlassMetrics(predictionAndLabels)
>>> metrics.confusionMatrix().toArray()
array([[ 2., 1., 1.],
[ 1., 3., 0.],
[ 0., 0., 1.]])
>>> metrics.falsePositiveRate(0.0)
0.2...
>>> metrics.precision(1.0)
0.75...
>>> metrics.recall(2.0)
1.0...
>>> metrics.fMeasure(0.0, 2.0)
0.52...
>>> metrics.precision()
0.66...
>>> metrics.recall()
0.66...
>>> metrics.weightedFalsePositiveRate
0.19...
>>> metrics.weightedPrecision
0.68...
>>> metrics.weightedRecall
0.66...
>>> metrics.weightedFMeasure()
0.66...
>>> metrics.weightedFMeasure(2.0)
0.65...
Returns confusion matrix: predicted classes are in columns, they are ordered by class label ascending, as in “labels”.
Returns f-measure or f-measure for a given label (category) if specified.
Returns precision or precision for a given label (category) if specified.
Evaluator for ranking algorithms.
Parameters: | predictionAndLabels – an RDD of (predicted ranking, ground truth set) pairs. |
---|
>>> predictionAndLabels = sc.parallelize([
... ([1, 6, 2, 7, 8, 3, 9, 10, 4, 5], [1, 2, 3, 4, 5]),
... ([4, 1, 5, 6, 2, 7, 3, 8, 9, 10], [1, 2, 3]),
... ([1, 2, 3, 4, 5], [])])
>>> metrics = RankingMetrics(predictionAndLabels)
>>> metrics.precisionAt(1)
0.33...
>>> metrics.precisionAt(5)
0.26...
>>> metrics.precisionAt(15)
0.17...
>>> metrics.meanAveragePrecision
0.35...
>>> metrics.ndcgAt(3)
0.33...
>>> metrics.ndcgAt(10)
0.48...
Returns the mean average precision (MAP) of all the queries. If a query has an empty ground truth set, the average precision will be zero and a log warining is generated.
Compute the average NDCG value of all the queries, truncated at ranking position k. The discounted cumulative gain at position k is computed as: sum,,i=1,,^k^ (2^{relevance of ‘’i’‘th item}^ - 1) / log(i + 1), and the NDCG is obtained by dividing the DCG value on the ground truth set. In the current implementation, the relevance value is binary. If a query has an empty ground truth set, zero will be used as NDCG together with a log warning.
Compute the average precision of all the queries, truncated at ranking position k.
If for a query, the ranking algorithm returns n (n < k) results, the precision value will be computed as #(relevant items retrieved) / k. This formula also applies when the size of the ground truth set is less than k.
If a query has an empty ground truth set, zero will be used as precision together with a log warning.
Python package for feature in MLlib.
Bases: pyspark.mllib.feature.VectorTransformer
Note
Experimental
Normalizes samples individually to unit Lp norm
For any 1 <= p < float(‘inf’), normalizes samples using sum(abs(vector) p) (1/p) as norm.
For p = float(‘inf’), max(abs(vector)) will be used as norm for normalization.
Parameters: | p – Normalization in L^p^ space, p = 2 by default. |
---|
>>> v = Vectors.dense(range(3))
>>> nor = Normalizer(1)
>>> nor.transform(v)
DenseVector([0.0, 0.3333, 0.6667])
>>> rdd = sc.parallelize([v])
>>> nor.transform(rdd).collect()
[DenseVector([0.0, 0.3333, 0.6667])]
>>> nor2 = Normalizer(float("inf"))
>>> nor2.transform(v)
DenseVector([0.0, 0.5, 1.0])
Bases: pyspark.mllib.feature.JavaVectorTransformer
Note
Experimental
Represents a StandardScaler model that can transform vectors.
Applies standardization transformation on a vector.
Parameters: | vector – Vector or RDD of Vector to be standardized. |
---|---|
Returns: | Standardized vector. If the variance of a column is zero, it will return default 0.0 for the column with zero variance. |
Bases: object
Note
Experimental
Standardizes features by removing the mean and scaling to unit variance using column summary statistics on the samples in the training set.
Parameters: |
|
---|
>>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 1.9])]
>>> dataset = sc.parallelize(vs)
>>> standardizer = StandardScaler(True, True)
>>> model = standardizer.fit(dataset)
>>> result = model.transform(dataset)
>>> for r in result.collect(): r
DenseVector([-0.7071, 0.7071, -0.7071])
DenseVector([0.7071, -0.7071, 0.7071])
Bases: object
Note
Experimental
Maps a sequence of terms to their term frequencies using the hashing trick.
Note: the terms must be hashable (can not be dict/set/list...).
Parameters: | numFeatures – number of features (default: 2^20) |
---|
>>> htf = HashingTF(100)
>>> doc = "a a b b c d".split(" ")
>>> htf.transform(doc)
SparseVector(100, {...})
Bases: pyspark.mllib.feature.JavaVectorTransformer
Represents an IDF model that can transform term frequency vectors.
Transforms term frequency (TF) vectors to TF-IDF vectors.
If minDocFreq was set for the IDF calculation, the terms which occur in fewer than minDocFreq documents will have an entry of 0.
Parameters: | x – an RDD of term frequency vectors or a term frequency vector |
---|---|
Returns: | an RDD of TF-IDF vectors or a TF-IDF vector |
Bases: object
Note
Experimental
Inverse document frequency (IDF).
The standard formulation is used: idf = log((m + 1) / (d(t) + 1)), where m is the total number of documents and d(t) is the number of documents that contain term t.
This implementation supports filtering out terms which do not appear in a minimum number of documents (controlled by the variable minDocFreq). For terms that are not in at least minDocFreq documents, the IDF is found as 0, resulting in TF-IDFs of 0.
Parameters: | minDocFreq – minimum of documents in which a term should appear for filtering |
---|
>>> n = 4
>>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
... Vectors.dense([0.0, 1.0, 2.0, 3.0]),
... Vectors.sparse(n, [1], [1.0])]
>>> data = sc.parallelize(freqs)
>>> idf = IDF()
>>> model = idf.fit(data)
>>> tfidf = model.transform(data)
>>> for r in tfidf.collect(): r
SparseVector(4, {1: 0.0, 3: 0.5754})
DenseVector([0.0, 0.0, 1.3863, 0.863])
SparseVector(4, {1: 0.0})
>>> model.transform(Vectors.dense([0.0, 1.0, 2.0, 3.0]))
DenseVector([0.0, 0.0, 1.3863, 0.863])
>>> model.transform([0.0, 1.0, 2.0, 3.0])
DenseVector([0.0, 0.0, 1.3863, 0.863])
>>> model.transform(Vectors.sparse(n, (1, 3), (1.0, 2.0)))
SparseVector(4, {1: 0.0, 3: 0.5754})
Bases: object
Word2Vec creates vector representation of words in a text corpus. The algorithm first constructs a vocabulary from the corpus and then learns vector representation of words in the vocabulary. The vector representation can be used as features in natural language processing and machine learning algorithms.
We used skip-gram model in our implementation and hierarchical softmax method to train the model. The variable names in the implementation matches the original C implementation.
For original C implementation, see https://code.google.com/p/word2vec/ For research papers, see Efficient Estimation of Word Representations in Vector Space and Distributed Representations of Words and Phrases and their Compositionality.
>>> sentence = "a b " * 100 + "a c " * 10
>>> localDoc = [sentence, sentence]
>>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" "))
>>> model = Word2Vec().setVectorSize(10).setSeed(42).fit(doc)
>>> syms = model.findSynonyms("a", 2)
>>> [s[0] for s in syms]
[u'b', u'c']
>>> vec = model.transform("a")
>>> syms = model.findSynonyms(vec, 2)
>>> [s[0] for s in syms]
[u'b', u'c']
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = Word2VecModel.load(sc, path)
>>> model.transform("a") == sameModel.transform("a")
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
Computes the vector representation of each word in vocabulary.
Parameters: | data – training data. RDD of list of string |
---|---|
Returns: | Word2VecModel instance |
Sets minCount, the minimum number of times a token must appear to be included in the word2vec model’s vocabulary (default: 5).
Sets number of iterations (default: 1), which should be smaller than or equal to number of partitions.
Bases: pyspark.mllib.feature.JavaVectorTransformer, pyspark.mllib.util.JavaSaveable, pyspark.mllib.util.JavaLoader
class for Word2Vec model
Bases: object
Note
Experimental
Creates a ChiSquared feature selector.
Parameters: | numTopFeatures – number of features that selector will select. |
---|
>>> data = [
... LabeledPoint(0.0, SparseVector(3, {0: 8.0, 1: 7.0})),
... LabeledPoint(1.0, SparseVector(3, {1: 9.0, 2: 6.0})),
... LabeledPoint(1.0, [0.0, 9.0, 8.0]),
... LabeledPoint(2.0, [8.0, 9.0, 5.0])
... ]
>>> model = ChiSqSelector(1).fit(sc.parallelize(data))
>>> model.transform(SparseVector(3, {1: 9.0, 2: 6.0}))
SparseVector(1, {0: 6.0})
>>> model.transform(DenseVector([8.0, 9.0, 5.0]))
DenseVector([5.0])
Bases: pyspark.mllib.feature.JavaVectorTransformer
Note
Experimental
Represents a Chi Squared selector model.
Bases: pyspark.mllib.feature.VectorTransformer
Note
Experimental
Scales each column of the vector, with the supplied weight vector. i.e the elementwise product.
>>> weight = Vectors.dense([1.0, 2.0, 3.0])
>>> eprod = ElementwiseProduct(weight)
>>> a = Vectors.dense([2.0, 1.0, 3.0])
>>> eprod.transform(a)
DenseVector([2.0, 2.0, 9.0])
>>> b = Vectors.dense([9.0, 3.0, 4.0])
>>> rdd = sc.parallelize([a, b])
>>> eprod.transform(rdd).collect()
[DenseVector([2.0, 2.0, 9.0]), DenseVector([9.0, 6.0, 12.0])]
Note
Experimental
A Parallel FP-growth algorithm to mine frequent itemsets.
Computes an FP-Growth model that contains frequent itemsets.
Parameters: |
|
---|
Note
Experimental
A FP-Growth model for mining frequent itemsets using the Parallel FP-Growth algorithm.
>>> data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
>>> rdd = sc.parallelize(data, 2)
>>> model = FPGrowth.train(rdd, 0.6, 2)
>>> sorted(model.freqItemsets().collect())
[FreqItemset(items=[u'a'], freq=4), FreqItemset(items=[u'c'], freq=3), ...
MLlib utilities for linear algebra. For dense vectors, MLlib uses the NumPy array type, so you can simply pass NumPy arrays around. For sparse vectors, users can construct a SparseVector object from MLlib or pass SciPy scipy.sparse column vectors if SciPy is available in their environment.
Bases: pyspark.mllib.linalg.Vector
A dense vector represented by a value array. We use numpy array for storage and arithmetics will be delegated to the underlying numpy array.
>>> v = Vectors.dense([1.0, 2.0])
>>> u = Vectors.dense([3.0, 4.0])
>>> v + u
DenseVector([4.0, 6.0])
>>> 2 - v
DenseVector([1.0, 0.0])
>>> v / 2
DenseVector([0.5, 1.0])
>>> v * u
DenseVector([3.0, 8.0])
>>> u / v
DenseVector([3.0, 2.0])
>>> u % 2
DenseVector([1.0, 0.0])
Compute the dot product of two Vectors. We support (Numpy array, list, SparseVector, or SciPy sparse) and a target NumPy array that is either 1- or 2-dimensional. Equivalent to calling numpy.dot of the two vectors.
>>> dense = DenseVector(array.array('d', [1., 2.]))
>>> dense.dot(dense)
5.0
>>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
4.0
>>> dense.dot(range(1, 3))
5.0
>>> dense.dot(np.array(range(1, 3)))
5.0
>>> dense.dot([1.,])
Traceback (most recent call last):
...
AssertionError: dimension mismatch
>>> dense.dot(np.reshape([1., 2., 3., 4.], (2, 2), order='F'))
array([ 5., 11.])
>>> dense.dot(np.reshape([1., 2., 3.], (3, 1), order='F'))
Traceback (most recent call last):
...
AssertionError: dimension mismatch
Calculte the norm of a DenseVector.
>>> a = DenseVector([0, -1, 2, -3])
>>> a.norm(2)
3.7...
>>> a.norm(1)
6.0
Parse string representation back into the DenseVector.
>>> DenseVector.parse(' [ 0.0,1.0,2.0, 3.0]')
DenseVector([0.0, 1.0, 2.0, 3.0])
Squared distance of two Vectors.
>>> dense1 = DenseVector(array.array('d', [1., 2.]))
>>> dense1.squared_distance(dense1)
0.0
>>> dense2 = np.array([2., 1.])
>>> dense1.squared_distance(dense2)
2.0
>>> dense3 = [2., 1.]
>>> dense1.squared_distance(dense3)
2.0
>>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
>>> dense1.squared_distance(sparse1)
2.0
>>> dense1.squared_distance([1.,])
Traceback (most recent call last):
...
AssertionError: dimension mismatch
>>> dense1.squared_distance(SparseVector(1, [0,], [1.,]))
Traceback (most recent call last):
...
AssertionError: dimension mismatch
Bases: pyspark.mllib.linalg.Vector
A simple sparse vector class for passing data to MLlib. Users may alternatively pass SciPy’s {scipy.sparse} data types.
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
>>> a.dot(array.array('d', [1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2], [1.0])
>>> a.dot(b)
0.0
>>> a.dot(np.array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
>>> a.dot([1., 2., 3.])
Traceback (most recent call last):
...
AssertionError: dimension mismatch
>>> a.dot(np.array([1., 2.]))
Traceback (most recent call last):
...
AssertionError: dimension mismatch
>>> a.dot(DenseVector([1., 2.]))
Traceback (most recent call last):
...
AssertionError: dimension mismatch
>>> a.dot(np.zeros((3, 2)))
Traceback (most recent call last):
...
AssertionError: dimension mismatch
A list of indices corresponding to active entries.
Calculte the norm of a SparseVector.
>>> a = SparseVector(4, [0, 1], [3., -4.])
>>> a.norm(1)
7.0
>>> a.norm(2)
5.0
Parse string representation back into the DenseVector.
>>> SparseVector.parse(' (4, [0,1 ],[ 4.0,5.0] )')
SparseVector(4, {0: 4.0, 1: 5.0})
Size of the vector.
Squared distance from a SparseVector or 1-dimensional NumPy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
>>> a.squared_distance(array.array('d', [1., 2., 3., 4.]))
11.0
>>> a.squared_distance(np.array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2], [1.0])
>>> a.squared_distance(b)
26.0
>>> b.squared_distance(a)
26.0
>>> b.squared_distance([1., 2.])
Traceback (most recent call last):
...
AssertionError: dimension mismatch
>>> b.squared_distance(SparseVector(3, [1,], [1.0,]))
Traceback (most recent call last):
...
AssertionError: dimension mismatch
A list of values corresponding to active entries.
Bases: object
Factory methods for working with vectors. Note that dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users can pass in SciPy’s scipy.sparse column vectors.
Create a dense vector of 64-bit floats from a Python list or numbers.
>>> Vectors.dense([1, 2, 3])
DenseVector([1.0, 2.0, 3.0])
>>> Vectors.dense(1.0, 2.0)
DenseVector([1.0, 2.0])
Parse a string representation back into the Vector.
>>> Vectors.parse('[2,1,2 ]')
DenseVector([2.0, 1.0, 2.0])
>>> Vectors.parse(' ( 100, [0], [2])')
SparseVector(100, {0: 2.0})
Create a sparse vector, using either a dictionary, a list of (index, value) pairs, or two separate arrays of indices and values (sorted by index).
Parameters: |
|
---|
>>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
SparseVector(4, {1: 1.0, 3: 5.5})
>>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
SparseVector(4, {1: 1.0, 3: 5.5})
Squared distance between two vectors. a and b can be of type SparseVector, DenseVector, np.ndarray or array.array.
>>> a = Vectors.sparse(4, [(0, 1), (3, 4)])
>>> b = Vectors.dense([2, 5, 4, 1])
>>> a.squared_distance(b)
51.0
Bases: pyspark.mllib.linalg.Matrix
Column-major dense matrix.
Bases: pyspark.mllib.linalg.Matrix
Sparse Matrix stored in CSC format.
Package for distributed linear algebra.
Bases: object
Note
Experimental
Represents a distributively stored matrix backed by one or more RDDs.
Bases: pyspark.mllib.linalg.distributed.DistributedMatrix
Note
Experimental
Represents a row-oriented distributed Matrix with no meaningful row indices.
Parameters: |
|
---|
Get or compute the number of cols.
>>> rows = sc.parallelize([[1, 2, 3], [4, 5, 6],
... [7, 8, 9], [10, 11, 12]])
>>> mat = RowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = RowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
Bases: object
Note
Experimental
Represents a row of an IndexedRowMatrix.
Just a wrapper over a (long, vector) tuple.
Parameters: |
|
---|
Bases: pyspark.mllib.linalg.distributed.DistributedMatrix
Note
Experimental
Represents a row-oriented distributed Matrix with indexed rows.
Parameters: |
|
---|
Get or compute the number of cols.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6]),
... IndexedRow(2, [7, 8, 9]),
... IndexedRow(3, [10, 11, 12])])
>>> mat = IndexedRowMatrix(rows)
>>> print(mat.numCols())
3
>>> mat = IndexedRowMatrix(rows, 7, 6)
>>> print(mat.numCols())
6
Get or compute the number of rows.
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6]),
... IndexedRow(2, [7, 8, 9]),
... IndexedRow(3, [10, 11, 12])])
>>> mat = IndexedRowMatrix(rows)
>>> print(mat.numRows())
4
>>> mat = IndexedRowMatrix(rows, 7, 6)
>>> print(mat.numRows())
7
Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.
>>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])]))
>>> rows = mat.rows
>>> rows.first()
IndexedRow(0, [1.0,2.0,3.0])
Convert this matrix to a BlockMatrix.
Parameters: |
|
---|
>>> rows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(6, [4, 5, 6])])
>>> mat = IndexedRowMatrix(rows).toBlockMatrix()
>>> # This IndexedRowMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # BlockMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> print(mat.numCols())
3
Convert this matrix to a CoordinateMatrix.
>>> rows = sc.parallelize([IndexedRow(0, [1, 0]),
... IndexedRow(6, [0, 5])])
>>> mat = IndexedRowMatrix(rows).toCoordinateMatrix()
>>> mat.entries.take(3)
[MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 0.0), MatrixEntry(6, 0, 0.0)]
Bases: object
Note
Experimental
Represents an entry of a CoordinateMatrix.
Just a wrapper over a (long, long, float) tuple.
Parameters: |
|
---|
Bases: pyspark.mllib.linalg.distributed.DistributedMatrix
Note
Experimental
Represents a matrix in coordinate format.
Parameters: |
|
---|
Entries of the CoordinateMatrix stored as an RDD of MatrixEntries.
>>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)]))
>>> entries = mat.entries
>>> entries.first()
MatrixEntry(0, 0, 1.2)
Get or compute the number of cols.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> print(mat.numCols())
2
>>> mat = CoordinateMatrix(entries, 7, 6)
>>> print(mat.numCols())
6
Get or compute the number of rows.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(1, 0, 2),
... MatrixEntry(2, 1, 3.7)])
>>> mat = CoordinateMatrix(entries)
>>> print(mat.numRows())
3
>>> mat = CoordinateMatrix(entries, 7, 6)
>>> print(mat.numRows())
7
Convert this matrix to a BlockMatrix.
Parameters: |
|
---|
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toBlockMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # BlockMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # BlockMatrix will have 5 columns as well.
>>> print(mat.numCols())
5
Convert this matrix to an IndexedRowMatrix.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toIndexedRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, and the ensuing
>>> # IndexedRowMatrix will have 7 rows as well.
>>> print(mat.numRows())
7
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing
>>> # IndexedRowMatrix will have 5 columns as well.
>>> print(mat.numCols())
5
Convert this matrix to a RowMatrix.
>>> entries = sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)])
>>> mat = CoordinateMatrix(entries).toRowMatrix()
>>> # This CoordinateMatrix will have 7 effective rows, due to
>>> # the highest row index being 6, but the ensuing RowMatrix
>>> # will only have 2 rows since there are only entries on 2
>>> # unique rows.
>>> print(mat.numRows())
2
>>> # This CoordinateMatrix will have 5 columns, due to the
>>> # highest column index being 4, and the ensuing RowMatrix
>>> # will have 5 columns as well.
>>> print(mat.numCols())
5
Bases: pyspark.mllib.linalg.distributed.DistributedMatrix
Note
Experimental
Represents a distributed matrix in blocks of local matrices.
Parameters: |
|
---|
The RDD of sub-matrix blocks ((blockRowIndex, blockColIndex), sub-matrix) that form this distributed matrix.
>>> mat = BlockMatrix(
... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
>>> blocks = mat.blocks
>>> blocks.first()
((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
Number of columns that make up each block.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.colsPerBlock
2
Number of columns of blocks in the BlockMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.numColBlocks
1
Get or compute the number of cols.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> print(mat.numCols())
2
>>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
>>> print(mat.numCols())
6
Number of rows of blocks in the BlockMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.numRowBlocks
2
Get or compute the number of rows.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> print(mat.numRows())
6
>>> mat = BlockMatrix(blocks, 3, 2, 7, 6)
>>> print(mat.numRows())
7
Number of rows that make up each block.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2)
>>> mat.rowsPerBlock
3
Convert this matrix to a CoordinateMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(1, 2, [1, 2])),
... ((1, 0), Matrices.dense(1, 2, [7, 8]))])
>>> mat = BlockMatrix(blocks, 1, 2).toCoordinateMatrix()
>>> mat.entries.take(3)
[MatrixEntry(0, 0, 1.0), MatrixEntry(0, 1, 2.0), MatrixEntry(1, 0, 7.0)]
Convert this matrix to an IndexedRowMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2).toIndexedRowMatrix()
>>> # This BlockMatrix will have 6 effective rows, due to
>>> # having two sub-matrix blocks stacked, each with 3 rows.
>>> # The ensuing IndexedRowMatrix will also have 6 rows.
>>> print(mat.numRows())
6
>>> # This BlockMatrix will have 2 effective columns, due to
>>> # having two sub-matrix blocks stacked, each with 2 columns.
>>> # The ensuing IndexedRowMatrix will also have 2 columns.
>>> print(mat.numCols())
2
Collect the distributed matrix on the driver as a DenseMatrix.
>>> blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])
>>> mat = BlockMatrix(blocks, 3, 2).toLocalMatrix()
>>> # This BlockMatrix will have 6 effective rows, due to
>>> # having two sub-matrix blocks stacked, each with 3 rows.
>>> # The ensuing DenseMatrix will also have 6 rows.
>>> print(mat.numRows)
6
>>> # This BlockMatrix will have 2 effective columns, due to
>>> # having two sub-matrix blocks stacked, each with 2
>>> # columns. The ensuing DenseMatrix will also have 2 columns.
>>> print(mat.numCols)
2
Python package for random data generation.
Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
Generates an RDD comprised of i.i.d. samples from the Exponential distribution with the input mean.
Parameters: |
|
---|---|
Returns: | RDD of float comprised of i.i.d. samples ~ Exp(mean). |
>>> mean = 2.0
>>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
Generates an RDD comprised of vectors containing i.i.d. samples drawn from the Exponential distribution with the input mean.
Parameters: |
|
---|---|
Returns: | RDD of Vector with vectors containing i.i.d. samples ~ Exp(mean). |
>>> import numpy as np
>>> mean = 0.5
>>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
Generates an RDD comprised of i.i.d. samples from the Gamma distribution with the input shape and scale.
Parameters: |
|
---|---|
Returns: | RDD of float comprised of i.i.d. samples ~ Gamma(shape, scale). |
>>> from math import sqrt
>>> shape = 1.0
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
>>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000
>>> abs(stats.mean() - expMean) < 0.5
True
>>> abs(stats.stdev() - expStd) < 0.5
True
Generates an RDD comprised of vectors containing i.i.d. samples drawn from the Gamma distribution.
Parameters: |
|
---|---|
Returns: | RDD of Vector with vectors containing i.i.d. samples ~ Gamma(shape, scale). |
>>> import numpy as np
>>> from math import sqrt
>>> shape = 1.0
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
>>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
True
>>> abs(mat.std() - expStd) < 0.1
True
Generates an RDD comprised of i.i.d. samples from the log normal distribution with the input mean and standard distribution.
Parameters: |
|
---|---|
Returns: | RDD of float comprised of i.i.d. samples ~ log N(mean, std). |
>>> from math import sqrt, exp
>>> mean = 0.0
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
>>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000
>>> abs(stats.mean() - expMean) < 0.5
True
>>> from math import sqrt
>>> abs(stats.stdev() - expStd) < 0.5
True
Generates an RDD comprised of vectors containing i.i.d. samples drawn from the log normal distribution.
Parameters: |
|
---|---|
Returns: | RDD of Vector with vectors containing i.i.d. samples ~ log N(mean, std). |
>>> import numpy as np
>>> from math import sqrt, exp
>>> mean = 0.0
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
>>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
>>> mat = np.matrix(m)
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
True
>>> abs(mat.std() - expStd) < 0.1
True
Generates an RDD comprised of i.i.d. samples from the standard normal distribution.
To transform the distribution in the generated RDD from standard normal to some other normal N(mean, sigma^2), use RandomRDDs.normal(sc, n, p, seed) .map(lambda v: mean + sigma * v)
Parameters: |
|
---|---|
Returns: | RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0). |
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
>>> stats = x.stats()
>>> stats.count()
1000
>>> abs(stats.mean() - 0.0) < 0.1
True
>>> abs(stats.stdev() - 1.0) < 0.1
True
Generates an RDD comprised of vectors containing i.i.d. samples drawn from the standard normal distribution.
Parameters: |
|
---|---|
Returns: | RDD of Vector with vectors containing i.i.d. samples ~ N(0.0, 1.0). |
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - 0.0) < 0.1
True
>>> abs(mat.std() - 1.0) < 0.1
True
Generates an RDD comprised of i.i.d. samples from the Poisson distribution with the input mean.
Parameters: |
|
---|---|
Returns: | RDD of float comprised of i.i.d. samples ~ Pois(mean). |
>>> mean = 100.0
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
1000
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
True
Generates an RDD comprised of vectors containing i.i.d. samples drawn from the Poisson distribution with the input mean.
Parameters: |
|
---|---|
Returns: | RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean). |
>>> import numpy as np
>>> mean = 100.0
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - mean) < 0.5
True
>>> from math import sqrt
>>> abs(mat.std() - sqrt(mean)) < 0.5
True
Generates an RDD comprised of i.i.d. samples from the uniform distribution U(0.0, 1.0).
To transform the distribution in the generated RDD from U(0.0, 1.0) to U(a, b), use RandomRDDs.uniformRDD(sc, n, p, seed) .map(lambda v: a + (b - a) * v)
Parameters: |
|
---|---|
Returns: | RDD of float comprised of i.i.d. samples ~ U(0.0, 1.0). |
>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
>>> len(x)
100
>>> max(x) <= 1.0 and min(x) >= 0.0
True
>>> RandomRDDs.uniformRDD(sc, 100, 4).getNumPartitions()
4
>>> parts = RandomRDDs.uniformRDD(sc, 100, seed=4).getNumPartitions()
>>> parts == sc.defaultParallelism
True
Generates an RDD comprised of vectors containing i.i.d. samples drawn from the uniform distribution U(0.0, 1.0).
Parameters: |
|
---|---|
Returns: | RDD of Vector with vectors containing i.i.d samples ~ U(0.0, 1.0). |
>>> import numpy as np
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
>>> mat.shape
(10, 10)
>>> mat.max() <= 1.0 and mat.min() >= 0.0
True
>>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
4
A matrix factorisation model trained by regularized alternating least-squares.
>>> r1 = (1, 1, 1.0)
>>> r2 = (1, 2, 2.0)
>>> r3 = (2, 1, 2.0)
>>> ratings = sc.parallelize([r1, r2, r3])
>>> model = ALS.trainImplicit(ratings, 1, seed=10)
>>> model.predict(2, 2)
0.4...
>>> testset = sc.parallelize([(1, 2), (1, 1)])
>>> model = ALS.train(ratings, 2, seed=0)
>>> model.predictAll(testset).collect()
[Rating(user=1, product=1, rating=1.0...), Rating(user=1, product=2, rating=1.9...)]
>>> model = ALS.train(ratings, 4, seed=10)
>>> model.userFeatures().collect()
[(1, array('d', [...])), (2, array('d', [...]))]
>>> model.recommendUsers(1, 2)
[Rating(user=2, product=1, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
>>> model.recommendProducts(1, 2)
[Rating(user=1, product=2, rating=1.9...), Rating(user=1, product=1, rating=1.0...)]
>>> model.rank
4
>>> first_user = model.userFeatures().take(1)[0]
>>> latents = first_user[1]
>>> len(latents) == 4
True
>>> model.productFeatures().collect()
[(1, array('d', [...])), (2, array('d', [...]))]
>>> first_product = model.productFeatures().take(1)[0]
>>> latents = first_product[1]
>>> len(latents) == 4
True
>>> model = ALS.train(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2, 2)
3.8...
>>> df = sqlContext.createDataFrame([Rating(1, 1, 1.0), Rating(1, 2, 2.0), Rating(2, 1, 2.0)])
>>> model = ALS.train(df, 1, nonnegative=True, seed=10)
>>> model.predict(2, 2)
3.8...
>>> model = ALS.trainImplicit(ratings, 1, nonnegative=True, seed=10)
>>> model.predict(2, 2)
0.4...
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = MatrixFactorizationModel.load(sc, path)
>>> sameModel.predict(2, 2)
0.4...
>>> sameModel.predictAll(testset).collect()
[Rating(...
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
Returns a list of predicted ratings for input user and product pairs.
Returns a paired RDD, where the first element is the product and the second is an array of features corresponding to that product.
Recommends the top “num” number of products for a given user and returns a list of Rating objects sorted by the predicted rating in descending order.
Class that represents the features and labels of a data point.
Parameters: |
|
---|
Note: ‘label’ and ‘features’ are accessible as class attributes.
A linear model that has a vector of coefficients and an intercept.
Parameters: |
|
---|
A linear regression model derived from a least-squares fit.
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
True
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> lrm.save(sc, path)
>>> sameModel = LinearRegressionModel.load(sc, path)
>>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except:
... pass
>>> data = [
... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... miniBatchFraction=1.0, initialWeights=array([1.0]), regParam=0.1, regType="l2",
... intercept=True, validateData=True)
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
Predict the value of the dependent variable given a vector or an RDD of vectors containing values for the independent variables.
Train a linear regression model using Stochastic Gradient Descent (SGD). This solves the least squares regression formulation
f(weights) = 1/(2n) ||A weights - y||^2,
which is the mean squared error. Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with its corresponding right hand side label y. See also the documentation for the precise formulation.
Parameters: |
|
---|
A linear regression model derived from a least-squares fit with an l_2 penalty term.
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
True
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> lrm.save(sc, path)
>>> sameModel = RidgeRegressionModel.load(sc, path)
>>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except:
... pass
>>> data = [
... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
Predict the value of the dependent variable given a vector or an RDD of vectors containing values for the independent variables.
Train a regression model with L2-regularization using Stochastic Gradient Descent. This solves the l2-regularized least squares regression formulation
f(weights) = 1/(2n) ||A weights - y||^2 + regParam/2 ||weights||^2.
Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with its corresponding right hand side label y. See also the documentation for the precise formulation.
Parameters: |
|
---|
A linear regression model derived from a least-squares fit with an l_1 penalty term.
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> abs(lrm.predict(sc.parallelize([[1.0]])).collect()[0] - 1) < 0.5
True
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> lrm.save(sc, path)
>>> sameModel = LassoModel.load(sc, path)
>>> abs(sameModel.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(sameModel.predict(np.array([1.0])) - 1) < 0.5
True
>>> abs(sameModel.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except:
... pass
>>> data = [
... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
Predict the value of the dependent variable given a vector or an RDD of vectors containing values for the independent variables.
Train a regression model with L1-regularization using Stochastic Gradient Descent. This solves the l1-regularized least squares regression formulation
f(weights) = 1/(2n) ||A weights - y||^2 + regParam ||weights||_1.
Here the data matrix has n rows, and the input RDD holds the set of rows of A, each with its corresponding right hand side label y. See also the documentation for the precise formulation.
Parameters: |
|
---|
Regression model for isotonic regression.
Parameters: |
|
---|
>>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
>>> irm = IsotonicRegression.train(sc.parallelize(data))
>>> irm.predict(3)
2.0
>>> irm.predict(5)
16.5
>>> irm.predict(sc.parallelize([3, 5])).collect()
[2.0, 16.5]
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> irm.save(sc, path)
>>> sameModel = IsotonicRegressionModel.load(sc, path)
>>> sameModel.predict(3)
2.0
>>> sameModel.predict(5)
16.5
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
Predict labels for provided features. Using a piecewise linear function. 1) If x exactly matches a boundary then associated prediction is returned. In case there are multiple predictions with the same boundary then one of them is returned. Which one is undefined (same as java.util.Arrays.binarySearch). 2) If x is lower or higher than all boundaries then first or last prediction is returned respectively. In case there are multiple predictions with the same boundary then the lowest or highest is returned respectively. 3) If x falls between two values in boundary array then prediction is treated as piecewise linear function and interpolated value is returned. In case there are multiple values with the same boundary then the same rules as in 2) are used.
Parameters: | x – Feature or RDD of Features to be labeled. |
---|
Python package for statistical functions in MLlib.
Note
Experimental
If observed is Vector, conduct Pearson’s chi-squared goodness of fit test of the observed data against the expected distribution, or againt the uniform distribution (by default), with each category having an expected frequency of 1 / len(observed). (Note: observed cannot contain negative values)
If observed is matrix, conduct Pearson’s independence test on the input contingency matrix, which cannot contain negative entries or columns or rows that sum up to 0.
If observed is an RDD of LabeledPoint, conduct Pearson’s independence test for every feature against the label across the input RDD. For each feature, the (feature, label) pairs are converted into a contingency matrix for which the chi-squared statistic is computed. All label and feature values must be categorical.
Parameters: |
|
---|---|
Returns: | ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, the method used, and the null hypothesis. |
>>> from pyspark.mllib.linalg import Vectors, Matrices
>>> observed = Vectors.dense([4, 6, 5])
>>> pearson = Statistics.chiSqTest(observed)
>>> print(pearson.statistic)
0.4
>>> pearson.degreesOfFreedom
2
>>> print(round(pearson.pValue, 4))
0.8187
>>> pearson.method
u'pearson'
>>> pearson.nullHypothesis
u'observed follows the same distribution as expected.'
>>> observed = Vectors.dense([21, 38, 43, 80])
>>> expected = Vectors.dense([3, 5, 7, 20])
>>> pearson = Statistics.chiSqTest(observed, expected)
>>> print(round(pearson.pValue, 4))
0.0027
>>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
>>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
>>> print(round(chi.statistic, 4))
21.9958
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
... LabeledPoint(0.0, Vectors.dense([1.5, 20.0])),
... LabeledPoint(1.0, Vectors.dense([1.5, 30.0])),
... LabeledPoint(0.0, Vectors.dense([3.5, 30.0])),
... LabeledPoint(0.0, Vectors.dense([3.5, 40.0])),
... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
>>> rdd = sc.parallelize(data, 4)
>>> chi = Statistics.chiSqTest(rdd)
>>> print(chi[0].statistic)
0.75
>>> print(chi[1].statistic)
1.5
Computes column-wise summary statistics for the input RDD[Vector].
Parameters: | rdd – an RDD[Vector] for which column-wise summary statistics are to be computed. |
---|---|
Returns: | MultivariateStatisticalSummary object containing column-wise summary statistics. |
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
... Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8])])
>>> cStats = Statistics.colStats(rdd)
>>> cStats.mean()
array([ 4., 4., 0., 3.])
>>> cStats.variance()
array([ 4., 13., 0., 25.])
>>> cStats.count()
3
>>> cStats.numNonzeros()
array([ 3., 2., 0., 3.])
>>> cStats.max()
array([ 6., 7., 0., 8.])
>>> cStats.min()
array([ 2., 0., 0., -2.])
Compute the correlation (matrix) for the input RDD(s) using the specified method. Methods currently supported: pearson (default), spearman.
If a single RDD of Vectors is passed in, a correlation matrix comparing the columns in the input RDD is returned. Use method= to specify the method to be used for single RDD inout. If two RDDs of floats are passed in, a single float is returned.
Parameters: |
|
---|---|
Returns: | Correlation matrix comparing columns in x. |
>>> x = sc.parallelize([1.0, 0.0, -2.0], 2)
>>> y = sc.parallelize([4.0, 5.0, 3.0], 2)
>>> zeros = sc.parallelize([0.0, 0.0, 0.0], 2)
>>> abs(Statistics.corr(x, y) - 0.6546537) < 1e-7
True
>>> Statistics.corr(x, y) == Statistics.corr(x, y, "pearson")
True
>>> Statistics.corr(x, y, "spearman")
0.5
>>> from math import isnan
>>> isnan(Statistics.corr(x, zeros))
True
>>> from pyspark.mllib.linalg import Vectors
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
>>> pearsonCorr = Statistics.corr(rdd)
>>> print(str(pearsonCorr).replace('nan', 'NaN'))
[[ 1. 0.05564149 NaN 0.40047142]
[ 0.05564149 1. NaN 0.91359586]
[ NaN NaN 1. NaN]
[ 0.40047142 0.91359586 NaN 1. ]]
>>> spearmanCorr = Statistics.corr(rdd, method="spearman")
>>> print(str(spearmanCorr).replace('nan', 'NaN'))
[[ 1. 0.10540926 NaN 0.4 ]
[ 0.10540926 1. NaN 0.9486833 ]
[ NaN NaN 1. NaN]
[ 0.4 0.9486833 NaN 1. ]]
>>> try:
... Statistics.corr(rdd, "spearman")
... print("Method name as second argument without 'method=' shouldn't be allowed.")
... except TypeError:
... pass
Note
Experimental
Performs the Kolmogorov-Smirnov (KS) test for data sampled from a continuous distribution. It tests the null hypothesis that the data is generated from a particular distribution.
The given data is sorted and the Empirical Cumulative Distribution Function (ECDF) is calculated which for a given point is the number of points having a CDF value lesser than it divided by the total number of points.
Since the data is sorted, this is a step function that rises by (1 / length of data) for every ordered point.
The KS statistic gives us the maximum distance between the ECDF and the CDF. Intuitively if this statistic is large, the probabilty that the null hypothesis is true becomes small. For specific details of the implementation, please have a look at the Scala documentation.
Parameters: |
|
---|---|
Returns: | KolmogorovSmirnovTestResult object containing the test statistic, degrees of freedom, p-value, the method used, and the null hypothesis. |
>>> kstest = Statistics.kolmogorovSmirnovTest
>>> data = sc.parallelize([-1.0, 0.0, 1.0])
>>> ksmodel = kstest(data, "norm")
>>> print(round(ksmodel.pValue, 3))
1.0
>>> print(round(ksmodel.statistic, 3))
0.175
>>> ksmodel.nullHypothesis
u'Sample follows theoretical distribution'
>>> data = sc.parallelize([2.0, 3.0, 4.0])
>>> ksmodel = kstest(data, "norm", 3.0, 1.0)
>>> print(round(ksmodel.pValue, 3))
1.0
>>> print(round(ksmodel.statistic, 3))
0.175
Trait for multivariate statistical summary of a data matrix.
Contains test results for the chi-squared hypothesis test.
Name of the test method
Represents a (mu, sigma) tuple
>>> m = MultivariateGaussian(Vectors.dense([11,12]),DenseMatrix(2, 2, (1.0, 3.0, 5.0, 2.0)))
>>> (m.mu, m.sigma.toArray())
(DenseVector([11.0, 12.0]), array([[ 1., 5.],[ 3., 2.]]))
>>> (m[0], m[1])
(DenseVector([11.0, 12.0]), array([[ 1., 5.],[ 3., 2.]]))
Note
Experimental
Estimate probability density at required points given a RDD of samples from the population.
>>> kd = KernelDensity()
>>> sample = sc.parallelize([0.0, 1.0])
>>> kd.setSample(sample)
>>> kd.estimate([0.0, 1.0])
array([ 0.12938758, 0.12938758])
Estimate the probability density at points
Set bandwidth of each sample. Defaults to 1.0
Set sample points from the population. Should be a RDD
Note
Experimental
A decision tree model for classification or regression.
Call method of java_model
Predict the label of one or more examples.
Parameters: | x – Data point (feature vector), or an RDD of data points (feature vectors). |
---|
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|
Note
Experimental
Learning algorithm for a decision tree model for classification or regression.
Train a DecisionTreeModel for classification.
Parameters: |
|
---|---|
Returns: | DecisionTreeModel |
Example usage:
>>> from numpy import array
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import DecisionTree
>>>
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
>>> print(model)
DecisionTreeModel classifier of depth 1 with 3 nodes
>>> print(model.toDebugString())
DecisionTreeModel classifier of depth 1 with 3 nodes
If (feature 0 <= 0.0)
Predict: 0.0
Else (feature 0 > 0.0)
Predict: 1.0
>>> model.predict(array([1.0]))
1.0
>>> model.predict(array([0.0]))
0.0
>>> rdd = sc.parallelize([[1.0], [0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
Train a DecisionTreeModel for regression.
Parameters: |
|
---|---|
Returns: | DecisionTreeModel |
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import DecisionTree
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data), {})
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {1: 0.0}))
0.0
>>> rdd = sc.parallelize([[0.0, 1.0], [0.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
Note
Experimental
Represents a random forest model.
Call method of java_model
Get number of trees in ensemble.
Predict values for a single data point or an RDD of points using the model trained.
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|
Full model
Get total number of nodes, summed over all trees in the ensemble.
Note
Experimental
Learning algorithm for a random forest model for classification or regression.
Method to train a decision tree model for binary or multiclass classification.
Parameters: |
|
---|---|
Returns: | RandomForestModel that can be used for prediction |
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>>
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(0.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> model = RandomForest.trainClassifier(sc.parallelize(data), 2, {}, 3, seed=42)
>>> model.numTrees()
3
>>> model.totalNumNodes()
7
>>> print(model)
TreeEnsembleModel classifier with 3 trees
>>> print(model.toDebugString())
TreeEnsembleModel classifier with 3 trees
Tree 0:
Predict: 1.0
Tree 1:
If (feature 0 <= 1.0)
Predict: 0.0
Else (feature 0 > 1.0)
Predict: 1.0
Tree 2:
If (feature 0 <= 1.0)
Predict: 0.0
Else (feature 0 > 1.0)
Predict: 1.0
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
0.0
>>> rdd = sc.parallelize([[3.0], [1.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
Method to train a decision tree model for regression.
Parameters: |
|
---|---|
Returns: | RandomForestModel that can be used for prediction |
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import RandomForest
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> model = RandomForest.trainRegressor(sc.parallelize(sparse_data), {}, 2, seed=42)
>>> model.numTrees()
2
>>> model.totalNumNodes()
4
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
0.5
>>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.5]
Note
Experimental
Represents a gradient-boosted tree model.
Call method of java_model
Get number of trees in ensemble.
Predict values for a single data point or an RDD of points using the model trained.
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|
Full model
Get total number of nodes, summed over all trees in the ensemble.
Note
Experimental
Learning algorithm for a gradient boosted trees model for classification or regression.
Method to train a gradient-boosted trees model for classification.
Parameters: |
|
---|---|
Returns: | GradientBoostedTreesModel that can be used for prediction |
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import GradientBoostedTrees
>>>
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(0.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>>
>>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}, numIterations=10)
>>> model.numTrees()
10
>>> model.totalNumNodes()
30
>>> print(model) # it already has newline
TreeEnsembleModel classifier with 10 trees
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
0.0
>>> rdd = sc.parallelize([[2.0], [0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
Method to train a gradient-boosted trees model for regression.
Parameters: |
|
---|---|
Returns: | GradientBoostedTreesModel that can be used for prediction |
Example usage:
>>> from pyspark.mllib.regression import LabeledPoint
>>> from pyspark.mllib.tree import GradientBoostedTrees
>>> from pyspark.mllib.linalg import SparseVector
>>>
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
>>> data = sc.parallelize(sparse_data)
>>> model = GradientBoostedTrees.trainRegressor(data, {}, numIterations=10)
>>> model.numTrees()
10
>>> model.totalNumNodes()
12
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
0.0
>>> rdd = sc.parallelize([[0.0, 1.0], [1.0, 0.0]])
>>> model.predict(rdd).collect()
[1.0, 0.0]
Mixin for classes which can load saved models using its Scala implementation.
Mixin for models that provide save() through their Scala implementation.
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|
Utils for generating linear data
Param: | intercept bias factor, the term c in X’w + c |
---|---|
Param: | weights feature vector, the term w in X’w + c |
Param: | xMean Point around which the data X is centered. |
Param: | xVariance Variance of the given data |
Param: | nPoints Number of points to be generated |
Param: | seed Random Seed |
Param: | eps Used to scale the noise. If eps is set high, the amount of gaussian noise added is more. |
Returns a list of LabeledPoints of length nPoints
Helper methods to load, save and pre-process data used in MLlib.
Returns a new vector with 1.0 (bias) appended to the end of the input vector.
Load labeled points saved using RDD.saveAsTextFile.
Parameters: |
|
---|---|
Returns: | labeled data stored as an RDD of LabeledPoint |
>>> from tempfile import NamedTemporaryFile
>>> from pyspark.mllib.util import MLUtils
>>> from pyspark.mllib.regression import LabeledPoint
>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, -1.23), (2, 4.56e-7)])), LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> sc.parallelize(examples, 1).saveAsTextFile(tempFile.name)
>>> MLUtils.loadLabeledPoints(sc, tempFile.name).collect()
[LabeledPoint(1.1, (3,[0,2],[-1.23,4.56e-07])), LabeledPoint(0.0, [1.01,2.02,3.03])]
Loads labeled data in the LIBSVM format into an RDD of LabeledPoint. The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR. Each line represents a labeled sparse feature vector using the following format:
label index1:value1 index2:value2 ...
where the indices are one-based and in ascending order. This method parses each line into a LabeledPoint, where the feature indices are converted to zero-based.
Parameters: |
|
---|---|
Returns: | labeled data stored as an RDD of LabeledPoint |
>>> from tempfile import NamedTemporaryFile
>>> from pyspark.mllib.util import MLUtils
>>> from pyspark.mllib.regression import LabeledPoint
>>> tempFile = NamedTemporaryFile(delete=True)
>>> _ = tempFile.write(b"+1 1:1.0 3:2.0 5:3.0\n-1\n-1 2:4.0 4:5.0 6:6.0")
>>> tempFile.flush()
>>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
>>> tempFile.close()
>>> examples[0]
LabeledPoint(1.0, (6,[0,2,4],[1.0,2.0,3.0]))
>>> examples[1]
LabeledPoint(-1.0, (6,[],[]))
>>> examples[2]
LabeledPoint(-1.0, (6,[1,3,5],[4.0,5.0,6.0]))
Loads vectors saved using RDD[Vector].saveAsTextFile with the default number of partitions.
Save labeled data in LIBSVM format.
Parameters: |
|
---|
>>> from tempfile import NamedTemporaryFile
>>> from fileinput import input
>>> from pyspark.mllib.regression import LabeledPoint
>>> from glob import glob
>>> from pyspark.mllib.util import MLUtils
>>> examples = [LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))]
>>> tempFile = NamedTemporaryFile(delete=True)
>>> tempFile.close()
>>> MLUtils.saveAsLibSVMFile(sc.parallelize(examples), tempFile.name)
>>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0.0 1:1.01 2:2.02 3:3.03\n1.1 1:1.23 3:4.56\n'
Mixin for models and transformers which may be saved as files.
Save this model to the given path.
The model may be loaded using py:meth:Loader.load.
Parameters: |
|
---|