Hortonworks Cybersecurity Platform
Also available as:
PDF
loading table of contents...

Chain Parsers

For ease of explanation, the following steps use the Grok parser format example provided in Step 1c.

Many sensors contain metadata that should be ingested along with the data or contain different sensor types that need to be parsed separately. You can chain multiple parsers for a sensor to individually address the different types of information in the sensor. For example, you can parse multiple components in a Syslog log file such as timestamp, message type, and message payload, to differentiate the information contained in the log file. To chain parsers, you need an enveloping parser and sub-parsers for one or more sensor types.

  1. Before editing configurations, pull the configurations from ZooKeeper locally:
    $METRON_HOME/bin/zk_load_configs.sh --mode PULL -z $ZOOKEEPER -o $METRON_HOME/config/zookeeper/ -f
    For ease of explanation, steps in this topic use the Grok parser format example provided in Step 2c.
  2. Determine the format of the new data source’s log entries, so you can parse them.
  3. Create a statement that defines the pattern of the parser expression for the log type for your enveloping parser.
    For ease of explanation, we assume that we are using a Grok topology. Refer to the Grok documentation for additional details.
  4. Save the Grok statement and load it into Hadoop Distributed File System (HDFS) in a named location:
    1. Create a local file for the new data source:
      touch /tmp/$ENVELOPE_DATASOURCE
    2. Open $ENVELOPE_DATASOURCE and add the Grok statement defined in Step 3:
      vi /tmp/$ENVELOPE_DATASOURCE
    3. Put the $ENVELOPE_DATASOURCE file into the HDFS directory where Metron stores its Grok parsers.
      Existing Grok parsers that ship with HCP are staged under /apps/metron/patterns:
      su - hdfs 
      hadoop fs -rmr /apps/metron/patterns/$ENVELOPE_DATASOURCE 
      hdfs dfs -put /tmp/$ENVELOPE_DATASOURCE /apps/metron/patterns/
  5. Define the enveloping parser configuration.
    1. As root, log into the host with HCP installed:
      ssh $HCP_HOST
    2. Create a $DATASOURCE envelope parser configuration file at $METRON_HOME/config/zookeeper/parsers/$ENVELOPE_DATASOURCE.json:
      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic $ENVELOPE_DATASOURCE --partitions 1 --replication-factor 1
    3. Verify your new topic by listing the Kafka topics:
      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --list
    4. Populate the $ENVELOPE_PARSER Kafka topic with the following:
      {   
           "parserClassName": "org.apache.metron.parsers.GrokParser",  
           "sensorTopic": "$ENVELOPE_DATASOURCE",
           "parserConfig": {   
             "grokPath": "/apps/metron/patterns/$ENVELOPE_DATASOURCE", 
             "batchSize" : 1,
             "patternLabel": "$DATASOURCE_DELIMITED", 
             "timestampField": "timestamp" 
             "timeFields" : [ "timestamp" ],
             "dateFormat" : "MMM dd yyyy HH:mm:ss",
             "kafka.topicField" : "logical_source_type"
      } 
      The important parameters to set for this parser are the following:
      parserClassName

      The name of the parser's class in the .jar file.

      sensorTopic

      The Kafka topic on which the telemetry is being streamed. If the topic is prefixed and suffixed by / then it is assumed to be a regex and will match any topic matching the pattern (for example, /bro.*/ matches bro_cust0, bro_cust1 and bro_cust2).

      parserConfig

      A JSON map defining the parser implementation configuration.

      For an envelope parser, this parameter specifies that the parser will send messages to the topic specified in the logical_source_type field. If the field does not exist, then the message is not sent.

    EXAMPLE for Envelope Parser
    The following is an example of an envelope parser called pix_syslog_router configured to:
    • Parse the timestamp field
    • Parse the payload into a field called data (messageField" : "data)
    • Parse the tag into a field called pix_type (input": "pix_type)
    • Route the enveloped message to the appropriate Kafka topic based on the tag. In this case, it's called logical_source_type.
    The envelope parser will send output to two sub-parsers:
    • cisco-6-302 - Connection creation and teardown messages, for example, Built UDP connection for faddr 198.207.223.240/53337 gaddr 10.0.0.187/53 laddr 192.168.0.2/53
    • cisco-5-304 - URL access events, for example 192.168.0.2 Accessed URL 66.102.9.99:/
    In order for this parser configuration to work, you must create a file called cisco_patterns and populate it with the following grok expressions:
    CISCO_ACTION Built|Teardown|Deny|Denied|denied|requested|permitted|denied by ACL|discarded|est-allowed|Dropping|created|deleted
    CISCO_REASON Duplicate TCP SYN|Failed to locate egress interface|Invalid transport field|No matching connection|DNS Response|DNS Query|(?:%{WORD}\s*)*
    CISCO_DIRECTION Inbound|inbound|Outbound|outbound
    CISCOFW302020_302021 %{CISCO_ACTION:action}(?:%{CISCO_DIRECTION:direction})? %{WORD:protocol} connection %{GREEDYDATA:ignore} faddr %{IP:ip_dst_addr}/%{INT:icmp_seq_num}(?:\(%{DATA:fwuser}\))? gaddr %{IP:ip_src_xlated}/%{INT:icmp_code_xlated} laddr %{IP:ip_src_addr}/%{INT:icmp_code}( \(%{DATA:user}\))?
    ACCESSED %{URIHOST:ip_src_addr} Accessed URL %{IP:ip_dst_addr}:%{URIPATHPARAM:uri_path}
    CISCO_PIX %{GREEDYDATA:timestamp}: %PIX-%{NOTSPACE:pix_type}: %{GREEDYDATA:data}
    Place the file at /tmp/cisco_patterns in HDFS by using:
    hadoop fs -put ~/cisco_patterns /tmp
    Parser Configuration
    {
       "parserClassName" : "org.apache.metron.parsers.GrokParser"
      ,"sensorTopic" : "pix_syslog_router"
      , "parserConfig": {
         "grokPath": "/tmp/cisco_patterns",
         "batchSize" : 1,
         "patternLabel": "CISCO_PIX",
         "timestampField": "timestamp",
         "timeFields" : [ "timestamp" ],
         "dateFormat" : "MMM dd yyyy HH:mm:ss",
         "kafka.topicField" : "logical_source_type"
       }
      ,"fieldTransformations" : [
        {
         "transformation" : "REGEX_SELECT"
        ,"input" :  "pix_type"
        ,"output" :  "logical_source_type"
        ,"config" : {
          "cisco-6-302" : "^6-302.*",
          "cisco-5-304" : "^5-304.*"
                    }
        }
                               ]
    }
    fieldTransformations

    An array of complex objects representing the transformations to be performed on the message generated from the parser before writing to the Kafka topic.

    For this example, this parameter includes the following options:

    • transformation - The REGEX_SELECT field transformation sets the logical_source_type field based on the value of the input value.
    • input - Determines the subparser type.
    • output - The output of the field transform.
    • config - The name of the sub-parsers and the REGEX that matches them.
  6. Define one or more sub-parser configurations.
    1. As root, log into the host with HCP installed:
      ssh $HCP_HOST
    2. Create a $DATASOURCE sub-parser configuration file at $METRON_HOME/config/zookeeper/parsers/$SUBPARSER_DATASOURCE.json:
      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic $SUBPARSER_DATASOURCE --partitions 1 --replication-factor 1
    3. Populate the $SUBPARSER_DATASOURCE.json file with the following:
      {   
           "parserClassName": "org.apache.metron.parsers.GrokParser",  
           "sensorTopic": "$SUBPARSER_DATASOURCE",
           "rawMessageStrategy" : "ENVELOPE"
           ,"rawMessageStrategyConfig" : {
              "messageField" : "data",
              "metadataPrefix" : ""
           "parserConfig": {   
             "grokPath": "/apps/metron/patterns/$SUBPARSER_DATASOURCE", 
             "batchSize" : 1,
             "patternLabel": "$DATASOURCE_DELIMITED", 
             "timestampField": "timestamp" 
             "timeFields" : [ "timestamp" ],
             "dateFormat" : "MMM dd yyyy HH:mm:ss",
             "kafka.topicField" : "logical_source_type"
           }
      } 
      The important parameters to set for this parser are the following:
      parserClassName

      The name of the parser's class in the .jar file.

      sensorTopic

      The Kafka topic on which the telemetry is being streamed. If the topic is prefixed and suffixed by / then it is assumed to be a regex and will match any topic matching the pattern (for example, /bro.*/ matches bro_cust0, bro_cust1 and bro_cust2).

      rawMessageStrategyConfig
      This is a strategy that indicates how to read data and metadata. The strategies supported are:
      • DEFAULT - Data is read directly from the Kafka record value and metadata, if any, is read from the Lafka record key. This strategy defaults to not reading metadata and not merging metadata.
      • ENVELOPE - Data from Kafka record value is presumed to be a JSON blob. One of these fields must contain the raw data to pass to the parser. All other fields should be considered metadata. The field containing the raw data is specified in rawMessageStrategyConfig. Data held in the Kafka key as well as the non-data fields in the JSON blob passed into the Kafka value are considered metadata. Note that the exception to this is that any original_string field is inherited from the envelope data so that the original string contains the envelope data. If you do not prefer this behavior, remove this field from the envelope data.
      rawMessageStrategyConfig
      The configuration (a map) for the rawMessageStrategy. Available configurations are strategy dependent:
      • DEFAULT - metadataPrefix defines the key prefix for metadata (default is metron.metadata).
      • ENVELOPE - metadataPrefix defines the key prefix for metadata (default is metron.metadata)

        messageField defines the field from the envelope to use as the data. All other fields are considered metadata.

      parserConfig

      A JSON map defining the parser implementation configuration.

      For a chained parser, this parameter specifies that the parser will send messages to the topic specified in the logical_source_type field. If the field does not exist, then the message is not sent.

      This parameter also includes batch sizing and timeout settings for writer configuration. If you do not define these properties, the system uses their default values.

      • grokPath - The path for the Grok statement.
      • batchSize - Number of records to batch together before sending to the writer. Default is 15.
      • patternLabel - The name of the Grok statement that defines the pattern of the Grok expression.
      • kafka.topicField - Specifies the topic as the value of a particular field.

        This field enables the routing capabilities necessary for handling enveloped date. sIf this value is unpopulated, the message is dropped.

      EXAMPLE for Sub-Parser
      The following is an example of a parser called cisco-6-302 configured to append to the existing fields from the pix_syslog_router the sensor specific fields based on the tag type.
      {
         "parserClassName" : "org.apache.metron.parsers.GrokParser"
        ,"sensorTopic" : "cisco-6-302"
        ,"rawMessageStrategy" : "ENVELOPE"
        ,"rawMessageStrategyConfig" : {
            "messageField" : "data",
            "metadataPrefix" : ""
        }
        , "parserConfig": {
           "grokPath": "/tmp/cisco_patterns",
           "batchSize" : 1,
           "patternLabel": "CISCOFW302020_302021"
         }
      }
  7. Use the following script to upload configurations to Apache ZooKeeper:
    $METRON_HOME/bin/zk_load_configs.sh --mode PUSH -i $METRON_HOME/config/zookeeper -z $ZOOKEEPER
  8. Deploy the new parser topology to the cluster:
    1. Log in to the host that has Metron installed as root user.
    2. Deploy the new parser topology:
      $METRON_HOME/bin/start_parser_topology.sh -k $KAFKA -z $ZOOKEEPER -s $DATASOURCE
    3. Use the Apache Storm UI to verify that the new topology is listed and that it has no errors.
    This new data source processor topology ingests from the $DATASOURCE Kafka topic that you created earlier and then parses the event with the HCP Grok framework using the Grok pattern defined earlier.