Adding Functionality to Apache NiFi
Also available as:
PDF
loading table of contents...

Data Ingress

A Processor that ingests data into NiFi has a single Relationship named success. This Processor generates new FlowFiles via the ProcessSession create method and does not pull FlowFiles from incoming Connections. The Processor name starts with "Get" or "Listen," depending on whether it polls an external source or exposes some interface to which external sources can connect. The name ends with the protocol used for communications. Processors that follow this pattern include GetFile, GetSFTP, ListenHTTP, and GetHTTP.

This Processor may create or initialize a Connection Pool in a method that uses the @OnScheduled annotation. However, because communications problems may prevent connections from being established or cause connections to be terminated, connections themselves are not created at this point. Rather, the connections are created or leased from the pool in the onTrigger method.

The onTrigger method of this Processor begins by leasing a connection from the Connection Pool, if possible, or otherwise creates a connection to the external service. When no data is available from the external source, the yield method of the ProcessContext is called by the Processor and the method returns so that this Processor avoids continually running and depleting resources without benefit. Otherwise, this Processor then creates a FlowFile via the ProcessSession's create method and assigns an appropriate filename and path to the FlowFile (by adding the filename and path attributes), as well as any other attributes that may be appropriate. An OutputStream to the FlowFile's content is obtained via the ProcessSession's write method, passing a new OutputStreamCallback (which is usually an anonymous inner class). From within this callback, the Processor is able to write to the FlowFile and streams the content from the external resource to the FlowFile's OutputStream. If the desire is to write the entire contents of an InputStream to the FlowFile, the importFrom method of ProcessSession may be more convenient to use than the write method.

When this Processor expects to receive many small files, it may be advisable to create several FlowFiles from a single session before committing the session. Typically, this allows the Framework to treat the content of the newly created FlowFiles much more efficiently.

This Processor generates a Provenance event indicating that it has received data and specifies from where the data came. This Processor should log the creation of the FlowFile so that the FlowFile's origin can be determined by analyzing logs, if necessary.

This Processor acknowledges receipt of the data and/or removes the data from the external source in order to prevent receipt of duplicate files. This is done only after the ProcessSession by which the FlowFile was created has been committed! Failure to adhere to this principle may result in data loss, as restarting NiFi before the session has been committed will result in the temporary file being deleted. Note, however, that it is possible using this approach to receive duplicate data because the application could be restarted after committing the session and before acknowledging or removing the data from the external source. In general, though, potential data duplication is preferred over potential data loss. The connection is finally returned or added to the Connection Pool, depending on whether the connection was leased from the Connection Pool to begin with or was created in the onTrigger method.

If there is a communications problem, the connection is typically terminated and not returned (or added) to the Connection Pool. Connections to remote systems are torn down and the Connection Pool shutdown in a method annotated with the @OnStopped annotation so that resources can be reclaimed.