Notes on writing a simple NiFi custom processor

Russell Bateman
July 2020
last update:

Table of Contents

Writing a custom NiFi processor
Flowfile I/O options
session I/O callbacks
Transferring the flowfile
Flowfile attributes
Custom processor code
NiFi's JUnit test runner
Output from test
Dealing with logging messages
A second custom processor example

This is the coded content of a custom processor. It's consciously a beginner's tutorial on writing a custom processor, tests for same and debugging it. Please see Notes on a simple NiFi custom processor project for building it.

Writing a custom NiFi processor

The heavy lifting in a NiFi processor is done by the onTrigger() method. There are several different constructs for writing NiFi processors. We will show them below. However, first, here's how most of them start out:

package com.windofkeltia.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@Tags( { "instruction" } )
@CapabilityDescription( "Themed processor to illustrate beginning to write custom processors." )
public class Monopoly extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();

    if( flowfile == null )
    {
      context.yield();
      return;
    }
 

The above obtains the in-coming flowfile NiFi has in this session.

Flowfile I/O options

This transfers, unchanged, whatever flowfile is at the time of the call. Of course, the point of writing a NiFi processor is usually to change the flowfile—either create a new one, remove the old one or a combination of those options.

  1. The first option is to read the flowfile obtained early on (the first one). As it sits, this one does not pre-assume you wish to do anything in particular with the in-coming flowfile.
     
      session.read( flowfile, new InputStreamCallback()
      {
        @Override
        public void process( InputStream inputStream ) throws IOException
        {
          // read the in-coming flowfile and understand it...
        }
      } );
    
  2. At some point, you may wish to create a flowfile that will leave your processor with new or different content inside.
     
    FlowFile newFlowfile = session.write( flowfile, new OutputStreamCallback()
    {
      @Override public void process( OutputStream outputStream ) throws IOException
      {
        // write out a new flowfile...
      }
    } );
    
  3. Much of the time, your processor will want to create a new flowfile based on what you learn by reading the in-coming one. Anything you write to the output stream will be put into a new flowfile, the flowfile returned from the call to session.write() here.
     
    FlowFile resultingFlowfile = session.write( flowfile, new StreamCallback()
    {
      @Override public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
      {
        // read the in-coming flowfile, understand it, then write out a resulting new flowfile--all simultaneously...
      }
    } );
    

Transferring the flowfile

Whatever is done to the flowfile, such as adding attributes or having created a whole new one, the end result is to transfer the flowfile to a relationship. Relationships are defined by your processor. For example, you might define one for where to transfer a flowfile upon success and another to transfer it in case of failure. In the UI, you draw to create the queues resulting from these relationships and feed the queued flowfiles into other processors.

 
  session.transfer( newFlowfile, SUCCESS );
}

If, however, you've finished with the original, in-coming flowfile and you do not wish to transfer it to a relation, you must remove it or NiFi will fail and then complain:

 
  session.transfer( newFlowfile, SUCCESS );
  session.remove( flowfile );
}

Or, you can create different relationships, like OLD and NEW, and transfer the original, in-coming flowfile one way and the resulting flowfile you just created another.

 
  session.transfer( newFlowfile, NEW );
  session.transfer( flowfile, OLD );
}

Flowfile attributes

Flowfiles have attributes. By default, NiFi ensures that every flowfile have the following attributes:

You can add your own flowfile attributes. Do not choose and of the above.

 
  flowfile = session.putAttribute( flowfile, "attribute-name", "value" );
 

Much of what your session can do for you is a little like the implementation of the Builder pattern. Successive calls are cumulative and to get it all to happen to the flowfile, you pass in the flowfile and you receive a reference to that flowfile that includes whatever you did.

If you do this:

 
  flowfile = session.putAttribute( flowfile, "attribute-first", "value" );
  session.putAttribute( flowfile, "attribute-second", "value" );
  .
  .
  .
  session.transfer( flowfile, SUCCESS );
}

