Notes on writing a simple NiFi custom processor

Russell Bateman
July 2020
last update:

Table of Contents

Writing a custom NiFi processor
Flowfile I/O options
        session I/O callbacks
        Atomic references
        session.read( flowfile, InputStreamCallback )               [ read from input stream ]
        session.write( flowfile, new OutputStreamCallback() [ write to output stream ]
        session.write( flowfile, StreamCallback()                        [ read from input stream, write to output stream ]
        session.write( cloned, OutputStreamCallback()               [ clone input stream, write to output stream ]
        Example 1
        Example 2
        session I/O callbacks
Transferring the flowfile
Flowfile attributes
Custom processor code
NiFi's JUnit test runner
Output from test
Dealing with logging messages
A second custom processor example: two flowfiles split from one
A third custom processor example: a processor that handles no flowfiles
Appendix: support for the above
        slurpIntoOutputStream

This is the coded content of a custom processor. It's consciously a beginner's tutorial on writing a custom processor, tests for same and debugging it. Please see Notes on a simple NiFi custom processor project for building it.

Writing a custom NiFi processor

The heavy lifting in a NiFi processor is done by the onTrigger() method. There are several different constructs for writing NiFi processors. We will show them below. However, first, here's how most of them start out:

package com.windofkeltia.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@Tags( { "instruction" } )
@CapabilityDescription( "Themed processor to illustrate beginning to write custom processors." )
public class Monopoly extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();

    if( flowfile == null )
    {
      context.yield();
      return;
    }
 

The above obtains the in-coming flowfile NiFi has in this session.

Flowfile I/O options

This transfers, unchanged, whatever flowfile is at the time of the call. Of course, the point of writing a NiFi processor is usually to change the flowfile—either create a new one, remove the old one or a combination of those options.

Important note: data you wish to inject into or data you wish to pass back out of the process( InputStream and/or OutputStream, etc. ) methods below must obey the following:

Here are different I/O block situations:

  1. [ read from input stream ]

    The first option is to read the flowfile obtained early on (the first one). As it sits, this one does not pre-assume you wish to do anything in particular with the in-coming flowfile other than read it for data.
     
    session.read( flowfile, new InputStreamCallback()
    {
      @Override
      public void process( InputStream inputStream ) throws IOException
      {
        // read the in-coming flowfile and understand it...
      }
    } );
    
  2. [ write to output stream ]

    At some point, you may wish to create a flowfile that will leave your processor with new or different content inside.
     
    FlowFile newFlowfile = session.write( flowfile, new OutputStreamCallback()
    {
      @Override public void process( OutputStream outputStream ) throws IOException
      {
        // write out a new flowfile...
      }
    } );
    
  3. [ read from input stream, write to output stream ]

    Much of the time, your processor will want to create a new flowfile tightly based on what you learn by reading the in-coming one.
    Anything you write to the output stream will be put into a new flowfile, the one returned from the call to session.write() here.
     
    FlowFile resultingFlowfile = session.write( flowfile, new StreamCallback()
    {
      @Override public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
      {
        // read the in-coming flowfile, understand it, then write out a resulting new flowfile--all simultaneously...
        StreamUtilities.slurpIntoOutputStream( inputStream, outputStream, 8192 );
      }
    } );
    
  4. [ clone input stream, write to output stream ]

    Under some circumstances, you'll want to clone an existing flowfile as a starting point (beyond which you make other changes).
     
    FlowFile cloned = session.clone( flowfile );
    
    cloned = session.write( cloned, new OutputStreamCallback()
    {
      @Override public void process( OutputStream outputStream ) throws IOException
      {
        // the cloned flowfile consists of (an exact copy of) the original...
        // ... plus you can modify it without damaging the original!
      }
    } );
    

Example 1

This snippet is a processor that breaks HL7v4 (FHIR) bundles coming in as flowfiles into constituent FHIR resources, one per new output flowfile. The critical NiFi code is highlighted.
public class SplitHl7v4Resources extends AbstractProcessor
{
  private FhirContext fhirContext;
  private IParser     fhirParser;

  @Override
  public void onTrigger( ProcessContext context, ProcessSession session ) throws ProcessException
  {
    FlowFile original = session.get();

    if( isNull( original ) )
      return;

    AtomicReference< Bundle > flowfileBundle = new AtomicReference<>( null );

    // load up the FHIR bundle...
    session.read( original, new InputStreamCallback()
    {
      @Override
      public void process( InputStream inputStream )
      {
        flowfileBundle.set( ( Bundle ) fhirParser.parseResource( inputStream ) );
      }
    } );

    StringBuilder resourceNames = new StringBuilder();

    // stop at each discrete FHIR resource as it shows up in the bundle...
    for( Bundle.BundleEntryComponent component : flowfileBundle.get().getEntry() )
    {
      Resource resource = component.getResource();
      FlowFile flowfile = session.clone( original );

      flowfile = session.write( flowfile, new OutputStreamCallback()
      {
        @Override
        public void process( OutputStream outputStream ) throws IOException
        {
          /* Wrap the isolated resource in its own bundle and write it out to its
           * new flowfile. TODO: we can do a lot more to the wrapper bundle here.
           */
          Bundle.BundleEntryComponent singleComponent = new Bundle.BundleEntryComponent();
          singleComponent.setFullUrl( component.getFullUrl() );
          singleComponent.setResource( resource );

          BundleBuilder builder = new BundleBuilder( fhirContext );
          IBase         entry   = builder.addEntry();
          Bundle        bundle  = ( Bundle ) builder.getBundle();

          builder.addToEntry( entry, "resource", resource );
          outputStream.write( fhirParser.encodeResourceToString( bundle ).getBytes() );
          resourceNames.append( "  " ).append( resource.fhirType() ).append( '\n' );
        }
      } );

      session.putAttribute( flowfile, "fhir.resource.type", resource.fhirType() );
      session.transfer( flowfile, SUCCESS );
    }

    if( getLogger().isInfoEnabled() )
    {
      int    count   = flowfileBundle.get().getEntry().size();
      String message = String.format(  "Splitting FHIR bundle into %d resources, one per flowfile:\n", count );
      resourceNames.setLength( resourceNames.length()-1 );
      getLogger().info( message + resourceNames );
    }

    session.remove( original );
  }
  .
  .
  .
}

Example 2

This snippet is a processor that extracts an HL7v4 (FHIR) Patient resource out of bundles coming in as flowfiles into constituent FHIR resources. It also passes on the original flowfile. To do this, we clone the original. The critical NiFi code is highlighted.
public class ExtractHl7v4Patient extends AbstractProcessor
{
    FlowFile original = session.get();

    if( isNull( original ) )
      return;

    FlowFile flowfile = session.clone( original );

    final IParser parser = fhirXmlParser;

    AtomicReference< String > error = new AtomicReference<>();

    flowfile = session.write( flowfile, new StreamCallback()
    {
      @Override
      public void process( InputStream inputStream, OutputStream outputStream )
      {
        try
        {
          Patient       patient;
          Bundle        bundle;
          IBaseResource resource;

          resource = parser.parseResource( inputStream );

          if( !( resource instanceof Bundle ) )
            throw new Exception( "No FHIR resource could be parsed." );

          bundle  = ( Bundle ) resource;
          patient = FhirPatient.getPatient( bundle );

          if( isNull(  patient ) )
            throw new Exception( "Bundle contained no Patient." );

          // write the Patient out to the new flowfile created when we were called...
          outputStream.write( parser.encodeResourceToString( patient ).getBytes() );
        }
        catch( Exception e )
        {
          error.set( e.getMessage() );
        }
      }
    } );

    if( !StringUtilities.isEmpty( error.get() ) )
    {
      Map< String, String > errorAttributes = new HashMap<>();
      errorAttributes.put( Constants.ERROR_REASON, error.get() );
      session.putAllAttributes( flowfile, errorAttributes );
      session.transfer( flowfile, FAILURE );
      return;
    }

    // send the original flowfile onward...
    session.transfer( original, ORIGINAL );

    // send the excerpted Patient onward...
    session.transfer( flowfile, SUCCESS );
}

Transferring the flowfile

Whatever is done to the flowfile, such as adding attributes or having created a whole new one, the end result is to transfer the flowfile to a relationship. Relationships are defined by your processor. For example, you might define one for where to transfer a flowfile upon success and another to transfer it in case of failure. In the UI, you draw to create the queues resulting from these relationships and feed the queued flowfiles into other processors.

 
  session.transfer( newFlowfile, SUCCESS );
}

If, however, you've finished with the original, in-coming flowfile and you do not wish to transfer it to a relation, you must remove it or NiFi will fail and then complain:

 
  session.transfer( newFlowfile, SUCCESS );
  session.remove( flowfile );
}

Or, you can create different relationships, like OLD and NEW, and transfer the original, in-coming flowfile one way and the resulting flowfile you just created another.

 
  session.transfer( newFlowfile, NEW );
  session.transfer( flowfile, OLD );
}

