Creating a Streaming Threat Intel Feed Source
Streaming intelligence feeds and are incorporated slightly differently than data from a flat CSV file. This section describes how to define a streaming source.
Because we are defining a streaming source, we need to define a parser topology to
handle the streaming data. In order to do that, we will need to create a file in
$METRON_HOME/zookeeper/parsers/user.json
.
Define a parser topology to handle the streaming data:
touch $METRON_HOME/zookeeper/parsers/user.json
Populate the file the parser topology definition.
The following example assumes CSV data where the first field is a user and the second field is an ip address (for example, my_username,127.0.0.1).
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter" ,"sensorTopic":"user" ,"parserConfig": { "shew.table" : "threatintel" ,"shew.cf" : "t" ,"shew.keyColumns" : "ip" ,"shew.enrichmentType" : "user" ,"columns" : { "user" : 0 ,"ip" : 1 } } }
where
- parserClassName
The parser name.
- writerClassName
The writer destination. For streaming parsers, the destination is
SimpleHbaseEnrichmentWriter
.- sensorTopic
Name of the sensor topic.
- shew.table
The simple HBase enrichment writer (shew) table to which we want to write.
- shew.cf
The simple HBase enrichment writer (shew) column family.
- shew.keyColumns
The simple HBase enrichment writer (shew) key.
- shew.enrichmentType
The simple HBase enrichment writer (shew) enrichment type.
- columns
The CSV parser information. For our example, this information is the user name and IP address.
This file fully defines the input structure and how that data can be used in enrichment.
Push the configuration file to ZooKeeper:
Create a Kafka topic:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER_HOST:2181 --replication-factor 1 --partitions 1 --topic user
When you create the Kafka topic, consider how much data will be flowing into this topic.
Push the configuration file to ZooKeeper.
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/zookeeper
Now that you are ingesting data into HBase, you need to use that enrichment data to enrich a separate datasource.
Set up an enrichment for
$METRON_HOME/config/zookeeper/enrichments/$DATASOURCE
to associate the ip_src_addr with the user enrichment that you streamed in from above.For example:
{ "enrichment" : { "fieldMap" : { "hbaseEnrichment" : [ "ip_src_addr" ] }, "fieldToTypeMap" : { "ip_src_addr" : [ "user" ] }, "config" : { } }, "threatIntel" : { "fieldMap" : { }, "fieldToTypeMap" : { }, "config" : { }, "triageConfig" : { "riskLevelRules" : { }, "aggregator" : "MAX", "aggregationConfig" : { } } }, "configuration" : { } }
Push this configuration to ZooKeeper:
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/zookeeper