Using Apache Storm to Move Data
Also available as:
PDF

Core-storm APIs

The following example constructs a Kafka bolt using core Storm APIs:

DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
 .withColumnFields(new Fields(colNames));
HiveOptions hiveOptions = new
HiveOptions(metaStoreURI,dbName,tblName,mapper);
HiveBolt hiveBolt = new HiveBolt(hiveOptions);
  1. Instantiate an Implementation of HiveMapper Interface.

    The storm-hive streaming bolt uses the HiveMapper interface to map the names of tuple fields to the names of Hive table columns. Storm provides two implementations: DelimitedRecordHiveMapper and JsonRecordHiveMapper. Both implementations take the same arguments.

    Table 1. HiveMapper Arguments

    Argument

    Data Type

    Description

    withColumnFields

    org.apache.storm.tuple.Fields

    The name of the tuple fields that you want to map to table column names.

    withPartitionFields

    org.apache.storm.tuple.Fields

    The name of the tuple fields that you want to map to table partitions.

    withTimeAsPartitionField

    String

    Requests that table partitions be created with names set to system time. Developers can specify any Java-supported date format, such as "YYYY/MM/DD".

    The following sample code illustrates how to use DelimitedRecordHiveMapper:

    ...
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
     .withColumnFields(new Fields(colNames))
     .withPartitionFields(new Fields(partNames));
    
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
     .withColumnFields(new Fields(colNames))
     .withTimeAsPartitionField("YYYY/MM/DD");
    ...
  2. Instantiate a HiveOptions Class with the HiveMapper Implementation. The HiveOptions class configures transactions used by Hive to ingest the streaming data:
    ...
    HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
     .withTxnsPerBatch(10)
     .withBatchSize(1000)
     .withIdleTimeout(10);
    ...

    The following table describes all configuration properties for the HiveOptions class.

    Table 2. HiveOptions Class Configuration Properties

    HiveOptions Configuration Property

    Data Type

    Description

    metaStoreURI

    String

    Hive Metastore URI. Storm developers can find this value in hive-site.xml.

    dbName

    String

    Database name

    tblName

    String

    Table name

    mapper

    Mapper

    Two properties that start with

    "org.apache.storm.hive.bolt.":

    mapper.DelimitedRecordHiveMapper

    mapperJsonRecordHiveMapper

    withTxnsPerBatch

    Integer

    Configures the number of desired transactions per transaction batch. Data from all transactions in a single batch form a single compaction file. Storm developers use this property in conjunction with the withBatchSize property to control the size of compaction files. The default value is 100.

    Hive stores data in base files that cannot be updated by HDFS. Instead, Hive creates a set of delta files for each transaction that alters a table or partition and stores them in a separate delta directory. Occasionally, Hive compacts, or merges, the base and delta files. Hive performs all compactions in the background without affecting concurrent reads and writes of other Hive clients.

    withMaxOpenConnections

    Integer

    Specifies the maximum number of open connections. Each connection is to a single Hive table partition. The default value is 500. When Hive reaches this threshold, an idle connection is terminated for each new connection request. A connection is considered idle if no data is written to the table partition to which the connection is made.

    withBatchSize

    Integer

    Specifies the maximum number of Storm tuples written to Hive in a single Hive transaction. The default value is 15000 tuples.

    withCallTimeout

    Integer

    Specifies the interval in seconds between consecutive heartbeats sent to Hive. Hive uses heartbeats to prevent expiration of unused transactions. Set this value to 0 to disable heartbeats. The default value is 240.

    withAutoCreatePartitions

    Boolean

    Indicates whether HiveBolt should automatically create the necessary Hive partitions needed to store streaming data. The default value is true.

    withKerberosPrinicipal

    String

    Kerberos user principal for accessing a secured Hive installation.

    withKerberosKeytab

    String

    Kerberos keytab for accessing a secured Hive installation.

  3. Instantiate the HiveBolt with the HiveOptions class:
    ...
    HiveBolt hiveBolt = new HiveBolt(hiveOptions);
    ...
  4. Before building your topology code, add the following dependency to your topology pom.xml file:
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.3.3</version>
    </dependency>