Using Apache Storm to Move Data
Also available as:
PDF

Trident APIs

The following example shows construction of a Kafka bolt using core Storm APIs, followed by details about the code:

DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
 .withColumnFields(new Fields(colNames))
 .withTimeAsPartitionField("YYYY/MM/DD");
 
HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
 .withTxnsPerBatch(10)
 .withBatchSize(1000)
 .withIdleTimeout(10)
 
StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(),
new Fields());
  1. Instantiate an Implementation of HiveMapper Interface

    The storm-hive streaming bolt uses the HiveMapper interface to map the names of tuple fields to the names of Hive table columns. Storm provides two implementations: DelimitedRecordHiveMapper and JsonRecordHiveMapper. Both implementations take the same arguments.

    Table 1. HiveMapper Arguments

    Argument

    Data Type

    Description

    withColumnFields

    org.apache.storm.tuple.Fields

    The name of the tuple fields that you want to map to table column names.

    withPartitionFields

    org.apache.storm.tuple.Fields

    The name of the tuple fields that you want to map to table partitions.

    withTimeAsPartitionField

    String

    Requests that table partitions be created with names set to system time. Developers can specify any Java-supported date format, such as "YYYY/MM/DD".

    The following sample code illustrates how to use DelimitedRecordHiveMapper:

    ...
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
     .withColumnFields(new Fields(colNames))
     .withPartitionFields(new Fields(partNames));
    
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
     .withColumnFields(new Fields(colNames))
     .withTimeAsPartitionField("YYYY/MM/DD");
    ...
  2. Instantiate a HiveOptions class with the HiveMapper Implementation

    Use the HiveOptions class to configure the transactions used by Hive to ingest the streaming data, as illustrated in the following code sample.

    ...
    HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
     .withTxnsPerBatch(10)
     .withBatchSize(1000)
     .withIdleTimeout(10);
    ...

    See "HiveOptions Class Configuration Properties" for a list of configuration properties for the HiveOptions class.

  3. Instantiate the HiveBolt with the HiveOptions class:
    ...
    StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
    TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(),
    new Fields());
    ...
  4. Before building your topology code, add the following dependency to your topology pom.xml file:
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.3.3</version>
    </dependency>