NiFi notes

Russell Bateman
January 2016
last update:



(See A simple NiFi processor project for minimal source code and pom.xml files.)

NiFi Javadocs

NiFi
NiFi test framework

Preface

The notes are in chronological order, as I needed and made them. I haven't yet attempted to organize them more usefully. The start happens below under Friday, 15 January 2016.

The principal NiFi developer documentation is cosmetically beautiful, however, it suffers from being written very much by someone who already thoroughly understands his topic without the sensitivity that his audience do not. Therefore, it's a great place to go to as a reference, but rarely as a method. One can easily become acquainted with the elements of what must be done, but rarely if ever know how to write the code to access those elements let alone the best way to do it. For method documentation, a concept in language learning, which often only appears for technology such as NiFi long after the fact in our industry, you'll need to look fort the "how-to" stuff in the bulleted list below, as well as in my notes here.

 

NiFi 1.x UI overview and other videos


My goal in writing this page is a) to record what I've learned for later reference and b) to leave breadcrumbs for others like me.

History (and testimonial)

In my company, we were doing extract, load and transfer (ETL) by way of a hugely complicated and proprietary monolith that was challenging to understand. I had proposed beginning to abandon it in favor of using a queue-messaging system, something I was very familiar with. A few months later, a director discovered NiFi and put my colleague up to looking into it. Earlier on, I glanced at it, but was swamped on a project and did not pursue it until after my colleague had basically adopted and promoted it as a replacement for our old ETL process. It only took us a few weeks to a) create a shim by which our existing ETL component code became fully exploitable and b) spin up a number of NiFi-native components including ports of some of the more important, legacy ones. For me, the road has been a gratifying one: I no longer need to grok the old monolith and can immediately contribute to the growing number of processors to do very interesting things. It leveled my own playing field as a newer member of staff.

Unit-testing cribs

Stuff I don't have to keep in the pile that's my brain because I just look here (rather than deep down inside these chronologically recorded notes):

  @Test
  public void test() throws Exception
  {
    TestRunner runner = TestRunners.newTestRunner( new ExcerptBase64FromHl7() );

    final String HL7 =
         "MSH|^~\\&|Direct Email|etHIN|||201609061628||ORU^R01^ORU_R01|1A160906202824456947||2.5\n"
        + "PID|||000752356634||TEST^MINNIE^||19690811|F|||1 STATE ST^^ANYWHERE^TN^21112^||\n"
        + "PV1||O\n"
        + "OBR||62281e18-b851-4218-9ed5-bbf392d52f84||AD^Advance Directive"
        + "OBX|1|ED|AD^Advance Directive^2.16.840.1.113883.3.1256||^multipart^^Base64^"
        + "F_ABCDEFGHIJKLMNOPQRSTUVWXYZ";

    // mock up some attributes...
    Map< String, String > attributes = new HashMap<>();
    attributes.put( "attribute-name", "attribute-value" );
    attributes.put( "another-name",   "another-value" );

    // mock up some properties...
    runner.setProperty( ExcerptBase64FromHl7.A_STATIC_PROPERTY, "static-property-value" );
    runner.setProperty( "dynamic property", "some value" );

    // call the processor...
    runner.setValidateExpressionUsage( false );
    runner.enqueue( new ByteArrayInputStream( HL7.getBytes() ), attributes );
    runner.run( 1 );
    runner.assertQueueEmpty();

    // gather the results...
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( ExcerptBase64FromHl7.SUCCESS );
    assertTrue( "1 match", results.size() == 1 );
    MockFlowFile result = results.get( 0 );

    String actual = new String( runner.getContentAsByteArray( result ) );

    assertTrue( "Missing binary content", actual.contains( "1_ABCDEFGHIJKLMNOPQRSTUVWXYZ" ) );

    // ensure the processor transmitted the attribute...
    String attribute = result.getAttribute( "attribute-name" );
    assertTrue( "Failed to transmit attribute", attribute.equals( "attribute-value" ) );
  }

Wednesday, 6 January 2016

Here is a collection of some NiFi notes...

