The storm-hdfs
connector provides the following key features:
Supports HDFS 2.x
Supports HA-enabled clusters
Supports both text and sequence files
Configurable directory and file names
Customizable synchronization, rotation policies, and rotation actions
Tuple fails if HDFS update fails
Supports the Trident API
Supports writing to kerberized Hadoop cluster
The primary classes of the storm-hdfs
connector are HdfsBolt
and SequenceFileBolt
, both located in the
org.apache.storm.hdfs.bolt
package. Use the
HDFSBolt
class to write text data to HDFS and the
SequenceFileBolt
class to write binary data. Storm developers
specify the following information when instantiating the bolt:
Table 2.4. HdfsBolt Methods
HdfsBolt Method | Description |
---|---|
withFsUrl | Specifies the target HDFS URL and port number for clusters
running without HA. For clusters with HA enabled, this parameter
specifies the nameservice ID in the following format:
hdfs://nameserviceID . No port number is
specified when passing a nameservice ID for an HA-enabled
cluster. You can find the nameservice ID as the value asigned
to the dfs.nameservices parameter in the
core-site.xml configuration file. |
withRecordFormat | Specifies the delimiter that indicates a boundary between data
records. Storm developers can customize by writing their own
implementation of the
org.apache.storm.hdfs.format.RecordFormat
interface. Use the provided
org.apache.storm.hdfs.format.DelimitedRecordFormat
class as a convenience class for writing delimited text data
with delimiters such as tabs, comma-separated values, and pipes.
The storm-hdfs bolt uses the
RecordFormat implementation to convert
tuples to byte arrays, so this method can be used with both text
and binary data. |
withRotationPolicy | Specifies when to stop writing to a data file and begin writing
to another. Storm developers can customize by writing their own
implementation of the
org.apache.storm.hdfs.rotation.FileSizeRotationSizePolicy
interface. |
withSyncPolicy | Specifies how frequently to flush buffered data to the HDFS
filesystem. This action enables other Hive clients to read the
synchronized data, even as the Storm client continues to write
data. Storm developers can customize by writing their own
implementation of the
org.apache.storm.hdfs.sync.SyncPolicy
interface. |
withFileNameFormat | Specifies the name of the data file. Storm developers can
customize by writing their own interface of the
org.apache.storm.hdfs.format.FileNameFormat
interface. The provided
org.apache.storm.hdfs.format.DefaultFileNameFormat
creates file names with the following naming format:
{prefix}-{componentId}-{taskId}-{rotationNum}-{timestamp}-{extension}.
For example, MyBolt-5-7-1390579837830.txt . |
Example: Cluster Without HA
The following example specifies an HDFS path of
hdfs://localhost:54310/foo
, pipe-delimited records ('|'),
filesystem sychronization every 1,000 tuples, and data file rotation when files
reach five MB. The HdfsBolt is instantiated with an HDFS URL and port
number.
... // Use pipe as record boundary RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples SyncPolicy syncPolicy = new CountSyncPolicy(1000); // Rotate data files when they reach five MB FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB); // Use default, Storm-generated file names FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo"); // Instantiate the HdfsBolt HdfsBolt bolt = new HdfsBolt() .withFsURL("hdfs://localhost:54310") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); ...
Example: HA-Enabled Cluster
The following example demonstrates how to modify the previous example to run on an HA-enabled cluster. The HdfsBolt is instantiated with with a nameservice ID rather than an HDFS URL and port number.
... HdfsBolt bolt = new HdfsBolt() .withFsURL("hdfs://myNameserviceID") .withFileNameFormat(fileNameformat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSynPolicy(syncPolicy); ...
Trident API
The storm-hdfs
connector supports the Trident API. Trident. Hortonworks
recommends that Storm developers use the trident
API unless your
application requires sub-second latency.
The Trident API implements a StateFactory
class with an API that
resembles the methods from the storm-code
API as shown in the following
code sample:
Fields hdfsFields = new Fields("field1", "field2"); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPrefix("trident") .withExtension(".txt") .withPath("/trident"); RecordFormat recordFormat = new DelimitedRecordFormat() .withFields(hdfsFields); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); HdfsState.Options options = new HdfsState.HdfsFileOptions() .withFileNameFormat(fileNameFormat) .withRecordFormat(recordFormat) .withRotationPolicy(rotationPolicy) .withFsUrl("hdfs://localhost:54310"); StateFactory factory = new HdfsStateFactory().withOptions(options); TridentState state = stream .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
See the javadoc for the Trident API, included with the storm-hdfs
connector, for more information.
Limitations
Directory and file names changes are limited to a prepackaged file name format based on a timestamp.