3.1. Writing Data with the Storm-Hdfs Connector

The storm-hdfs connector provides the following key features:

  • Supports HDFS 2.x

  • Supports HA-enabled clusters

  • Supports both text and sequence files

  • Configurable directory and file names

  • Customizable synchronization, rotation policies, and rotation actions

  • Tuple fails if HDFS update fails

  • Supports the Trident API

  • Supports writing to kerberized Hadoop cluster

The primary classes of the storm-hdfs connector are HdfsBolt and SequenceFileBolt, both located in the org.apache.storm.hdfs.bolt package. Use the HDFSBolt class to write text data to HDFS and the SequenceFileBolt class to write binary data. Storm developers specify the following information when instantiating the bolt:

 

Table 2.4. HdfsBolt Methods

HdfsBolt MethodDescription
withFsUrlSpecifies the target HDFS URL and port number for clusters running without HA. For clusters with HA enabled, this parameter specifies the nameservice ID in the following format: hdfs://nameserviceID . No port number is specified when passing a nameservice ID for an HA-enabled cluster. You can find the nameservice ID as the value asigned to the dfs.nameservices parameter in the core-site.xml configuration file.
withRecordFormatSpecifies the delimiter that indicates a boundary between data records. Storm developers can customize by writing their own implementation of the org.apache.storm.hdfs.format.RecordFormat interface. Use the provided org.apache.storm.hdfs.format.DelimitedRecordFormat class as a convenience class for writing delimited text data with delimiters such as tabs, comma-separated values, and pipes. The storm-hdfs bolt uses the RecordFormat implementation to convert tuples to byte arrays, so this method can be used with both text and binary data.
withRotationPolicySpecifies when to stop writing to a data file and begin writing to another. Storm developers can customize by writing their own implementation of the org.apache.storm.hdfs.rotation.FileSizeRotationSizePolicy interface.
withSyncPolicySpecifies how frequently to flush buffered data to the HDFS filesystem. This action enables other Hive clients to read the synchronized data, even as the Storm client continues to write data. Storm developers can customize by writing their own implementation of the org.apache.storm.hdfs.sync.SyncPolicy interface.
withFileNameFormatSpecifies the name of the data file. Storm developers can customize by writing their own interface of the org.apache.storm.hdfs.format.FileNameFormat interface. The provided org.apache.storm.hdfs.format.DefaultFileNameFormat creates file names with the following naming format: {prefix}-{componentId}-{taskId}-{rotationNum}-{timestamp}-{extension}. For example, MyBolt-5-7-1390579837830.txt.


Example: Cluster Without HA

The following example specifies an HDFS path of hdfs://localhost:54310/foo, pipe-delimited records ('|'), filesystem sychronization every 1,000 tuples, and data file rotation when files reach five MB. The HdfsBolt is instantiated with an HDFS URL and port number.

...
// Use pipe as record boundary
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// Rotate data files when they reach five MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo");

// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
         .withFsURL("hdfs://localhost:54310")
         .withFileNameFormat(fileNameFormat)
         .withRecordFormat(format)
         .withRotationPolicy(rotationPolicy)
         .withSyncPolicy(syncPolicy);
...

Example: HA-Enabled Cluster

The following example demonstrates how to modify the previous example to run on an HA-enabled cluster. The HdfsBolt is instantiated with with a nameservice ID rather than an HDFS URL and port number.

...
HdfsBolt bolt = new HdfsBolt()
           .withFsURL("hdfs://myNameserviceID")
           .withFileNameFormat(fileNameformat)
           .withRecordFormat(format)
           .withRotationPolicy(rotationPolicy)
           .withSynPolicy(syncPolicy);
...

Trident API

The storm-hdfs connector supports the Trident API. Trident. Hortonworks recommends that Storm developers use the trident API unless your application requires sub-second latency.

The Trident API implements a StateFactory class with an API that resembles the methods from the storm-code API as shown in the following code sample:

Fields hdfsFields = new Fields("field1", "field2");

 FileNameFormat fileNameFormat = new DefaultFileNameFormat()
              .withPrefix("trident")
              .withExtension(".txt")
              .withPath("/trident");

 RecordFormat recordFormat = new DelimitedRecordFormat()
              .withFields(hdfsFields);

 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

 HdfsState.Options options = new HdfsState.HdfsFileOptions()
             .withFileNameFormat(fileNameFormat)
             .withRecordFormat(recordFormat)
             .withRotationPolicy(rotationPolicy)
             .withFsUrl("hdfs://localhost:54310");

 StateFactory factory = new HdfsStateFactory().withOptions(options);

 TridentState state = stream
             .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());

See the javadoc for the Trident API, included with the storm-hdfs connector, for more information.

Limitations

Directory and file names changes are limited to a prepackaged file name format based on a timestamp.