flowfile will not have attribute-second on it when you view it in the queue, but it will have attribute-first.

Custom processor code

There are comments on many lines that explain what's happening.

monopoly/src/main/com/windofkeltia/processor/Monopoly.java:
package com.windofkeltia.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

/**
 * Sample Monopoly-themed custom processor paired with simple JUnit test.
 * This processor is consciously for beginning to learn how to write and
 * test a custom processor.
 * @author Russell Bateman
 * @since July 2020
 */
@SideEffectFree
@Tags( { "instruction" } )
@CapabilityDescription( "Illustrates beginning to write custom processors." )
public class Monopoly extends AbstractProcessor
{
  @Override public void onTrigger( ProcessContext context, ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();

    if( flowfile == null )
    {
      context.yield();
      return;
    }

    /* ------------------------------------------------------------------------------------
     * Here's where the real work of a custom processor goes, usually containing a call to:
     * a) session.read( flowfile, new InputStreamCallback()...
     * b) session.write( flowfile, new OutputStreamCallback()... or
     * c) session.write( flowfile, new StreamCallback()...
     *
     * The original flowfile is read, maybe a new one is written to output.
     * ------------------------------------------------------------------------------------
     */

    // how we reach nifi-app.log...
    getLogger().info( "Just passing through on our way to Go..." );

    // how to get the value of a property...
    String favorite = context.getProperty( MONOPOLY_PROPERTY.getName() ).getValue();

    if( !favorite.equals( "Bordwalk" ) )
    {
      session.transfer( flowfile, FAILURE );
      return;
    }

    // how to create and set a new flowfile attribute...
    flowfile = session.putAttribute( flowfile, "property", "Bordwalk" );
    flowfile = session.putAttribute( flowfile, "content", "flowfile unchanged" );
    session.transfer( flowfile, SUCCESS );
  }

  public static final PropertyDescriptor MONOPOLY_PROPERTY = new PropertyDescriptor.Builder()
      .name( "Favorite property name" )           // must always remain the same or flows using this processor will break!
      .displayName( "Nom de propriété préférée" ) // this name can change without breaking flow.xml.gz
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .defaultValue( "Bordwalk" )
      .description( "Favorite property name on the Monopoly board." )
      .build();

  public static final Relationship SUCCESS = new Relationship.Builder()
      .name( "Success" )
      .description( "The flowfile passes go." )
      .build();
  public static final Relationship FAILURE = new Relationship.Builder()
      .name( "Failure" )
      .description( "The flowfile doesn't pass go." )
      .build();

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( MONOPOLY_PROPERTY );
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( SUCCESS );
    relationships.add( FAILURE );
    this.relationships = relationships;
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >        getRelationships()                { return relationships; }
}

NiFi's JUnit test runner

There are comments on many lines that explain what's happening.

monopoly/src/test/com/windofkeltia/processor/MonopolyTest.java:
package com.windofkeltia.processor;

import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.windofkeltia.utilities.StringUtilities;

/**
 * Set breakpoints in Monopoly.onTrigger() and step through these tests
 * in the debugger.
 * @author Russell Bateman
 * @since July 2020
 */
public class MonopolyTest
{
  @After public void tearDown() { }
  @Before public void setUp() { runner = TestRunners.newTestRunner( processor ); }

  private final Monopoly   processor = new Monopoly();
  private       TestRunner runner;

  private static final boolean VERBOSE = true;