Friday, 15 January 2016

  1. Download nifi-0.4.1-bin.tar.gz from Apache NiFi Downloads and explode locally. Sometimes the tarball doesn't work; in this case, use nifi-0.4.1-bin.zip instead. (The version changes every few months; adjust accordingly. Much of what you see Googling for help using NiFi will for some time continue to be for 0.x until 1.x examples begin to overwhelm. I personally just use the latest except that in production for my company, work must lag a bit behind.)

  2. (Don't download the source and build it unless you need/want to...)

  3. Presuming you don't have anything else running on port 8080, there are no settings to modify. If you do not wish to use this port to run NiFI, modify nifi.web.http.port=8080 in ${NIFI_ROOT}/conf/nifi.properties accordingly.

  4. Start NiFi: ${NIFI_ROOT}/bin/nifi.sh start.

  5. Launch http://localhost:8080/nifi/ in a browser. You'll see

  6. If there is trouble, check out ${NIFI_ROOT}/logs/nifi-bootstrap.log or, less likely, ${NIFI_ROOT}/logs/nifi-app.log.

  7. At this point, check out a tutorial on creating a simple flow. Videos might be a good place. There's also a whole selection of starter videos for NiFi 0.x.

Update, February 2017: what's the real delta between 0.x and 1.x? Not too much until you get down into gritty detail, something you won't do when just acquainting yourself.

Now I'm looking at:

  1. Developing a Custom Apache Nifi Processor (JSON).

Monday, 18 January 2016

Unsuccessful last Friday, I took another approach to working on the NiFi example from IntelliJ IDEA. By no means do I think it must be done this way, however, Eclipse was handy for ensuring stuff got set up correctly. The article assumes much about setting up a project.

  1. Copy pom.xml to new subdirectory.
  2. mvn eclipse:eclipse
  3. Launch Eclipse (Mars) and create subdirectories:
    1. Create src/main/resources/META-INF/services.
    2. Create file org.apache.nifi.processor.Processor.
    3. Create subdirectory src/main/java.
    4. Create subdirectory test.
    5. Right-click java subdirectory, choose Build Path → Use as Source Folder
      (do this also for test subdirectory).
    6. Create package rocks.nifi.examples.processors.

The pom.xml. There are several ultra-critical relationships to beware of.

  1. The org.apache.nifi.processor.Processor file contents must match Java package path and processor Java classname.
  2. groupId must match Java package path.
  3. artifactId gives project its name and must match.
  4. This is handled automatically if you follow the steps above.
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>rocks.nifi</groupId>
  <artifactId>examples</artifactId>   <!-- change this to change project name -->
  <version>1.0-SNAPSHOT</version>
  <packaging>nar</packaging>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <nifi.version>0.4.1</nifi.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-api</artifactId>
      <version>${nifi.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-utils</artifactId>
      <version>${nifi.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-processor-utils</artifactId>
      <version>${nifi.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-io</artifactId>
      <version>1.3.2</version>
    </dependency>
    <dependency>
      <groupId>com.jayway.jsonpath</groupId>
      <artifactId>json-path</artifactId>
      <version>1.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-mock</artifactId>
      <version>${nifi.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-nar-maven-plugin</artifactId>
        <version>1.0.0-incubating</version>
        <extensions>true</extensions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.15</version>
      </plugin>
    </plugins>
  </build>
</project>
<!-- vim: set tabstop=2 shiftwidth=2 expandtab: -->

Of course, once you've also created JsonProcessor.java at the end of the package created, you just do as the article says. You should end up with:

~/dev/nifi/projects/json-processor $ !tree
tree
.
+-- pom.xml
`-- src
    +-- main
    |   +-- java
    |   |   `-- rocks
    |   |       `-- nifi
    |   |           `-- examples
    |   |               `-- processors
    |   |                   `-- JsonProcessor.java
    |   `-- resources
    |       `-- META-INF
    |           `-- services
    |               `-- org.apache.nifi.processor.Processor
    `-- test
        `-- java
            `-- rocks
                `-- nifi
                    `-- examples
                        `-- processors
                            `-- JsonProcessorTest.java

16 directories, 4 files

The essential is:

  1. Create the exact, Sun Java/Maven-ish structure show because the NiFi NAR packager cannot endure any other structure. It's brittle.
  2. Build using mvn clean install from the command line (not the IDE).

The IntelliJ IDEA project can be opened as usual in place of Eclipse. Here's success:

See NiFi Developer's Guide: Processor API for details on developing a processor, what I've just done. Importantly, this describes

  1. the file named, org.apache.nifi.processor.Processor that contains the fully qualified class name of the processor,
  2. extending org.apache.nifi.processor.AbstractProcessor to create the new processor,
  3. the highly concurrent nature of the NiFi framework and how the processor must be written to be thread-safe.
  4. the existence of several, major supporting classes, to wit:
    • FlowFile —the date or file being treated
    • ProcessSession —the session during treatment
    • ProcessContext —processor-framework bridge
    • PropertyDescriptor —defines a property
    • Validator —for user-entered values for properties
    • ValidationContext
    • PropertyValue —value of a given property
    • Relationship —route by which a FlowFile leaves a processor to go elsewhere
    • ProcessorInitializationContext
    • ProcessorLog

Here's an example from NiFi Rocks:

package rocks.nifi.examples.processors;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import com.jayway.jsonpath.JsonPath;
import org.apache.commons.io.IOUtils;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

/**
 * Our NiFi JSON processor from
 * http://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/
 */
@SideEffectFree
@Tags( { "JSON", "NIFI ROCKS" } )                       // for finding processor in the GUI
@CapabilityDescription( "Fetch value from JSON path." ) // also displayed in the processor selection box
public class JsonProcessor extends AbstractProcessor
{
  private List< PropertyDescriptor > properties;        // one of these for each property of the processor
  private Set< Relationship >        relationships;     // one of these for each possible arc away from the processor

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >        getRelationships()                { return relationships; }

  // our lone property...
  public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder()
        .name( "Json Path" )
        .required( true )
        .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
        .build();

  // our lone relationship...
  public static final Relationship SUCCESS = new Relationship.Builder()
        .name( "SUCCESS" )
        .description( "Succes relationship" )
        .build();

  /**
   * Called at the start of Apache NiFi. NiFi is a highly multi-threaded environment; be
   * careful what is done in this space. This is why both the list of properties and the
   * set of relationships are set with unmodifiable collections.
   * @param context supplies the processor with a) a ProcessorLog, b) a unique
   *                identifier and c) a ControllerServiceLookup to interact with
   *                configured ControllerServices.
   */
  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( JSON_PATH );
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( SUCCESS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  /**
   * Called when ever a flowfile is passed to the processor. The AtomicReference
   * is required because in order to have access to the variable in the call-back scope it
   * needs to be final. If it were just a final String it could not change.
   */
  @Override
  public void onTrigger( ProcessContext context, ProcessSession session ) throws ProcessException
  {
    final ProcessorLog              log   = this.getLogger();
    final AtomicReference< String > value = new AtomicReference<>();

    FlowFile flowfile = session.get();

    // ------------------------------------------------------------------------
    session.read( flowfile, new InputStreamCallback()
    {
      /**
       * Uses Apache Commons to read the input stream out to a string. Uses
       * JsonPath to attempt to read the JSON and set a value to the pass on.
       * It would normally be best practice in the case of a exception to pass
       * the original flowfile to a Error relation point.
       */
      @Override
      public void process( InputStream in ) throws IOException
      {
        try
        {
          String json   = IOUtils.toString( in );
          String result = JsonPath.read( json, "$.hello" );
          value.set( result );
        }
        catch( Exception e )
        {
          e.printStackTrace();
          log.error( "Failed to read json string." );
        }
      }
    } );
    // ------------------------------------------------------------------------

    String results = value.get();

    if( results != null && !results.isEmpty() )
      flowfile = session.putAttribute( flowfile, "match", results );

    // ------------------------------------------------------------------------
    flowfile = session.write( flowfile, new OutputStreamCallback()
    {
      /**
       * For both reading and writing to the same flowfile. With both the
       * outputstream call-back and stream call-back remember to assign it
       * back to a flowfile. This processor is not in use in the code, but
       * could have been. The example was deliberate to show a way of moving
       * data out of callbacks and back in.
       */
      @Override
      public void process( OutputStream out ) throws IOException
      {
        out.write( value.get().getBytes() );
      }
    } );
    // ------------------------------------------------------------------------

    /* Every flowfile that is generated needs to be deleted or transfered.
     */
    session.transfer( flowfile, SUCCESS );
  }

  /**
   * Called every time a property is modified in case we want to do something
   * like stop and start a Flowfile over in consequence.
   * @param descriptor indicates which property was changed.
   * @param oldValue value unvalidated.
   * @param newValue value unvalidated.
   */
  @Override
  public void onPropertyModified( PropertyDescriptor descriptor, String oldValue, String newValue ) { }
}

Tuesday, 19 January 2016

Lifecycle of a processor...

Before anything else, the processor's init() method is called. It looks like this happens at NiFi start-up rather than later when the processor is actually used.

The processor must expose to the NiFi framework all of the Relationships it supports. The relationships are the arcs in the illustration below.

Performing the work. This happens when onTrigger() is called (by the framework).

The onTrigger() method is passed the context and the session, and it must:

  1. get the flowfile from the session,
  2. perform a read of the flowfile stream,
  3. obtain the results of reading,
  4. put the attribute "match," on the session,
  5. do work (whatever that is/however that is done),
  6. perform a write of the modified flowfile stream,
  7. note the success or failure of writing,
  8. remove the flowfile, or
  9. hand off the flowfile to the next processor.

A processor won't in fact be triggered, even if it has one or more FlowFile in its queue if the framework, checking downstream destinations (i.e. all other processors) and these are full. There is an annotation, @TriggerWhenAnyDestinationAvailable, which when present will allow the FlowFile to be processed as long as it can advance to the next processor.

@TriggerSerially will make the processor work as single-threaded. However, the processor implementation must still be thread-safe as the thread of execution may change.

Properties

Properties are exposed to the web interface via public method, but the properties themselves are locked using Collections.unmodifiableList() (if coded that way). If a property is changed as a processor is running, the latter can react eagerly if it implements onPropertyModified().

(There is no sample code of validating processor properties. Simply, the customValidate() method is called with a validation context and any that describe problems are returned . (But, is this implemented?)

  protected Collection< ValidationResult> customValidate( ValidationContext validationContext )
  {
    return Collections.emptySet();
  }

It's possible to augment initialization and enabling lifecycles through Java annotion. Some of these apply only to processor components, some to controller components, some to reporting tasks, to one or more.

Wednesday, 20 January 2016

There's a way to document various things that reach the user level like properties and relationships. This is done through annotation, see Documenting a Component and see the JSON processor sample code at the top of the class.

"Advanced documentation," what appears when a user right-clicks a processor in the web UI, provides a Usage menu item. What goes in there is done in additionalDetails.html. This file must be dropped into the root of the processor's JAR (NAR) file in subdirectory, docs. See Advanced Documentation.

In practice, this might be a little confusing. Here's an illustration. If I were going to endow my sample JSON processor with such a document, it would live here (see additionalDetails.html below):

~/dev/nifi/projects/json-processor $ !tree
tree
.
+-- pom.xml
`-- src
    +-- main
    |   +-- java
    |   |   `-- rocks
    |   |       `-- nifi
    |   |           `-- examples
    |   |               `-- processors
    |   |                   `-- JsonProcessor.java
    |   `-- resources
    |       +-- META-INF
    |       |   `-- services
    |       |       `-- org.apache.nifi.processor.Processor
    |       `-- docs
    |               `-- rocks.nifi.examples.processors.JsonProcessor
    |                   `-- additionalDetails.html
    `-- test
        `-- java
            `-- rocks
                `-- nifi
                    `-- examples
                        `-- processors
                            `-- JsonProcessorTest.java

18 directories, 5 files

When one gets Usage from the context menu, an index.html is generated that "hosts" additionalDetails.html, but covers standard, default NiFi processor topics including auto-generated coverage of the current processor (based on what NiFi knows about properties and the like). additionalDetails.html is only necessary if that index.html documentation is inadequate.

Friday, 22 January 2016

Gathering resources (see dailies for today).

package com.etretatlogiciels.nifi.python.filter;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 *
 */
@Tags( { "example", "resources" } )
@CapabilityDescription( "This example processor loads a resource from the nar and writes it to the FlowFile content" )
public class WriteResourceToStream extends AbstractProcessor
{
  public static final Relationship REL_SUCCESS = new Relationship.Builder().name( "success" )
                                                    .description( "files that were successfully processed" )
                                                    .build();
  public static final Relationship REL_FAILURE = new Relationship.Builder().name( "failure" )
                                                    .description( "files that were not successfully processed" )
                                                    .build();

  private Set< Relationship > relationships;
  private String resourceData;

  @Override protected void init( final ProcessorInitializationContext context )
  {

    final Set< Relationship > relationships = new HashSet<>();
    relationships.add( REL_SUCCESS );
    relationships.add( REL_FAILURE );
    this.relationships = Collections.unmodifiableSet( relationships );

    final InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream( "file.txt" );

    try
    {
      this.resourceData = IOUtils.toString( resourceStream );
    }
    catch( IOException e )
    {
      throw new RuntimeException( "Unable to load resources", e );
    }
    finally
    {
      IOUtils.closeQuietly( resourceStream );
    }
  }

  @Override
	public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowFile = session.get();

    if( flowFile == null )
      return;

    try
    {
      flowFile = session.write( flowFile, new OutputStreamCallback()
      {

        @Override public void process( OutputStream out ) throws IOException
        {
          IOUtils.write( resourceData, out );
        }
      } );
      session.transfer( flowFile, REL_SUCCESS );
    }
    catch( ProcessException ex )
    {
      getLogger().error( "Unable to process", ex );
      session.transfer( flowFile, REL_FAILURE );
    }
  }
}

Monday, 25 January 2016

I need to figure out how to get property values. The samples I've seen so far pretty well avoid that. So, I went looking for it this morning and found it looking in the implementation of GetFile, a processor that comes with NiFI. Let me fill in the context here and highlight the actual statement below:

@TriggerWhenEmpty
@Tags( { "local", "files", "filesystem", "ingest", "ingress", "get", "source", "input" } )
@CapabilityDescription( "Creates FlowFiles from files in a directory. "
                      + "NiFi will ignore files it doesn't have at least read permissions for." )
public class GetFile extends AbstractProcessor
{
  public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
                       .name( "input directory" )        // internal name (used if displayName missing)*
                       .displayName( "Input Directory" ) // public name seen in UI
                       .description( "The input directory from which to pull files" )
                       .required( true )
                       .addValidator( StandardValidators.createDirectoryExistsValidator( true, false ) )
                       .expressionLanguageSupported( true )
                       .build();
  .
  .
  .
  @Override
  protected void init( final ProcessorInitializationContext context )
  {
    final List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( DIRECTORY );
    .
    .
    .
  }

  .
  .
  .

  @OnScheduled
  public void onScheduled( final ProcessContext context )
  {
    fileFilterRef.set( createFileFilter( context ) );
    fileQueue.clear();
  }

  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    final File directory = new File( context.getProperty( DIRECTORY ).evaluateAttributeExpressions().getValue() );
    final boolean keepingSourceFile = context.getProperty( KEEP_SOURCE_FILE ).asBoolean();
    final ProcessorLog logger = getLogger();
    .
    .
    .
  }
}

* See PropertyDescriptor name and displayName

Tuesday, 26 January 2016

Today I was experiencing the difficulty of writing tests for a NiFi processor only to learn that there are helps from NiFi for this. I found the test for the original, JSON processor example.

package rocks.nifi.examples.processors;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import org.apache.commons.io.IOUtils;

import org.junit.Test;

import static org.junit.Assert.assertTrue;

import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.MockFlowFile;

public class JsonProcessorTest
{
  @Test
  public void testOnTrigger() throws IOException
  {
    // Content to be mock a json file
    InputStream content = new ByteArrayInputStream( "{\"hello\":\"nifi rocks\"}".getBytes() );

    // Generate a test runner to mock a processor in a flow
    TestRunner runner = TestRunners.newTestRunner( new JsonProcessor() );

    // Add properites
    runner.setProperty( JsonProcessor.JSON_PATH, "$.hello" );

    // Add the content to the runner
    runner.enqueue( content );

    // Run the enqueued content, it also takes an int = number of contents queued
    runner.run( 1 );

    // All results were processed with out failure
    runner.assertQueueEmpty();

    // If you need to read or do aditional tests on results you can access the content
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( JsonProcessor.SUCCESS );
    assertTrue( "1 match", results.size() == 1 );
    MockFlowFile result = results.get( 0 );
    String resultValue = new String( runner.getContentAsByteArray( result ) );
    System.out.println( "Match: " + IOUtils.toString( runner.getContentAsByteArray( result ) ) );

    // Test attributes and content
    result.assertAttributeEquals( JsonProcessor.MATCH_ATTR, "nifi rocks" );
    result.assertContentEquals( "nifi rocks" );
  }
}

What does it do? NiFi TestRunner mocks out the NiFi framework and creates all the underpinnings I was trying today to create. This makes it so that calls inside onTrigger() don't summarily crash. I stepped through the test (above) to watch it work. All the references to the TestRunner object are ugly and only to set up what the framework would create for the processor, but it does work.

Had to work through these problems...

java.lang.AssertionError: Processor has 2 validation failures:
'basename' validated against 'CCD_Sample' is invalid because 'basename' is not a supported property
'doctype' validated against 'XML' is invalid because 'doctype' is not a supported property

—these properties don't even exist.

Caused by: java.lang.IllegalStateException: Attempting to Evaluate Expressions but PropertyDescriptor[Path to Python Interpreter]
indicates that the Expression Language is not supported. If you realize that this is the case and do not want this error to occur,
it can be disabled by calling TestRunner.setValidateExpressionUsage(false)

—so I disabled it.

Then, I reached the session.read() bit which appeared to execute, but processing never returned. So, I killed it, set a breakpoint on the session.write(), and relaunched. It got there. I set a breakpoint on the while( ( line = reply.readLine() ) != null ) and it got there. I took a single step more and, just as before, processing never returned.

Then I break-pointed out.write() and the first call to session.putAttribute() and relaunched. Processing never hit those points.

Will continue this tomorrow.

Wednesday, 27 January 2016

I want to solve my stream problems in python-filter. Note that the source code to GetFile is not a good example of what I'm trying to do because I'm getting the data to work on from (the session.read()) stream, passing it to the Python filter, then reading it back and spewing it down (the session.write()) output stream. For the (session.read()) input stream, ...

...maybe switch from readLine() to a byte-by-byte processing? But, all the examples in all kinds of situations (HTTP, socket, file, etc.) are using readLine().

The old Murphism about a defective appliance demonstrated to a competent technician will fail to fail applies here. My code was actually working. What was making me think it wasn't? Uh, well, for one thing, all the flowfile = session.putAttributes( flowfile, ... ) calls were broken for the same reason (higher up yesterday) other properties stuff wasn't.

Where am I?

I had to append "quit" to the end of my test data because that's the only way to tell the Python script that I'm done sending data.

So, I asked Nathan, who determined using a test:

import sys
while True:
    input = sys.stdin.readline().strip()
    if input == '':
        break
    upperCase = input.upper()
    print( upperCase )
master ~/dev/python-filter $ echo "Blah!" | python poop.py
BLAH!
master ~/dev/python-filter $

...that checking the return from sys.stdin.readline() for being zero-length tells us to drop out of the read loop. But, this doesn't work in my code! Why?

I reasoned that in my command-line example, the pipe is getting closed. That's not happening in my NiFi code. So, I inserted a statement to close the pipe (see hilghted line below) as soon as finished writing what the input flowfile was offering:

session.read( flowfile, new InputStreamCallback()
{
  /**
   * There is probably a much speedier way than to read the flowfile line
   * by line. Check it out.
   */
  @Override
  public void process( InputStream in ) throws IOException
  {
    BufferedReader input = new BufferedReader( new InputStreamReader( in ) );
    String         line;

    while( ( line = input.readLine() ) != null )
    {
      output.write( line + "\n" );
      output.flush();
    }

    output.close();
  }
} );

NiFi validators list

StandardValidators Description
ATTRIBUTE_KEY_VALIDATOR ?
ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR ?
INTEGER_VALIDATOR is an integer value
POSITIVE_INTEGER_VALIDATOR is an integer value above 0
NON_NEGATIVE_INTEGER_VALIDATOR is an integer value above -1
LONG_VALIDATOR is a long integer value
POSITIVE_LONG_VALIDATOR is a long integer value above 0
PORT_VALIDATOR is a valid port number
NON_EMPTY_VALIDATOR is not an empty string
BOOLEAN_VALIDATOR is "true" or "false"
CHARACTER_SET_VALIDATOR contains only valid characters supported by JVM
URL_VALIDATOR is a well formed URL
URI_VALIDATOR is a well formed URI
REGULAR_EXPRESSION_VALIDATOR is a regular expression
ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR is a well formed key-value pair
TIME_PERIOD_VALIDATOR is of format duration time unit where duration is a non-negative integer and time unit is a supported type, one of { nanos, millis, secs, mins, hrs, days }.
DATA_SIZE_VALIDATOR is a non-nil data size
FILE_EXISTS_VALIDATOR is a path to a file that exists

I'm still struggling to run tests after adding in all the properties I wish to support. I get:

'Filter Name' validated against 'filter.py' is invalid because 'Filter Name' is not a supported property

...and I don't understand why. Clearly, there's no testing of NiFi with corresponding failure cases. No one gives an explanation of this error, how to fix it, or what's involved in creating a proper, supported property. It's trial and error. I lucked out with PYTHON_INTERPRETER and FILTER_REPOSITORY.

However, I found that they all began to pass as soon as I constructed them with validators. I had left validation off because a) as inferred above I wonder about validation and b) at least the last four are optional.

Now, the code is running again. However,

final String doctype  = flowfile.getAttribute( DOCTYPE );
final String pathname = flowfile.getAttribute( PATHNAME );
final String filename = flowfile.getAttribute( FILENAME );
final String basename = flowfile.getAttribute( BASENAME );

...gives me only FILENAME, because the NiFi TestRunner happens to set that one up. How does it know? The others return null, yet I've tried to set them. What's wrong is the corresondence between runner.setProperty() and the property and flowfile.getAttributes().

Let's isolate the properties handling (only) from the NiFi rocks example.


// JsonProcessor.java:
private List< PropertyDescriptor > properties;

public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder()
        .name( "Json Path" )
        .required( true )
        .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
        .build();

List< PropertyDescriptor > properties = new ArrayList<>();
properties.add( JSON_PATH );
this.properties = Collections.unmodifiableList( properties );

// JsonProcessorTest.java:
runner.setProperty( JsonProcessor.JSON_PATH, "$.hello" );

Hmmmm... I think there is a complete separation between flowfile attributes and processor properties. I've reworked all of that, and my code (still/now) works though there are unanswered questions.

Thursday, 11 February 2016

NiFi logging example...

final ProcessorLog logger = getLogger();

String key   = "someKey";
String value = "someValue";
logger.warn( "This is a warning message" );
logger.info( "This is a message with some interpolation key={}, value={}", new String[]{ key, value } );

My second NiFi processor at work.

Tuesday, 16 February 2016

Documenting practical consumption of GetFile and PutFile processors (more or less "built-ins") of NiFi. These are the differences from default properties:

GetFile: Configure Processor

  1. No auto-termination (Settings → Auto terminate relationships)
  2. 30 seconds (Scheduling → Run schedule)
    This is how many seconds between GetFile pulsing and sending the (same) file again.
  3. /tmp/nifi-source (Properties → Input Directory)
  4. test-lily-munster.csv (Properties → File Filter)

PutFile: Configure Processor

  1. Check failure and success (Settings → Auto terminate relationships)
    This is because I want to stop at PutFile in my example above—processing will not be going on to anywhere else.
  2. /tmp/nifi-output (Properties → Directory)

My own AppointmentsProcessor: Configure Processor

  1. Check FAILURE and ORIGINAL (Settings → Auto terminate relationships)
    This is because I want to stop on failure and do not wish to route original document on to the next point (PutFile).
  2. Bay Health (Properties → Provider)

The UI sequence appears to be:

  1. Perform all configuration.
  2. Any processor can be started because it will sit idle until something comes in or, if it's feeding into the pipeline and has something, it will just fill the pipeline which will await the next processor down-stream to start.

Monday, 22 February 2016

Looking to change my port number from 8080, the property is

nifi.web.http.port=8080

...and this is in NIFI_HOME/conf/nifi.properties. Also of interest may be:

nifi.web.http.host
nifi.web.https.host
nifi.web.https.port

Monday, 29 February 2016

I'm testing actual NiFi set-ups for cda-breaker and cda-filter this morning. The former went just fine, but I'm going to document the setting up of a pipeline with the latter since I see I have not completely documented such a thing before.

Nifi CdaFilterProcessor in its pipeline

I'm going to set up this:

I accomplish this by clicking, dragging and dropping three new processors to the workspace. Here they are and their configurations:

GetFile configuration

Just the defaults...

The 60 seconds keep the processor from firing over and over again (on the same file) before I can stop the pipeline: I only need it to fire once.

Get CDA_Sample1.xml out of /tmp/nifi-sources for the test.

From GetFile, a connection to CdaFilterProcessor must be created for the success relationship (I only pass this file along if it exists):

CdaFilterProcessor configuration

From CdaFilterProcessor, a connection to PutFile must be created for the original and success relationships:

PutFile configuration

Running the test..

(Of course, you must ensure that the source file, rules list and rules are in place for the test to have any chance of running.)

Once the three processors and two connections set up, you can turn the processors on in any order, but only once two are on at the same time, with data in the pipeline, will there be processing. Logically, GetFile if the first to light up.

  1. Click GetFile to select, then click the green start button in the button bar above.
  2. Next, select CdaFilterProcessor and start it too. It might not start, getting "No eligible components are selected. Please select the components to be started and ensure they are no longer running." If you thought you had set everything up right, dismiss this alert and hover the mouse pointer over the yellow, warning triangle of this processor. You may see something like, "'Path to rule pathnames' is invalid. Path to rule pathnames is required." Likely, you forgot to specify this path, as shown above.
  3. If nothing is running so far, did you copy CDA_Sample1.xml to /tmp/nifi-source? If not copy it. Note: It likely will not make a difference if you don't wait for the 60 seconds you set on GetFile. You can change this setting or just wait.
  4. Now, once you see bytes in the CdaFilterProcessor, this means you've run something.
  5. Click to select PutFile, then click the green start button.
  6. Stop GetFile (before it resubmits CDA_Sample1.xml to the filter).
  7. Check /tmp/filter-output for CDA_Sample1.cxml.

Monday, 7 March 2016

Have left off busy with other stuff. I just found out that the reason additionalDetails.html doesn't work is because one's expected—and this isn't what the JSON example I used to get started does—to produce a JAR of one's processor, then only wrap that in the NAR.

Without that, your trip to right-click the processor, then choosing Usage is unrewarding.

Wednesday, 9 March 2016

Attacking NiFi attributes today. You get them off a flow file. However, to add an attribute, putAttribute() returns a new or wrapped flow file. The attribute won't be there if you look at the old flow file. In this snippet, we just forget the old flow file and use the new one under the same variable.

FlowFile flowfile        = session.get();
String   attribute-value = flowfile.getAttribute( "attribute-name" );

flowfile  = session.putAttribute( flowfile, attributeName, attributeValue );

Wednesday, 23 March 2016

I'm working on remote debugging of NiFi processors using IntelliJ IDEA.

Steps to debugging a NiFi processor you've written. See this article to learn how remote debugging works in IntelliJ. The example uses IntelliJ to debug an application running in Tomcat, but it's a similar thing no matter if you're using Eclipse (or IntelliJ) to debug NiFi.

  1. Edit conf/bootstrap.conf and simply uncomment the line that starts java.arg.debug. If you like, change the port number (address) to what you want to use in steps 2e or 2f below.
  2. Create a Debug configuration.
    In IntelliJ IDEA, ...
    1. Do Run → Edit Configurations.
    2. Click the green +.
    3. Choose Remote (because it's a remote debugging session you want to create).
    4. Give a Name:, something like "Local NiFi".
    5. Under Settings, change the Port: to 8000 or to whatever you established in conf/bootstrap.conf. (Note: this number has nothing to do with launching your browser with http://localhost:8080/nifi)
    6. If you're only debugging a single processor in a project with multiple modules, set the drop-down Search sources using module's classpath: to the module in which it lives.
    In Eclipse, do
    1. Do Run → Debug Configurations....
    2. Choose Remote Java Application.
    3. Click on the New Launch Configuration icon (a tiny sheet of paper with a yellow plus sign in upper left of dialog).
    4. Give it a name like "Local NiFi".
    5. In Project:, type the name (or browse for it) of the project containing your processor code.
    6. Set the Port: to 8000 or whatever you established in conf/bootstrap.conf. (Note: this number has nothing to do with launching your browser with http://localhost:8080/nifi)
    7. Click Apply or, if you're ready, Debug.
  3. Launch NiFi (or bounce it).
  4. Set one or more breakpoints in your processor code.
  5. Launch the debugger.
    In IntelliJ IDEA, do Run → Debug and choose "Local NiFi" (or whatever you called it) from the list presented. This brings up the debugger and displays, "Connected to the target VM, address 'localhost:8000', transport: 'socket'
    In Eclipse, do Run → Debug Configurations..., scroll down and choose "Local NiFi" or whatever you called it. What you see should give you warm fuzzies, something akin to what I'm reporting seeing in IntelliJ IDEA.
  6. Prepare data flows into your processor.
  7. Start your processor; the IDE debugger should pop up stopped at the first breakpoint.
  8. Debug away.

Someone else had this experience setting up to debug on an AWS instance.

I stumbled upon something peculiar in NiFi. I had been attaching an attribute to a flowfile in the session.read() call-back. The NiFi unit-testing framework tolerated this, but when I ran my processor for real, it blew chunks (IllegalStateException). I solved the problem by saving the new attribute and attaching it outside the call-back just before calling session.transfer().

The only reason I'm pointing this out is because I somewhat rely on unit testing and hope to catch this level of error/gotcha earlier than button-pushing testing.

Thursday, 14 April 2016

Got "DocGenerator Unable to document: class xyz / NullPointerException" reported in nifi-app.log

This occurs (at least) when your processor's relationships field (and the return from getRelationships()) is null.

2016-04-14 14:23:26,529 WARN [main] o.apache.nifi.documentation.DocGenerator Unable to document: class com.etretatlogiciels.nifi.processor.HeaderFooterIdentifier
java.lang.NullPointerException: null
    at org.apache.nifi.documentation.html.HtmlProcessorDocumentationWriter.writeRelationships(HtmlProcessorDocumentationWriter.java:200) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.html.HtmlProcessorDocumentationWriter.writeAdditionalBodyInfo(HtmlProcessorDocumentationWriter.java:47) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.html.HtmlDocumentationWriter.writeBody(HtmlDocumentationWriter.java:129) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.html.HtmlDocumentationWriter.write(HtmlDocumentationWriter.java:67) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.DocGenerator.document(DocGenerator.java:115) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.DocGenerator.generate(DocGenerator.java:76) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.NiFi.(NiFi.java:123) [nifi-runtime-0.4.1.jar:0.4.1]
    at org.apache.nifi.NiFi.main(NiFi.java:227) [nifi-runtime-0.4.1.jar:0.4.1]

Monday, 18 April 2016

Here's how to document multiple dynamic properties. For only one, just use @DynamicProperty alone.

@SideEffectFree
@Tags( { "SomethingProcessor" } )
@DynamicProperties( {
  @DynamicProperty(
    name = "section-N.name",
    value = "Name",
    supportsExpressionLanguage = false,
    description = "Add the name to identify a section."
        + " N is a required integer to unify the three properties especially..."
        + " Example: \"section-1.name\" : \"illness\"."
  ),
  @DynamicProperty(
    name = "section-N.pattern",
    value = "Pattern",
    supportsExpressionLanguage = false,
    description = "Add the pattern to identify a section."
        + " N is a required integer to unify the three properties especially..."
        + " Example: \"section-1.pattern\" : \"History of Illness\"."
  ),
  @DynamicProperty(
    name = "section-N.NLP",
    value = "\"on\" or \"off\"",
    supportsExpressionLanguage = false,
    description = "Add whether to turn NLP on or off."
        + " N is a required integer to unify the three properties especially..."
        + " Example: \"section-1.nlp\" : \"on\"."
  )
} )
public class SomethingProcessor extends AbstractProcessor
{
  ...

Friday and Monday, 22 & 25 April 2016

Need to set up NiFi to work from home. First, download the tarball containing binaries from Apache nifi Downloads and explode it.

~/Downloads $ tar -zxf nifi-0.6.1-bin.tar.gz 
~/Downloads $ mv nifi-0.6.1 ~/dev
~/Downloads $ cd ~/dev
~/dev $ ln -s ./nifi-0.6.1/ ./nifi

Edit nifi-0.6.1/conf/bootstrap.conf and add my username.

# Username to use when running NiFi. This value will be ignored on Windows.
run.as=russ

Launch NiFi.

~/dev/nifi/bin $ ./nifi.sh start

Launch browser.

http://localhost:8080/nifi/

And there we are!

Tuesday, 26 April 2016

How to inject flow-file attributes:

TestRunner runner = TestRunners.newTestRunner( new VelocityTemplating() );

Map< String, String > flowFileAttributes = new HashMap<>();
flowFileAttributes.put( "name", "Velocity-templating Unit Test" );       // used in merging "Hello $name!"
.
.
.
runner.enqueue( DOCUMENT, flowFileAttributes );
.
.
.

If you install NiFi as a service, then wish to remove it, there are some files (these might not be the exact names) you'll need to remove:

Friday, 29 April 2016

Some useful comments on examing data during flow and debugging NiFi processors.

Andy LoPresto:

NiFi is intentionally designed to allow you to make changes on a very tight cycle while interacting with live data. This separates it from a number of tools that require lengthy deployment processes to push them to sandbox/production environments and test with real data. As one of the engineers says frequently, "NiFi is like digging irrigation ditches as the water flows, rather than building out a sprinkler system in advance."

Each processor component and queue will show statistics about the data they are processing and you can see further information and history by going into the Stats dialog of the component (available through the right-click context menu on each element).

To monitor actual data (such as the response of an HTTP request or the result of a JSON split), I'd recommend using the LogAttribute processor. You can use this processor to print the value of specific attributes, all attributes, flowfile payload, expression language queries, etc. to a log file and monitor the content in real-time. I'm not sure if we have a specific tutorial for this process, but many of the tutorials and videos for other flows include this functionality to demonstrate exactly what you are looking for. If you decide that the flow needs modification, you can also replay flowfiles through the new flow very easily from the provenance view.

One other way to explore the processors, is of course to write a unit test and evaluate the result compared to your expected values. This is more advanced and requires downloading the source code and does not use the UI, but for some more complicated usages or for people familiar with the development process, NiFi provides extensive flow mocking capabilities to allow developers to do this.

Matt Burgess:

As of at least 0.6.0, you can view the items in a queued connection. So for your example, you can have a GetHttp into a SplitJson, but don't start the SplitJson, just the GetHttp. You will see any flowfiles generated by GetHttp queued up in the success (or response?) connection (whichever you have wired to SplitJson). Then you can right-click on the connection (the line between the processors) and choose List Queue. In that dialog you can choose an element by clicking on the Info icon ('i' in a circle) and see the information about it, including a View button for the content.

The best part is that you don't have to do a "preview" run, then a "real" run. The data is in the connection's queue, so you can make alterations to your SplitJson, then start it to see if it works. If it doesn't, stop it and start the GetHttp again (if stopped) to put more data in the queue. For fine-grained debugging, you can temporarily set the Run schedule for the SplitJson to something like 10 seconds, then when you start it, it will likely only bring in one flow file, so you can react to how it works, then stop it before it empties the queue.

Joe Percivall:

Provenance is useful for replaying events but I also find it very useful for debugging processors/flows as well. Data Provenance is a core feature of NiFi and it allows you to see exactly what the FlowFile looked like (attributes and content) before and after a processor acted on it as well as the ability to see a map of the journey that FlowFile underwent through your flow. The easiest way to see the provenance of a processor is to right click on it and then click "Data provenance".

Thursday, 9 June 2016

Before explaining how to work with templates and process groups, it's necessary to define certain visual elements of the NiFi workspace. I'm just getting around to this because my focus until now has been to write and debug custom processors for others' use.

NiFi templates

Here's how to make a template in NiFi. Templates are used to reproduce work flows without having to rebuild them, from all their components and configuration, by hand. I don't know how yet to "carry" templates around between NiFi installations.

Here's a video on creating templates by the way. And here are the steps:

  1. SHIFT + MOUSE + drag to select a work flow.
  2. Click on the Create Template icon in the actions toolbar.
  3. Name and describe the template.
  4. Click the Create button, then click OK.

Here's how to consume an existing template in your NiFi workspace:

  1. Click on the Template icon in the Components Toolbar.
  2. Drag the template to the workspace.
  3. If there are no templates, Instantiate Template will tell you.
  4. Select the desired template from the modal's drop-down; hovering over one you'll see the description appear.

Process groups

You can turn a template into a process group in order to push the detail of how it's built down to make your work flow simpler at the top level.

Here's a video on process groups.

  1. Drag the process group from the Process Group icon on the (Components) toolbar over the NiFi workspace.
  2. Give the process group a name.
  3. Go into the process group (which will be blank at first) by double-clicking it.
  4. (Notice the bread crumbs in the workspace.)

Sometimes you already have a flow you want to transform into a process group.

  1. Select process group, just as for creating a template, and click the Group button from the actions toolbar.
  2. Give the process group a name and click Add.

You can zoom in (or out) on the process group. It looks like a processor because it performs the same function as a processor, only it does a lot more since it subsumes multiple processors.

However, a process group must have explicit input- and output ports in order to treat it as a processor:

  1. Double-click the process group.
  2. Click and drag an Input Port from the Components Toolbar.
  3. Name it ("to" plus the process-group name?) and click Add.
  4. Click the new input port and drag to the "entering" processor.
  5. Click Add.
  6. Similarly, for the output port...
  7. Click and drag an Output Port from the Components Toolbar.
  8. Name it ("from" plus the process-group name?) and click Add.
  9. Drag a connection from the exiting processor (or a funnel) to the output port and click Add.
  10. Go back to the next root up and add the connection from the process group to the consuming processor.
  11. Add the connection from the supplying processor to the process group.

Exporting templates for use in other NiFi installations

...and the current installation too. Here's a video on exporting templates.

  1. Assuming you have one or more templates, click the Templates button in the Management Toolbar. This brings up the Template Management dialog.
  2. Click the Download button to download the template as an XML file.

  3. Save the XML file to ~/Downloads or wherever you want it.
  4. Send it to a friend.

Note: if your template uses processors or other components that don't exist in the target NiFi installation (that your template is taken to), then it can be imported, but when attempting to use it in that installation, NiFi will issue an error (in a modeless alert).

Importing a template

This uses the Template Management dialog too.

  1. Click the Templates button in the Management Toolbar.
  2. Click Browse to go find a template XML file you've copied into your filesystem somewhere.
  3. Navigate to that file and click Open.
  4. The new template should show up in your list.
  5. Exit the modal dialog and consume the template as noted.

Monday, 13 June 2016

Some NiFi notes here on logging...

Beginning in NiFi 1.0, it can be said that it used to be that NiFi logging would be done by each processor at the INFO level to say what it's doing for each flowfile. However, this became extremely abusive in terms of logging diskspace, so the the minimum level for processors was instead set to WARN. In conf/logback.xml this can be changed back by setting the log level of org.apache.nifi.processors.

Prior to 1.0, however, one saw this:

.
.
.
<logger name="org.apache.nifi" level="INFO" />
.
.
.

Friday, 8 July 2016

Today I'm studying the NiFi ReST (NiFi API) which provides programmatic access to command and control of a NiFi instance (in real time). Specifically, I'm interested in monitoring, indeed, in obtaining an answer to questions like:

  1. How many times a processor processed something?
  2. How many flowfiles were processed?
  3. How many flowfiles were produced?
  4. Can I get this information per processor?
  5. How can I tell different instances of the same processor apart?
    • ...in what's reported?
    • ...in asking about a specific processor instance?

Also, I'm interested in:

  1. Getting a list of items on the NiFi canvas like processors and relationships.
  2. How many times processors or a processor failed?
  3. What is the profile of resource usage?
    • memory consumed
    • ?
  4. What is the profile of processor latency?
    • flowfiles backed up awaiting processing
    • flowfiles in other states (?)

But first, let's break this ReST control down by service. I'm not interested in all of these today, but since I've got the information in a list, here it is:

Service Function(s)
Controller Get controller configuration, search the flow, manage templates, system diagnostics
Process Groups Get the flow, instantiate a template, manage sub groups, monitor component status
Processors Create a processor, set properties, schedule
Connections Create a connection, set queue priority, update connection destination
Input Ports Create an input port, set remote port access control
Output Ports Create an output port, set remote port access control
Remote Process Groups Create a remote group, enable transmission
Labels Create a label, set label style
Funnels Manage funnels
Controller Services Manage controller services, update controller service references
Reporting Tasks Manage reporting tasks
Cluster View node status, disconnect nodes, aggregate component status
Provenance Query provenance, search event lineage, download content, replay
History View flow history, purge flow history
Users Update user access, revoke accounts, get account details, group users

Let me make a quick list of end-points that pique my interest and I'll try them out, make further notes, etc.

Service Verb URI Notes
Controller GET /controller/config retrieve configuration
GET /controller/counter current counters

Here's dynamic access to all of the service documentation: https://nifi.apache.org/docs/nifi-docs/rest-api/

How to use the ReST API

If I'm accessing my NiFi canvas (flow) using http://localhost:8080/nifi, I use this URI to reach the ReST end-points:

http://localhost:8080/nifi-api/ service / end-point

For example, http://localhost:8080/nifi-api/controller/config returns (I'm running this against a NiFI flow that's got two flows of 3 and 2 processors with relationships between them that have sat for a couple of weeks unused) this:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<controllerConfigurationEntity>
    <revision>
        <clientId>87718998-e57b-43ac-9e65-15f2b8e0feaa</clientId>
    </revision>
    <config>
        <autoRefreshIntervalSeconds>30</autoRefreshIntervalSeconds>
        <comments></comments>
        <contentViewerUrl>/nifi-content-viewer/</contentViewerUrl>
        <currentTime>13:23:01 MDT</currentTime>
        <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
        <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
        <name>NiFi Flow</name>
        <siteToSiteSecure>true</siteToSiteSecure>
        <timeOffset>-21600000</timeOffset>
        <uri>http://localhost:8080/nifi-api/controller</uri>
    </config>
</controllerConfigurationEntity>

I'm using the Firefox </> RESTED client. What about JSON? Yes, RESTED allows me to add Accept: application/json:

{
  "revision":
  {
    "clientId": "f841561c-60c6-4d9b-a3ce-d2202789eef7"
  },
  "config":
  {
    "name": "NiFi Flow",
    "comments": "",
    "maxTimerDrivenThreadCount": 10,
    "maxEventDrivenThreadCount": 5,
    "autoRefreshIntervalSeconds": 30,
    "siteToSiteSecure": true,
    "currentTime": "14:06:18 MDT",
    "timeOffset": -21600000,
    "contentViewerUrl": "/nifi-content-viewer/",
    "uri": "http://localhost:8080/nifi-api/controller"
  }
}

I wanted to see the available processors in a list. I did this via: http://localhost:8080/nifi-api/controller/processor-types

However, I could not figure out what to supply as {processorId} to controller/history/processors/{processorId}. I decided to use a processor name: http://localhost:8080/nifi-api/controller/history/processors/VelocityTemplating
This did the trick:

{
  "revision":
  {
    "clientId": "86082d22-9fb2-4aea-86bc-efe062861d51"
  },
  "componentHistory":
  {
    "componentId": "VelocityTemplating",
    "propertyHistory": {}
  }
}

I notice that I can't see the name I give the processor. For example, I have a GetFile that I named Get PDF files from test fodder. I can't find any API that returns that.

Now, about instances of processors. The forum told me that, even if I'm not using any process groups, there is one process group for what I am using, namely root; this URI is highlighted below:

http://localhost:8080/nifi-api/controller/process-groups/root/status
{
  "revision":
  {
    "clientId": "98d70ac0-476c-4c6b-bcbc-3ec91193d994"
  },
  "processGroupStatus":
  {
    "bulletins": [],
    "id": "50e3886b-984d-4e90-a889-d46dc53c4afc",
    "name": "NiFi Flow",
    "connectionStatus":
    [
      {
        "id": "f3a032d6-6bd1-4cc5-ab5e-8923e131c222",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "success",
        "input": "0 / 0 bytes",
        "queuedCount": "0",
        "queuedSize": "0 bytes",
        "queued": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "sourceId": "9929acf6-aac7-46d9-a0dd-f239d66e1594",
        "sourceName": "Get PDFs from inbox",
        "destinationId": "c396af51-2221-4bfd-ba31-5f4b5913b910",
        "destinationName": "Generate XML from advance directives template"
      },
      {
        "id": "e5ba7858-8301-4fd0-885e-6fb63b336e53",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "success",
        "input": "0 / 0 bytes",
        "queuedCount": "0",
        "queuedSize": "0 bytes",
        "queued": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "sourceId": "c396af51-2221-4bfd-ba31-5f4b5913b910",
        "sourceName": "Generate XML from advance directives template",
        "destinationId": "1af4a618-ca9b-4697-81b5-3ba1bbfaba6b",
        "destinationName": "Put generated XMLs to output folder"
      },
      {
        "id": "53e4a0e9-ce20-4e4d-84f3-d12504edeccc",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "success",
        "input": "0 / 0 bytes",
        "queuedCount": "3",
        "queuedSize": "25.27 KB",
        "queued": "3 / 25.27 KB",
        "output": "0 / 0 bytes",
        "sourceId": "4435c701-0719-4231-8867-671db08dd813",
        "sourceName": "Get PDF files from test fodder",
        "destinationId": "5f28baeb-b24c-4c3f-b6d0-c3284553d6ac",
        "destinationName": "Put PDFs as in-box items"
      }
    ],
    "processorStatus":
    [
      {
        "id": "c396af51-2221-4bfd-ba31-5f4b5913b910",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Generate XML from advance directives template",
        "type": "VelocityTemplating",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "9929acf6-aac7-46d9-a0dd-f239d66e1594",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Get PDFs from inbox",
        "type": "GetInbox",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "5f28baeb-b24c-4c3f-b6d0-c3284553d6ac",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Put PDFs as in-box items",
        "type": "PutInbox",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "1af4a618-ca9b-4697-81b5-3ba1bbfaba6b",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Put generated XMLs to output folder",
        "type": "PutFile",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "4435c701-0719-4231-8867-671db08dd813",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Get PDF files from test fodder",
        "type": "GetFile",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      }
    ],
    "processGroupStatus": [],
    "remoteProcessGroupStatus": [],
    "inputPortStatus": [],
    "outputPortStatus": [],
    "input": "0 / 0 bytes",
    "queuedCount": "3",
    "queuedSize": "25.27 KB",
    "queued": "3 / 25.27 KB",
    "read": "0 bytes",
    "written": "0 bytes",
    "output": "0 / 0 bytes",
    "transferred": "0 / 0 bytes",
    "received": "0 / 0 bytes",
    "sent": "0 / 0 bytes",
    "activeThreadCount": 0,
    "statsLastRefreshed": "09:53:12 MDT"
  }
}

This sort of hits the jackpot. What we see above are:

  1. status of connections and processors
  2. processor name (type): "type" : "VelocityTemplating"
  3. processor name (name): "name" : "Generate XML from advance directives template"
  4. "runStatus"
  5. "read" : 0 bytes"
  6. "written" : "0 bytes"
  7. "input" : "0 / 0 bytes"
  8. "output" : "0 / 0 bytes"
  9. "tasksDuration" : "00:00:00.000"
  10. all-processor totals
  11. other stuff

So, let's see if we can't dig into this stuff above using Python.

My Python's not as good as it was a year ago when I last coded in this language, but Python is extraordinarily useful, as compared to anything else (yeah, okay, I'm not too keen on Ruby as compared to Python and I'm certainly down on Perl) for writing these scripts. To write this in Java, you'd end up with fine software, but it is more work (to juggle the JSON-to-POJO transformation) and of course, a JAR isn't as brainlessly usable as your command-line is mucker than in a proper scripting language.

This illustrates a cautious traversal down to processor names (and types) which shows the way because there are countless additional tidbits to show like process group-wide status. Remember, this is a weeks-old NiFi installation that's just been sitting there. I've made no attempt to shake it around so that the status numbers get more interesting. Here's my first script:

import sys
import json
import globals
import httpclient

def main( argv ):
    if( len( argv ) > 1 and argv[ 1 ] == '-d' or argv[ 1 ] == '--debug' ):
        DEBUG = True
    URI = globals.NIFI_API_URI( "controller" ) + 'process-groups/root/status'

    print( 'URI = %s' % URI )

    response = httpclient.get( URI )
    jsonDict = json.loads( response )

    print
    print 'Keys back from request:'
    print list( jsonDict.keys() )

    processGroupStatus = jsonDict[ 'processGroupStatus' ]
    print
    print 'processGroupStatus keys:'
    print list( processGroupStatus.keys() )

    connectionStatus = processGroupStatus[ 'connectionStatus' ]
    print
    print 'connectionStatus keys:'
    print connectionStatus

    processorStatus  = processGroupStatus[ 'processorStatus' ]
    print
    print 'processorStatus:'
    print '  There are %d processors listed.' % len( processorStatus )
    for processor in processorStatus:
        print( '  (%s): %s' % ( processor[ 'type' ], processor[ 'name' ] ) )


if __name__ == "__main__":
    if len( sys.argv ) <= 1:
        args = [ 'first.py' ]
        main( args )
        sys.exit( 0 )
    elif len( sys.argv ) >= 1:
        sys.exit( main( sys.argv ) )

Output:

URI = http://localhost:8080/nifi-api/controller/process-groups/root/status
Keys back from request:
[u'processGroupStatus', u'revision']
processGroupStatus keys:
[u'outputPortStatus', u'received', u'queuedCount', u'bulletins', u'activeThreadCount', u'read', u'queuedSize', ...]
connectionStatus keys:
[{u'queuedCount': u'0', u'name': u'success', u'sourceId': u'9929acf6-aac7-46d9-a0dd-f239d66e1594', u'queuedSize': u'0 bytes' ...]
processorStatus:
There are 5 processors listed.
(VelocityTemplating): Generate XML from advance directives template
(GetInbox): Get PDFs from inbox
(PutInbox): Put PDFs as in-box items
(PutFile): Put generated XMLs to output folder
(GetFile): Get PDF files from test fodder

Wednesday, 13 July 2016

I'm convinced that Reporting Tasks, which are written very similarly to Processors, but not deployed like them (so, using a reporting task isn't so much a matter of tediously clicking and dragging boxes, but of enabling task reporting including custom task reporting for the whole flow or process group) are the best approach to monitoring. See also Re: Workflow monitoring/notifications.

(The second of the three NiFi architectural components is the Controller Service.)

The NiFi ReST end-points are very easy to use, but there is some difficulty in their semantics which appear a bit random still to my mind. The NiFi canvas or web UI is built atop these end-points (of course) and It is possible to follow what's happening using the Developer tools in your browser (Firefox: Developer Tools → Network, then click desired method to see URI, etc.). I found that a little challenging because I've been pretty much a back-end guy who's never used the browser developer intensively.

I found it very easy, despite not being a rockstar Python guy, to write simple scripts to hit NiFi. And I did. However, realize that the asynchrony of this approach presents a challenge given that a work flow can be very dynamic. Examining something in a NiFi flow from Python is like trying to inspect a flight to New York to see if your girlfriend is sitting next to your rival as it passes over Colorado, Nebraska, Iowa, etc. It's just not the best way. The ids of individual things you're looking for are not always easily ascertained and can change as the plane transits from Nebraska to Iowa to Illinois. Enabling task report or even writing a custom reporting task and enabling that strikes me as the better solution though it forces us to design or conceive of what we want to know up front. This isn't necessarily bad in the long-run.

Gain access to reporting tasks by clicking the Controller Settings in the Management Toolbar.


Here's a coded example, ControllerStatusReportingTask. The important methods to implement appear to be:

The onTrigger( ReportingContext context ) method is given access to reporting context which carries the following (need to investigate how this is helpful):

But, ControllerStatusReportingTask!

Yes, it just already exists, right? So use it? ControllerStatusReportingTask, that yields the following, which may be sufficient to most of our needs.

Information logged:

By default, the output from this reporting task goes to the NiFi log, but can be redirected elsewhere by modifying conf/logback.xml. (This would constitute a new logger, something like?)

<appender name="STATUS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
  <file>logs/nifi-status.log</file>
  <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
    <fileNamePattern>./logs/nifi-status_%d.log</fileNamePattern>
    <maxHistory>30</maxHistory>
  </rollingPolicy>
  <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
    <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
  </encoder>
</appender>
.
.
.
<logger name="org.apache.nifi.controller.ControllerStatusReportingTask" level="INFO" additivity="false">
  <appender-ref ref="STATUS_FILE" />
</logger>

So, I think what we want is already rolled for us by NiFi and all we need do is:

  1. Modify conf/logback.xml to separate its output from the rest of logging,
  2. Hit that log file to get the data into HEKA.

Thursday, 14 July 2016

Today, I followed these steps to reporting task happiness—as an experiment. I removed all log files in order to minimize the confusion and see differences in less space to look through.

    Establish status quo...
  1. Removed all NiFi logs and bounced NiFi, running NiFi 0.6.1.
  2. Clicked on Controller Settings icon in the Management Toolbar.
  3. Clicked Reporting Tasks tab.
  4. Configured as reporting task leaving defaults (in particular, Run schedule 5 minutes).
  5. Clicked Start (green arrow) icon.
  6. Begin first observation...
  7. Removed all NiFi logs and bounced NiFi.
  8. I note that ControllerStatusReportingTask is running after bounce.

    fgrep reporting *.log yields:

    nifi-app.log: org.apache.nifi.reporting.ganglia.StandardGangliaReporter || org.apache.nifi.nar.NarClassLoader[./work/nar/extensions/nifi-standard-nar-0.6.1.nar-unpacked]
    nifi-app.log: org.apache.nifi.reporting.ambari.AmbariReportingTask || org.apache.nifi.nar.NarClassLoader[./work/nar/extensions/nifi-ambari-nar-0.6.1.nar-unpacked]
    nifi-user.log:2016-07-14 09:49:09,509 INFO [NiFi Web Server-25] org.apache.nifi.web.filter.RequestLogger Attempting request for (anonymous) GET http://localhost:8080/nifi-api/controller/reporting-task-types (source ip: 127.0.0.1)
    nifi-user.log:2016-07-14 09:49:09,750 INFO [NiFi Web Server-70] org.apache.nifi.web.filter.RequestLogger Attempting request (ibid)
    nifi-user.log:2016-07-14 09:49:26,596 INFO [NiFi Web Server-24] org.apache.nifi.web.filter.RequestLogger Attempting request (ibid)
    
  9. I launched GetFile (Get PDF files from test fodder) and waited a few seconds for it to run and produce flowfiles.
  10. I launched PutInbox (Put PDFs as in-box items) and waited a few seconds for it to run and produce flowfiles.

    Even after waiting minutes, I see no difference when running the grep. However, I launch vim on nifi-app.log and see, here and there, stuff like:

    2016-07-14 09:48:50,481 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Processors Processor Statuses:
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Processor Name | Processor ID | Processor Type | Run Status | Flow Files In | Flow Files Out | Bytes Read | Bytes Written | Tasks | Proc Time |
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Generate XML from advance dire | c396af51-2221-4bfd-ba31-5f4b5913b910 | VelocityTemplating | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Get PDFs from inbox | 9929acf6-aac7-46d9-a0dd-f239d66e1594 | GetInbox | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Put PDFs as in-box items | 5f28baeb-b24c-4c3f-b6d0-c3284553d6ac | PutInbox | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Put generated XMLs to output f | 1af4a618-ca9b-4697-81b5-3ba1bbfaba6b | PutFile | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Get PDF files from test fodder | 4435c701-0719-4231-8867-671db08dd813 | GetFile | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    2016-07-14 09:48:50,483 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Connections Connection Statuses:
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Connection ID | Source | Connection Name | Destination | Flow Files In | Flow Files Out | FlowFiles Queued |
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | 53e4a0e9-ce20-4e4d-84f3-d12504edeccc | Get PDF files from test fodder | success | Put PDFs as in-box items | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 9 / 75.82 KB (+9/+75.82 KB) |
    | f3a032d6-6bd1-4cc5-ab5e-8923e131c222 | Get PDFs from inbox | success | Generate XML from advance dire | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) |
    | e5ba7858-8301-4fd0-885e-6fb63b336e53 | Generate XML from advance dire | success | Put generated XMLs to output f | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) |
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    

    (I would figure out a little later that this is the start-up view before anthing's happened.)

  11. Begin attempting to redirect reporting task observation...
  12. I inserted this near the bottom of conf/logback.xml:
    <!-- Logger to redirect reporting task output to a different file. -->
    <logger name="org.apache.nifi.controller.ControllerStatusReportingTask" level="INFO" additivity="false">
    <appender-ref ref="STATUS_FILE" />
    </logger>
    
  13. ...and I inserted this near the top of conf/logback.xml, just after the definition of BOOTSTRAP_FILE rolling appender:
    <!-- rolling appender for controller-status reporting task log. -->
    <appender name="STATUS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>logs/nifi-status.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
    <fileNamePattern>./logs/nifi-status_%d.log</fileNamePattern>
    <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
    <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
    </encoder>
    </appender>
    
  14. Removed all NiFi logs and bounced NiFi.
  15. Looked at logs:
    ~/dev/nifi/logs $ ll
    total 76
    drwxr-xr-x 2 russ russ 4096 Jul 14 10:28 .
    drwxr-xr-x 13 russ russ 4096 Jul 14 09:37 ..
    -rw-r--r-- 1 russ russ 65016 Jul 14 10:29 nifi-app.log
    -rw-r--r-- 1 russ russ 3069 Jul 14 10:28 nifi-bootstrap.log
    -rw-r--r-- 1 russ russ 0 Jul 14 10:28 nifi-status.log
    -rw-r--r-- 1 russ russ 0 Jul 14 10:28 nifi-user.log
    
  16. I launched GetFile (Get PDF files from test fodder) and waited a few seconds for it to run and produce flowfiles.

  17. I launched PutInbox (Put PDFs as in-box items) and waited a few seconds for it to run and produce flowfiles.

  18. I never see nifi-status.log grow in size. I launch vim on nifi-app.log and see, just as before (but this time I'm scraping the stuff showing work accomplished in bold):
    2016-07-14 10:34:06,643 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Processors Processor Statuses:
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Processor Name | Processor ID | Processor Type | Run Status | Flow Files In | Flow Files Out | Bytes Read | Bytes Written | Tasks | Proc Time |
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Get PDF files from test fodder | 4435c701-0719-4231-8867-671db08dd813 | GetFile | Stopped | 0 / 0 bytes (+0/+0 bytes) | 6 / 50.54 KB (+6/+50.54 KB) | 50.54 KB (+50.54 KB) | 50.54 KB (+50.54 KB) | 2 (+2) | 00:00:00.056 (+00:00:00.056) |
    | Put PDFs as in-box items | 5f28baeb-b24c-4c3f-b6d0-c3284553d6ac | PutInbox | Stopped | 6 / 50.54 KB (+6/+50.54 KB) | 0 / 0 bytes (+0/+0 bytes) | 50.54 KB (+50.54 KB) | 0 bytes (+0 bytes) | 1 (+1) | 00:00:00.014 (+00:00:00.014) |
    | Generate XML from advance dire | c396af51-2221-4bfd-ba31-5f4b5913b910 | VelocityTemplating | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Get PDFs from inbox | 9929acf6-aac7-46d9-a0dd-f239d66e1594 | GetInbox | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Put generated XMLs to output f | 1af4a618-ca9b-4697-81b5-3ba1bbfaba6b | PutFile | Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    2016-07-14 10:34:06,644 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Connections Connection Statuses:
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Connection ID | Source | Connection Name | Destination | Flow Files In | Flow Files Out | FlowFiles Queued |
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | f3a032d6-6bd1-4cc5-ab5e-8923e131c222 | Get PDFs from inbox | success | Generate XML from advance dire | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) |
    | e5ba7858-8301-4fd0-885e-6fb63b336e53 | Generate XML from advance dire | success | Put generated XMLs to output f | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) |
    | 53e4a0e9-ce20-4e4d-84f3-d12504edeccc | Get PDF files from test fodder | success | Put PDFs as in-box items | 6 / 50.54 KB (+6/+50.54 KB) | 6 / 50.54 KB (+6/+50.54 KB) | 0 / 0 bytes (+0/+0 bytes) |
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    
  19. So, this was a partial failure...

What failed is getting the reported status into nifi-status.log. What succeeded is, just as was working before, getting controller status.

I got a response out of [email protected] that this was a reported bug and it's been fixed in 0.7.0 and 1.0.0. Also note that %logging{40} is some log-back incantation to affect the package name such that a name like org.nifi.controller.ControllerStatusReportingTask becomes o.n.c.ControllerStatusReportingTask. (See "Pattern Layouts" at Chapter 6: Layouts.)

I upgraded my NiFi version to 0.7.0 at home and at the office.

Tuesday, 23 August 2016

Apache NiFi expression language notes

I need to take time out to grok this.

  1. Flow file
  2. Attributes on flow file
    • filename
    • path
  3. NiFi expression language syntax

Hmmmm... yeah, there's precious little written on this stuff that really corrals it. So, I wrote this (albeit non-functional) example using the information above plus what I already know about flowfiles, properties and attributes:

package com.etretatlogiciels;

import java.util.HashMap;
import java.util.Map;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;

public class FunWithNiFiProperties
{
  private static final PropertyDescriptor EL_HOSTNAME = new PropertyDescriptor
                        .Builder()
                        .name( "Hostname (supporting EL)" )
                        .description( "Hostname property that supports NiFi Expression Language syntax"
                            + "This is the name of the flowfile attribute to expect to hold the hostname" )
                        .required( false )
                        .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
                        .expressionLanguageSupported( true )                // !
                        .build();
  private static final PropertyDescriptor HOSTNAME    = new PropertyDescriptor
                        .Builder()
                        .name( "Hostname" )
                        .description( "Hostname property that doesn't support NiFi Expression Language syntax."
                            + "This is the name of the flowfile attribute to expect to hold the hostname" )
                        .required( false )
                        .addValidator( Validator.VALID )
                        .build();

  // values will end up with a map of relevant data, names and values...
  private Map< String, String > values = new HashMap<>();

  /**
   * Example of getting a hold of a property value via what the property value evaluates to
   * including what not evaluating it as an expression yields.
   */
  private void expressionLanguageProperty( final ProcessContext context, final ProcessSession session )
  {
    FlowFile      flowFile = session.get();
    PropertyValue property = context.getProperty( EL_HOSTNAME );
    String        value    = property.getValue();       // (may contain expression-language syntax)

    // what's our property and its value?
    values.put( EL_HOSTNAME.getName(), value );

    // is there a flowfile attribute of this precise name (without evaluating)?
    // probably won't be a valid flowfile attribute if contains expression-language syntax...
    values.put( "hostname", flowFile.getAttribute( value ) );

    //         -- or --

    // get NiFi to evaluate the possible expression-language of the property as a flowfile attribute name...
    values.put( "el-hostname", property.evaluateAttributeExpressions( flowFile ).getValue() );
  }

  /**
   * Example of getting a hold of a property value the normal way, that is, not involving
   * any expression syntax.
   */
  private void normalProperty( final ProcessContext context, final ProcessSession session )
  {
    FlowFile      flowFile = session.get();
    PropertyValue property = context.getProperty( HOSTNAME );
    String        value    = property.getValue(); // (what's in HOSTNAME)

    // what's our property and its value?
    values.put( HOSTNAME.getName(), value );

    // is there a flowfile attribute by this precise name?
    values.put( "hostname-attribute-on-flowfile", flowFile.getAttribute( value ) );
  }
}

This is all I think I know so far:

  1. Expression-language syntax can only be used as the value of a processor property.
  2. (Not exactly true: it can be transmitted not only via a processor property value, but by other input means as well including a flowfile attribute value.)
  3. Expression-language syntax can only be evaluated by the evaluateAttributeExpressions() method, so it can only be used for the purpose of finding a specially named flowfile attribute (and get its value).

Friday, 26 August 2016

Yesterday, I finally put together a tiny example of a full, custom NiFi project minus additionalDetails.html—for that, look in the notes on the present page). See A simple NiFi processor project.

Thursday, 8 September 2016

Here's one for your gee-whiz collection: if you do

public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
{
  FlowFile flowfile = session.get();
  .
  .
  .
  someMethod( session );

...NiFi has removed the flowfile from the session's processorQueue and a subsequent call:

void someMethod( final ProcessSession session )
{
  FlowFile flowfile = session.get();

...will produce a null flowfile. This is an interesting side-effect.

Wednesday, 14 September 2016

Missing flowfiles...

In what situation would onTrigger() get called if there is no flowfile?

This can happen for a few reasons. Because the processor has...

  1. ...no incoming connections (it's a source processor),
  2. ...@ScheduleWhenEmpty annotation,
  3. ...more than one concurrent task,
  4. (others?)

The main reason is the third. If you have multiple concurrent tasks, Thread 1 can determine that there is 1 flowfile queued. Thread 2 then determines that there is one flowfile queued. Both threads call onTrigger(), thread 1 gets the flowfile, and thread 2 gets nothing (null).

Backing up NiFi flows, using templates, etc....

Is there a best practice for keeping a backup of data flows, assuming using flow.xml.gz and configuration files, it's possible to restore NiFi in case of any failure? Is it possible simply to "drop" this file into a new NiFi installation or should templates be used in development that get imported into production?

It's a good idea to back up flow.xml.gz and configuration files. But distinguish between using these backups to recreate an equivalent new NiFi installation and attempting to reset the state of the existing one. The difference is the live data in the flow, in the provenance repository, in state variables, etc.: Restoring a flow definition that doesn't match (or no longer matches) your content and provenance data may have unexpected results for you and for systems connecting with the NiFi installation. NiFi tries hard to handle these changes smoothly, but it isn't a magic time machine.

Deploying flow.xml.gz can work, especially when deployed with configuration files that reference IDs in the flow (like authorizations.xml) or the nifi.sensitive.props.key setting, and others. But if you overwrite a running flow, you will still have data migration problems.

Templates are the current recommended best practice for deployment. They provide:

  1. concise packaging for deployment,
  2. separation between site-specific configuration like authorizations from the flow logic,
  3. workflow that allows, encourages and forces the administrator to address migration from the existing flow to incorporate the new template.

Is there a way to predeploy templates to the template directory and have NiFi can recognize them?

Templates have been polished quite a bit to be more deterministic and source-control friendly so they are an ideal artifact to be versioned and kept in source-control repository.

When NiFi starts up, it looks in conf/templates (by default—this directory can be changed in nifi.properties). It looks for any file that has a suffix of .template or .xml so be sure the files are named properly. If starting a cluster, ensure all nodes in the cluster have these same templates or the latter will not show up.

Friday, 23 September 2016

Here's a small, complete example of a processor that translates attributes names or removes attributes. I found UpdateAttributes to be overly complicated and hard to use.

NiFiStandardAttributes.java:
package com.etretatlogiciels.nifi.attributes;

import java.util.ArrayList;
import java.util.List;

public class NiFiStandardAttributes
{
  public static final List< String > FLOWFILE_ATTRIBUTES;

  static
  {
    FLOWFILE_ATTRIBUTES = new ArrayList<>( 5 );
    FLOWFILE_ATTRIBUTES.add( "filename" );
    FLOWFILE_ATTRIBUTES.add( "uuid" );
    FLOWFILE_ATTRIBUTES.add( "path" );
  }

  public static final int FLOWFILE_ATTRIBUTE_COUNT = FLOWFILE_ATTRIBUTES.size();
}
TranslateAttributeNamesTest.java:
package com.etretatlogiciels.nifi.processor;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.etretatlogiciels.nifi.attributes.NiFiStandardAttributes;
import com.etretatlogiciels.testing.TestUtilities;

/**
 * @author Russell Bateman
 * @since September 2016
 */
public class TranslateAttributeNamesTest
{
  // @formatter:off
  @Rule   public TestName name = new TestName();
  @Before public void setUp() throws Exception { TestUtilities.setUp( name ); }
  @After  public void tearDown() throws Exception { }

  private static final boolean VERBOSE = true;//TestUtilities.VERBOSE;

  private static final String TRANSLATE_JSON ="{"
                                            + "    \"inner-MIME\" : \"mime-type\","
                                            + "    \"lose-this\" : \"\","
                                            + "    \"lose-this-too\" : null"
                                            + "}";

  @Test
  public void testBasic()
  {
    TestRunner runner = TestRunners.newTestRunner( new TranslateAttributeNames() );

    final int         ONE      = 1;
    final String      CONTENTS = "This is a test whose flowfile contents are insignificant!";
    final InputStream MESSAGE  = new ByteArrayInputStream( CONTENTS.getBytes() );

    runner.setProperty( TranslateAttributeNames.TRANSLATIONS, TRANSLATE_JSON );

    Map< String, String > flowFileAttributes = new HashMap<>();
    flowFileAttributes.put( "testname",   "X12 Message Router Unit Test" );
    flowFileAttributes.put( "inner-MIME", "application/pdf" );
    flowFileAttributes.put( "lose-this", "" );
    flowFileAttributes.put( "lose-this-too", null );
    flowFileAttributes.put( "dont-touch-this-one", "Don't touch this attribute!" );

    if( VERBOSE )
    {
      System.out.println( "Attributes on the original flowfile:" );
      Map< String, String > sortedAttributes = new TreeMap<>( flowFileAttributes );
      for( Map.Entry< String, String > attribute : sortedAttributes.entrySet() )
        System.out.println( "  " + attribute.getKey() + " = " + attribute.getValue() );
    }

    runner.setValidateExpressionUsage( false );
    runner.enqueue( MESSAGE, flowFileAttributes );

    runner.run( ONE );
    runner.assertQueueEmpty();
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( TranslateAttributeNames.SUCCESS );
    MockFlowFile         result  = results.get( 0 );
    String               actual  = new String( runner.getContentAsByteArray( result ) );

    if( VERBOSE )
    {
      System.out.println( "Surviving flowfile contents:" );
      System.out.println( "  " + actual );
      System.out.println( "Attributes surviving on the flowfile:" );
      Map< String, String > sortedAttributes = new TreeMap<>( result.getAttributes() );
      for( Map.Entry< String, String > attribute : sortedAttributes.entrySet() )
      {
        String key = attribute.getKey();

        if( !NiFiStandardAttributes.FLOWFILE_ATTRIBUTES.contains( key ) )
          System.out.println( "  " + key + " = " + attribute.getValue() );
      }
    }

    assertTrue( "Flowfile contents changed", actual.equals( CONTENTS ) );

    String mimeType = result.getAttribute( "mime-type" );
    assertNotNull( "Attribute \"inner-MIME\" was not renamed", mimeType );
    assertTrue( "Attribute \"inner-MIME\" was not renamed", mimeType.equals( "application/pdf" ) );

    String dontTouchThisOne = result.getAttribute( "dont-touch-this-one" );
    assertNotNull( "Attribute \"dont-touch-this-one\" did not survive", result.getAttribute( "dont-touch-this-one" ) );
    assertTrue( "Attribute \"dont-touch-this-one\" did not survive untouched", dontTouchThisOne.equals( "Don't touch this attribute!" ) );

    assertNull( "Attribute \"lose-this\" was not dropped", result.getAttribute( "lose-this" ) );
    assertNull( "Attribute \"lose-this-too\" was not dropped", result.getAttribute( "lose-this-too" ) );
  }
}
TranslateAttributeNamesTest.java:
package com.etretatlogiciels.nifi.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;

import com.etretatlogiciels.nifi.rel.StandardRelationships;
import com.etretatlogiciels.utilities.StringUtilities;

/**
 * Translate attribute names or even drop attributes altogether.
 *
 * {
 *   "attachment-filename"  : "attachment",
 *   "disposition-encoding" : "encoding",
 *   "disposition-type"     : null,
 *   "inner-MIME"           : "mime_type",
 *   "inner-boundary"       : "Donald Duck
 * }
 *
 * @author Russell Bateman
 * @since September 2016
 */
@SideEffectFree
@Tags( { "translateattributenames" } )
@CapabilityDescription( "Translate the names of attributes to new ones. The flowfile itself remains untouched." )
public class TranslateAttributeNames extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();

    Map< String, String > translations = new Gson().fromJson( context.getProperty( TRANSLATIONS ).getValue(),
                                  new TypeToken< Map< String, String > >(){ }.getType() );

    // determine which attributes we'll be pushing onto the new flowfile...
    Map< String, String > candidates = flowfile.getAttributes();
    Map< String, String > attributes = new HashMap<>();

    for( Map.Entry< String, String > attribute : candidates.entrySet() )
    {
      String key = attribute.getKey();

      // do nothing because we're leaving the attribute as-is...
      if( !translations.containsKey( key ) )
        continue;

      // remove this flowfile attribute because we're changing its name or dropping it...
      flowfile = session.removeAttribute( flowfile, key );

      String newKey = translations.get( key );

      // we're dropping this attribute altogether...
      if( StringUtilities.isEmpty( newKey ) || newKey.equals( "null" ) )
        continue;

      // here's the attribute under its new name...
      attributes.put( translations.get( key ), attribute.getValue() );
    }

    flowfile = session.putAllAttributes( flowfile, attributes );
    session.transfer( flowfile, SUCCESS );
  }

  private List< PropertyDescriptor > properties    = new ArrayList<>();
  private Set< Relationship >        relationships = new HashSet<>();

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    properties.add( TRANSLATIONS );
    properties = Collections.unmodifiableList( properties );
    relationships.add( SUCCESS );
    relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public List< PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >       getRelationships()                { return relationships; }

  //<editor-fold desc="Processor properties and relationships">
  public static final PropertyDescriptor TRANSLATIONS = new PropertyDescriptor.Builder()
      .name( "Translations" )
      .description( "JSON detailing the name changes as key-value pairs where the value is the new name. Any value "
          + "that's null or <"<" will result in the attribute being dropped (not transmitted on the flowfile)." )
      .required( false )
      .addValidator( Validator.VALID )
      .build();

  public static final Relationship SUCCESS = StandardRelationships.SUCCESS;
  //</editor-fold>
}

Wednesday, 28 September 2016

I had a question about cloning the input stream:

  // under some condition, look into the flowfile contents to see if something's there...
  session.read( flowfile, new InputStreamCallback()
  {
    @Override
    public void process( InputStream( in ) throws IOException
    {
      // read from in...
    }
  } );


  // then, later (and always) consume it (so, sometimes a second time):

  session.read( flowfile, new InputStreamCallback()
  {
    @Override
    public void process( InputStream( in ) throws IOException
    {
    // read from in (maybe again)...
    }
  } );

Wouldn't the second time be problematic since the stream contents have already been read?

I learned from Mark Payne that each time session.read() is called, I'm getting a new InputStream that starts at the beginning of the flowfile. How convenient!

Thursday, 29 September 2016

Some observations on using GetFile:

  1. Configure Scheduling → Run schedule to 10000 sec. This makes it so that this processor will only run once, then wait 10.000 seconds before attempting to run again.
  2. Running "once" means that if the File filter template is adequate, it will likely induct all or many of the files on the Input directory no matter how quickly you can reach for the stop button.
  3. So, if you only want one file at a time, only put one file in the Input subdirectory that matches the File filter template.

Friday, 30 September 2016

I don't know why, after all these months, I'm still making mistakes like this. In two of my processors, which work when unit-tested, I keep getting:

Could not read from StandardFlowFileRecord

I tried a new way because likely the InputStream I'm caching is getting closed before I can use it again and this doesn't show up in the unit-test environment:

    FlowFile flowfile = session.get();

    flowfile = session.write( flowfile, new StreamCallback()
    {
      @Override
      public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
      {
        Base64Unroller unroller = new Base64Unroller( inputStream );
        unroller.unroll();

        InputStream in = unroller.getBase64Data();
        int         c;

        for( ; ; )
        {
          c = in.read();
          if( c == EOS || ( c == '\n' || c == '\r' ) )
            break;
          outputStream.write( c );
        }
      }
    } );

    session.transfer( flowfile, SUCCESS );

Flush with this better understanding of input- and output streams in NiFi, I'm going to see if this doesn't answer also the problem I have with Base64Decode.

It does solve my problem there: this is the answer to cases where I need both to read the input stream and write to an output stream.

Monday, 10 October 2016

Got a link to great document I've never seen, Apache NiFi in Depth. Here are points of note.

  1. The flowfile is at the heart of NiFi and its design. The flowfile is a data record. Flowfile repository. However, it is not a file in the filesystem.
  2. The content of the flowfile is the data or payload. Content repository.
  3. Provenance is a record of what's happened to the flowfile. Provenance repository.
  4. Each of the above has its own repository for storage.
  5. Attributes are the flowfile's metadata.
  6. The repositories are immutable; when a change occurs, new entities, such as attributes, are created in memory, then persisted on disk. When content is changed, the original content is read, streamed through the transform and then written to a new stream. At that point, the flowfile's content pointer is updated to the new filesystem location. This makes the flowfile content storage an "immutable, versioned content store." This results in:
    • reduced space required for typically complex graphs of processing and natural replay capability,
    • takes advantage of OS caching,
    • reduces random read/write performance hits.
  7. The flowfile repository is, in effect, a write-ahead log for what's going on.
  8. When NiFi goes down, the write claim for a content change is orphaned, then cleaned up by garbage collection. In effect, this rolls back the flowfile to the last known, stable state, ready to continue.
  9. Flowfiles live in a connection queue, in memory, until their number exceeds a value in nifi.queue.swap.threshold in conf/nifi.properties. Then, the lowest-priority flowfiles are serialized to disk in a swap file of 10K flowfiles and marked so in internal NiFi data structures so they can be fetched when needed. This allows millions of flowfiles to existing in the flow without depleating system memory.
  10. After a flowfile's content is known no longer to be useful, then it is deleted. Depending on configuration, it can also be archived.

The provenance respository is where the history of each flowfile is stored. This involves the chain of custody for each piece of data. Each time a flowfile event (create, fork, clone, modify, etc.), a new provenance event is created. This event is a snapshot of the flowfile as it appeared and fit into the flow as it existed at that point in time. These events are held in the repository for a period of time after completion—controllable by configuration.

Because all of the flowfile attributes and pointers to content are kept in the provenance repository, not only the lineage or processing history of the data is visible, but the data itself and the data can be replayed from any point in the flow. When a down-stream processor appears not to have received data, data lineage can show exactly when it was delivered or indeed that it was never sent.

Flowing between processors is done using pass by reference. Flowfile content isn't actually streamed between processors, but only a reference to a flowfile is passed. When a flowfile is to be passed to two processors, it's cloned (content and attributes) and the references passed on.

Tuesday, 11 October 2016

I asked a question in the [email protected] forum.

"If I create flows for integration testing of multiple components (say, processors), where can I find the underlying files I can pick up, store and deploy to a fresh server with a fresh NiFi, then launch them on data (that I also deploy)?"

"I assumed that once I 'paint' my canvas with processors and relationships, it's harvestable in some form, in some .xml file somewhere, and easily transported to, then dropped into, a new NiFi installation on some server or host for running it. I assumed my testing would be thus configured, then kicked off somehow."

Andy LoPresto answered, "The 'flow' (the processors on the canvas and the relationships between them) is stored on disk as ${nifi_home}/conf/flow.xml.gz [1]. You can also export selections from the flow as templates [2], along with a name and description, which can then be saved as XML files and loaded into a new instance of NiFi with (non-sensitive) configuration values and properties maintained. You may also be interested in the ongoing work to support the 'Software Development Lifecycle (SDLC)' or 'Development to Production Pipeline (D2P)' efforts to allow users to develop flows in testbed environments and 'promote' them through code control/repositories to QE/QA and then Production environments [3].

"If you are attempting to perform automated testing of flows or flow segments, you can do this with the test runner [4]. I would encourage you to store the flow in a template or full flow file (flow.xml.gz, not a 'flowfile') and load that during the test rather than programmatically instantiating and configuring each component and relationship in the test code, but you are able to do both. Also, be aware that the TestRunner is just that, and approximates/mocks some elements of the NiFi environment, so you should understand what decisions are being made behind the scenes to ensure that what is covered by the test is exactly what you expect."

[1] https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#terminology
[2] https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates
[3] https://cwiki.apache.org/confluence/display/NIFI/Configuration+Management+of+Flows
[4] https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#testing

I'm waiting on what [4] says about "automated testing of flows or flow segments." I did not understand from reading it that it said anything about that let alone offer a sample of how to inject flow.xml.

Googling, ... I did find a Hortonworks document on NiFi testing. However, it didn't talk about this either.

It did note, and I have often wondered about this because I had sort of reached a contrary conclusion about it, that running the processor from the test framework (run() method):

  1. will invoke any method in the processor code that carries an @OnScheduled annotation,
  2. call onTrigger() once,
  3. then run the @OnUnscheduled methods, and finally,
  4. the @OnStopped methods.

It pointed out that it only does all of the above as long as you do not pass an argument (like 1) to the run() method. This I had not known and I had reached the conclusion that it didn't run these other kinds of methods besides onTrigger().

The options for run() appear to be:

  1. run( int contentsQueued ) —how I run it
  2. run( int contentsQueued, boolean stoprOnFinish ) —pass false to avoid @OnUnscheduled and @OnStopped methods
  3. run( int contentsQueued, boolean stopOnFinish, boolean initialize ) —pass false, false to avoid @OnScheduled events as well
  4. run( int contentsQueued, boolean stopOnFinish, boolean initialize, long runWait ) —to specify milliseconds the framework will wait for a processor to stop running before calling the @OnUnscheduled methods

I got a later answer about someone else wanting to tackle all of this evoking the need to mock all sorts of things like database and wire answers for different processors. A sensation of hope headed toward the distant horizon filled me. It's a huge and thankless undertaking.

Wednesday, 12 October 2016

It's possible to annotate the NiFi canvas with labels, clicking and dragging a label to the the canvas and configuring it. These labels land behind anything you constructively design on the canvas (processors, relationships, funnels, etc.).

I have a hard time remembering how to view flowfile content in a queue, here are the steps:

  1. Right-click the queue,
  2. Choose List queue,
  3. At the extreme left, click the (i) button, which is "View Details," of the flowfile that interests you,
  4. In the Details tab, click the View button,
  5. You have the option to view it as a) original (text) format, b) formatted (see content type at upper right: whether this choice makes a difference or not depends on MIME type) or c) hex dump (very useful for binary or even "confusing" text—looking for carriage returns vs. newlines, control characters, etc.).

Tuesday, 1 November 2016

On dynamic properties in NiFi...

The following appear as observations to me. I've used dynamic properties before, but I've pretty much ignored doing them the way they're supposed to be done, I think. It's worth paying attention to everything in this list.

  1. By default, AbstractProcessor doesn't allow for dynamically created properties. It's sometimes desirable, however, to allow a NiFi user (the one who drags and drops processors in a flow), to configure additional properties. This is accomplished by overriding
    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( final String propertyDescriptorName )
    {
      .
      .
      .
    }
    
  2. Dynamic properties are described in annotations for the processor:
    @Tags( { "processor", "nifi" } )
    @CapabilityDescription( "This is a processor" )
    @DynamicProperty( name = "name",
                value = "value",
                description = "description" )
    public class Processor extends AbstractProcessor
    {
      private List< PropertyDescriptor > properties;
      .
      .
      .
    
  3. Static and dynamic properties are generally kept separate in processor code.
    public class Processor extends AbstractProcessor
    {
      private          List< PropertyDescriptor > properties;
      private volatile Set< String >              dynamicPropertyNames = new HashSet<>();
    
      @Override
      protected void init( final ProcessorInitializationContext context )
      {
        final List< PropertyDescriptor > properties = new ArrayList<>();
        properties.add( PROPERTY );
        this.properties = Collections.unmodifiableList(properties);
      }
      .
      .
      .
    
  4. This method is generally always included in the processor code, for calling by (this isn't clear since there are no examples of it being called, but we explained higher up what it's for). It's crucial for any property returned to be marked as dynamic:
    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( final String propertyDescriptorName )
    {
      return new PropertyDescriptor.Builder()
                          .required( false )
                          .name( propertyDescriptorName )
                          .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
                          .dynamic( true )
                          .build();
    }
    
  5. Often, the names of active, dynamic properties are managed in a list maintained by method, onPropertyModified(). In that method, newValue is nil for propertiers disappearing and non-nil for new ones or ones to be kept:
    /**
     * Called only when a NiFi user actually updates (changes) this property's value.
     * @param descriptor
     * @param oldValue --null if property had no previous value.
     * @param newValue --null if property has been removed.
     */
    @Override
    public void onPropertyModified( final PropertyDescriptor descriptor, final String oldValue, final String newValue )
    {
      final Set newDynamicPropertyNames = new HashSet<>( this.dynamicPropertyNames );
    
      if( newValue == null )
        newDynamicPropertyNames.remove( descriptor.getName() );
      else if( oldValue == null && descriptor.isDynamic() )
        newDynamicPropertyNames.add( descriptor.getName() );
    
      this.dynamicPropertyNames = Collections.unmodifiableSet( newDynamicPropertyNames );
    }
    
  6. Often, method onScheduled() is tasked with updating dynamic properties in the map:
    @OnScheduled
    public void onScheduled( final ProcessContext context )
    {
      for( final PropertyDescriptor descriptor : context.getProperties().keySet() )
      {
        if( descriptor.isDynamic())
        {
          getLogger().debug( "Dynamic property: {}", new Object[] { descriptor } );
          .
          .
          .
          (do stuff here)
        }
      }
    }
    
  7. But the actual consumption of dynamic properties would most often take place in the onTrigger() method.

    For example, here every dynamic property is assumed to be a new attribute to be added to the resulting flowfile.

    Notice that context.getProperties() contains both static and dynamic properties. This is so because all properties, both static and dynamic, set in the configuration are put by NiFi in that container. What distinguishes between them is isDynamic().

    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session )
        throws ProcessException
    {
      FlowFile flowfile = session.get();
      .
      .
      .
      for( Map.Entry< PropertyDescriptor, String > entry : context.getProperties().entrySet() )
      {
        if( entry.getKey().isDynamic() )
    	    flowfile = session.putAttribute( flowfile, entry.getKey().getName(), entry.getValue() );
      }
    }
    
  8. So, why isn't there a simpler interface to getting the value of a dynamic property? This should be obvious, but note that there's no way necessarily to know its name. Therefore, it can only be obtained by looping through all the properties (as just above), ignoring the non-dynamic ones.

  9. And why must the properties be maintained using @OnScheduled. This also should be obvious: static properties are known at initialization, but dynamic ones are certainly not known.

Wednesday, 2 November 2016

Every once in a while, I'm puzzled and rediscover that I've left stuff out making me scratch my head.

If you get this error in your log or debugger...

java.lang.AssertionError: Processor has 1 validation failures:
property-name validated against value is invalid because property-name is not a supported property

..., you've likely left these methods out of your processor's implementation:

@Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
@Override public Set< Relationship >        getRelationships()                { return relationships; }

Here's how to get NiFi back up once you've removed a custom processor (yes, you lose your entire canvas):

~/dev/nifi/nifi-0.7.1/conf $ ll
total 72
drwxr-xr-x. 3 russ russ 4096 Nov 2 14:45 .
drwxr-xr-x. 14 russ russ 4096 Oct 21 09:07 ..
-rw-r--r--. 1 russ russ 2129 Oct 13 17:54 authority-providers.xml
-rw-r--r--. 1 russ russ 2530 Oct 13 17:54 authorized-users.xml
-rw-r--r--. 1 russ russ 3241 Oct 13 17:54 bootstrap.conf
-rw-r--r--. 1 russ russ 2119 Oct 13 17:15 bootstrap-notification-services.xml
-rw-r--r--. 1 russ russ 1317 Nov 2 14:45 flow.xml.gz
-rw-r--r--. 1 russ russ 7454 Oct 13 17:54 logback.xml
-rw-r--r--. 1 russ russ 6089 Oct 13 17:54 login-identity-providers.xml
-rw-r--r--. 1 russ russ 8571 Oct 13 17:54 nifi.properties
-rw-r--r--. 1 russ russ 4603 Oct 13 17:54 state-management.xml
drwxr-xr-x. 2 russ russ 4096 Oct 21 09:07 templates
-rw-r--r--. 1 russ russ 1437 Oct 13 17:15 zookeeper.properties
~/dev/nifi/nifi-0.7.1/conf $ rm flow.xml.gz

Friday, 4 November 2016

(I can be embarrassingly stupid sometimes...)

You've configured nifi.properties to use port 9091.

nifi.web.http.port=9091

...but can't get NiFi to launch. The error from logs/nifi-bootstrap.log is:

~/dev/nifi/nifi-1.0.0/logs $ cat nifi-bootstrap.log
2016-11-04 11:32:32,217 INFO [main] o.a.n.b.NotificationServiceManager Successfully loaded the following 0 services: []
2016-11-04 11:32:32,221 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STARTED
2016-11-04 11:32:32,221 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STOPPED
2016-11-04 11:32:32,221 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_DIED
2016-11-04 11:32:32,222 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is not currently running
2016-11-04 11:32:35,684 INFO [main] o.a.n.b.NotificationServiceManager Successfully loaded the following 0 services: []
2016-11-04 11:32:35,687 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STARTED
2016-11-04 11:32:35,688 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STOPPED
2016-11-04 11:32:35,688 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_DIED
2016-11-04 11:32:35,695 INFO [main] org.apache.nifi.bootstrap.Command Starting Apache NiFi...
2016-11-04 11:32:35,695 INFO [main] org.apache.nifi.bootstrap.Command Working Directory: /home/russ/dev/nifi/nifi-1.0.0
2016-11-04 11:32:35,696 INFO [main] org.apache.nifi.bootstrap.Command Command: /home/russ/dev/jdk1.8.0_25/bin/java -classpath /home/russ/dev/nifi/nifi-1.0.0/./conf:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-properties-loader-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/jcl-over-slf4j-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-api-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/logback-core-1.1.3.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/logback-classic-1.1.3.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-framework-api-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/jul-to-slf4j-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-documentation-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-runtime-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/slf4j-api-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-properties-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/bcprov-jdk15on-1.54.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/log4j-over-slf 4j-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/commons-lang3-3.4.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-nar-utils-1.0.0.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/home/russ/dev/nifi/nifi-1.0.0/./conf/nifi.properties -Dnifi.bootstrap.listen.port=33110 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/home/russ/dev/nifi/nifi-1.0.0/logs org.apache.nifi.NiFi
2016-11-04 11:32:35,747 ERROR [NiFi logging handler] org.apache.nifi.StdErr ERROR: transport error 202: bind failed: Address already in use
2016-11-04 11:32:35,747 ERROR [NiFi logging handler] org.apache.nifi.StdErr ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
2016-11-04 11:32:35,747 ERROR [NiFi logging handler] org.apache.nifi.StdErr JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized [debugInit.c:750]
2016-11-04 11:32:35,747 INFO [NiFi logging handler] org.apache.nifi.StdOut FATAL ERROR in native method: JDWP No transports initialized, jvmtiError=AGENT_ERROR_TRANSPORT_INIT(197)
2016-11-04 11:32:36,739 INFO [main] org.apache.nifi.bootstrap.RunNiFi NiFi never started. Will not restart NiFi

And yet,

# lsof -i | grep 9091

...produces nothing for port 9091 (nor does netstat -lptu). So, port 9091 is not in use and yet the error leads one to believe that this is the problem.

It looks like remote debugging has been configured, which specifies a port that's obviously in use (8000). That happens in bootstrap.conf:

# Enable Remote Debugging
java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000

Thursday, 17 November 2016

Still working on identifying how to create and store away in version control a process group.

Also looking into how we can hot-swap new or updated processors. Later, I'll get into an Agile write-up everyone agreed I'd do in the Core Team meeting yesterday afternoon.

We could set up a cluster for NiFi because rolling restarts are possible with individual instances in a cluster, that is, if I understand correctly, it's possible to take down one instance and update it, i.e.: restart it, without affecting the other though I wonder still right now if there's a way to throttle or somehow pause the instance so that it stops handling anything so that bouncing it has no ramifications like reprocessing jobs, etc.

To be noted too that the new/updated software isn't available until all nodes of the cluster have been updated and restarted.

When NiFi first starts up, the following directories (and files) are created:

flow.xml.gz

Here's a flow to look at:

Unzipped (from flow.xml.gz), this flow looks like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<flowController>
  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
  <rootGroup>
    <id>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</id>
    <name>NiFi Flow</name>
    <position x="0.0" y="0.0"/>
    <comment/>
    <processor>
      <id>0c689901-1e21-4a89-af21-04ec13f1ca03</id>
      <name>CreateFlowFile</name>
      <position x="-27.0" y="-12.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.CreateFlowFile</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
      <property>
        <name>Content</name>
        <value>This is a test of the Emergency Broadcast System.</value>
      </property>
    </processor>
    <processor>
      <id>c3daf696-11e7-4684-9e84-9f1aaedf15a6</id>
      <name>RetryOperation</name>
      <position x="281.0" y="86.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.RetryOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
      <property>
        <name>Stop retrying operation when</name>
      </property>
      <property>
        <name>Flowfile attribute name</name>
      </property>
      <property>
        <name>Initial counter value</name>
        <value>3</value>
      </property>
      <property>
        <name>Retry cache-table TTL</name>
        <value>10</value>
      </property>
    </processor>
    <processor>
      <id>e1d7c27f-e22f-480d-9d8d-591accbb7180</id>
      <name>NoOperation</name>
      <position x="639.0" y="272.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.NoOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
    </processor>
    <processor>
      <id>3bbe0931-dc54-490e-96cf-23f503f8915a</id>
      <name>NoOperation</name>
      <position x="278.0" y="272.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.NoOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
    </processor>
    <connection>
      <id>15af94b3-3db2-4bee-a7db-68869ad9bb86</id>
      <name/>
      <bendPoints>
        <bendPoint x="234.5" y="229.0"/>
      </bendPoints>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>3bbe0931-dc54-490e-96cf-23f503f8915a</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>success</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>c180a307-c0f5-48b5-8816-9abc54962b5b</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>e1d7c27f-e22f-480d-9d8d-591accbb7180</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>Stop retrying</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>ea3c8896-7775-4fce-83fe-8e95467c7811</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>0c689901-1e21-4a89-af21-04ec13f1ca03</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>success</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>f5739172-bf92-4ef8-a41b-b6e4dd363468</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>3bbe0931-dc54-490e-96cf-23f503f8915a</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>Retry operation</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
  </rootGroup>
  <controllerServices/>
  <reportingTasks/>
</flowController>

This file stores the flow which represents the processors on the canvas and the relationships between them. The record of processors (lines 10, 31, 62, 79 and 96) includes their properties, even sensitive ones (like passwords—though none here), as shown on lines 53-60 above, and their graphical position on the canvas (see lines 13, 34, etc. above).

Also highlighted above are the names of relationships that correspond to connections. Here's a second flow, this time with a process group:

 




Inside "MyProcessGroup"...

Unzipped (from flow.xml.gz), this flow looks like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<flowController encoding-version="1.0">
  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
  <rootGroup>
    <id>746c58aa-0158-1000-08a8-9984ee3a72bc</id>
    <name>NiFi Flow</name>
    <position x="0.0" y="0.0"/>
    <comment/>
    <processor>
      <id>74a80bc0-0158-1000-8699-37fc9ff670e4</id>
      <name>CreateFlowFile</name>
      <position x="252.0" y="4.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.CreateFlowFile</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
      <property>
        <name>Content</name>
        <value>All good me must come to the aid of their country.</value>
      </property>
    </processor>
    <processor>
      <id>74a8e138-0158-1000-92f3-c6ec8781ba64</id>
      <name>NoOperation</name>
      <position x="249.0" y="601.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.NoOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
    </processor>
    <processGroup>
      <id>74a6c4bd-0158-1000-3de4-adbe359fb278</id>
      <name>MyProcessGroup</name>
      <position x="232.0" y="289.0"/>
      <comment/>
      <processor>
        <id>74a70fd2-0158-1000-3d8e-645008570026</id>
        <name>NoOperation</name>
        <position x="285.0" y="184.0"/>
        <styles/>
        <comment/>
        <class>com.imatsolutions.nifi.processor.NoOperation</class>
        <maxConcurrentTasks>1</maxConcurrentTasks>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <penalizationPeriod>30 sec</penalizationPeriod>
        <yieldPeriod>1 sec</yieldPeriod>
        <bulletinLevel>WARN</bulletinLevel>
        <lossTolerant>false</lossTolerant>
        <scheduledState>STOPPED</scheduledState>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <runDurationNanos>0</runDurationNanos>
      </processor>
      <inputPort>
        <id>74a74ed5-0158-1000-76fc-eeb723a5f47a</id>
        <name>to MyProcessGroup</name>
        <position x="228.0" y="74.0"/>
        <comments/>
        <scheduledState>STOPPED</scheduledState>
      </inputPort>
      <outputPort>
        <id>74a772d8-0158-1000-4916-01c5a8b496e9</id>
        <name>from MyProcessGroup</name>
        <position x="432.0" y="402.0"/>
        <comments/>
        <scheduledState>STOPPED</scheduledState>
      </outputPort>
      <connection>
        <id>74a79d27-0158-1000-ecf5-2c3ae0360366</id>
        <name/>
        <bendPoints/>
        <labelIndex>1</labelIndex>
        <zIndex>0</zIndex>
        <sourceId>74a70fd2-0158-1000-3d8e-645008570026</sourceId>
        <sourceGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</sourceGroupId>
        <sourceType>PROCESSOR</sourceType>
        <destinationId>74a772d8-0158-1000-4916-01c5a8b496e9</destinationId>
        <destinationGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</destinationGroupId>
        <destinationType>OUTPUT_PORT</destinationType>
        <relationship>success</relationship>
        <maxWorkQueueSize>10000</maxWorkQueueSize>
        <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
        <flowFileExpiration>0 sec</flowFileExpiration>
      </connection>
      <connection>
        <id>74a78f0c-0158-1000-a1f3-a3c2c8730ff2</id>
        <name/>
        <bendPoints/>
        <labelIndex>1</labelIndex>
        <zIndex>0</zIndex>
        <sourceId>74a74ed5-0158-1000-76fc-eeb723a5f47a</sourceId>
        <sourceGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</sourceGroupId>
        <sourceType>INPUT_PORT</sourceType>
        <destinationId>74a70fd2-0158-1000-3d8e-645008570026</destinationId>
        <destinationGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</destinationGroupId>
        <destinationType>PROCESSOR</destinationType>
        <relationship/>
        <maxWorkQueueSize>10000</maxWorkQueueSize>
        <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
        <flowFileExpiration>0 sec</flowFileExpiration>
      </connection>
    </processGroup>
    <connection>
      <id>74b5493a-0158-1000-617d-40ba69f7ad05</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>74a772d8-0158-1000-4916-01c5a8b496e9</sourceId>
      <sourceGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</sourceGroupId>
      <sourceType>OUTPUT_PORT</sourceType>
      <destinationId>74a8e138-0158-1000-92f3-c6ec8781ba64</destinationId>
      <destinationGroupId>746c58aa-0158-1000-08a8-9984ee3a72bc</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship/>
      <maxWorkQueueSize>10000</maxWorkQueueSize>
      <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>74a8a54b-0158-1000-7902-ac61947fd255</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>74a80bc0-0158-1000-8699-37fc9ff670e4</sourceId>
      <sourceGroupId>746c58aa-0158-1000-08a8-9984ee3a72bc</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>74a74ed5-0158-1000-76fc-eeb723a5f47a</destinationId>
      <destinationGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</destinationGroupId>
      <destinationType>INPUT_PORT</destinationType>
      <relationship>success</relationship>
      <maxWorkQueueSize>10000</maxWorkQueueSize>
      <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
  </rootGroup>
  <controllerServices/>
  <reportingTasks/>
</flowController>

Templates and process groups

The two artifacts that are crucial to our ability to deploy are

  1. process groups
  2. templates

Process groups are found inside flow.xml.gz, as shown above.

Tuesday, 22 November 2016

"NiFi's orientation is around continuous streams of data rather than starting and completing jobs.

"When you look at processors via the UI and you see the green start arrow or the red stopped square it is best to think about [those as meaning] 'this component is (or is not) scheduled to execute' rather than 'this is running or not running.' It is ok that they're scheduled to run. They're still not going to actually get triggered unless they need to be. If the [up-stream processor has no] data to send then [the next processor] won't actually be triggered to execute."

—Joe Witt

On a cluster node going down...

If a NiFi node dies and the disks on which its data are found are inaccessible by other NiFi instances (whether on the same or different hosts), then that data is not available until the situation is rectified. The way to achieve continuous availability is to set up the key repositories (content, flowfile and provenance) such that they can be remounted by another host. This is non-trivial. There has been a discussion of integrating management of this into NiFi itself.

Wednesday, 30 November 2016

Imagine a conf/flow.xml.gz that runs as soon as NiFi is started. However, imagine it hanging and the UI is unavailable to stop the processors, look around or otherwise look for the problem. What to do?

In conf/nifi.properties, there is nifi.flowcontroller.autoResumeState that can be set to false (the default is true) before restarting NiFi. Look for errors in logs/nifi-app.log.

Remember to set back to the default when finished. If any changes in debugging the problem are made to the flow, a new conf/flow.xml.gz will be generated with processors in a stopped state (or with other changes made when debugging) which may not be what is wanted.

Thursday, 1 December 2016

To log what flowfile is getting passed out from each processor to understand what the processor has done to that flowfile, LogAttribute processor has a property, Log Payload, that can be set to true that you could use this at different points in your flow.

Another solution that will work without modifying a flow is to use NiFi's provenance feature from the UI. Right-click on a processor and select Data provenance, then pick an event and view the lineage graph. (Note that provenance in NiFi is typically configured to evaporate in order to save space, so there may be nothing to look at if you wait too long). From there you can look at the content of each event in the graph to see how it changed as it moved through the flow.

Rather than examining the lineage, you could clikc View Details (the "info" or question-mark icon to the left of the table record). Then switch to the Content tab to see the in-coming and out-going content at each step of provenance. Clicking Download will bring the raw output to your filesystem while View will open a content viewer in another window displaying the flowfile in various text formats.

UI notifications; processors faulting

One way to draw a user's attention is to use a standard log statement with proper level. This will issue a bulletin both on the processor (and visually indicate a note) as well as add it to a global list of bulletins. By default processors ignore anything lower than WARN, but this is adjustable in processor settings (right-click the processor in the UI, choose Configure, click the Settings tab and see Bulletin Level).

Friday, 9 December 2016

I saw a thread in the developers' forum discussing a counter and I wondered about using ProcessSession's counter in place of writing a special processor for Mat and Steve that counts documents:

void adjustCounter( String name, long delta, boolean immediate ) Adjusts counter data for the given counter name and takes care of registering the counter if not already present, i.e.:

session.adjustCounter( "IMAT Counter", 1, false );
session.transfer( flowFile, SUCCESS );

I need to figure out where to see this counter. I discovered it in the NiFi Counters summary which is in a different place in the UI depending on the NiFi version (showing both at right). It lists the particulars including the individual passage of flowfiles through a processor that adjusts the count and also totals for all flowfiles through a processor that calls adjustCounter(). It's confusing if the processor is used over and over again in a flow because it shows up in the list.

Here's a listing in which I explicitly made this call, as coded just above, in Timer and ran through the flow illustrated above (yesterday).

(The long ids in the list above are processor instance ids.)

However, if I remove this explicit call, nothing shows up in this list after running a flowfile completely through the four instances of Timer. So, some processor must explicitly make this call.

So, to interpret what's in the list above, note that:

  1. The total number of processors called that bump "IMAT Timer" is 4.
  2. Each of
    • Timer 1 (start)
    • Timer 1 (stop)
    • Timer 2 (start)
    • Timer 2 (stop)
    (which are all just different instances of Timer) is called one time.

To name the counter, a property could be used or the name of the counter could come from the processor's special name (as set in the Configure → Settings → Name field.

It remains to be seen which processors of our we wish to put a counter into and how we want to know to make the call.

Tuesday, 13 December 2016

NiFi site-to-site experiment

  1. Erect NiFi on two hosts. Refer to them as "local" and "remote" in these steps. I'm using NiFi 1.1.0 for this experiment.

  2. Ensure conf/nifi-properties configuration of
    • both hosts must have non-conflicting HTTP port (can be same on each host, but must not conflict with anything else using that port and cannot conflict with the remote port that will be set in the succeeding step:
      # web properties
      nifi.web.http.port=9091
      
    • remote host to send to:
      # Site to Site properties
      nifi.remote.input.secure=false
      nifi.remote.input.socket.port=9098
      
  3. Start both local and remote instances of NiFi.

  4. Launch browser tabs to both. In my case, this is:

    local host: localhost:9091/nifi
    remote host: 10.10.8.44:9091/nifi (not that site-to-site port!)

  5. On the local NiFi instance (using the browser), ...
    1. Choose a processor that will create some content. I'm using our own CreateFlowFile with content, "This is a test of the Emergency Broadcast System. It is only a test.":

    2. Drag a remote processor group to the canvas:

      If transmission is disabled, as shown by the circle icon with a line through it on this processor at upper left, then right-click and enable it. This will not work if the remote instance of NiFi has not yet been a) properly configured and b) launched and working.

    3. Connect the content-creating processor, CreateFlowFile, to the remote processor group. You should get the following error alert:

      Skip to the next step to correct this.

  6. On the remote NiFi instance (whose canvas is likely blank), ...
    1. Create an input port. Give it any name you wish. I called mine, "From local NiFi instance":

      Note, at this point, that even with this input port, you still cannot connect the local NiFi instance's CreateFlowFile (to the remote processor group).

    2. Create a place for the content to be delivered to by this new input port. I'm choosing to create a file with it using processor, PutFile. I configured PutFile to create (any file it wants) in Directory /tmp (configuration not illustrated):

    3. Finally, connect the input port to PutFile so the content will flow there.

  7. Return to the local instance of NiFi and try again to connect the content-creating processor, CreateFlowFile, to the remove processor group. This should work as illustrated here. Note that the connection takes on the name you gave the input port on the remote NiFi host. The remote processor group added here represents the remote host.

  8. Now let's try to flow content over from our remote instance of NiFi, where we're creating an arbitrary bit of text in a flowfile, "This is a test of the Emergency Broadcast System. This is only a test." to pass onto the remote NiFi instance which will put that content in a file:
    1. Select CreateFlowFile, start it, then stop it (in order to avoid many instances of the content from being produced). Notice the 1 flowfile in the queue:

    2. Last, ensure that the remote input port named, "From local NiFi instance," is turned on if it isn't:
      1. Right-click the remote process group in the local NiFi instance canvas:

      2. See the on/off button is off (white):

      3. Click it to turn it on (make it blue):

  9. The flow should run all the way to a) the remote NiFi instance and b) into a file in /tmp:
    [email protected] /tmp $ ls -lart
    .
    .
    .
    -rw-r--r-- 1 russ russ 70 Dec 13 11:06 1703060523504973
    drwxrwxrwt 24 root root 4096 Dec 13 12:17 .
    [email protected] /tmp $ cat 1703060523504973
    This is a test of the Emergency Broadcast System. This is only a [email protected] /tmp $
    