Flowfile attributes

Flowfiles have attributes. By default, NiFi ensures that every flowfile have the following attributes:

You can add your own flowfile attributes. Do not choose and of the above.

 
  flowfile = session.putAttribute( flowfile, "attribute-name", "value" );
 

Much of what your session can do for you is a little like the implementation of the Builder pattern. Successive calls are cumulative and to get it all to happen to the flowfile, you pass in the flowfile and you receive a reference to that flowfile that includes whatever you did.

If you do this:

 
  flowfile = session.putAttribute( flowfile, "attribute-first", "value" );
  session.putAttribute( flowfile, "attribute-second", "value" );
  .
  .
  .
  session.transfer( flowfile, SUCCESS );
}

flowfile will not have attribute-second on it when you view it in the queue, but it will have attribute-first.

Custom processor code

There are comments on many lines that explain what's happening.

monopoly/src/main/com/windofkeltia/processor/Monopoly.java:
package com.windofkeltia.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

/**
 * Sample Monopoly-themed custom processor paired with simple JUnit test.
 * This processor is consciously for beginning to learn how to write and
 * test a custom processor.
 * @author Russell Bateman
 * @since July 2020
 */
@SideEffectFree
@Tags( { "instruction" } )
@CapabilityDescription( "Illustrates beginning to write custom processors." )
public class Monopoly extends AbstractProcessor
{
  @Override public void onTrigger( ProcessContext context, ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();

    if( flowfile == null )
    {
      context.yield();
      return;
    }

    /* ------------------------------------------------------------------------------------
     * Here's where the real work of a custom processor goes, usually containing a call to:
     * a) session.read( flowfile, new InputStreamCallback()...
     * b) session.write( flowfile, new OutputStreamCallback()... or
     * c) session.write( flowfile, new StreamCallback()...
     *
     * The original flowfile is read, maybe a new one is written to output.
     * ------------------------------------------------------------------------------------
     */

    // how we reach nifi-app.log...
    getLogger().info( "Just passing through on our way to Go..." );

    // how to get the value of a property...
    String favorite = context.getProperty( MONOPOLY_PROPERTY.getName() ).getValue();

    if( !favorite.equals( "Bordwalk" ) )
    {
      session.transfer( flowfile, FAILURE );
      return;
    }

    // how to create and set a new flowfile attribute...
    flowfile = session.putAttribute( flowfile, "property", "Bordwalk" );
    flowfile = session.putAttribute( flowfile, "content", "flowfile unchanged" );
    session.transfer( flowfile, SUCCESS );
  }

