Package pyspark :: Module sql :: Class SQLContext
[frames] | no frames]

Class SQLContext

source code

Main entry point for SparkSQL functionality.

A SQLContext can be used create SchemaRDDs, register SchemaRDDs as tables, execute SQL over tables, cache tables, and read parquet files.

Instance Methods
 
__init__(self, sparkContext, sqlContext=None)
Create a new SQLContext.
source code
 
inferSchema(self, rdd)
Infer and apply a schema to an RDD of dicts.
source code
 
registerRDDAsTable(self, rdd, tableName)
Registers the given RDD as a temporary table in the catalog.
source code
 
parquetFile(self, path)
Loads a Parquet file, returning the result as a SchemaRDD.
source code
 
jsonFile(self, path)
Loads a text file storing one JSON object per line, returning the result as a L{SchemaRDD}.
source code
 
jsonRDD(self, rdd)
Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
source code
 
sql(self, sqlQuery)
Return a SchemaRDD representing the result of the given query.
source code
 
table(self, tableName)
Returns the specified table as a SchemaRDD.
source code
 
cacheTable(self, tableName)
Caches the specified table in-memory.
source code
 
uncacheTable(self, tableName)
Removes the specified table from the in-memory cache.
source code
Method Details

__init__(self, sparkContext, sqlContext=None)
(Constructor)

source code 

Create a new SQLContext.

Parameters:
  • sparkContext - The SparkContext to wrap.
    >>> srdd = sqlCtx.inferSchema(rdd)
    >>> sqlCtx.inferSchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    ValueError:...
    >>> bad_rdd = sc.parallelize([1,2,3])
    >>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    ValueError:...
    >>> allTypes = sc.parallelize([{"int" : 1, "string" : "string", "double" : 1.0, "long": 1L,
    ... "boolean" : True}])
    >>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long,
    ... x.boolean))
    >>> srdd.collect()[0]
    (1, u'string', 1.0, 1, True)

inferSchema(self, rdd)

source code 

Infer and apply a schema to an RDD of dicts.

We peek at the first row of the RDD to determine the fields names and types, and then use that to extract all the dictionaries. Nested collections are supported, which include array, dict, list, set, and tuple.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
...                    {"field1" : 3, "field2": "row3"}]
True
>>> from array import array
>>> srdd = sqlCtx.inferSchema(nestedRdd1)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
...                    {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
>>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
...                    {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True

registerRDDAsTable(self, rdd, tableName)

source code 

Registers the given RDD as a temporary table in the catalog.

Temporary tables exist only during the lifetime of this instance of SQLContext.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")

parquetFile(self, path)

source code 

Loads a Parquet file, returning the result as a SchemaRDD.

>>> import tempfile, shutil
>>> parquetFile = tempfile.mkdtemp()
>>> shutil.rmtree(parquetFile)
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.saveAsParquetFile(parquetFile)
>>> srdd2 = sqlCtx.parquetFile(parquetFile)
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True

jsonFile(self, path)

source code 
Loads a text file storing one JSON object per line,
   returning the result as a L{SchemaRDD}.
   It goes through the entire dataset once to determine the schema.

>>> import tempfile, shutil
>>> jsonFile = tempfile.mkdtemp()
>>> shutil.rmtree(jsonFile)
>>> ofn = open(jsonFile, 'w')
>>> for json in jsonStrings:
...   print>>ofn, json
>>> ofn.close()
>>> srdd = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql(
...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
>>> srdd2.collect() == [
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
True

jsonRDD(self, rdd)

source code 
Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
   It goes through the entire dataset once to determine the schema.

>>> srdd = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql(
...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
>>> srdd2.collect() == [
... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
True

sql(self, sqlQuery)

source code 

Return a SchemaRDD representing the result of the given query.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> srdd2.collect() == [{"f1" : 1, "f2" : "row1"}, {"f1" : 2, "f2": "row2"},
...                     {"f1" : 3, "f2": "row3"}]
True

table(self, tableName)

source code 

Returns the specified table as a SchemaRDD.

>>> srdd = sqlCtx.inferSchema(rdd)
>>> sqlCtx.registerRDDAsTable(srdd, "table1")
>>> srdd2 = sqlCtx.table("table1")
>>> sorted(srdd.collect()) == sorted(srdd2.collect())
True