Wednesday, 15 December 2016

I was trying to subject NiFi to the Java Flight Recorder (JFR). However, adding the needed arguments to conf/bootstrap.conf proved difficult. NiFi wasn't having them (as reported naively in the developers' forum).

There is no problem in NiFi per se. It was not really necessary to dig down into the code really because there's no problem there. It's just how java.arg.n is consumed that creates trouble. One must play around with the value of n when argument ordering is crucial. In the present case, I was able to turn on JFR using the following (in bold) in conf/bootstrap.conf:

#Set headless mode by default
java.arg.14=-Djava.awt.headless=true
# Light up the Java Flight Recorder...
java.arg.32=-XX:+UnlockCommercialFeatures
java.arg.31=-XX:+FlightRecorder
java.arg.42=-XX:StartFlightRecording=duration=120m,filename=recording.jfr
# Master key in hexadecimal format for encrypted sensitive configuration values
nifi.bootstrap.sensitive.key=

The point was that -XX:+UnlockCommercialFeatures must absolutely precede the other two options. Because of these arguments passing through HashMap, some values of n will fail to sort an argument before others and the addition of other arguments might also come along and disturb this order. What I did here was by trial and error. I guess NiFi code could man-handle n in every case such that the order be respected. I'm guessing that my case is the only or one of the very rare times options are order-dependent. I wouldn't personally up-vote a JIRA to fix this.

