Integrating Apache Hive with Spark and BI
Also available as:
PDF

HiveWarehouseSession API operations

HiveWarehouseSession acts as an API to bridge Spark with Hive. In your Spark source code, you create an instance of HiveWarehouseSession. You use the language-specific code to create the HiveWarehouseSession.

Import statements and variables

The following string constants are defined by the API:

  • HIVE_WAREHOUSE_CONNECTOR
  • DATAFRAME_TO_STREAM
  • STREAM_TO_STREAM

For more information, see the Github project for the Hive Warehouse Connector.

Assuming spark is running in an existing SparkSession, use this code for imports:

  • Scala
    import com.hortonworks.hwc.HiveWarehouseSession
    import com.hortonworks.hwc.HiveWarehouseSession._
    val hive = HiveWarehouseSession.session(spark).build()
  • Java
    import com.hortonworks.hwc.HiveWarehouseSession;
    import static com.hortonworks.hwc.HiveWarehouseSession.*;
    HiveWarehouseSession hive = HiveWarehouseSession.session(spark).build();
  • Python
    from pyspark_llap import HiveWarehouseSession
    hive = HiveWarehouseSession.session(spark).build()

Catalog operations

  • Set the current database for unqualified Hive table references

    hive.setDatabase(<database>)

  • Execute a catalog operation and return a DataFrame

    hive.execute("describe extended web_sales").show(100)

  • Show databases

    hive.showDatabases().show(100)

  • Show tables for the current database

    hive.showTables().show(100)

  • Describe a table

    hive.describeTable(<table_name>).show(100)

  • Create a database

    hive.createDatabase(<database_name>,<ifNotExists>)

  • Create an ORC table

    hive.createTable("web_sales").ifNotExists().column("sold_time_sk", "bigint").column("ws_ship_date_sk", "bigint").create()

    See the CreateTableBuilder interface section below for additional table creation options. Note: You can also create tables through standard Hive using hive.executeUpdate.

  • Drop a database

    hive.dropDatabase(<databaseName>, <ifExists>, <useCascade>)

  • Drop a table

    hive.dropTable(<tableName>, <ifExists>, <usePurge>)

Read operations

Execute a Hive SELECT query and return a DataFrame.

hive.executeQuery("select * from web_sales")

Write operations

  • Execute a Hive update statement

    hive.executeUpdate("ALTER TABLE old_name RENAME TO new_name")

    Note: You can execute CREATE, UPDATE, DELETE, INSERT, and MERGE statements in this way.

  • Write a DataFrame to Hive in batch (uses LOAD DATA INTO TABLE)

    Java/Scala:

    df.write.format(HIVE_WAREHOUSE_CONNECTOR).option("table", <tableName>).save()

    Python:

    df.write.format(HiveWarehouseSession().HIVE_WAREHOUSE_CONNECTOR).option("table", &tableName>).save()
  • Write a DataFrame to Hive using HiveStreaming

    Java/Scala:

    //Using dynamic partitioning
    df.write.format(DATAFRAME_TO_STREAM).option("table", <tableName>).save()
    
    //Or, to write to static partition
    df.write.format(DATAFRAME_TO_STREAM).option("table", <tableName>).option("partition", <partition>).save()
                

    Python:

    //Using dynamic partitioning
    df.write.format(HiveWarehouseSession().DATAFRAME_TO_STREAM).option("table", <tableName>).save()
                  
    //Or, to write to static partition
    df.write.format(HiveWarehouseSession().DATAFRAME_TO_STREAM).option("table", <tableName>).option("partition", <partition>).save()
              
  • Write a Spark Stream to Hive using HiveStreaming.

    Java/Scala:

    stream.writeStream.format(STREAM_TO_STREAM).option("table", "web_sales").start()

    Python:

    stream.writeStream.format(HiveWarehouseSession().STREAM_TO_STREAM).option("table", "web_sales").start()

ETL example (Scala)

Read table data from Hive, transform it in Spark, and write to a new Hive table.

import com.hortonworks.hwc.HiveWarehouseSession
import com.hortonworks.hwc.HiveWarehouseSession._
val hive = HiveWarehouseSession.session(spark).build()
hive.setDatabase("tpcds_bin_partitioned_orc_1000")
val df = hive.executeQuery("select * from web_sales")
df.createOrReplaceTempView("web_sales")
hive.setDatabase("testDatabase")
hive.createTable("newTable")
  .ifNotExists()
  .column("ws_sold_time_sk", "bigint")
  .column("ws_ship_date_sk", "bigint")
  .create()
sql("SELECT ws_sold_time_sk, ws_ship_date_sk FROM web_sales WHERE ws_sold_time_sk > 80000)
  .write.format(HIVE_WAREHOUSE_CONNECTOR)
  .option("table", "newTable")
  .save()

HiveWarehouseSession interface

package com.hortonworks.hwc;

public interface HiveWarehouseSession {

//Execute Hive SELECT query and return DataFrame
	  Dataset<Row> executeQuery(String sql);

//Execute Hive update statement
	  boolean executeUpdate(String sql);

//Execute Hive catalog-browsing operation and return DataFrame
	  Dataset<Row> execute(String sql);

//Reference a Hive table as a DataFrame
	  Dataset<Row> table(String sql);

//Return the SparkSession attached to this HiveWarehouseSession
	  SparkSession session();

//Set the current database for unqualified Hive table references
	  void setDatabase(String name);

/**
 * Helpers: wrapper functions over execute or executeUpdate
*/

//Helper for show databases
	  Dataset<Row> showDatabases();

//Helper for show tables
	  Dataset<Row> showTables();

//Helper for describeTable
	  Dataset<Row> describeTable(String table);

//Helper for create database
	  void createDatabase(String database, boolean ifNotExists);

//Helper for create table stored as ORC
	  CreateTableBuilder createTable(String tableName);

//Helper for drop database
	  void dropDatabase(String database, boolean ifExists, boolean cascade);

//Helper for drop table
	  void dropTable(String table, boolean ifExists, boolean purge);
}

CreateTableBuilder interface


package com.hortonworks.hwc;
        
public interface CreateTableBuilder {
        
  //Silently skip table creation if table name exists
  CreateTableBuilder ifNotExists();
        
  //Add a column with the specific name and Hive type
  //Use more than once to add multiple columns
  CreateTableBuilder column(String name, String type);
        
  //Specific a column as table partition
  //Use more than once to specify multiple partitions
  CreateTableBuilder partition(String name, String type);
        
  //Add a table property
  //Use more than once to add multiple properties
  CreateTableBuilder prop(String key, String value);
        
  //Make table bucketed, with given number of buckets and bucket columns
  CreateTableBuilder clusterBy(long numBuckets, String ... columns);
        
  //Creates ORC table in Hive from builder instance
  void create();
}