  public static final PropertyDescriptor MONOPOLY_PROPERTY = new PropertyDescriptor.Builder()
      .name( "Favorite property name" )           // must always remain the same or flows using this processor will break!
      .displayName( "Nom de propriété préférée" ) // this name can change without breaking flow.xml.gz
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .defaultValue( "Bordwalk" )
      .description( "Favorite property name on the Monopoly board." )
      .build();

  public static final Relationship SUCCESS = new Relationship.Builder()
      .name( "Success" )
      .description( "The flowfile passes go." )
      .build();
  public static final Relationship FAILURE = new Relationship.Builder()
      .name( "Failure" )
      .description( "The flowfile doesn't pass go." )
      .build();

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( MONOPOLY_PROPERTY );
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( SUCCESS );
    relationships.add( FAILURE );
    this.relationships = relationships;
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >        getRelationships()                { return relationships; }
}

NiFi's JUnit test runner

There are comments on many lines that explain what's happening.

monopoly/src/test/com/windofkeltia/processor/MonopolyTest.java:
package com.windofkeltia.processor;

import java.io.ByteArrayInputStream;
import java.util.List;
import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.windofkeltia.utilities.StringUtilities;

/**
 * Set breakpoints in Monopoly.onTrigger() and step through these tests
 * in the debugger.
 * @author Russell Bateman
 * @since July 2020
 */
public class MonopolyTest
{
  @After public void tearDown() { }
  @Before public void setUp() { runner = TestRunners.newTestRunner( processor ); }

