Integrating Hive and Kafka
Also available as:
PDF

Writing transformed data to Kafka

You can extract, transform, and load a Hive table to a Kafka topic for real-time streaming of a large volume of Hive data. You need some understanding of write semantics and the metadata columns required for writing data to Kafka.

Write semantics

The Hive-Kafka connector supports the following write semantics:

  • At least once (default)
  • Exactly once
At least once (default)
At least once is the most common write semantic used by streaming engines. The internal Kafka producer retries on errors. In the event of an undelivered message, the exception is raised to the task level that causes its restart, and thus more retries. At least once leads to one of the following conclusions:
  • If the job succeeds, each record is guaranteed to be delivered at least once.
  • If the job fails, some of the records might be lost and some might not be sent. You can retry the query, which eventually leads to the delivery of each record at least once.
Exactly once
Following the exactly once semantic, the Hive job ensures that either every record is delivered exactly once, or nothing is delivered. You can use only Kafka brokers supporting the Transaction API (0.11.0.X or later). To use this semantic, you need to set the following table property "kafka.write.semantic"="EXACTLY_ONCE".

Metadata columns

In addition to the user row payload, the insert statement must include values for the following extra columns:

__key
You can set the value of this metadata column to null, but using a meaningful key value to avoid unbalanced partitions is recommended. Any binary value is valid.
__partition
The recommended value is null unless you want to route the record to a particular partition. Do not use a non-existing partition value. Doing so results in an error.
__offset
The value is fixed at -1. Kafka does not allow you to set this value.
__timestamp
You can set this value to a meaningful timestamp, represented as the number of milliseconds since epoch. Optionally, you can set this value to null or -1, which means the Kafka broker strategy sets the timestamp column.