Adding Functionality to Apache NiFi
Also available as:
PDF
loading table of contents...

Route Streams Based on Content (One-to-Many)

The previous description of Route Based on Content (One-to-Many) provides an abstraction for creating a very powerful Processor. However, it assumes that each FlowFile will be routed in its entirety to zero or more Relationships. What if the incoming data format is a "stream" of many different pieces of information - and we want to send different pieces of this stream to different Relationships? For example, imagine that we want to have a RouteCSV Processor such that it is configured with multiple Regular Expressions. If a line in the CSV file matches a Regular Expression, that line should be included in the outbound FlowFile to the associated relationship. If a Regular Expression is associated with the Relationship "has-apples" and that Regular Expression matches 1,000 of the lines in the FlowFile, there should be one outbound FlowFile for the "has-apples" relationship that has 1,000 lines in it. If a different Regular Expression is associated with the Relationship "has-oranges" and that Regular Expression matches 50 lines in the FlowFile, there should be one outbound FlowFile for the "has-oranges" relationship that has 50 lines in it. I.e., one FlowFile comes in and two FlowFiles come out. The two FlowFiles may contain some of the same lines of text from the original FlowFile, or they may be entirely different. This is the type of Processor that we will discuss in this section.

This Processor's name starts with "Route" and ends with the name of the data type that it routes. In our example here, we are routing CSV data, so the Processor is named RouteCSV. This Processor supports dynamic properties. Each user-defined property has a name that maps to the name of a Relationship. The value of the Property is in the format necessary for the "Match Criteria." In our example, the value of the property must be a valid Regular Expression.

This Processor maintains an internal ConcurrentMap where the key is a Relationship and the value is of a type dependent on the format of the Match Criteria. In our example, we would maintain a ConcurrentMap<Relationship, Pattern>. This Processor overrides the onPropertyModified method. If the new value supplied to this method (the third argument) is null, the Relationship whose name is defined by the property name (the first argument) is removed from the ConcurrentMap. Otherwise, the new value is processed (in our example, by calling Pattern.compile(newValue)) and this value is added to the ConcurrentMap with the key again being the Relationship whose name is specified by the property name.

This Processor will override the customValidate method. In this method, it will retrieve all Properties from the ValidationContext and count the number of PropertyDescriptors that are dynamic (by calling isDynamic() on the PropertyDescriptor). If the number of dynamic PropertyDescriptors is 0, this indicates that the user has not added any Relationships, so the Processor returns a ValidationResult indicating that the Processor is not valid because it has no Relationships added.

The Processor returns all of the Relationships specified by the user when its getRelationships method is called and will also return an unmatched Relationship. Because this Processor will have to read and write to the Content Repository (which can be relatively expensive), if this Processor is expected to be used for very high data volumes, it may be advantageous to add a Property that allows the user to specify whether or not they care about the data that does not match any of the Match Criteria.

When the onTrigger method is called, the Processor obtains a FlowFile via ProcessSession.get. If no data is available, the Processor returns. Otherwise, the Processor creates a Map<Relationship, FlowFile>. We will refer to this Map as flowFileMap. The Processor reads the incoming FlowFile by calling ProcessSession.read and provides an InputStreamCallback. From within the Callback, the Processor reads the first piece of data from the FlowFile. The Processor then evaluates each of the Match Criteria against this piece of data. If a particular criteria (in our example, a Regular Expression) matches, the Processor obtains the FlowFile from flowFileMap that belongs to the appropriate Relationship. If no FlowFile yet exists in the Map for this Relationship, the Processor creates a new FlowFile by calling session.create(incomingFlowFile) and then adds the new FlowFile to flowFileMap. The Processor then writes this piece of data to the FlowFile by calling session.append with an OutputStreamCallback. From within this OutputStreamCallback, we have access to the new FlowFile's OutputStream, so we are able to write the data to the new FlowFile. We then return from the OutputStreamCallback. After iterating over each of the Match Criteria, if none of them match, we perform the same routines as above for the unmatched relationship (unless the user configures us to not write out unmatched data). Now that we have called session.append, we have a new version of the FlowFile. As a result, we need to update our flowFileMap to associate the Relationship with the new FlowFile.

If at any point, an Exception is thrown, we will need to route the incoming FlowFile to failure. We will also need to remove each of the newly created FlowFiles, as we won't be transferring them anywhere. We can accomplish this by calling session.remove(flowFileMap.values()). At this point, we will log the error and return.

Otherwise, if all is successful, we can now iterate through the flowFileMap and transfer each FlowFile to the corresponding Relationship. The original FlowFile is then either removed or routed to an original relationship. For each of the newly created FlowFiles, we also emit a Provenance ROUTE event indicating which Relationship the FlowFile went to. It is also helpful to include in the details of the ROUTE event how many pieces of information were included in this FlowFile. This allows DataFlow Managers to easily see when looking at the Provenance Lineage view how many pieces of information went to each of the relationships for a given input FlowFile.

Additionally, some Processors may need to "group" the data that is sent to each Relationship so that each FlowFile that is sent to a relationship has the same value. In our example, we may wan to allow the Regular Expression to have a Capturing Group and if two different lines in the CSV match the Regular Expression but have different values for the Capturing Group, we want them to be added to two different FlowFiles. The matching value could then be added to each FlowFile as an Attribute. This can be accomplished by modifying the flowFileMap such that it is defined as Map<Relationship, Map<T, FlowFile>> where T is the type of the Grouping Function (in our example, the Group would be a String because it is the result of evaluating a Regular Expression's Capturing Group).