  private final Monopoly   processor = new Monopoly();
  private       TestRunner runner;

  private static final boolean VERBOSE = true;

  @Test
  public void testSuccess()
  {
    System.out.println( "\n--- testSuccess() -----------------------------------------------------------------------" );

    // how to set a property value...
    runner.setProperty( Monopoly.MONOPOLY_PROPERTY, "Bordwalk" );
    // how to create flowfile content...
    runner.enqueue( new ByteArrayInputStream( "Take a ride on the Reading Railroad!".getBytes() ) );
    // this runs the processor; your first breakpoint opportunity is likely onTrigger()...
    runner.run( 1 );
    runner.assertQueueEmpty();

    // get all the flowfiles that were transferred to SUCCESS...
    List< MockFlowFile > flowfiles = runner.getFlowFilesForRelationship( Monopoly.SUCCESS );
    assertEquals(1, flowfiles.size() );

    // we know there's only one flowfile, so get it for our test...
    MockFlowFile          flowfile   = flowfiles.get( 0 );  assertNotNull( flowfile );
    String                content    = new String( runner.getContentAsByteArray( flowfile ) );
    Map< String, String > attributes = flowfile.getAttributes();

    if( VERBOSE )
    {
      displayAttributes( attributes );
      System.out.println( "Content: ---------------------------------------------------------------------------------");
      System.out.println( "  " + content );
    }
  }

  @Test
  public void testFailure()
  {
    System.out.println( "\n--- testFailure() -----------------------------------------------------------------------" );
    runner.setProperty( Monopoly.MONOPOLY_PROPERTY, "Park Place" );
    runner.enqueue( new ByteArrayInputStream( "Go to jail; go directly to jail!".getBytes() ) );
    runner.run( 1 );
    runner.assertQueueEmpty();

    // get all the flowfiles that were transferred to FAILURE...
    List< MockFlowFile > flowfiles = runner.getFlowFilesForRelationship( Monopoly.FAILURE );
    assertEquals(1, flowfiles.size() );

    MockFlowFile          flowfile   = flowfiles.get( 0 );  assertNotNull( flowfile );
    String                content    = new String( runner.getContentAsByteArray( flowfile ) );
    Map< String, String > attributes = flowfile.getAttributes();

    if( VERBOSE )
    {
      displayAttributes( attributes );
      System.out.println( "Content: ---------------------------------------------------------------------------------");
      System.out.println( "  " + content );
    }
  }

  private void displayAttributes( Map< String, String > attributes )
  {
    System.out.println( "Attributes: --------------------------------------------------------------------------------");
    int maxWidth = 0;
    for( Map.Entry< String, String > attribute : attributes.entrySet() )
      maxWidth = Math.max( maxWidth, attribute.getKey().length() );

    for( Map.Entry< String, String > attribute : attributes.entrySet() )
      System.out.println( "  " + StringUtilities.padStringLeft( attribute.getKey(), 9 )
          + ": " + attribute.getValue() );
  }
}

Output from test

--- testFailure() -----------------------------------------------------------------------
415  [pool-1-thread-1] INFO  c.w.p.Monopoly.info:236 - Monopoly[id=942cadb8-e1a6-4d87-8c23-92c9c5108078] Just passing through on our way to Go...
Attributes: --------------------------------------------------------------------------------
       path: target
   filename: 2789895456519550.mockFlowFile
       uuid: 5be96b6b-77e0-4ed9-9604-55ff14d20767
Content: ---------------------------------------------------------------------------------
  Go to jail; go directly to jail!

--- testSuccess() -----------------------------------------------------------------------
429  [pool-2-thread-1] INFO  c.w.p.Monopoly.info:236 - Monopoly[id=76832e61-dd8c-4813-9f2e-c241c3a50114] Just passing through on our way to Go...
Attributes: --------------------------------------------------------------------------------
       path: target
   filename: 2789895534284706.mockFlowFile
   property: Bordwalk
       uuid: fe6d4014-916e-4110-877c-85008e0772fc
    content: flowfile unchanged
Content: ---------------------------------------------------------------------------------
  Take a ride on the Reading Railroad!

Process finished with exit code 0

Dealing with logging messages...