  @Test
  public void testSuccess()
  {
    System.out.println( "\n--- testSuccess() -----------------------------------------------------------------------" );

    // how to set a property value...
    runner.setProperty( Monopoly.MONOPOLY_PROPERTY, "Bordwalk" );
    // how to create flowfile content...
    runner.enqueue( new ByteArrayInputStream( "Take a ride on the Reading Railroad!".getBytes() ) );
    // this runs the processor; your first breakpoint opportunity is likely onTrigger()...
    runner.run( 1 );
    runner.assertQueueEmpty();

    // get all the flowfiles that were transferred to SUCCESS...
    List< MockFlowFile > flowfiles = runner.getFlowFilesForRelationship( Monopoly.SUCCESS );
    assertEquals(1, flowfiles.size() );

    // we know there's only one flowfile, so get it for our test...
    MockFlowFile          flowfile   = flowfiles.get( 0 );  assertNotNull( flowfile );
    String                content    = new String( runner.getContentAsByteArray( flowfile ) );
    Map< String, String > attributes = flowfile.getAttributes();

    if( VERBOSE )
    {
      displayAttributes( attributes );
      System.out.println( "Content: ---------------------------------------------------------------------------------");
      System.out.println( "  " + content );
    }
  }

  @Test
  public void testFailure()
  {
    System.out.println( "\n--- testFailure() -----------------------------------------------------------------------" );
    runner.setProperty( Monopoly.MONOPOLY_PROPERTY, "Park Place" );
    runner.enqueue( new ByteArrayInputStream( "Go to jail; go directly to jail!".getBytes() ) );
    runner.run( 1 );
    runner.assertQueueEmpty();

    // get all the flowfiles that were transferred to FAILURE...
    List< MockFlowFile > flowfiles = runner.getFlowFilesForRelationship( Monopoly.FAILURE );
    assertEquals(1, flowfiles.size() );

    MockFlowFile          flowfile   = flowfiles.get( 0 );  assertNotNull( flowfile );
    String                content    = new String( runner.getContentAsByteArray( flowfile ) );
    Map< String, String > attributes = flowfile.getAttributes();

    if( VERBOSE )
    {
      displayAttributes( attributes );
      System.out.println( "Content: ---------------------------------------------------------------------------------");
      System.out.println( "  " + content );
    }
  }

  private void displayAttributes( Map< String, String > attributes )
  {
    System.out.println( "Attributes: --------------------------------------------------------------------------------");
    int maxWidth = 0;
    for( Map.Entry< String, String > attribute : attributes.entrySet() )
      maxWidth = Math.max( maxWidth, attribute.getKey().length() );

    for( Map.Entry< String, String > attribute : attributes.entrySet() )
      System.out.println( "  " + StringUtilities.padStringLeft( attribute.getKey(), 9 )
          + ": " + attribute.getValue() );
  }
}

Output from test

--- testFailure() -----------------------------------------------------------------------
415  [pool-1-thread-1] INFO  c.w.p.Monopoly.info:236 - Monopoly[id=942cadb8-e1a6-4d87-8c23-92c9c5108078] Just passing through on our way to Go...
Attributes: --------------------------------------------------------------------------------
       path: target
   filename: 2789895456519550.mockFlowFile
       uuid: 5be96b6b-77e0-4ed9-9604-55ff14d20767
Content: ---------------------------------------------------------------------------------
  Go to jail; go directly to jail!

--- testSuccess() -----------------------------------------------------------------------
429  [pool-2-thread-1] INFO  c.w.p.Monopoly.info:236 - Monopoly[id=76832e61-dd8c-4813-9f2e-c241c3a50114] Just passing through on our way to Go...
Attributes: --------------------------------------------------------------------------------
       path: target
   filename: 2789895534284706.mockFlowFile
   property: Bordwalk
       uuid: fe6d4014-916e-4110-877c-85008e0772fc
    content: flowfile unchanged
Content: ---------------------------------------------------------------------------------
  Take a ride on the Reading Railroad!

Process finished with exit code 0

Dealing with logging messages...

