Home | Trees | Indices | Help |
|
---|
|
Main entry point for SparkSQL functionality.
A SQLContext can be used create SchemaRDDs, register SchemaRDDs as tables, execute SQL over tables, cache tables, and read parquet files.
Instance Methods | |||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
Method Details |
Create a new SQLContext.
|
Registers a lambda function as a UDF so it can be used in SQL statements. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically be done. For any other return type, the produced object must match the specified type. >>> sqlCtx.registerFunction("stringLengthString", lambda x: len(x)) >>> sqlCtx.sql("SELECT stringLengthString('test')").collect() [Row(c0=u'4')] >>> sqlCtx.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlCtx.sql("SELECT stringLengthInt('test')").collect() [Row(c0=4)] >>> sqlCtx.registerFunction("twoArgs", lambda x, y: len(x) + y, IntegerType()) >>> sqlCtx.sql("SELECT twoArgs('test', 1)").collect() [Row(c0=5)] |
Infer and apply a schema to an RDD of Rows. We peek at the first row of the RDD to determine the fields' names and types. Nested collections are supported, which include array, dict, list, Row, tuple, namedtuple, or object. All the rows in `rdd` should have the same type with the first one, or it will cause runtime exceptions. Each row could be pyspark.sql.Row object or namedtuple or objects, using dict is deprecated. >>> rdd = sc.parallelize( ... [Row(field1=1, field2="row1"), ... Row(field1=2, field2="row2"), ... Row(field1=3, field2="row3")]) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.collect()[0] Row(field1=1, field2=u'row1') >>> NestedRow = Row("f1", "f2") >>> nestedRdd1 = sc.parallelize([ ... NestedRow(array('i', [1, 2]), {"row1": 1.0}), ... NestedRow(array('i', [2, 3]), {"row2": 2.0})]) >>> srdd = sqlCtx.inferSchema(nestedRdd1) >>> srdd.collect() [Row(f1=[1, 2], f2={u'row1': 1.0}), ..., f2={u'row2': 2.0})] >>> nestedRdd2 = sc.parallelize([ ... NestedRow([[1, 2], [2, 3]], [1, 2]), ... NestedRow([[2, 3], [3, 4]], [2, 3])]) >>> srdd = sqlCtx.inferSchema(nestedRdd2) >>> srdd.collect() [Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), ..., f2=[2, 3])] |
Applies the given schema to the given RDD of These tuples or lists can contain complex nested structures like lists, maps or nested rows. The schema should be a StructType. It is important that the schema matches the types of the objects in each row or exceptions could be thrown at runtime. >>> rdd2 = sc.parallelize([(1, "row1"), (2, "row2"), (3, "row3")]) >>> schema = StructType([StructField("field1", IntegerType(), False), ... StructField("field2", StringType(), False)]) >>> srdd = sqlCtx.applySchema(rdd2, schema) >>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> srdd2 = sqlCtx.sql("SELECT * from table1") >>> srdd2.collect() [Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')] >>> from datetime import datetime >>> rdd = sc.parallelize([(127, -128L, -32768, 32767, 2147483647L, 1.0, ... datetime(2010, 1, 1, 1, 1, 1), ... {"a": 1}, (2,), [1, 2, 3], None)]) >>> schema = StructType([ ... StructField("byte1", ByteType(), False), ... StructField("byte2", ByteType(), False), ... StructField("short1", ShortType(), False), ... StructField("short2", ShortType(), False), ... StructField("int", IntegerType(), False), ... StructField("float", FloatType(), False), ... StructField("time", TimestampType(), False), ... StructField("map", ... MapType(StringType(), IntegerType(), False), False), ... StructField("struct", ... StructType([StructField("b", ShortType(), False)]), False), ... StructField("list", ArrayType(ByteType(), False), False), ... StructField("null", DoubleType(), True)]) >>> srdd = sqlCtx.applySchema(rdd, schema) >>> results = srdd.map( ... lambda x: (x.byte1, x.byte2, x.short1, x.short2, x.int, x.float, x.time, ... x.map["a"], x.struct.b, x.list, x.null)) >>> results.collect()[0] (127, -128, -32768, 32767, 2147483647, 1.0, ...(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None) >>> srdd.registerTempTable("table2") >>> sqlCtx.sql( ... "SELECT byte1 - 1 AS byte1, byte2 + 1 AS byte2, " + ... "short1 + 1 AS short1, short2 - 1 AS short2, int - 1 AS int, " + ... "float + 1.5 as float FROM table2").collect() [Row(byte1=126, byte2=-127, short1=-32767, short2=32766, int=2147483646, float=2.5)] >>> rdd = sc.parallelize([(127, -32768, 1.0, ... datetime(2010, 1, 1, 1, 1, 1), ... {"a": 1}, (2,), [1, 2, 3])]) >>> abstract = "byte short float time map{} struct(b) list[]" >>> schema = _parse_schema_abstract(abstract) >>> typedSchema = _infer_schema_type(rdd.first(), schema) >>> srdd = sqlCtx.applySchema(rdd, typedSchema) >>> srdd.collect() [Row(byte=127, short=-32768, float=1.0, time=..., list=[1, 2, 3])] |
Registers the given RDD as a temporary table in the catalog. Temporary tables exist only during the lifetime of this instance of SQLContext. >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") |
Loads a Parquet file, returning the result as a SchemaRDD. >>> import tempfile, shutil >>> parquetFile = tempfile.mkdtemp() >>> shutil.rmtree(parquetFile) >>> srdd = sqlCtx.inferSchema(rdd) >>> srdd.saveAsParquetFile(parquetFile) >>> srdd2 = sqlCtx.parquetFile(parquetFile) >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True |
Loads a text file storing one JSON object per line as a SchemaRDD. If the schema is provided, applies the given schema to this JSON dataset. Otherwise, it goes through the entire dataset once to determine the schema. >>> import tempfile, shutil >>> jsonFile = tempfile.mkdtemp() >>> shutil.rmtree(jsonFile) >>> ofn = open(jsonFile, 'w') >>> for json in jsonStrings: ... print>>ofn, json >>> ofn.close() >>> srdd1 = sqlCtx.jsonFile(jsonFile) >>> sqlCtx.registerRDDAsTable(srdd1, "table1") >>> srdd2 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table1") >>> for r in srdd2.collect(): ... print r Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')]) Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) >>> srdd3 = sqlCtx.jsonFile(jsonFile, srdd1.schema()) >>> sqlCtx.registerRDDAsTable(srdd3, "table2") >>> srdd4 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table2") >>> for r in srdd4.collect(): ... print r Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) Row(f1=2, f2=None, f3=Row(field4=22,..., f4=[Row(field7=u'row2')]) Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) >>> schema = StructType([ ... StructField("field2", StringType(), True), ... StructField("field3", ... StructType([ ... StructField("field5", ... ArrayType(IntegerType(), False), True)]), False)]) >>> srdd5 = sqlCtx.jsonFile(jsonFile, schema) >>> sqlCtx.registerRDDAsTable(srdd5, "table3") >>> srdd6 = sqlCtx.sql( ... "SELECT field2 AS f1, field3.field5 as f2, " ... "field3.field5[0] as f3 from table3") >>> srdd6.collect() [Row(f1=u'row1', f2=None, f3=None)...Row(f1=u'row3', f2=[], f3=None)] |
Loads an RDD storing one JSON object per string as a SchemaRDD. If the schema is provided, applies the given schema to this JSON dataset. Otherwise, it goes through the entire dataset once to determine the schema. >>> srdd1 = sqlCtx.jsonRDD(json) >>> sqlCtx.registerRDDAsTable(srdd1, "table1") >>> srdd2 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table1") >>> for r in srdd2.collect(): ... print r Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')]) Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) >>> srdd3 = sqlCtx.jsonRDD(json, srdd1.schema()) >>> sqlCtx.registerRDDAsTable(srdd3, "table2") >>> srdd4 = sqlCtx.sql( ... "SELECT field1 AS f1, field2 as f2, field3 as f3, " ... "field6 as f4 from table2") >>> for r in srdd4.collect(): ... print r Row(f1=1, f2=u'row1', f3=Row(field4=11, field5=None), f4=None) Row(f1=2, f2=None, f3=Row(field4=22..., f4=[Row(field7=u'row2')]) Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None) >>> schema = StructType([ ... StructField("field2", StringType(), True), ... StructField("field3", ... StructType([ ... StructField("field5", ... ArrayType(IntegerType(), False), True)]), False)]) >>> srdd5 = sqlCtx.jsonRDD(json, schema) >>> sqlCtx.registerRDDAsTable(srdd5, "table3") >>> srdd6 = sqlCtx.sql( ... "SELECT field2 AS f1, field3.field5 as f2, " ... "field3.field5[0] as f3 from table3") >>> srdd6.collect() [Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)] >>> sqlCtx.jsonRDD(sc.parallelize(['{}', ... '{"key0": {"key1": "value1"}}'])).collect() [Row(key0=None), Row(key0=Row(key1=u'value1'))] >>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', ... '{"key0": {"key1": "value1"}}'])).collect() [Row(key0=None), Row(key0=Row(key1=u'value1'))] |
Return a SchemaRDD representing the result of the given query. >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> srdd2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> srdd2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')] |
Returns the specified table as a SchemaRDD. >>> srdd = sqlCtx.inferSchema(rdd) >>> sqlCtx.registerRDDAsTable(srdd, "table1") >>> srdd2 = sqlCtx.table("table1") >>> sorted(srdd.collect()) == sorted(srdd2.collect()) True |
Home | Trees | Indices | Help |
|
---|
Generated by Epydoc 3.0.1 on Thu Sep 11 01:19:41 2014 | http://epydoc.sourceforge.net |