monopoly/src/test/resources/logback.xml:
<configuration>
  <!-- This makes the NiFi component logger spit stuff out to the console in running tests.
       We set the level to DEBUG (or TRACE if we use that) to see the maximum.
    -->
  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
      <!-- <pattern>%-4r [%t] %-5p %c.%m%n:%L</pattern>-->
      <pattern>%-4relative [%thread] %-5level %logger{5}.%method:%line - %message%n</pattern>
      <!-- %relative outputs the number of milliseconds elapsed since beginning of execution
           %logger{n] where {n} is the abbreviation (number) of or referring to a "scheme", see
           http://logback.qos.ch/manual/layouts.html
        -->
    </encoder>
  </appender>

  <logger name="org.apache.nifi" level="INFO">
    <appender-ref ref="CONSOLE" />
  </logger>

  <logger name="com.windofkeltia.processor" level="DEBUG" additivity="false">
    <appender-ref ref="CONSOLE" />
  </logger>

  <root level="INFO">
    <appender-ref ref="CONSOLE" />
  </root>
</configuration>

A second custom processor example: two flowfiles split from one

This processor takes a flowfile full of XML, excerpts a (in practice really huge) portion of it and puts it out as a separate flowfile, analyzes and encodes the analysis of the remainder as a second flowfile. Upon error, it sends the original flowfile out the failure relationship. This code has been somewhat reduced to leave the minimum that shows one way to get from an incoming flowfile to two new flowfiles.

There is no concern in cloning even a huge incoming flowfile because it's not the content of the flowfile that's duplicated, but just the FlowFile object. Also, the uuid of each clone is new (and discrete).

If you're keen on the class, KMPStreamMatch, or its companion that operates on a String, e-mail me and ask. If you're curious about parsing XML, check out these notes on SAX.

Here's a sample "big" XML:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<bigxml>
  <excerpt type="text" xml:space="preserve" xmlns:csmk="http://software.windofkeltia.com/sdk/csmk-v1">
    This is a test of the Emergency Broadcast System. This is only a test. If this had been a real
    emergency, you would have been told where to go. The quick brown fox jumped over the lazy dog's
    back and made a clean get-away. Just what does the fox say anyway?
  </excerpt>

  <!-- XML body that, when parsed for analysis, results in a list of useful objects. -->

</bigxml>
Constants.java:
package com.windofkeltia.processor;

public class Constants
{
  protected static final String ORIGINAL_DOCUMENT = "original-document";
  protected static final String BIGXML_POJOS      = "bigxml-pojos";
}
ExtractBigXml.java:
package com.windofkeltia.processor;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;

import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;

import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.StaxDriver;

import com.windofkeltia.pojos.Pojo;
import com.windofkeltia.utilities.KMPStreamMatch;

/**
 * @author Russell Bateman
 * @since August 2020
 */
@TriggerSerially
@CapabilityDescription( "Extracts data from an XML producing serialized POJOs. Copy the XML's excerpt document to excerpt as a"
                + " separate output flowfile. The two are \"bound\" together by the original 'uuid' of the incoming flowfile"
                + " retained on the serialized POJO flowfile created and the annotation of attribute '" + EXCERPT-DOCUMENT
                + "' on the excerpt flowfile." )
@WritesAttributes(
    { @WritesAttribute( attribute   = EXCERPT_DOCUMENT,
                   description = "Both flowfiles on success will have this discrete value in their '" + EXCERPT_DOCUMENT
                            + "' attribute." )
    } )
public class ExtractBigXml extends AbstractProcessor
{
  private static final String EXCERPT_START_ELEMENT = "<excerpt ";
  private static final String EXCERPT_END_ELEMENT   = "</excerpt>";

  @Override public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();
    FlowFile excerpt  = session.clone( flowfile );
    FlowFile pojos    = session.clone( excerpt );