monopoly/src/test/resources/logback.xml:
<configuration>
  <!-- This makes the NiFi component logger spit stuff out to the console in running tests.
       We set the level to DEBUG (or TRACE if we use that) to see the maximum.
    -->
  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
      <!-- <pattern>%-4r [%t] %-5p %c.%m%n:%L</pattern>-->
      <pattern>%-4relative [%thread] %-5level %logger{5}.%method:%line - %message%n</pattern>
      <!-- %relative outputs the number of milliseconds elapsed since beginning of execution
           %logger{n] where {n} is the abbreviation (number) of or referring to a "scheme", see
           http://logback.qos.ch/manual/layouts.html
        -->
    </encoder>
  </appender>

  <logger name="org.apache.nifi" level="INFO">
    <appender-ref ref="CONSOLE" />
  </logger>

  <logger name="com.windofkeltia.processor" level="DEBUG" additivity="false">
    <appender-ref ref="CONSOLE" />
  </logger>

  <root level="INFO">
    <appender-ref ref="CONSOLE" />
  </root>
</configuration>

A second custom processor example: two flowfiles split from one

This processor takes a flowfile full of XML, excerpts a (in practice really huge) portion of it and puts it out as a separate flowfile, analyzes and encodes the analysis of the remainder as a second flowfile. Upon error, it sends the original flowfile out the failure relationship. This code has been somewhat reduced to leave the minimum that shows one way to get from an incoming flowfile to two new flowfiles.

There is no concern in cloning even a huge incoming flowfile because it's not the content of the flowfile that's duplicated, but just the FlowFile object. Also, the uuid of each clone is new (and discrete).

If you're keen on the class, KMPStreamMatch, or its companion that operates on a String, e-mail me and ask. If you're curious about parsing XML, check out these notes on SAX.

Here's a sample "big" XML:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<bigxml>
  <excerpt type="text" xml:space="preserve" xmlns:csmk="http://software.windofkeltia.com/sdk/csmk-v1">
    This is a test of the Emergency Broadcast System. This is only a test. If this had been a real
    emergency, you would have been told where to go. The quick brown fox jumped over the lazy dog's
    back and made a clean get-away. Just what does the fox say anyway?
  </excerpt>

  <!-- XML body that, when parsed for analysis, results in a list of useful objects. -->

</bigxml>
Constants.java:
package com.windofkeltia.processor;

public class Constants
{
  protected static final String ORIGINAL_DOCUMENT = "original-document";
  protected static final String BIGXML_POJOS      = "bigxml-pojos";
}
ExtractBigXml.java:
package com.windofkeltia.processor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;

import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;

import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.StaxDriver;

import com.windofkeltia.pojos.Pojo;
import com.windofkeltia.utilities.KMPStreamMatch;

/**
 * @author Russell Bateman
 * @since August 2020
 */
@TriggerSerially
@CapabilityDescription( "Extracts data from an XML producing serialized POJOs. Copy the XML's excerpt document to excerpt as a"
                + " separate output flowfile. The two are \"bound\" together by the original 'uuid' of the incoming flowfile"
                + " retained on the serialized POJO flowfile created and the annotation of attribute '" + EXCERPT-DOCUMENT
                + "' on the excerpt flowfile." )
@WritesAttributes(
    { @WritesAttribute( attribute   = EXCERPT_DOCUMENT,
                   description = "Both flowfiles on success will have this discrete value in their '" + EXCERPT_DOCUMENT
                            + "' attribute." )
    } )
public class ExtractBigXml extends AbstractProcessor
{
  private static final String EXCERPT_START_ELEMENT = "<excerpt ";
  private static final String EXCERPT_END_ELEMENT   = "</excerpt>";

  @Override public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();
    FlowFile excerpt  = session.clone( flowfile );
    FlowFile pojos    = session.clone( excerpt );

