Apache Spark Component Guide
Also available as:
PDF
loading table of contents...

Accessing ORC Data in Hive Tables

Apache Spark on HDP supports the Optimized Row Columnar (ORC) file format, a self-describing, type-aware, column-based file format that is one of the primary file formats supported in Apache Hive. ORC reduces I/O overhead by accessing only the columns that are required for the current query. It requires significantly fewer seek operations because all columns within a single group of row data (known as a stripe) are stored together on disk.

Spark ORC data source supports ACID transactions, snapshot isolation, built-in indexes, and complex data types (such as array, map, and struct), and provides read and write access to ORC files. It leverages the Spark SQL Catalyst engine for common optimizations such as column pruning, predicate push-down, and partition pruning.

This subsection has several examples of Spark ORC integration, showing how ORC optimizations are applied to user programs.

Accessing ORC Files from Spark

To start using ORC, you can define a SparkSession instance:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

The following example uses data structures to demonstrate working with complex types. The Person struct data type has a name, an age, and a sequence of contacts, which are themselves defined by names and phone numbers.

  1. Define Contact and Person data structures:

    case class Contact(name: String, phone: String)
    case class Person(name: String, age: Int, contacts: Seq[Contact])
  2. Create 100 Person records:

    val records = (1 to 100).map { i =>;
      Person(s"name_$i", i, (0 to 1).map { m => Contact(s"contact_$m", s"phone_$m") })
    }

    In the physical file, these records are saved in columnar format. When accessing ORC files through the DataFrame API, you see rows.

  3. To write person records as ORC files to a directory named “people”, you can use the following command:

    records.toDF().write.format("orc").save("people")
  4. Read the objects back:

    val people = sqlContext.read.format("orc").load("people.json")
  5. For reuse in future operations, register the new "people" directory as temporary table “people”:

    people.createOrReplaceTempView("people")
  6. After you register the temporary table “people”, you can query columns from the underlying table:

    sqlContext.sql("SELECT name FROM people WHERE age < 15").count()

In this example the physical table scan loads only columns name and age at runtime, without reading the contacts column from the file system. This improves read performance.

You can also use Spark DataFrameReader and DataFrameWriter methods to access ORC files.

Enabling Predicate Push-Down Optimization

The columnar nature of the ORC format helps avoid reading unnecessary columns, but it is still possible to read unnecessary rows. The example in this subsection reads all rows in which the age value is between 0 and 100, even though the query requested rows in which the age value is less than 15 ("...WHERE age < 15"). Such full table scanning is an expensive operation.

ORC avoids this type of overhead by using predicate push-down, with three levels of built-in indexes within each file: file level, stripe level, and row level:

  • File-level and stripe-level statistics are in the file footer, making it easy to determine if the rest of the file must be read.

  • Row-level indexes include column statistics for each row group and position, for finding the start of the row group.

ORC uses these indexes to move the filter operation to the data loading phase by reading only data that potentially includes required rows.

This combination of predicate push-down with columnar storage reduces disk I/O significantly, especially for larger datasets in which I/O bandwidth becomes the main bottleneck to performance.

By default, ORC predicate push-down is disabled in Spark SQL. To obtain performance benefits from predicate push-down, you must enable it explicitly, as follows:

spark.sql("set spark.sql.orc.filterPushdown=true")

Loading ORC Data into DataFrames by Using Predicate Push-Down

DataFrames look similar to Spark RDDs but have higher-level semantics built into their operators. This allows optimization to be pushed down to the underlying query engine.

Here is the Scala API version of the SELECT query used in the previous section, using the DataFrame API:

val spark = SparkSession.builder().getOrCreate()
spark.sql("set spark.sql.orc.filterPushdown=true")
val people = spark.read.format("orc").load("peoplePartitioned")
people.filter(people("age") < 15).select("name").show()

DataFrames are not limited to Scala. There is a Java API and, for data scientists, a Python API binding:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
people = spark.read.format("orc").load("peoplePartitioned")
people.filter(people.age < 15).select("name").show()

Optimizing Queries Through Partition Pruning

When predicate push-down optimization is not applicable—for example, if all stripes contain records that match the predicate condition—a query with a WHERE clause might need to read the entire data set. This becomes a bottleneck over a large table. Partition pruning is another optimization method; it exploits query semantics to avoid reading large amounts of data unnecessarily.

Partition pruning is possible when data within a table is split across multiple logical partitions. Each partition corresponds to a particular value of a partition column and is stored as a subdirectory within the table root directory on HDFS. Where applicable, only the required partitions (subdirectories) of a table are queried, thereby avoiding unnecessary I/O.

Spark supports saving data in a partitioned layout seamlessly, through the partitionBy method available during data source write operations. To partition the "people" table by the “age” column, you can use the following command:

people.write.format("orc").partitionBy("age").save("peoplePartitioned")

As a result, records are automatically partitioned by the age field and then saved into different directories: for example, peoplePartitioned/age=1/, peoplePartitioned/age=2/, and so on.

After partitioning the data, subsequent queries can omit large amounts of I/O when the partition column is referenced in predicates. For example, the following query automatically locates and loads the file under peoplePartitioned/age=20/and omits all others:

val peoplePartitioned = spark.read.format("orc").load("peoplePartitioned")
peoplePartitioned.createOrReplaceTempView("peoplePartitioned") 
spark.sql("SELECT * FROM peoplePartitioned WHERE age = 20")

Enabling Vectorized Query Execution

Vectorized query execution is a feature that greatly reduces the CPU usage for typical query operations such as scans, filters, aggregates, and joins. Vectorization is also implemented for the ORC format. Spark also uses Whole Stage Codegen and this vectorization (for Parquet) since Spark 2.0 (released on July 26, 2016).

Use the following steps to implement the new ORC format and enable vectorization for ORC files with SparkSQL.

  1. On the Ambari Dashboard, select Spark2 > Configs. For Spark shells and applications, click Custom spark2-defaults, then add the following properties. For the Spark Thrift Server, add these properties under Custom spark2-thrift-sparkconf.

    • spark.sql.orc.enabled=true – Enables the new ORC format to read/write Spark data source tables and files.

    • spark.sql.hive.convertMetastoreOrc=true – Enables the new ORC format to read/write Hive tables.

    • spark.sql.orc.char.enabled=true – Enables the new ORC format to use CHAR types to read Hive tables. By default, STRING types are used for performance reasons. This is an optional configuration for Hive compatibility.

  2. Click Save, then restart Spark and any other components that require a restart.

Reading Hive ORC Tables

For existing Hive tables, Spark can read them without createOrReplaceTempView. If the table is stored as ORC format, predicate Push-down, partition pruning, and vectorized query execution are also applied according to the configuration.

spark.sql("SELECT * FROM hiveTable WHERE age = 20")