Notes on writing and using custom NiFi controller services

Russell Bateman
November 2020
last update:

Controller services are a little more confusing to write and to use than custom processors your first time. I'm going to show setting this up (I am using NiFi 1.11.4). A good piece of documentation to look at also is Adding Controller Services for Dataflows. What I explain here is only a little different.

I have written two controllers for what I need—StandardAuthToken and CanonicalConnection. I used to have a different connection controller, but I got tire of configuring it to use the first which I also had to configure. So, I merged the functionality and now only need the one I show here.

This page tells you how to configure a controller service in NiFi even if you didn't write it.

Setting up a controller in the NiFi UI...

Remember, once you set up a controller, you can use it in support of different, consuming processors if the configuration remains the same. You don't have to create a new instance of the controller for every processor using it.

  1. In the Operate Pallette, click the Gear icon (Note: If you have anything selected, this will go to the configuration dialog for the selected (processor)—not what you want here: click out in the canvas nowhere land, then retry this step.)

  2. In the NiFi Flow Configuration dialog, click the Controller Services tab.

  3. Click + to add a new controller service to the list.

  4. Enter the name of the controller service you want, or otherwise find it, and click on it. Click Add.

  5. A new line is added to the top of your controllers list. Click on its Gear icon at the extreme right.

  6. In the Configure Controller Services dialog, you can "rename" your controller to something more documentary if you like. I'm calling mine, Middleware Connector. Otherwise, click the Properties tab.

  7. Configure the behavior of CanonicalConnection:
    • Protocol : https (requires certificate)

    • Hostname : 10.10.99.102 (a test server of mine)

    • Port : No value set

    • Disable SSL validation : false (because we'll need it)

    • Connection time-out : 20 (seconds)

    • Supply default Accept in header : No value set

    • Username : krakatoa

    • Authenticating token : No value set (generated by the service I want to connect to)

  8. Click on Apply.

  9. Close the NiFi Flow Controller dialog page.

Consuming the controller above in the NiFi UI...

Next, you're going to configure your processor's consumption of the controller service you just set up.

  1. Right-click the box representing the NiFi processor that you want to consume the connection controller above, choose Configure.

  2. Click on the Properties tab.

  3. Click in the Value cell of the Property that's going to point to this controller service (it says No value set right now).

  4. In the drop-down that appears, change from No value to the name you choose for the controller service (configured above). If you choose Create new service..., yet another controller service will be created and you will have two (and probably be confused as to why and which one you're really going to use).

  5. You should see, toward the right side, a —→ (right arrow) that you can click on. If asked, you do want to save changes before going to this controller service.

  6. Now you're in the NiFi Flow Configuration dialog page. At this point, you're ready to Enable your controller service by clicking on the lightning bolt. Click it to get the Enable Controller Service dialog.

  7. Under Scope, you have the option of enabling the Service only or, in addition to turning on (enabling) the service you've just configured, you can also start up (enable) all consuming processors, a list of which is at the right. (This is a shorthand for professional flow writers, it's probably not what you want to do.)

  8. Click the ENABLE button below.

  9. If you are lucky, you will see something spin for a bref moment, then a green . If you weren't so lucky, why? In this case, knowing my controller service, it would be because it was misconfigured, most importantly:
    • Is the hostname (IP address) correct?
    • Is the Authenticating token real (and correct)?

  10. At this point, you can dismiss the dialog by clicking CLOSE.

Implementing and deploying (consuming) a controller...

Note that, when implementing and deploying, that is, consuming a controller in a NiFi processor property, there are careful rules to follow and often these get overlooked. At all of the highlighted points below, you'll find interface IConnectionService being used.

  1. Your controller really implements an interface. This is what you create first, and this is what you consume (later) in the configuration property of the processor using your controller. Pretty simple; interfaces usually are. This is all the code there is.
    IConnectionService.java:
    package com.windofkeltia.controller;
    
    import org.apache.nifi.controller.ControllerService;
    
    public interface IConnectionService extends ControllerService
    {
      // connection-proper details...
      String  getProtocol();
      String  getHostname();
      String  getPort();
      int     getTimeout();
      boolean getDisableSsl();
      String  getDefaultAccept();
      URL     getServerUrl();
      String  getUsername();
      String  getToken();
    }
    
  2. You implement that interface as the controller you write. Often, the controller doesn't do much heavy lifting, it's just configuration. Except for the individual property definitions (that work exactly like processor properties), this is the entire code of my controller.
    CanonicalConnection.java:
    package com.windofkeltia.controller;
    
    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    
    import org.apache.nifi.annotation.documentation.CapabilityDescription;
    import org.apache.nifi.annotation.documentation.Tags;
    import org.apache.nifi.annotation.lifecycle.OnEnabled;
    import org.apache.nifi.components.PropertyDescriptor;
    import org.apache.nifi.components.Validator;
    import org.apache.nifi.controller.AbstractControllerService;
    import org.apache.nifi.controller.ConfigurationContext;
    import org.apache.nifi.processor.util.StandardValidators;
    import org.apache.nifi.reporting.InitializationException;
    
    public class CanonicalConnection extends AbstractControllerService implements IConnectionService
    {
      private String  protocol;         @Override public String  getProtocol()      { return protocol; }
      private String  hostname;         @Override public String  getHostname()      { return hostname; }
      private String  port;             @Override public String  getPort()          { return port; }
      private int     timeout;          @Override public int     getTimeout()       { return timeout; }
      private Boolean disableSsl;       @Override public boolean getDisableSsl()    { return disableSsl; }
      private String  useDefaultAccept; @Override public String  getDefaultAccept() { return useDefaultAccept; }
      private URL     serverUrl;        @Override public URL     getServerUrl()     { return serverUrl; }
      private String  username;         @Override public String  getUsername()      { return username; }
      private String  token;            @Override public String  getToken()         { return token; }
    
      @OnEnabled
      public void onConfigured( final ConfigurationContext context ) throws InitializationException
      {
        protocol         = context.getProperty( PROTOCOL_PROPERTY ).getValue().toLowerCase();
        hostname         = context.getProperty( HOSTNAME_PROPERTY ).getValue();
        port             = context.getProperty( HOSTPORT_PROPERTY ).getValue();
        disableSsl       = context.getProperty( DISABLE_SSL_PROPERTY ).asBoolean();
        useDefaultAccept = context.getProperty( DEFAULT_ACCEPT ).getValue();
        username         = context.getProperty( USERNAME_PROPERTY ).getValue();
        token            = context.getProperty( TOKEN_PROPERTY ).getValue();
    
        try
        {
          String timeoutString  = context.getProperty( CONNECTION_TIMEOUT ).getValue();
          int    timeoutSeconds = Integer.parseInt( timeoutString );
    
          if( timeoutSeconds < 0 )
            throw new NumberFormatException( "Number is negative" );
    
          timeout = timeoutSeconds * 1000;
        }
        catch( NumberFormatException e )
        {
          throw new InitializationException( "Time-out value must be 0 or a positive number of seconds" );
        }
    
        StringBuilder url = new StringBuilder( protocol );
    
        url.append( "://" ).append( hostname );
    
        if( !StringUtilities.isEmpty( port ) )
          url.append( ':' ).append( port );
    
        try
        {
          serverUrl = new URL( url.toString() );
        }
        catch( MalformedURLException e )
        {
          throw new InitializationException(
                        String.format( "Host and port configuration created invalid URL '%s'", url ), e );
        }
      }
    
      //<editor-fold desc="Individual property definitions">
      //</editor-fold>
    
      private static final List< PropertyDescriptor > serviceProperties;
    
      static
      {
        final List< PropertyDescriptor > properties = new ArrayList<>();
        properties.add( PROTOCOL_PROPERTY );
        properties.add( HOSTNAME_PROPERTY );
        properties.add( HOSTPORT_PROPERTY );
        properties.add( DISABLE_SSL_PROPERTY );
        properties.add( CONNECTION_TIMEOUT );
        properties.add( DEFAULT_ACCEPT );
        properties.add( USERNAME_PROPERTY );
        properties.add( TOKEN_PROPERTY );
        serviceProperties = Collections.unmodifiableList( properties );
      }
    
      @Override
      final protected List< PropertyDescriptor > getSupportedPropertyDescriptors() { return serviceProperties; }
    }
    
  3. In the processor that consumes the controller service, you will use both the interface and the controller implementation.

    First, the processor's configuration property that consumes the controller indicates the controller by its interface name. (I always do this at the end because once defined, I want them out of the way).

    ExtractHl7v4.java:
      public static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder()
          .name( CONNECTION_SERVICE_NAME )
          .displayName( CONNECTION_SERVICE_DNAME )
          .description( "Controller service that connects to Middleware." )
          .required( true )
          .identifiesControllerService( IConnectionService.class )
          .build();
      .
      .
      .
      private List< PropertyDescriptor > properties;
      .
      .
      .
      @Override
      public void init( final ProcessorInitializationContext context )
      {
        List< PropertyDescriptor > properties = new ArrayList<>();
        properties.add( CONNECTION_SERVICE );
        .
        .
        .
        this.properties = Collections.unmodifiableList( properties );
        .
        .
        .
      }
    
  4. Near the top of onTrigger(), you'll likely fetch what's been configured by your user. Mostly, using your controller, it's a bunch of more configuration for your processor.
    ExtractHl7v4.java:
    package com.windofkeltia.controller;
    
    import com.windofkeltia.controller.interfaces.IConnectionService;
    
    public class ExtractHl7v4 extends AbstractProcessor
    {
      @Override
      public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
      {
        final CanonicalConnection connectionService = context.getProperty( CONNECTION_SERVICE )
                                                .asControllerService( IConnectionService.class );
        String hostname = connectionService.getHostname();
        ...etc.
      }
      .
      .
      .
    
  5. When building the JAR (that goes into the NAR) containing your controller service, just as for a custom NiFi processor, the controller service implementation, a package path, must be put into a special resource file, src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService. This is not the package path of the interface.

    Failure to do this will result in a controller server that cannot be found from the NiFi UI.

Writing (JUnit) test code consuming a controller...

Something like this. It should get you close. I don't have the time to vet this fully as an example. The code I quickly scraped and modified to give this example worked. The important bits not to forget are highlighted. Maybe I'll get back here to do a proper job.

Obvious, the four lines of properties.put() below must match what your controller need as configuation.

package com.windofkeltia.controller;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.windofkeltia.controller.CanonicalConnection;
import com.windofkeltia.controller.IConnectionService;

public class ExtractHl7v4Test
{
  @After  public void tearDown() { }
  @Before public void setUp()
  {
    connectionService = new CanonicalConnection();
    runner = TestRunners.newTestRunner( processor = new ExtractHl7v4() );
    runner.setValidateExpressionUsage( false );
  }

  private static final int    ONE          = 1;
  private static final String EXTRACT_FROM = "<Bundle xmlns="http://hl7.org/fhir">\n";
                                 + "  <type value="transaction" />\n"
                                 + "  <entry>\n"
                                 + "    <fullUrl value="urn:uuid:38826a3f-8e5c-4846-95c3-7f12a233447d" />\n"
                                 + "    <resource>\n"
                                 + "      <Patient xmlns="http://hl7.org/fhir">\n"
                                 + "        ...\n"
                                 + "      </Patient>\n"
                                 + "    </resource>\n"
                                 + "  </entry>\n"
                                 + "</Bundle>\n";

  private ExtractHl7v4       processor;
  private IConnectionService connectionService;
  private TestRunner         runner;

  @Test
  public void test() throws InitializationException
  {
    Map< String, String > properties = new HashMap<>();
    properties.put( CanonicalImatConnection.PROTOCOL_PROPERTY.getName(), "https" );
    properties.put( CanonicalImatConnection.HOSTNAME_PROPERTY.getName(), "10.10.11.192" );
    properties.put( CanonicalImatConnection.USERNAME_PROPERTY.getName(), "rbateman" );
    properties.put( CanonicalImatConnection.TOKEN_PROPERTY   .getName(), "WPES9E49...VPMAZ" );

    runner.addControllerService( "connectionService", connectionService, properties );
    runner.enableControllerService( connectionService );
    runner.setProperty( EmitSearchServer.CONNECTION_SERVICE, "connectionService" );

    runner.enqueue( new ByteArrayInputStream( EXTRACT_FROM.getBytes() ) );
    runner.run( ONE );
    runner.assertQueueEmpty();

    List< MockFlowFile > successes = runner.getFlowFilesForRelationship( ExtractHl7v4.SUCCESS );
    MockFlowFile         flowfile  = successes.get( 0 );
    ...
  }
}

Here's a little simpler version to follow:

package com.windofkeltia.controller;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.windofkeltia.controller.CanonicalConnection;
import com.windofkeltia.controller.IConnectionService;

public class ExtractHl7v4Test
{
  private static final int    ONE          = 1;
  private static final String EXTRACT_FROM = "<Bundle xmlns="http://hl7.org/fhir">\n";
                                 + "  <type value="transaction" />\n"
                                 + "  <entry>\n"
                                 + "        ...\n"
                                 + "  </entry>\n"
                                 + "</Bundle>\n";

  @Test
  public void test() throws InitializationException
  {
    TestRunner runner = TestRunners.newTestRunner( new ExtractHl7v4() );
    runner.setValidateExpressionUsage( false );

    IConnectionSerice connectionService = new CanonicalConnection();
    Map< String, String > properties = new HashMap<>();
    properties.put( CanonicalConnection.PROTOCOL_PROPERTY.getName(), "https" );
    properties.put( CanonicalConnection.HOSTNAME_PROPERTY.getName(), "10.10.11.192" );
    properties.put( CanonicalConnection.USERNAME_PROPERTY.getName(), "rbateman" );
    properties.put( CanonicalConnection.TOKEN_PROPERTY   .getName(), "WPES9E49...VPMAZ" );

    runner.addControllerService( "connectionService", connectionService, properties );
    runner.enableControllerService( connectionService );
    runner.setProperty( ExtractHl7v4.CONNECTION_SERVICE, "connectionService" );

    runner.enqueue( new ByteArrayInputStream( EXTRACT_FROM.getBytes() ) );
    runner.run( ONE );
    runner.assertQueueEmpty();

    List< MockFlowFile > successes = runner.getFlowFilesForRelationship( ExtractHl7v4.SUCCESS );
    MockFlowFile         flowfile  = successes.get( 0 );
    ...
  }
}