    try
    {
      final String UUID = flowfile.getAttribute( "uuid" );

      // write the original excerpt to a new flowfile...
      session.write( excerpt, new OutputStreamCallback()
      {
        @Override public void process( OutputStream outputStream )
        {
          // read from the original flowfile copying to the output flowfile...
          session.read( flowfile, new InputStreamCallback()
          {
            @Override public void process( InputStream inputStream ) throws IOException
            {
              OutputStream   ignore = new ByteArrayOutputStream(); // (place to write the before stuff we'll promptly ignore)
              KMPStreamMatch match  = new KMPStreamMatch();
              long           result;

              /* 1. Find the beginning of the original excerpt; this will consume "<excerpt " from inputStream.
               * 2. Since we need the potential attribute list anyway, add "<excerpt " to outputStream.
               * 3. Allow the attribute list (still in inputStream) and the original excerpt to flow into outputStream.
               * 4. Since we put "<excerpt ... >" into outputStream, we may as well cap it off by letting "</excerpt>" in.
               * 5. KMP does this without asking (bizarre: leaving it as an exercise to us if we didn't want it).
               * 6. Add a newline so, upon later reassembly, the first POJO won't start just after the original excerpt element.
               */
              result = match.indexOf( inputStream, ignore, EXCERPT_START_ELEMENT );
              if( result == -1 )
              {
                final String NO_EXCERPT = "There is no opening excerpt element in this big XML";
                getLogger().error( NO_EXCERPT );
                throw new IOException( NO_EXCERPT );
              }
              outputStream.write( "  <excerpt ".getBytes() );
              result = match.indexOf( inputStream, outputStream, EXCERPT_END_ELEMENT );
              if( result == -1 )
              {
                final String NO_EXCERPT = "There is no closing excerpt element in this big XML";
                getLogger().error( NO_EXCERPT );
                throw new IOException( NO_EXCERPT );
              }
              outputStream.write( '\n' );
            }
          } );
        }
      } );

      AtomicReference< List< Pojo > > pojoListHolder = new AtomicReference<>();

      // parse the pojos into POJOs...
      session.read( pojos, new InputStreamCallback()
      {
        final List< Pojo > pojoList = pojoListHolder.get();

        @Override public void process( InputStream inputStream ) throws IOException
        {
          try
          {
            SAXParserFactory factory   = SAXParserFactory.newInstance();
            BigXmlSaxHandler handler   = new BigXmlSaxHandler();
            SAXParser        parser    = factory.newSAXParser();
            XMLReader        xmlReader = parser.getXMLReader();

            parser.parse( inputStream, handler );
            pojoListHolder.set( handler.getPojos() );
          }
          catch( ParserConfigurationException e )
          {
            throw new IOException( "ParserConfigurationException", e );
          }
          catch( SAXException e )
          {
            throw new IOException( "SAXException", e );
          }
        }
      } );

      // write out the POJOs serialized to a new flowfile...
      session.write( pojos, new OutputStreamCallback()
      {
        @Override public void process( OutputStream outputStream )
        {
          XStream      xstream  = new XStream( new StaxDriver() );
          List< Pojo > pojos    = pojoListHolder.get();
          xstream.toXML( pojos, outputStream );
        }
      } );

      excerpt = session.putAttribute( excerpt, EXCERPT_DOCUMENT, UUID );
      pojos   = session.putAttribute( pojos,   EXCERPT_DOCUMENT, UUID );
      session.transfer( excerpt, EXCERPT );
      session.transfer( pojos, POJOS );
      session.remove( flowfile );
    }
    catch( Exception e )
    {
      session.remove( excerpt );
      session.remove( pojos );
      session.transfer( flowfile, FAILURE );
    }
  }

  public static final Relationship FAILURE  = new Relationship.Builder()
      .name( "failure" )
      .description( "Original flowfile routes here upon failure." )
      .build();

  public static final Relationship EXCERPT  = new Relationship.Builder()
      .name( Constants.EXCERPT_DOCUMENT )
      .description( "A new flowfile containing the excerpt that was embedded in the big XML routes here upon success." )
      .build();
  public static final Relationship POJOS    = new Relationship.Builder()
      .name( Constants.BIGXML_POJOS )
      .description( "A new flowfile containing just the serialized POJOs from the analysis routes here upon success." )
      .build();

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( FAILURE );
    relationships.add( EXCERPT );
    relationships.add( POJOS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public Set< Relationship >        getRelationships()                { return relationships; }
  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
}