Integrating Hive and Kafka
Also available as:
PDF

Kafka storage handler and table properties

You use the Kafka storage handler and table properties to specify the query connection and configuration.

Kafka storage handler

You specify 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' in queries to connect to, and transform a Kafka topic into, a Hive table. In the definition of an external table, the storage handler creates a view over a single Kafka topic. For example, to use the storage handler to connect to a topic, the following table definition specifies the storage handler and required table properties: the topic name and broker connection string.
CREATE EXTERNAL TABLE kafka_table
   (`timestamp` timestamp , `page` string,  `newPage` boolean, 
   added int, deleted bigint, delta double)
   STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
   TBLPROPERTIES
   ("kafka.topic" = "test-topic", "kafka.bootstrap.servers"="localhost:9092");
kafka.topic
The Kafka topic to connect to
kafka.bootstrap.servers
The broker connection string

Storage handler-based optimizations

The storage handler can optimize reads using a filter push-down when executing a query such as the following time-based lookup supported on Kafka 0.11 or later. The Kafka consumer supports seeking on the stream based on an offset, which the storage handler leverages to push down filters over metadata columns.

SELECT COUNT(*) FROM kafka_table 
   WHERE `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '10' MINUTES) ; 

The storage handler in this example performs seeks based on the Kafka record __timestamp to read only recently arrived data.

The following logical operators and predicate operators are supported in the WHERE clause:

Logical operators: OR, AND

Predicate operators: <, <=, >=, >, =

The storage handler reader optimizes seeks by performing partition pruning to go directly to a particular partition offset used in the WHERE clause.

SELECT COUNT(*)  FROM kafka_table 
  WHERE (`__offset` < 10 AND `__offset` > 3 AND `__partition` = 0) 
  OR (`__partition` = 0 AND `__offset` < 105 AND `__offset` > 99) 
  OR (`__offset` = 109);

The storage handler scans partition 0 only, and then read only records between offset 4 and 109.

Kafka metadata

In addition to the user-defined payload schema, the Kafka storage handler appends to the table the following additional columns, which you can use to query the Kafka metadata fields:

__key
Kafka record key (byte array)
__partition
Kafka record partition identifier (int 32)
__offset
Kafka record offset (int 64)
__timestamp
Kafka record timestamp (int 64)

The partition identifier, record offset, and record timestamp plus a key-value pair constitute a Kafka record. Because the key-value is a 2-byte array, you need to use a Serde classes to transform the array into a set of columns.

Table Properties

You can use the following properties in the TBLPROPERTIES clause of a Hive query that specifies the Kafka storage handler:
Property Description Required Default
kafka.topic Kafka topic name to map the table to. Yes null
kafka.bootstrap.servers Table property indicating Kafka broker(s) connection string. Yes null
kafka.serde.class Serializer and Deserializer class implementation. No org.apache.hadoop.hive.serde2.JsonSerDe
hive.kafka.poll.timeout.ms Parameter indicating Kafka Consumer poll timeout period in millis. FYI this is independent from internal Kafka consumer timeouts. No 5000 (5 Seconds)
hive.kafka.max.retries Number of retries for Kafka metadata fetch operations. No 6
hive.kafka.metadata.poll.timeout.ms Number of milliseconds before consumer timeout on fetching Kafka metadata. No 30000 (30 Seconds)
kafka.write.semantic Writer semantic, allowed values (NONE, AT_LEAST_ONCE, EXACTLY_ONCE) No AT_LEAST_ONCE