My trial and error went pretty fast using this command line between reassigning instances of n for these arguments. It took me maybe 6 tries. (Script bounce-nifi.sh does little more than shut down, then start up NiFi, something I do a hundred times per day across at least two versions.)

~/dev/nifi/nifi-1.1.0/logs $ rm *.log ; bounce-nifi.sh 1.1.0 ; tail -f nifi-bootstrap.log

Thursday, 22 December 2016

http://stackoverflow.com/questions/151238/has-anyone-ever-got-a-remote-jmx-jconsole-to-work

I'm trying to JMC to run the Flight Recorder (JFR) to profile NiFi on a remote server that doesn't offer a graphical environment on which to run JMC.

Here is what I'm supplying to the JVM (conf/bootstrap.conf) when I launch NiFi:

java.arg.90=-Dcom.sun.management.jmxremote=true
java.arg.91=-Dcom.sun.management.jmxremote.port=9098
java.arg.92=-Dcom.sun.management.jmxremote.rmi.port=9098
java.arg.93=-Dcom.sun.management.jmxremote.authenticate=false
java.arg.94=-Dcom.sun.management.jmxremote.ssl=false
java.arg.95=-Dcom.sun.management.jmxremote.local.only=false
java.arg.96=-Djava.rmi.server.hostname=10.10.10.92 (the IP address of my server running NiFi)