    try
    {
      final String UUID = flowfile.getAttribute( "uuid" );

      // write the original excerpt to a new flowfile...
      session.write( excerpt, new OutputStreamCallback()
      {
        @Override public void process( OutputStream outputStream )
        {
          // read from the original flowfile copying to the output flowfile...
          session.read( flowfile, new InputStreamCallback()
          {
            @Override public void process( InputStream inputStream ) throws IOException
            {
              OutputStream   ignore = new ByteArrayOutputStream(); // (place to write the before stuff we'll promptly ignore)
              KMPStreamMatch match  = new KMPStreamMatch();
              long           result;

              /* 1. Find the beginning of the original excerpt; this will consume "<excerpt " from inputStream.
               * 2. Since we need the potential attribute list anyway, add "<excerpt " to outputStream.
               * 3. Allow the attribute list (still in inputStream) and the original excerpt to flow into outputStream.
               * 4. Since we put "<excerpt ... >" into outputStream, we may as well cap it off by letting "</excerpt>" in.
               * 5. KMP does this without asking (bizarre: leaving it as an exercise to us if we didn't want it).
               * 6. Add a newline so, upon later reassembly, the first POJO won't start just after the original excerpt element.
               */
              result = match.indexOf( inputStream, ignore, EXCERPT_START_ELEMENT );
              if( result == -1 )
              {
                final String NO_EXCERPT = "There is no opening excerpt element in this big XML";
                getLogger().error( NO_EXCERPT );
                throw new IOException( NO_EXCERPT );
              }
              outputStream.write( "  <excerpt ".getBytes() );
              result = match.indexOf( inputStream, outputStream, EXCERPT_END_ELEMENT );
              if( result == -1 )
              {
                final String NO_EXCERPT = "There is no closing excerpt element in this big XML";
                getLogger().error( NO_EXCERPT );
                throw new IOException( NO_EXCERPT );
              }
              outputStream.write( '\n' );
            }
          } );
        }
      } );

      AtomicReference< List< Pojo > > pojoListHolder = new AtomicReference<>();

      // parse the pojos into POJOs...
      session.read( pojos, new InputStreamCallback()
      {
        final List< Pojo > pojoList = pojoListHolder.get();

        @Override public void process( InputStream inputStream ) throws IOException
        {
          try
          {
            SAXParserFactory factory   = SAXParserFactory.newInstance();
            BigXmlSaxHandler handler   = new BigXmlSaxHandler();
            SAXParser        parser    = factory.newSAXParser();
            XMLReader        xmlReader = parser.getXMLReader();

            parser.parse( inputStream, handler );
            pojoListHolder.set( handler.getPojos() );
          }
          catch( ParserConfigurationException e )
          {
            throw new IOException( "ParserConfigurationException", e );
          }
          catch( SAXException e )
          {
            throw new IOException( "SAXException", e );
          }
        }
      } );

      // write out the POJOs serialized to a new flowfile...
      session.write( pojos, new OutputStreamCallback()
      {
        @Override public void process( OutputStream outputStream )
        {
          XStream      xstream  = new XStream( new StaxDriver() );
          List< Pojo > pojos    = pojoListHolder.get();
          xstream.toXML( pojos, outputStream );
        }
      } );

      excerpt = session.putAttribute( excerpt, EXCERPT_DOCUMENT, UUID );
      pojos   = session.putAttribute( pojos,   EXCERPT_DOCUMENT, UUID );
      session.transfer( excerpt, EXCERPT );
      session.transfer( pojos, POJOS );
      session.remove( flowfile );
    }
    catch( Exception e )
    {
      session.remove( excerpt );
      session.remove( pojos );
      session.transfer( flowfile, FAILURE );
    }
  }

  public static final Relationship FAILURE  = new Relationship.Builder()
      .name( "failure" )
      .description( "Original flowfile routes here upon failure." )
      .build();

  public static final Relationship EXCERPT  = new Relationship.Builder()
      .name( Constants.EXCERPT_DOCUMENT )
      .description( "A new flowfile containing the excerpt that was embedded in the big XML routes here upon success." )
      .build();
  public static final Relationship POJOS    = new Relationship.Builder()
      .name( Constants.BIGXML_POJOS )
      .description( "A new flowfile containing just the serialized POJOs from the analysis routes here upon success." )
      .build();

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( FAILURE );
    relationships.add( EXCERPT );
    relationships.add( POJOS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public Set< Relationship >        getRelationships()                { return relationships; }
  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
}

A third processor example: a processor that handles no flowfiles

Here's a processor that's not expected to handle flowfiles, doesn't support relationships and only supports dynamic properties.

ConfigureProcessor.java:
package com.windofkeltia.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Objects.isNull;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@TriggerWhenEmpty
@SideEffectFree
@CapabilityDescription( "Dynamic properties can be created to specify (or add to) static configuration"
                + " of key-value substitution pairs.")
