Creating a Streaming Threat Intel Feed Source
Streaming intelligence feeds and are incorporated slightly differently than data from a flat CSV file. This section describes how to define a streaming source.
Because we are defining a streaming source, we need to define a parser topology to
handle the streaming data. In order to do that, we will need to create a file in
$METRON_HOME/zookeeper/parsers/user.json
.
Define a parser topology to handle the streaming data:
touch $METRON_HOME/zookeeper/parsers/user.json
Populate the file the parser topology definition. For example:
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter" ,"sensorTopic":"user" ,"parserConfig": { "shew.table" : "threatintel" ,"shew.cf" : "t" ,"shew.keyColumns" : "ip" ,"shew.enrichmentType" : "user" ,"columns" : { "user" : 0 ,"ip" : 1 } } }
where
- parserClassName
The parser name.
- writerClassName
The writer destination. For streaming parsers, the destination is
SimpleHbaseEnrichmentWriter
.- sensorTopic
Name of the sensor topic.
- shew.table
The simple HBase enrichment writer (shew) table to which we want to write.
- shew.cf
The simple HBase enrichment writer (shew) column family.
- shew.keyColumns
The simple HBase enrichment writer (shew) key.
- shew.enrichmentType
The simple HBase enrichment writer (shew) enrichment type.
- columns
The CSV parser information. For our example, this information is the user name and IP address.
This file fully defines the input structure and how that data can be used in enrichment.
Push the configuration file to ZooKeeper:
Create a Kafka topic:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER_HOST:2181 --replication-factor 1 --partitions 1 --topic user
When you create the Kafka topic, consider how much data will be flowing into this topic.
Push the configuration file to ZooKeeper.
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/zookeeper
Edit the new data source enrichment configuration at
$METRON_HOME/config/zookeeper/enrichments/$DATASOURCE
to associate theip_src_addr
with the user enrichment.For example:
{ "enrichment" : { "fieldMap" : { "hbaseEnrichment" : [ "ip_src_addr" ] }, "fieldToTypeMap" : { "ip_src_addr" : [ "user" ] }, "config" : { } }, "threatIntel" : { "fieldMap" : { }, "fieldToTypeMap" : { }, "config" : { }, "triageConfig" : { "riskLevelRules" : { }, "aggregator" : "MAX", "aggregationConfig" : { } } }, "configuration" : { } }
Push this configuration to ZooKeeper:
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/zookeeper