Getting Started with Streaming Analytics
Also available as:
PDF
loading table of contents...

Chapter 3. Creating a Dataflow Application

Data Producer Application Generates Events

The following is a sample of a raw truck event stream generated by the sensors.

The date producing application or data simulator publishes these serialized Avro raw events into Kafka topics. The following is what the raw event looks like serialized into Avro using the Schema Registry.

NiFi: Create a Dataflow Application

To make things easier to setup, import the NiFi Template for this flow by downloading it this Github location. After importing, select Use Case 1 process group. The below instructions are with respect to that flow.

NiFi Controller Services

Click on Flow Configuration Settings icon and select Controller Services tab.

Hortonworks Schema Registry Controller Service

  1. Click on Flow Configuration Settings icon and select Controller Services tab.

  2. You will see the HWX Schema Registry controller service. Edit the properties to configure the Schema Registry URL based on your environment. You can find this value in the Streaming Analytics Manager Service in Ambari for the configuration property called registry.url. An example of what the URL looks similar to http://$REGISTRY_SERVER:7788/api/v1.

  3. Enable this controller service.

RecordReader and RecordWriter Controller Services

The RecordReader and RecordWriter controller services are new controller services that allows you convert events from one type (json, xml, csv, Avro) to another (json, xml, csv, Avro). These controller services use the Schema Registry to fetch the schema for the event to do this conversion. There are a number of different schema access strategies you can configure on the RecordReader and RecordWriter to tell the Record Reader/Writer how to look up the schema information. For example, if you are reading records serialized by the Hortonworks Schema Registry, the schema identifier required to look up the schema in the registry is embedded in the header of the payload. Hence, the RecordReader would use the schema access strategy called "HWX Content-Encoded Schema Reference". The following are the RecordReader and RecordWriter controller services used for the NiFi template imported:

  • Avro Truck Events - Reads Avro events and looks up the schema id via the HWX Content-Encoded Schema Reference strategy. This schema id is then used to query the schema from the Hortonworks Schema Registry.

  • CSV Truck Events - Reads csv events and looks up the schema name from the value of the "Schema Name" attribute. This schema lookup strategy is called the "Use 'Schema Name' Property" access strategy. This value of this schema name property is then used to query the schema from the Hortonworks Schema Registry.

  • AvroRecordSetWriter - Writes events into Avro and looks up the schema identifier info using the HWX Schema Reference Attribute strategy. This controller also uses a write strategy of HWX Content-Encoded Schema Reference where the Avro object will have schema identifier information pre-appended on the header.

  • AvroRecordSetWriter-Read-Schema-From-HWX-Via-Schema-Name - Writes events into Avro and looks up the schema using the Schema Name access strategy. This controller also uses a write strategy of HWX Content-Encoded Schema Reference where the Avro object will have schema identifier information pre-appended on the header.

  • CSVRecordSetWriter - Writes events into CSV and looks up the schema identifier using the HWX Schema Reference Attribute strategy. The write schema strategy is also HWX Schema Reference attributes. This means when the csv is written the schema identifier information is stored in named attributes of the flow file.

  • CSVRecordSetWriter-Read-Schema-From-HWX-Embedded - Similar to the previous csv writer but the schema identifier is looked using the HWX Content-Encoded Schema Reference strategy.

Enable all of these controller services.

NiFi Ingests the Raw Sensor Events

In the Use Case 1 process group, go into the "Acquire Events" process group. The first step in the NiFi flow is to ingest the raw serialized Avro events from the two Kafka topics. We will use the new ConsumerKafkaRecord processor for this.

Both ConsumerKafkaRecord processors are configured with an AvroReader controller service and the CSVRecordSetWriter-Read-Schema-From-HWX-Embedded controller service to convert from Avro to CSV using a schema.

[Note]Note

Make sure for both processors, you change the Kafka Brokers property value to your cluster settings.

Publish Enriched Events to Kafka for Consumption by Analytics Applications

After NiFi has done the routing, transforms, and enrichment, NiFi will publish the enriched events into Kafka topics. These topics have a schema registered for it in the Schema Registry and we will store the schema identifier for the schema in the FlowFile attributes (UpdateAttribute processors) and use the PublishKafkaRecord processor to push the events into Kafka.

The PublishKafkaRecord processor is configured with the controller service 'CSV Truck Events' for the Record Reader and uses the AvroRecordSetWriter to write the events into Avro. It is a serialized Avro object with the schema identifier in the header that gets published to Kafka for consumption by SAM.

[Note]Note

Make sure for the PublishKafkaRecord, you change the Kafka Brokers property value to your cluster settings.

Start the NiFi Flow

Start the Process Grouped called "Use Case 1".