public class ConfigureProcessor extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    ComponentLog logger = getLogger();

    for( Map.Entry< PropertyDescriptor, String > entry : context.getProperties().entrySet() )
    {
      PropertyDescriptor property = entry.getKey();

      if( !property.isDynamic() )
        continue;

      // do work here...
      final String PROPERTY_NAME  = property.getName();
      final String PROPERTY_VALUE = entry.getValue();

      logger.trace( "Processing configurable mappings titled \"" + PROPERTY_NAME + "\"" );

      try
      {
        harvestDynamicPropertyMappings( PROPERTY_VALUE );
      }
      catch( Exception e )
      {
        logger.debug( e.getMessage() );
      }
    }
  }

  protected static void harvestDynamicPropertyMappings( final String PROPERTY_VALUE ) throws Exception
  {
    final String[] LINES      = PROPERTY_VALUE.split( "\n" );
    int            lineNumber = 0;

    if( LINES.length < 1 )
      return;

    final String WHICH_LIST = LINES[ 0 ];

    for( int which = 1; which < LINES.length; which++ )
    {
      final String VALUE_LINE = LINES[ which ];
      char         delimiter  = VALUE_LINE.charAt( 0 );
      int          position   = VALUE_LINE.indexOf( delimiter, 1 );
      String key, value;

      key   = ( position < 0 ) ? VALUE_LINE.substring( 1 )                   : VALUE_LINE.substring( 1, position ).trim();
      value = ( position > 0 ) ? VALUE_LINE.substring( position + 1 ).trim() : "";

      switch( WHICH_LIST )
      {
        case "human-readables" :
          HumanReadables.add( key, value );  // (I don't show this stuff...)
          break;
        case "code-system urs" :
          CodeSystemUri.add( key, value );   // (...left up to the imagination)
          break;
        default :
          throw new Exception( "Configurable-mapping " + WHICH_LIST + " is not a supported list" );
      }
    }
  }

  /**
   * This method is overridden from AbstractConfigurableComponent.
   *
   * In allowing a user to define a dynamic property, which is to say
   * one that is not known statically (as are most processor properties), we
   * override this method to create that property allowing us also to specify
   * a validator. Without doing this, we'll always get something like:
   *
   * java.lang.AssertionError: Processor has 1 validation failures:
   * (dynamic-property-name) validated against (its value) is invalid because\
   *   (dynamic-property-name) is not a supported property or has no Validator associated with it
   *
   * @param propertyDescriptorName the name of the property's descriptor.
   * @return the new descriptor.
   */
  @Override
  protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( final String propertyDescriptorName )
  {
    return new PropertyDescriptor.Builder()
                          .required( false )
                          .name( propertyDescriptorName )
                          .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
                          // or .addValidator( Validator.VALID ) if you do not wish it validated!
                          .dynamic( true )
                          .build();
  }

  /**
   * Used with onPropertyModified(), it collects dynamic properties
   * that change. This is not common.
   * See {@link #onPropertyModified( PropertyDescriptor, String, String )}
   */
  private volatile Set< String > dynamicPropertyNames = new HashSet<>();

  /**
   * This method is overridden from AbstractConfigurableComponent. It's
   * called in reaction to a configuration change (in the UI).
   *
   * Why might you want to do this? Because you are using dynamic properties to
   * influence the set of relationships that a processor may have, adding to or
   * subtracting from them. For example, if we were called with "dynamic-property"
   * added, then this method would add a new relationship (to SUCCESS)
   * called "dynamic-property" (--for whatever good that might do).
   *
   * This is not a common thing to do.
   *
   * @param descriptor of the modified property.
   * @param oldValue non-null: previous property value.
   * @param newValue new property value or null when property removed (trash can in UI).
   */
  @Override
  public void onPropertyModified( final PropertyDescriptor descriptor, final String oldValue, final String newValue )
  {
    final Set< String > newDynamicPropertyNames = new HashSet<>( dynamicPropertyNames );

    if( isNull( newValue ) )
      newDynamicPropertyNames.remove( descriptor.getName() );
    else if( isNull( oldValue ) && descriptor.isDynamic() )
      newDynamicPropertyNames.add( descriptor.getName() );

    dynamicPropertyNames = Collections.unmodifiableSet( newDynamicPropertyNames );

    final Set< String > allDynamicProperties = dynamicPropertyNames;
  }

  /**
   * When the processor is scheduled, you can access its properties including
   * any dynamic ones.
   * @param context processor context.
   */
  @OnScheduled public void processProperties( final ProcessContext context )
  {
    for( Map.Entry< PropertyDescriptor, String > entry : context.getProperties().entrySet() )
    {
      PropertyDescriptor descriptor = entry.getKey();

      if( descriptor.isDynamic() )
        getLogger().debug( "Dynamic property named:\n    " + descriptor.getName()
                     + ", value: " + entry.getValue().replaceAll( "\n", " + " ) );
    }
  }

  public static final PropertyDescriptor ERASE_HUMANREADABLES = new PropertyDescriptor.Builder()
      .name( "Erase all human-readables" )
      .displayName( "Erase all human-readables" )
      .description( "Erase all human-readables and replace with the dynamic ones defined by this processor."
               + "(Medical code-system substitutions, a different list, cannot be erased." )
      .required( false )
      .addValidator( BooleanProperty.BOOLEAN_VALIDATOR )
      .allowableValues( BooleanProperty.BOOLEAN_VALUES )
      .defaultValue( BooleanProperty.FALSE )
      .build();

  private List< PropertyDescriptor > properties;

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    // create static properties and relationships...
    @SuppressWarnings( "MismatchedQueryAndUpdateOfCollection" )
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( ERASE_HUMANREADABLES );
    this.properties = Collections.unmodifiableList( properties );
  }

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
}
ConfigureProcessorTest.java:
package com.windofkeltia.com.processor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import static org.junit.Assert.assertTrue;