I did put this in /etc/hosts, though I doubt it's needed:

10.10.10.92 localhost

Then, upon launching JMC, I create a remote connection with these properties:

Host: 10.10.10.92
Port: 9098
User: (nothing)
Password: (ibid)

Incidentally, if I click the Custom JMX service URL, I see:

service:jmx:rmi:///jndi/rmi://10.10.10.92:9098/jmxrmi

At some point, weeks later, this stopped working. I scratched my head. It turns out that I had run an Ansible script that activated iptables and I had to open port 9098 through the firewall created:

# iptables -A INPUT -p tcp --dport 9098 -j ACCEPT

Tuesday, 3 January 2017

Watching NiFi come up. In conf/nifi.properties, nifi.web.http.port=9091...

~/dev/nifi $ ps -ef | grep [n]ifi
russ 27483 1 0 14:37 pts/4 00:00:00 /bin/sh ./nifi.sh start
russ 27485 27483 2 14:37 pts/4 00:00:00 /home/russ/dev/jdk1.8.0_112/bin/java \
-cp /home/russ/dev/nifi/nifi-1.1.1/conf:/home/russ/dev/nifi/nifi-1.1.1/lib/bootstrap/* \
-Xms12m -Xmx24m \
-Dorg.apache.nifi.bootstrap.config.log.dir=/home/russ/dev/nifi/nifi-1.1.1/logs /
-Dorg.apache.nifi.bootstrap.config.pid.dir=/home/russ/dev/nifi/nifi-1.1.1/run /
-Dorg.apache.nifi.bootstrap.config.file=/home/russ/dev/nifi/nifi-1.1.1/conf/bootstrap.conf /
org.apache.nifi.bootstrap.RunNiFi start
russ 27508 27485 99 14:37 pts/4 00:00:21 /home/russ/dev/jdk1.8.0_112/bin/java /
-classpath /home/russ/dev/nifi/nifi-1.1.1/./conf
:/home/russ/dev/nifi/nifi-1.1.1/./lib/slf4j-api-1.7.12.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-documentation-1.1.1.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-nar-utils-1.1.1.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/log4j-over-slf4j-1.7.12.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/logback-core-1.1.3.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/jul-to-slf4j-1.7.12.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/jcl-over-slf4j-1.7.12.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-framework-api-1.1.1.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-properties-1.1.1.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/logback-classic-1.1.3.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-api-1.1.1.jar
:/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-runtime-1.1.1.jar
-Dorg.apache.jasper.compiler.disablejsr199=true /
-Xmx512m /
-Xms512m /
-Dsun.net.http.allowRestrictedHeaders=true /
-Djava.net.preferIPv4Stack=true /
-Djava.awt.headless=true /
-XX:+UseG1GC /
-Djava.protocol.handler.pkgs=sun.net.www.protocol /
-Dnifi.properties.file.path=/home/russ/dev/nifi/nifi-1.1.1/./conf/nifi.properties /
-Dnifi.bootstrap.listen.port=38751 /
-Dapp=NiFi /
-Dorg.apache.nifi.bootstrap.config.log.dir=/home/russ/dev/nifi/nifi-1.1.1/logs /
org.apache.nifi.NiFi
~ # netstat -lptu
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 localhost:63342 *:* LISTEN 23923/java
tcp 0 0 localhost:47060 *:* LISTEN 1876/GoogleTalkPlug
tcp 0 0 gondolin:domain *:* LISTEN 1238/dnsmasq
tcp 0 0 *:ssh *:* LISTEN 1124/sshd
tcp 0 0 *:35353 *:* LISTEN 23923/java
tcp 0 0 localhost:6942 *:* LISTEN 23923/java
tcp 0 0 *:9091 *:* LISTEN 27508/java
tcp 0 0 localhost:45608 *:* LISTEN 27508/java
tcp 0 0 localhost:58634 *:* LISTEN 1876/GoogleTalkPlug
tcp6 0 0 [::]:64628 [::]:* LISTEN 24068/java
tcp6 0 0 [::]:ssh [::]:* LISTEN 1124/sshd
tcp6 0 0 [::]:44155 [::]:* LISTEN 24068/java
tcp6 0 0 localhost:38751 [::]:* LISTEN 27485/java
udp 0 0 *:mdns *:* 919/avahi-daemon: r
udp 0 0 gondolin:domain *:* 1238/dnsmasq
udp 0 0 gondolin:ntp *:* 28138/ntpd
udp 0 0 localhost:ntp *:* 28138/ntpd
udp 0 0 *:ntp *:* 28138/ntpd
udp 0 0 *:49427 *:* 919/avahi-daemon: r
udp 0 0 *:ipp *:* 17819/cups-browsed
udp6 0 0 [::]:mdns [::]:* 919/avahi-daemon: r
udp6 0 0 [::]:56356 [::]:* 919/avahi-daemon: r
udp6 0 0 fe80::cc63:b04:481e:ntp [::]:* 28138/ntpd
udp6 0 0 ip6-localhost:ntp [::]:* 28138/ntpd
udp6 0 0 [::]:ntp [::]:* 28138/ntpd
~ # lsof -i | grep 9091
firefox 3842 russ 79u IPv4 103681669 0t0 TCP localhost:57558->localhost:9091 (ESTABLISHED)
firefox 3842 russ 81u IPv4 103686787 0t0 TCP localhost:57562->localhost:9091 (ESTABLISHED)
firefox 3842 russ 86u IPv4 103686788 0t0 TCP localhost:57564->localhost:9091 (ESTABLISHED)
firefox 3842 russ 90u IPv4 103686789 0t0 TCP localhost:57566->localhost:9091 (ESTABLISHED)
java 27508 russ 1699u IPv4 103667313 0t0 TCP *:9091 (LISTEN)
java 27508 russ 1730u IPv4 103687885 0t0 TCP localhost:9091->localhost:57562 (ESTABLISHED)
java 27508 russ 1731u IPv4 103687886 0t0 TCP localhost:9091->localhost:57564 (ESTABLISHED)
java 27508 russ 1732u IPv4 103680156 0t0 TCP localhost:9091->localhost:57558 (ESTABLISHED)
java 27508 russ 1734u IPv4 103687887 0t0 TCP localhost:9091->localhost:57566 (ESTABLISHED)

Friday, 6 January 2017

Just a note from Expression Language Guide that if attempting to evaluate an attribute, via evaluateAttributeExpression(), NiFi looks first for a flowfile attribute by that name, then, if missing, for a system environment variable, and that failing, for a JVM system property.

Tuesday, 10 January 2017

Working on a huge flow, examining performance with JMC/JFR, I have literally hundreds of millions of files that stack up at different queue points, but especially at the end. Ridding myself of all theses files from the queues take something like forever. I posted to ask if anyone had a quick way, but there is none. I tried a) creating a no-op processor drain, but that doesn't go much if at all faster than b) right-clicking the queue and choosing Empty queue.

I even tried a brute-force approach using a script I wrote sitting in my NiFi root:

#!/bin/sh
read -p "Smoke NiFi repositories (y/n)? " yesno
case "$yesno" in
  y|Y)
    ( cd flowfile_repository   ; rm -rf * )
    ( cd provenance_repository ; rm -rf * )
    ( cd content_repository    ; rm -rf * )
    ;;
  *)
    ;;
esac

...because removing everything from the content repository takes forever—hours for a couple of hundred million flowfiles.

This was predictable, but I wasn't even thinking about it before the accumulation became a problem. After letting the files delete over night, I came in a added a counting processor so I could just let them drain out as they arrived (instead of accumulating them in a queue for counting).

Note too that if the total number of flowfiles in any one connection exceeds the value nifi.queue.swap.threshold in nifi.properties, usually 20,000, swapping will occur and performance can be affected. So, by allowing files to accumulate, I was shooting myself in the foot because I was altering the very performance I was trying to monitor/get a feel for.

Wednesday, 11 January 2017

I'd never done this before.

How to make "kinkies" in a relationship/connection? Just double-click the line and it will put a yellow handle in it that you can drag to get the connection around any obstruction to make the connection clearer. To remove one that's not/no longer needed, simply double-click the yellow handle. See Bending connections to add a bend or elbow-point.

Someone in the forum wanted a flowfile counter processor. I'll put it here, but just in case, I'll say "no guarantees" too. NiFi's counter cookie jar consists of the lines highlighted below.

PassThruCounterTest.java:
package com.etretatlogiciels.nifi.processor;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

public class PassThruCounterTest
{
  private static final String TEST_VALUE = "test123";

  @Test
  public void testFlowfileCount() throws InitializationException
  {
    TestRunner runner = TestRunners.newTestRunner( PassThruCounter.class );
    runner.setProperty( PassThruCounter.NAME, TEST_VALUE );
    runner.setProperty( PassThruCounter.TYPE, "count" );
    runner.enqueue( TEST_VALUE );
    runner.run();
    final Long actual   = runner.getCounterValue( TEST_VALUE );
    final Long expected = Long.valueOf( "1" );
    assertEquals( actual, expected );
  }

  @Test
  public void testFlowfileSize() throws InitializationException
  {
    TestRunner runner = TestRunners.newTestRunner( PassThruCounter.class );
    runner.setProperty( PassThruCounter.NAME, TEST_VALUE );
    runner.setProperty( PassThruCounter.TYPE, "size" );
    String value = TEST_VALUE;
    runner.enqueue( value );
    runner.run();
    final Long actual   = runner.getCounterValue( TEST_VALUE );
    final Long expected = ( long ) value.length();
    assertEquals( actual, expected );
  }
}
PassThruCounter.java:
package com.etretatlogiciels.nifi.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@SupportsBatching
@Tags( { "debugging" } )
@CapabilityDescription( "Counts flowfiles passing or their cumulative sizes." )
public class PassThruCounter extends AbstractProcessor
{
  @Override public void onTrigger( final ProcessContext context, final ProcessSession session )
        throws ProcessException
  {
    FlowFile flowfile = session.get();

    if( flowfile == null )
    {
      context.yield();
      return;
    }

    final String type = context.getProperty( TYPE ).getValue();
    final String name = context.getProperty( NAME ).getValue();

    switch( type )
    {
      case "count" :
        session.adjustCounter( name, 1, true );
        break;
      case "size" :
        session.adjustCounter( name, flowfile.getSize(), true );
        break;
    }

    session.transfer( flowfile, SUCCESS );
  }

  //<editor-fold desc="Properties and relationships">
  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( TYPE );
    properties.add( NAME );
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( SUCCESS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
      .name( "Type" )
      .description( "Counter type, one of \"count\" or \"size\" of flowfiles." )
      .required( true )
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .defaultValue( "count" )
      .build();

  public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder()
      .name( "Name" )
      .description( "Name of registered counter." )
      .required( true )
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .build();

  public static final Relationship SUCCESS = new Relationship.Builder()
      .name( "success" )
      .description( "Continue on to next processor" )
      .build();

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >        getRelationships()                { return relationships; }
  //</editor-fold>
}

Thursday, 12 January 2017

Some notes on tuning NiFi for high-performance.

    nifi.properties
  1. nifi.bored.yield.duration=10 millis —prevents processors that are using the timer-driven scheduling technology from using excessive CPU despite there being no work to do.
  2. nifi.ui.autorefresh.interval=30 sec —can be used to make the browser UI refresh more often, but increases burden on the network. It doesn't affect the performance of NiFi itself.
  3. There are two databases used by NiFi (under database_repository). One is a user database keeping track of user logins (in the case where running secured NiFi). The other is the history database that tracks all changes made to the graph (or workflow). Neither of these is very big (I'm seeing just over 2Mb). It's useful to consider relocating this repository elsewhere (that is, outside /opt/nifi), to simplify upgrading by allowing to retain the user and component history after upgrading.
  4. The most likely problem in corrupting repositories is running out of disk space. For this reason, relocating the flowfile-, content- and provenance repositories outside of /opt/nifi is not a bad idea. The location is specified in nifi.properties via nifi.flowfile.repository.directory, etc.
  5. As noted a couple of days ago, it's the content repository's disk that experiences the highest I/O impact on systems with high data volumes. NiFi queues stacking up can be a problem. nifi.content.repository.directory.name=path can be use, in fact, a whole list of them with different names, in nifi.properties to spread out this potentially mammoth repository.
  6. In NiFi clusters, it's a good idea to use different names (just as above) for the repositories of the different NiFi nodes. Obviously, installing these nodes will not lead automatically to different names and keeping the same, default names works fine, but monitoring disk utilization within the NiFi UI will be easier if the names are different when lumped together in the system diagnostics window.
  7. bootstrap.conf
  8. The JVM memory settings control NiFi's heap should be greatly expanded from the shipping defaults (for example, from -Xms256m to -Xms8g).

A good link for NiFi installation best practice: HDF/NIFI Best practices for setting up a high performance NiFi installation.

Friday, 13 January 2017

I still haven't had time to set up a NiFi cluster which I want to do experimentally (before I ever have to do it because forced).

I came across a random, passing observation that Zookeeper, which is integral to erecting a NiFi cluster, generally recommends an odd number of nodes. I'm sure that, just as for MongoDB, this is about elections.

Tuesday, 24 January 2017

The perils of moving a processor between packages.

When you change the class (or package) name of a processor, you have effectively removed the processor from your build and added a new one. At that point, NiFi will create a "ghost" processor (in conf/flow.xml.gz) to take its place because the processor class no longer exists.

However, the properties are still retained, in case the processor is added back (reappears). But since NiFi cannot find it, it doesn't know whether the properties are sensitive or not. As a result, it marks all properties as sensitive because it was determined that it is better to share too little information than to share too much.

It's possible to edit conf/flow.xml.gz by hand, if care is taken, I think.

Thursday, 2 February 2017

If disk space is exhausted, how to recover unprocessed data since NiFi cannot proceed?

Archived content can be deleted from content_repository:

# find content_repository/ -type f -amin +600 | grep archive | xargs rm -f

(The -amin +600 option means file was accessed 600 or more minutes ago.)

This deletes anything from that archive that is more than 10 hours old. Once this has happened, assuming enough diskspace has been freed up, NiFi can be restarted.

Of course, what partition both the content- and flowfile repositories are on is a consideration. If not the same one, then freeing up old content won't do the trick.

Note that it's theoretically safe to perform these deletions with NiFi running, but there are no guarantees. It's better to halt NiFi first, then restart it.

Saturday, 11 February 2017

Time out for NiFi performance...

I need to review some NiFi advice to see where we are on implementing NiFi the correct way. Here are some elements:

  1. Each NiFi repository should be on its own, physically separate partition. The three repositories in question are found directly off ${NIFI_ROOT} (usually /opt/nifi):
    • content_repository
    • flowfile_repository
    • provenance_repository
    • database_repository (small and not part of any performance concern)

    The location of these repositories is managed by changing these defaults in conf/nifi.properties:

    nifi.content.repository.directory.default=./content_repository
    nifi.flowfile.repository.directory.default=./flowfile_repository
    nifi.provenance.repository.directory.default=./provenance_repository
    
  2. The first, easiest step is to make the provenance_repository volatile. This is done by settings for:
    • buffer size
    • maximum storage time —how long to keep provenance around for examination
    • maximum storage size —how much disk space to dedicate to keeping provenance data around
    • roll-over time —the time to wait before rolling over the latest provenance information to be available from the UI
    • index shard size —larger values impact the Java heap when searching through provenance data (from the UI)

    ...in conf/nifi.properties, here are the settings discussed above and their defaults:

    nifi.provenance.repository.buffer.size=100000
    nifi.provenance.repository.max.storage.time=24 hours
    nifi.provenance.repository.max.storage.size=1 GB
    nifi.provenance.repository.rollover.time=30 secs
    nifi.provenance.repository.rollover.size=100 MB
    nifi.provenance.repository.index.shard.size=500 MB
    
  3. Heap size for NiFi should be as great as legitimately possible. This is managed by establishing the JVM command-line option, -Xmx, in conf/bootstrap.conf. It looks like this:
    # JVM memory settings
    java.arg.2=-Xms512m
    java.arg.3=-Xmx12288m
    

  4. Processors that load their content fully into memory harm performance greatly. A very naïve example of this would be this sample onTrigger() implementation. onTrigger() is where the NiFi processor performs its main business, that of manipulating the flowfile. Options are to process the flowfile contents progressively inching through input- and output streams such that only the buffered contents sit in memory at one time (and not the entire input- or output contents):
    import org.apache.commons.io.IOUtils;
    .
    .
    .
    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
    {
      FlowFile flowfile = session.get();
    
      final AtomicReference< String > contents = new AtomicReference<>();
    
      session.read( flowfile, new InputStreamCallback()
      {
        @Override
        public void process( InputStream in ) throws IOException
        {
          StringWriter writer = new StringWriter();
          IOUtils.copy( in, writer );
          contents.set( writer.toString() );
        }
      });
    
      // now the entire flowfile contents are sitting in memory...
    }
    
  5. Consider a run-duration of 25 milliseconds for processors that support it. This allows the framework to batch many operations into fewer transactions which can dramatically increase throughput. The run-duration setting, when supported, is in the Scheduling tab in the processor configuration:

    Note that this setting may be confusing. Let me attempt to explain:

    The lower the latency, the more cycles are given to other processors because the batch size is kept smaller. The higher the throughput (which is throughput per invocation, the more that processor hogs cycles for itself.

    Depending on what's going on elsewhere in the flow, a lower latency will mean that the processor will simply run more times to process the same number of flowfiles that would have been processed in one invocation when this setting was done to favor throughput. However, this also means:

    • better response time at the NiFi UI (canvas),
    • better response time for other processors and elements of the flow.

    The run-duration slider control is included in configuration based on the use of the @SupportsBatching annotation. Once you use this annotation, you don't need to do anything in onTrigger() like:

    private static final int MAX_FLOWFILES_IN_BATCH = 50;
    
    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
    {
      final List< FlowFile > flowfiles = session.get( MAX_FLOWFILES_IN_BATCH );
    
      for( FlowFile flowfile : flowfiles )
      {
        ...
      }
    

    ..., but just respond with:

    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
    {
      final FlowFile flowfile = session.get();
    
      if( flowfile == null )
      {
        context.yield();
        return;
      }
    

    The slider (the NiFi framework) does the rest: you won't need to worry about the existence of a batch. It just is.

    Now, on the other hand, if you need to manage a pool of expensive resources, expensive to allocate, for example, you'd want also to add:

    @OnScheduled
    public void allocateMyResources( final ProcessContext context ) throws ProcessException
    {
      try
      {
        // make a connection, allocate a pool, etc.
      }
      catch( Exception e )
      {
        throw new ProcessException( "Couldn't allocate my resources: ", e );
      }
    }
    

Saturday, 14 February 2017

When you get the error that a flowfile is not the most recent version, it means that you have modified the flowfile in some way via ProcessSession and then attempted to use the flowfile in a variable that preceeds the change. Commonly this happens when you have code that looks something like this:

FlowFile flowfile = session.get();

if( flowfile == null )
  return;

session.putAttribute( flowfile, "hello", "hello" );
session.transfer( flowfile, SUCCESS );

The issue here is that the ProcessSession.putAttribute() method called above returns a new FlowFile object, and we then pass the old version of the flowfile to session.transfer(). This code would need to be updated to look like this:

FlowFile flowfile = session.get();

if( flowfile == null )
  return;

flowfile = session.putAttribute( flowfile, "hello", "hello" );
session.transfer( flowfile, SUCCESS );

Note the flowfile variable is re-assigned to the new FlowFile object when session.putAttribute() is called so that we pass the most up-to-date version of the flowfile to ProcessSession.transfer.

Friday, 17 February 2017

On sensitive-value properties...

This is a phenomenon common in controller services, i.e.: properties that hold a password or -phrase or other sensitive sequence that must be hidden from subsequent view. You type it in, confirm it, and never see it again. You can only type it in again.

Here's a long comment I got from Andy LoPresto:

Each sensitive property value is encrypted using the specified algorithm noted in conf/nifi.properties. By default, and therefore on any unmodified system, this is PBEWITHMD5AND256BITAES-CBC-OPENSSL, which means that the actual cipher applied is AES with CBC mode of operation using a 256-bit key derived from the provided nifi.sensitive.props.key value (really a password and not a true key) using the OpenSSL EVP_BytesToKey key-derivation function (KDF), a modified version of PKCS #5 v1.5' MD5-based PBKDF1 identical when the combined length of the key and IV is less than or equal to an MD5 digest and non-standard if greater.

A random 16-byte salt is generated during the encrypt process (the salt length is actually set to be the block length of the cipher in question—AES is always 16 bytes, but if you choose to use DES for example, this value would be 8 bytes), and this salt is provided, along with an iteration count (1000 by default) to the KDF. Internally to the application, this process occurs within the Jasypt library (slated for removal in NIFI-3116), but you can see a compatible Java native implementation of this process in the ConfigEncryptionTool.

If the nifi.sensitive.props.key value is empty, a hard-coded password is used. This is why all documentation should recommend setting that value to a unique and strong passphrase prior to deployment.

Once the key is derived, the sensitive value is encrypted and the salt and cipher text are concatenated and then encoded in hexadecimal. This output is wrapped in "enc{" and "}" tokens to denote the value is encrypted, and then stored in flow.xml.gz. As pointed out elsewhere, the sensitive value (in plain-text or encrypted form) is never sent over the ReST API to any client, including the UI. When editing a processor property that is sensitive, the UI displays a static placeholder.

If two values are the same (i.e. the same password in an EncryptContent processor encrypting and an EncryptContent processor decrypting later in the same flow), the two encrypted values will be completely different in the flow.xml.gz even though they were encrypted with the same key. This is because of the random salt value mentioned above.

The encrypt-config.sh script in nifi-toolkit exposes the functionality to "migrate" the flow.xml.gz encrypted values to be encrypted with a new key. For example, if you read the above and feel uncomfortable with your values encrypted with the default hard-coded password rather than a unique passphrase, you can run this tool and provide a new passphrase, and it will update conf/nifi.properties with the new passphrase (and encrypt it if you have encrypted configuration enabled) and decrypt all values in the flow.xml.gz and re-encrypt them with the new key and repopulate them.

A small note on the migration tool—you may notice that after running it, identical values will have identical encrypted values. This design decision was made because the performance tradeoff of not re-performing the KDF with a unique salt and cipher initialization on each value is immense, and the threat model is weak because the key is already present on the same system. This decision was further impacted by the process by which Jasypt derives the keys. After NIFI-3116 is completed, I feel confident we can improve the security of the cipher texts by using unique IVs without incurring the substantial penalty currently levied by the library structure.

Throughout this explanation, you may have had concerns about some of the decisions made (algorithms selected, default values, etc.). There are ongoing efforts to improve these points, as security is an evolving process and attacks and computational processing availability to attackers is always increasing. More of this is available in relevant JIRAs and the Security Features Roadmap on the wiki, but in general I am looking to harden the system by restricting the algorithms used by default and available in general and increasing the cost of all key derivations (both in time and memory hardness via PBKDF2, bcrypt, and scrypt). Obviously the obstacles in this effort are backwards compatibility and legacy support. It is a balance, but with the support of the community, I see us continuing to move forward with regards to security while making the user experience even easier.

Wednesday, 1 March 2017

This morning, NiFi queue prioritization...

To impose prioritization on flowfiles, you configure a (stopped) queue, by right-clicking it and choosing Configure. Look for the Settings tab. Then grab one of:

...and drag it down into Selected Prioritizers.

This morning's note is about PriorityAttributePrioritizer.

The PriorityAttributePrioritizer prioritizes flowfiles by looking for a attribute named priority, then sorting the flowfiles using the lexicographical value of the priority marked therein.

Brian Bende pointed out that, coming out of various feeds (let's say some processor groups), you can set the priority attribute using UpdateAttribute to weight the various feeds according to a sense of priority. Before continuing the flow, you can converge all the flows into one using a funnel.

Tuesday, 7 March 2017

How the provenance repository works

The provenance repository writes out the data "inline" as flowfiles traverse the system. It rolls over periodically (by default every 30 seconds or after writing 100 MB of provenance data). When it rolls over, it begins writing data to a new (repository) file and starts to index the events for the old file in Lucene. The events are not available in the UI until they have been indexed into Lucene. This could be a few minutes depending on the system and how many provenance events are being generated, see Persistent Provenance Repository Design.

This said, the existing implementation (1.1.x) is not as fast as it could be. In the next release of NiFi, there is an alternate implementation that can be used. See Provide a newly refactored provenance repository.

Wednesday, 15 March 2017

If you want to see when/who stopped a processor in NiFi (and other configuration changes) you can use the Flow configuration history (the clock icon on the right side of the menu bar in 0.7.x) and filter by processor id.

Thursday, 16 March 2017

Just a note to remember that AbstractProcessor extends AbstractSessionFactoryProcessor and that the real delta between them is the second argument to onTrigger(), where the subclass offers ProcessSession in place of the ProcessSessionFactory. The latter gives the ability to create a new session (with sessionFactory.createSession().

Friday, 17 March 2017

Here are some excellent details on NiFi-processor semantics by Andy LoPresto and Mark Payne. First Andy...

In the Developer Guide, there are discussions of various processor patterns, including Split Content, and a section on Cohesion and Usability, which states:

"In order to avoid these issues, and make processors more reusable, a processor should always stick to the principal of 'do one thing and do it well.' Such a processor should be broken into two separate processors: one to convert the data from format X to format Y, and another processor to send data to the remote resource."

I call this the "Unix model," i.e.: it is better to join several small, specific tools together to accomplish a task than re-invent larger tools every time a small modification is required. In general, that would lead me to develop processors that operate on the smallest unit of data necessary—a single line, element, or record makes sense—unless more context is needed for completeness, or the performance is so grossly different that it is inefficient to operate on such small quantities.

Finally, with regards to your question about which provenance events to use in various scenarios, I agree the documentation is lacking. Luckily Drew Lim did some great work improving this documentation. While it has not been released in an official version, both the Developer Guide and User Guide have received substantial enhancements, describing the complete list of provenance event types and their usage or meaning. This work is available on master and will be released in 1.2.0.

Different processors have different philosophies. Sometimes that is the result of different authors, sometimes it is a legitimate result of the wide variety of scenarios that these processors interact with. Improving the user experience and documentation is always important, and getting started with and maximizing the usefulness of these processors is one of several top priorities.

Next, Mark...

On Provenance Events, while the documentation does appear to be lacking there at the moment, the framework does its best to help here. Here is some additional guidance:

Regarding processors handling all of the data vs. only a single "piece" of the data:

Processors should handle all of the content of a flowfile, whenever it makes sense to do so. For example, if you have a processor that is designed to operate on the header of a CSV file then it does not make sense to read and process the rest of the data. However, if you have a processor that modifies one of the cells in a row of a CSV file it should certainly operate on all rows. This gives the user the flexibility to send in single-row CSV files if they need to split it for some other reason, or to send in a many-gigabyte CSV file without the need to pay the cost of splitting it up and then re-merging it.

Where you may see the existing processors deviate from this is something like ExtractText, which says that it will only buffer up to the configured amount of data. This is done because in order to run a regular expression over the data, the data has to be buffered in the Java heap. If we don't set limits to this, then a user will inevitably send in a 10Gb CSV file and get "out of memory errors." In this case, we would encourage users to use a small buffer size such as 1Mb and then split the content upstream.

Friday, 18 March 2017

I asked the question,

The ability to create a new session is conferred by extending AbstractSessionFactoryProcessor (in lieu of AbstractProcessorwhich which further restricts Processor, among other things, removing the session factory) giving one the option of calling sessionFactory.createSession(). This begs the question of why or when would I wish to create a new session rather than operate within the one I'm given (by AbstractProcessor)?

Mark Payne and Matt Burgess responded, Mark wrote:

MergeContent is a good example of when this would be done.

It creates a ProcessSession for each "bin" of flowfiles that it holds onto. This allows it to build up several flowfiles in one bin and then process that entire bin, committing the ProcessSession that is associated with it. Doing this does not affect any of the flowfiles in another bin, though, as they belong to a different ProcessSession. If they all belonged to the same ProcessSession, then we would not be able to commit the session whenever one bin is full. Instead, we would have to wait until all bins are full, because in order to commit a ProcessSession, all of its flowfiles must be accounted for (transferred or a relationship or removed).

So in a more general sense, you would want this any time that you have multiple flowfiles where you need them to belong to different session so that you can commit/rollback a session without accounting for all flowfiles that you've seen.

The AbstractSessionFactoryProcessor is also sometimes used when we want to avoid committing the session when returning from onTrigger(). For example, if there is some asynchronous process going on.

Matt:

There are a couple of use cases I can think of where you'd want AbstractSessionFactoryProcessor over AbstractProcessor:

  1. You only want to update the StateManager after a successful commit has taken place. Because AbstractProcessor.onTrigger() calls commit for you, you can't add code after that to update the state map.
  2. I am working on a processor that needs to keep the same session across multiple calls to onTrigger(), basically its a transactional thing and I don't know when it will end. If something bad happens in the meantime, I need to roll back that original session.

Tuesday, 21 March 2017

@SupportsBatching leads to that "slider" seen in a processor's configuration. It's a hint only, but has no material effect upon how to code the processor (in terms of special things to do).

A greater than 0 delay (see the slider) will cause the flowfiles over that time period to be batched out such that the work will be done, but no commit until the end of the delay (in slider).

Friday, 24 March 2017

Custom registry properties in the NiFi Expression Language

At least by NiFi 1.1.1, the following became a possibility:

In ${NIFI_ROOT}/conf/nifi.properties, add this line:

nifi.variable.registry.properties=./conf/registry.properties

You can list more than one file, separated by commas. Then, create this file (registry.properties) locally with contents:

zzyzzx="This is a test of the Emergency Broadcast System. This is only a test."

Next, create a flow using CreateFlowFile (and start it) connected to UpdateAttributes, which you configure thus:

Property Value
ZZYZZX   ${zzyzzx}

...(starting this processor too) then on to NoOperation (that you don't start) so there's a flowfile in the queue you can look at. Bounce NiFi because these properties are static. Once there's a flowfile in the queue in front of NoOperation, right-click and choose List Queue, then click the View Details and, finally, the Attributes tab. You'll see attribute ZZYZZX with the property value assigned by registry.properties in our example.