import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.windofkeltia.constants.ConfigurableMappings;

/**
 * @author Russell Bateman
 * @since December 2020
 */
public class ConfigureProcessorTest
{
  // @formatter:off
  @Rule   public TestName name = new TestName();
  @After  public void tearDown() { }
  @Before public void setUp()    { }

  private static final boolean VERBOSE  = true;
  private static final int     ONE      = 1;

  private ConfigureProcessor processor = new ConfigureProcessor();
  private TestRunner         runner    = TestRunners.newTestRunner( processor );

  private static final String PROPERTY_VALUE = "human-readables\n"
                                             + "|this|that\n"
                                             + "|and| everything\n"
                                             + "|nothing\n"
                                             + "^J'accuse^de rien !"
                                             + "\n";
  @Test
  public void test()
  {
    runner.setProperty( "my human readables", PROPERTY_VALUE );
    runner.setValidateExpressionUsage( false );
    runner.run( ONE );
    runner.assertQueueEmpty();

    int line = 0;

    List< String >        output  = new ArrayList<>();
    Map< String, String > mappings = ConfigurableMappings.HumanReadables.getMap();
    for( Map.Entry< String, String > map : mappings.entrySet() )
    {
      String keyAndValue = map.getKey() + " --> " + map.getValue();
      output.add( keyAndValue );
      System.out.println( "  " + ++line + ": " + keyAndValue );
    }

    assertTrue( output.contains( "http://hospital.smarthealthit.org --> Smart Health IT" ) );
    assertTrue( output.contains( "http://diameterhealth.com --> Diameter Health" ) );
    assertTrue( output.contains( "this --> that" ) );
    assertTrue( output.contains( "and --> everything" ) );
    assertTrue( output.contains( "nothing --> " ) );
    assertTrue( output.contains( "J'accuse --> de rien !" ) );
  }

  @Test
  public void testHarvestDynamicPropertyMappings()
  {
    try
    {
      int lineNumber = 0;

      ConfigureProcessor.harvestDynamicPropertyMappings( PROPERTY_VALUE );

      Map< String, String > mappings = HumanReadables.getMap();
      for( Map.Entry< String, String > map : mappings.entrySet() )
      {
        String key   = map.getKey();
        String value = map.getValue();

        if( VERBOSE )
          System.out.println( "  " + ++lineNumber + ": key = " + key + ", value = " + value );
      }
    }
    catch( Exception e )
    {
      if( VERBOSE )
        System.out.println( e.getMessage() );
    }
  }
}

Appendix: support for the above

StreamUtilities.java:
/**
 * Slurp content from input stream and write it to the output stream.
 */
public static void slurpIntoOutputStream( final InputStream inputStream,
                               final OutputStream outputStream,
                               int bufferSize )
    throws IOException
{
  if( isNull( inputStream ) )
    throw new IOException( "Missing input stream" );
  else if( isNull( outputStream ) )
    throw new IOException( "Missing output stream" );

  byte[] buffer = new byte[ bufferSize ];

  for( ; ; )
  {
    int length = inputStream.read( buffer, 0, bufferSize );
    outputStream.write( buffer, 0, length );

    if( length < bufferSize )
      break;
  }
}