NiFi notes

Russell Bateman
January 2016
last update:



(See A simple NiFi processor project for minimal source code and pom.xml files.)

Managing templates in NiFi 1.x.

NiFi Javadocs

NiFi
NiFi test framework

Preface

The notes are in chronological order, as I needed and made them. I haven't yet attempted to organize them more usefully. The start happens below under Friday, 15 January 2016.

According to Wikipedia, NiFi originated at the NSA where it originally bore the name, Niagarafiles (from Niagara Falls). It was made open source, then picked up for continued development by Onyara, Inc., a start-up company later acquired by Hortonworks in August, 2015. NiFi has been described as a big-data technology, as "data in motion," in opposition to Hadoop, which is "data at rest."

Apache NiFi is the perfect tool for performing extract, transfer and load (ETL) of data records.

The principal NiFi developer documentation is cosmetically beautiful, however, it suffers from being written very much by someone who already thoroughly understands his topic without the sensitivity that his audience do not. Therefore, it's a great place to go to as a reference, but rarely as a method. One can easily become acquainted with the elements of what must be done, but rarely if ever know how to write the code to access those elements let alone the best way to do it. For method documentation, a concept in language learning, which often only appears for technology such as NiFi long after the fact in our industry, you'll need to look fort the "how-to" stuff in the bulleted list below, as well as in my notes here.

 

NiFi 1.x UI overview and other videos


My goal in writing this page is a) to record what I've learned for later reference and b) to leave breadcrumbs for others like me.

History (and testimonial)

In my company, we were doing extract, load and transfer (ETL) by way of a hugely complicated and proprietary monolith that was challenging to understand. I had proposed beginning to abandon it in favor of using a queue-messaging system, something I was very familiar with. A few months later, a director discovered NiFi and put my colleague up to looking into it. Earlier on, I glanced at it, but was swamped on a project and did not pursue it until after my colleague had basically adopted and promoted it as a replacement for our old ETL process. It only took us a few weeks to a) create a shim by which our existing ETL component code became fully exploitable and b) spin up a number of NiFi-native components including ports of some of the more important, legacy ones. For me, the road has been a gratifying one: I no longer need to grok the old monolith and can immediately contribute to the growing number of processors to do very interesting things. It leveled my own playing field as a newer member of staff.

Unit-testing cribs

Stuff I don't have to keep in the pile that's my brain because I just look here (rather than deep down inside these chronologically recorded notes):

  @Test
  public void test() throws Exception
  {
    TestRunner runner = TestRunners.newTestRunner( new ExcerptBase64FromHl7() );

    final String HL7 =
         "MSH|^~\\&|Direct Email|etHIN|||201609061628||ORU^R01^ORU_R01|1A160906202824456947||2.5\n"
        + "PID|||000752356634||TEST^MINNIE^||19690811|F|||1 STATE ST^^ANYWHERE^TN^21112^||\n"
        + "PV1||O\n"
        + "OBR||62281e18-b851-4218-9ed5-bbf392d52f84||AD^Advance Directive"
        + "OBX|1|ED|AD^Advance Directive^2.16.840.1.113883.3.1256||^multipart^^Base64^"
        + "F_ABCDEFGHIJKLMNOPQRSTUVWXYZ";

    // mock up some attributes...
    Map< String, String > attributes = new HashMap<>();
    attributes.put( "attribute-name", "attribute-value" );
    attributes.put( "another-name",   "another-value" );

    // mock up some properties...
    runner.setProperty( ExcerptBase64FromHl7.A_STATIC_PROPERTY, "static-property-value" );
    runner.setProperty( "dynamic property", "some value" );

    // call the processor...
    runner.setValidateExpressionUsage( false );
    runner.enqueue( new ByteArrayInputStream( HL7.getBytes() ), attributes );
    runner.run( 1 );
    runner.assertQueueEmpty();

    // gather the results...
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( ExcerptBase64FromHl7.SUCCESS );
    assertTrue( "1 match", results.size() == 1 );
    MockFlowFile result = results.get( 0 );

    String actual = new String( runner.getContentAsByteArray( result ) );

    assertTrue( "Missing binary content", actual.contains( "1_ABCDEFGHIJKLMNOPQRSTUVWXYZ" ) );

    // ensure the processor transmitted the attribute...
    String attribute = result.getAttribute( "attribute-name" );
    assertTrue( "Failed to transmit attribute", attribute.equals( "attribute-value" ) );
  }

Wednesday, 6 January 2016

Here is a collection of some NiFi notes...

Friday, 15 January 2016

  1. Download nifi-0.4.1-bin.tar.gz from Apache NiFi Downloads and explode locally. Sometimes the tarball doesn't work; in this case, use nifi-0.4.1-bin.zip instead. (The version changes every few months; adjust accordingly. Much of what you see Googling for help using NiFi will for some time continue to be for 0.x until 1.x examples begin to overwhelm. I personally just use the latest except that in production for my company, work must lag a bit behind.)

  2. (Don't download the source and build it unless you need/want to...)

  3. Presuming you don't have anything else running on port 8080, there are no settings to modify. If you do not wish to use this port to run NiFI, modify nifi.web.http.port=8080 in ${NIFI_ROOT}/conf/nifi.properties accordingly.

  4. Start NiFi: ${NIFI_ROOT}/bin/nifi.sh start.

  5. Launch http://localhost:8080/nifi/ in a browser. You'll see

  6. If there is trouble, check out ${NIFI_ROOT}/logs/nifi-bootstrap.log or, less likely, ${NIFI_ROOT}/logs/nifi-app.log.

  7. At this point, check out a tutorial on creating a simple flow. Videos might be a good place. There's also a whole selection of starter videos for NiFi 0.x.

Update, February 2017: what's the real delta between 0.x and 1.x? Not too much until you get down into gritty detail, something you won't do when just acquainting yourself.

Now I'm looking at:

  1. Developing a Custom Apache Nifi Processor (JSON).

Monday, 18 January 2016

Unsuccessful last Friday, I took another approach to working on the NiFi example from IntelliJ IDEA. By no means do I think it must be done this way, however, Eclipse was handy for ensuring stuff got set up correctly. The article assumes much about setting up a project.

  1. Copy pom.xml to new subdirectory.
  2. mvn eclipse:eclipse
  3. Launch Eclipse (Mars) and create subdirectories:
    1. Create src/main/resources/META-INF/services.
    2. Create file org.apache.nifi.processor.Processor.
    3. Create subdirectory src/main/java.
    4. Create subdirectory test.
    5. Right-click java subdirectory, choose Build Path → Use as Source Folder
      (do this also for test subdirectory).
    6. Create package rocks.nifi.examples.processors.

The pom.xml. There are several ultra-critical relationships to beware of.

  1. The org.apache.nifi.processor.Processor file contents must match Java package path and processor Java classname.
  2. groupId must match Java package path.
  3. artifactId gives project its name and must match.
  4. This is handled automatically if you follow the steps above.
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>rocks.nifi</groupId>
  <artifactId>examples</artifactId>   <!-- change this to change project name -->
  <version>1.0-SNAPSHOT</version>
  <packaging>nar</packaging>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <nifi.version>0.4.1</nifi.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-api</artifactId>
      <version>${nifi.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-utils</artifactId>
      <version>${nifi.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-processor-utils</artifactId>
      <version>${nifi.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-io</artifactId>
      <version>1.3.2</version>
    </dependency>
    <dependency>
      <groupId>com.jayway.jsonpath</groupId>
      <artifactId>json-path</artifactId>
      <version>1.2.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.nifi</groupId>
      <artifactId>nifi-mock</artifactId>
      <version>${nifi.version}</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-nar-maven-plugin</artifactId>
        <version>1.0.0-incubating</version>
        <extensions>true</extensions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.15</version>
      </plugin>
    </plugins>
  </build>
</project>
<!-- vim: set tabstop=2 shiftwidth=2 expandtab: -->

Of course, once you've also created JsonProcessor.java at the end of the package created, you just do as the article says. You should end up with:

~/dev/nifi/projects/json-processor $ !tree
tree
.
+-- pom.xml
`-- src
    +-- main
    |   +-- java
    |   |   `-- rocks
    |   |       `-- nifi
    |   |           `-- examples
    |   |               `-- processors
    |   |                   `-- JsonProcessor.java
    |   `-- resources
    |       `-- META-INF
    |           `-- services
    |               `-- org.apache.nifi.processor.Processor
    `-- test
        `-- java
            `-- rocks
                `-- nifi
                    `-- examples
                        `-- processors
                            `-- JsonProcessorTest.java

16 directories, 4 files

The essential is:

  1. Create the exact, Sun Java/Maven-ish structure show because the NiFi NAR packager cannot endure any other structure. It's brittle.
  2. Build using mvn clean install from the command line (not the IDE).

The IntelliJ IDEA project can be opened as usual in place of Eclipse. Here's success:

See NiFi Developer's Guide: Processor API for details on developing a processor, what I've just done. Importantly, this describes

  1. the file named, org.apache.nifi.processor.Processor that contains the fully qualified class name of the processor,
  2. extending org.apache.nifi.processor.AbstractProcessor to create the new processor,
  3. the highly concurrent nature of the NiFi framework and how the processor must be written to be thread-safe.
  4. the existence of several, major supporting classes, to wit:
    • FlowFile —the date or file being treated
    • ProcessSession —the session during treatment
    • ProcessContext —processor-framework bridge
    • PropertyDescriptor —defines a property
    • Validator —for user-entered values for properties
    • ValidationContext
    • PropertyValue —value of a given property
    • Relationship —route by which a FlowFile leaves a processor to go elsewhere
    • ProcessorInitializationContext
    • ProcessorLog

Here's an example from NiFi Rocks:

package rocks.nifi.examples.processors;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import com.jayway.jsonpath.JsonPath;
import org.apache.commons.io.IOUtils;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

/**
 * Our NiFi JSON processor from
 * http://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/
 */
@SideEffectFree
@Tags( { "JSON", "NIFI ROCKS" } )                       // for finding processor in the GUI
@CapabilityDescription( "Fetch value from JSON path." ) // also displayed in the processor selection box
public class JsonProcessor extends AbstractProcessor
{
  private List< PropertyDescriptor > properties;        // one of these for each property of the processor
  private Set< Relationship >        relationships;     // one of these for each possible arc away from the processor

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >        getRelationships()                { return relationships; }

  // our lone property...
  public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder()
        .name( "Json Path" )
        .required( true )
        .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
        .build();

  // our lone relationship...
  public static final Relationship SUCCESS = new Relationship.Builder()
        .name( "SUCCESS" )
        .description( "Succes relationship" )
        .build();

  /**
   * Called at the start of Apache NiFi. NiFi is a highly multi-threaded environment; be
   * careful what is done in this space. This is why both the list of properties and the
   * set of relationships are set with unmodifiable collections.
   * @param context supplies the processor with a) a ProcessorLog, b) a unique
   *                identifier and c) a ControllerServiceLookup to interact with
   *                configured ControllerServices.
   */
  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( JSON_PATH );
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( SUCCESS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  /**
   * Called when ever a flowfile is passed to the processor. The AtomicReference
   * is required because in order to have access to the variable in the call-back scope it
   * needs to be final. If it were just a final String it could not change.
   */
  @Override
  public void onTrigger( ProcessContext context, ProcessSession session ) throws ProcessException
  {
    final ProcessorLog              log   = this.getLogger();
    final AtomicReference< String > value = new AtomicReference<>();

    FlowFile flowfile = session.get();

    // ------------------------------------------------------------------------
    session.read( flowfile, new InputStreamCallback()
    {
      /**
       * Uses Apache Commons to read the input stream out to a string. Uses
       * JsonPath to attempt to read the JSON and set a value to the pass on.
       * It would normally be best practice in the case of a exception to pass
       * the original flowfile to a Error relation point.
       */
      @Override
      public void process( InputStream in ) throws IOException
      {
        try
        {
          String json   = IOUtils.toString( in );
          String result = JsonPath.read( json, "$.hello" );
          value.set( result );
        }
        catch( Exception e )
        {
          e.printStackTrace();
          log.error( "Failed to read json string." );
        }
      }
    } );
    // ------------------------------------------------------------------------

    String results = value.get();

    if( results != null && !results.isEmpty() )
      flowfile = session.putAttribute( flowfile, "match", results );

    // ------------------------------------------------------------------------
    flowfile = session.write( flowfile, new OutputStreamCallback()
    {
      /**
       * For both reading and writing to the same flowfile. With both the
       * outputstream call-back and stream call-back remember to assign it
       * back to a flowfile. This processor is not in use in the code, but
       * could have been. The example was deliberate to show a way of moving
       * data out of callbacks and back in.
       */
      @Override
      public void process( OutputStream out ) throws IOException
      {
        out.write( value.get().getBytes() );
      }
    } );
    // ------------------------------------------------------------------------

    /* Every flowfile that is generated needs to be deleted or transfered.
     */
    session.transfer( flowfile, SUCCESS );
  }

  /**
   * Called every time a property is modified in case we want to do something
   * like stop and start a Flowfile over in consequence.
   * @param descriptor indicates which property was changed.
   * @param oldValue value unvalidated.
   * @param newValue value unvalidated.
   */
  @Override
  public void onPropertyModified( PropertyDescriptor descriptor, String oldValue, String newValue ) { }
}

(See this note for interacting with a flowfile in both input- and output modes at once.)

Tuesday, 19 January 2016

Lifecycle of a processor...

Before anything else, the processor's init() method is called. It looks like this happens at NiFi start-up rather than later when the processor is actually used.

The processor must expose to the NiFi framework all of the Relationships it supports. The relationships are the arcs in the illustration below.

Performing the work. This happens when onTrigger() is called (by the framework).

The onTrigger() method is passed the context and the session, and it must:

  1. get the flowfile from the session,
  2. perform a read of the flowfile stream,
  3. obtain the results of reading,
  4. put the attribute "match," on the session,
  5. do work (whatever that is/however that is done),
  6. perform a write of the modified flowfile stream,
  7. note the success or failure of writing,
  8. remove the flowfile, or
  9. hand off the flowfile to the next processor.

A processor won't in fact be triggered, even if it has one or more FlowFile in its queue if the framework, checking downstream destinations (i.e. all other processors) and these are full. There is an annotation, @TriggerWhenAnyDestinationAvailable, which when present will allow the FlowFile to be processed as long as it can advance to the next processor.

@TriggerSerially will make the processor work as single-threaded. However, the processor implementation must still be thread-safe as the thread of execution may change.

Properties

Properties are exposed to the web interface via public method, but the properties themselves are locked using Collections.unmodifiableList() (if coded that way). If a property is changed as a processor is running, the latter can react eagerly if it implements onPropertyModified().

(There is no sample code of validating processor properties. Simply, the customValidate() method is called with a validation context and any that describe problems are returned . (But, is this implemented?)

  protected Collection< ValidationResult> customValidate( ValidationContext validationContext )
  {
    return Collections.emptySet();
  }

It's possible to augment initialization and enabling lifecycles through Java annotion. Some of these apply only to processor components, some to controller components, some to reporting tasks, to one or more.

Wednesday, 20 January 2016

There's a way to document various things that reach the user level like properties and relationships. This is done through annotation, see Documenting a Component and see the JSON processor sample code at the top of the class.

"Advanced documentation," what appears when a user right-clicks a processor in the web UI, provides a Usage menu item. What goes in there is done in additionalDetails.html. This file must be dropped into the root of the processor's JAR (NAR) file in subdirectory, docs. See Advanced Documentation.

In practice, this might be a little confusing. Here's an illustration. If I were going to endow my sample JSON processor with such a document, it would live here (see additionalDetails.html below):

~/dev/nifi/projects/json-processor $ !tree
tree
.
+-- pom.xml
`-- src
    +-- main
    |   +-- java
    |   |   `-- rocks
    |   |       `-- nifi
    |   |           `-- examples
    |   |               `-- processors
    |   |                   `-- JsonProcessor.java
    |   `-- resources
    |       +-- META-INF
    |       |   `-- services
    |       |       `-- org.apache.nifi.processor.Processor
    |       `-- docs
    |               `-- rocks.nifi.examples.processors.JsonProcessor
    |                   `-- additionalDetails.html
    `-- test
        `-- java
            `-- rocks
                `-- nifi
                    `-- examples
                        `-- processors
                            `-- JsonProcessorTest.java

18 directories, 5 files

When one gets Usage from the context menu, an index.html is generated that "hosts" additionalDetails.html, but covers standard, default NiFi processor topics including auto-generated coverage of the current processor (based on what NiFi knows about properties and the like). additionalDetails.html is only necessary if that index.html documentation is inadequate.

Friday, 22 January 2016

Gathering resources (see dailies for today).

package com.etretatlogiciels.nifi.python.filter;

import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
 *
 */
@Tags( { "example", "resources" } )
@CapabilityDescription( "This example processor loads a resource from the nar and writes it to the FlowFile content" )
public class WriteResourceToStream extends AbstractProcessor
{
  public static final Relationship REL_SUCCESS = new Relationship.Builder().name( "success" )
                                                    .description( "files that were successfully processed" )
                                                    .build();
  public static final Relationship REL_FAILURE = new Relationship.Builder().name( "failure" )
                                                    .description( "files that were not successfully processed" )
                                                    .build();

  private Set< Relationship > relationships;
  private String resourceData;

  @Override protected void init( final ProcessorInitializationContext context )
  {

    final Set< Relationship > relationships = new HashSet<>();
    relationships.add( REL_SUCCESS );
    relationships.add( REL_FAILURE );
    this.relationships = Collections.unmodifiableSet( relationships );

    final InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream( "file.txt" );

    try
    {
      this.resourceData = IOUtils.toString( resourceStream );
    }
    catch( IOException e )
    {
      throw new RuntimeException( "Unable to load resources", e );
    }
    finally
    {
      IOUtils.closeQuietly( resourceStream );
    }
  }

  @Override
	public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowFile = session.get();

    if( flowFile == null )
      return;

    try
    {
      flowFile = session.write( flowFile, new OutputStreamCallback()
      {

        @Override public void process( OutputStream out ) throws IOException
        {
          IOUtils.write( resourceData, out );
        }
      } );
      session.transfer( flowFile, REL_SUCCESS );
    }
    catch( ProcessException ex )
    {
      getLogger().error( "Unable to process", ex );
      session.transfer( flowFile, REL_FAILURE );
    }
  }
}

Monday, 25 January 2016

I need to figure out how to get property values. The samples I've seen so far pretty well avoid that. So, I went looking for it this morning and found it looking in the implementation of GetFile, a processor that comes with NiFI. Let me fill in the context here and highlight the actual statement below:

@TriggerWhenEmpty
@Tags( { "local", "files", "filesystem", "ingest", "ingress", "get", "source", "input" } )
@CapabilityDescription( "Creates FlowFiles from files in a directory. "
                      + "NiFi will ignore files it doesn't have at least read permissions for." )
public class GetFile extends AbstractProcessor
{
  public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder()
                       .name( "input directory" )        // internal name (used if displayName missing)*
                       .displayName( "Input Directory" ) // public name seen in UI
                       .description( "The input directory from which to pull files" )
                       .required( true )
                       .addValidator( StandardValidators.createDirectoryExistsValidator( true, false ) )
                       .expressionLanguageSupported( true )
                       .build();
  .
  .
  .
  @Override
  protected void init( final ProcessorInitializationContext context )
  {
    final List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( DIRECTORY );
    .
    .
    .
  }

  .
  .
  .

  @OnScheduled
  public void onScheduled( final ProcessContext context )
  {
    fileFilterRef.set( createFileFilter( context ) );
    fileQueue.clear();
  }

  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    final File directory = new File( context.getProperty( DIRECTORY ).evaluateAttributeExpressions().getValue() );
    final boolean keepingSourceFile = context.getProperty( KEEP_SOURCE_FILE ).asBoolean();
    final ProcessorLog logger = getLogger();
    .
    .
    .
  }
}

* See PropertyDescriptor name and displayName

Tuesday, 26 January 2016

Today I was experiencing the difficulty of writing tests for a NiFi processor only to learn that there are helps from NiFi for this. I found the test for the original, JSON processor example.

package rocks.nifi.examples.processors;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import org.apache.commons.io.IOUtils;

import org.junit.Test;

import static org.junit.Assert.assertTrue;

import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.MockFlowFile;

public class JsonProcessorTest
{
  @Test
  public void testOnTrigger() throws IOException
  {
    // Content to be mock a json file
    InputStream content = new ByteArrayInputStream( "{\"hello\":\"nifi rocks\"}".getBytes() );

    // Generate a test runner to mock a processor in a flow
    TestRunner runner = TestRunners.newTestRunner( new JsonProcessor() );

    // Add properites
    runner.setProperty( JsonProcessor.JSON_PATH, "$.hello" );

    // Add the content to the runner
    runner.enqueue( content );

    // Run the enqueued content, it also takes an int = number of contents queued
    runner.run( 1 );

    // All results were processed with out failure
    runner.assertQueueEmpty();

    // If you need to read or do aditional tests on results you can access the content
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( JsonProcessor.SUCCESS );
    assertTrue( "1 match", results.size() == 1 );
    MockFlowFile result = results.get( 0 );
    String resultValue = new String( runner.getContentAsByteArray( result ) );
    System.out.println( "Match: " + IOUtils.toString( runner.getContentAsByteArray( result ) ) );

    // Test attributes and content
    result.assertAttributeEquals( JsonProcessor.MATCH_ATTR, "nifi rocks" );
    result.assertContentEquals( "nifi rocks" );
  }
}

What does it do? NiFi TestRunner mocks out the NiFi framework and creates all the underpinnings I was trying today to create. This makes it so that calls inside onTrigger() don't summarily crash. I stepped through the test (above) to watch it work. All the references to the TestRunner object are ugly and only to set up what the framework would create for the processor, but it does work.

Had to work through these problems...

java.lang.AssertionError: Processor has 2 validation failures:
'basename' validated against 'CCD_Sample' is invalid because 'basename' is not a supported property
'doctype' validated against 'XML' is invalid because 'doctype' is not a supported property

—these properties don't even exist.

Caused by: java.lang.IllegalStateException: Attempting to Evaluate Expressions but PropertyDescriptor[Path to Python Interpreter]
indicates that the Expression Language is not supported. If you realize that this is the case and do not want this error to occur,
it can be disabled by calling TestRunner.setValidateExpressionUsage(false)

—so I disabled it.

Then, I reached the session.read() bit which appeared to execute, but processing never returned. So, I killed it, set a breakpoint on the session.write(), and relaunched. It got there. I set a breakpoint on the while( ( line = reply.readLine() ) != null ) and it got there. I took a single step more and, just as before, processing never returned.

Then I break-pointed out.write() and the first call to session.putAttribute() and relaunched. Processing never hit those points.

Will continue this tomorrow.

Wednesday, 27 January 2016

I want to solve my stream problems in python-filter. Note that the source code to GetFile is not a good example of what I'm trying to do because I'm getting the data to work on from (the session.read()) stream, passing it to the Python filter, then reading it back and spewing it down (the session.write()) output stream. For the (session.read()) input stream, ...

...maybe switch from readLine() to a byte-by-byte processing? But, all the examples in all kinds of situations (HTTP, socket, file, etc.) are using readLine().

The old Murphism about a defective appliance demonstrated to a competent technician will fail to fail applies here. My code was actually working. What was making me think it wasn't? Uh, well, for one thing, all the flowfile = session.putAttributes( flowfile, ... ) calls were broken for the same reason (higher up yesterday) other properties stuff wasn't.

Where am I?

I had to append "quit" to the end of my test data because that's the only way to tell the Python script that I'm done sending data.

So, I asked Nathan, who determined using a test:

import sys
while True:
    input = sys.stdin.readline().strip()
    if input == '':
        break
    upperCase = input.upper()
    print( upperCase )
master ~/dev/python-filter $ echo "Blah!" | python poop.py
BLAH!
master ~/dev/python-filter $

...that checking the return from sys.stdin.readline() for being zero-length tells us to drop out of the read loop. But, this doesn't work in my code! Why?

I reasoned that in my command-line example, the pipe is getting closed. That's not happening in my NiFi code. So, I inserted a statement to close the pipe (see hilghted line below) as soon as finished writing what the input flowfile was offering:

session.read( flowfile, new InputStreamCallback()
{
  /**
   * There is probably a much speedier way than to read the flowfile line
   * by line. Check it out.
   */
  @Override
  public void process( InputStream in ) throws IOException
  {
    BufferedReader input = new BufferedReader( new InputStreamReader( in ) );
    String         line;

    while( ( line = input.readLine() ) != null )
    {
      output.write( line + "\n" );
      output.flush();
    }

    output.close();
  }
} );

NiFi validators list

StandardValidators Description
ATTRIBUTE_KEY_VALIDATOR ?
ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR ?
INTEGER_VALIDATOR is an integer value
POSITIVE_INTEGER_VALIDATOR is an integer value above 0
NON_NEGATIVE_INTEGER_VALIDATOR is an integer value above -1
LONG_VALIDATOR is a long integer value
POSITIVE_LONG_VALIDATOR is a long integer value above 0
PORT_VALIDATOR is a valid port number
NON_EMPTY_VALIDATOR is not an empty string
BOOLEAN_VALIDATOR is "true" or "false"
CHARACTER_SET_VALIDATOR contains only valid characters supported by JVM
URL_VALIDATOR is a well formed URL
URI_VALIDATOR is a well formed URI
REGULAR_EXPRESSION_VALIDATOR is a regular expression
ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR is a well formed key-value pair
TIME_PERIOD_VALIDATOR is of format duration time unit where duration is a non-negative integer and time unit is a supported type, one of { nanos, millis, secs, mins, hrs, days }.
DATA_SIZE_VALIDATOR is a non-nil data size
FILE_EXISTS_VALIDATOR is a path to a file that exists

I'm still struggling to run tests after adding in all the properties I wish to support. I get:

'Filter Name' validated against 'filter.py' is invalid because 'Filter Name' is not a supported property

...and I don't understand why. Clearly, there's no testing of NiFi with corresponding failure cases. No one gives an explanation of this error, how to fix it, or what's involved in creating a proper, supported property. It's trial and error. I lucked out with PYTHON_INTERPRETER and FILTER_REPOSITORY.

However, I found that they all began to pass as soon as I constructed them with validators. I had left validation off because a) as inferred above I wonder about validation and b) at least the last four are optional.

Now, the code is running again. However,

final String doctype  = flowfile.getAttribute( DOCTYPE );
final String pathname = flowfile.getAttribute( PATHNAME );
final String filename = flowfile.getAttribute( FILENAME );
final String basename = flowfile.getAttribute( BASENAME );

...gives me only FILENAME, because the NiFi TestRunner happens to set that one up. How does it know? The others return null, yet I've tried to set them. What's wrong is the corresondence between runner.setProperty() and the property and flowfile.getAttributes().

Let's isolate the properties handling (only) from the NiFi rocks example.


// JsonProcessor.java:
private List< PropertyDescriptor > properties;

public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder()
        .name( "Json Path" )
        .required( true )
        .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
        .build();

List< PropertyDescriptor > properties = new ArrayList<>();
properties.add( JSON_PATH );
this.properties = Collections.unmodifiableList( properties );

// JsonProcessorTest.java:
runner.setProperty( JsonProcessor.JSON_PATH, "$.hello" );

Hmmmm... I think there is a complete separation between flowfile attributes and processor properties. I've reworked all of that, and my code (still/now) works though there are unanswered questions.

Thursday, 28 January 2016

runner.assertQueueEmpty() will fail (and stop testing) if there are any flowfiles left untransferred in the queue. Did you forget to transfer a flowfile? Did you lose your handle to a flowfile because you used the variable to point at another flowfile before you had transferred it? Etc.

Thursday, 11 February 2016

NiFi logging example...

final ProcessorLog logger = getLogger();

String key   = "someKey";
String value = "someValue";
logger.warn( "This is a warning message" );
logger.info( "This is a message with some interpolation key={}, value={}", new String[]{ key, value } );

My second NiFi processor at work.

Tuesday, 16 February 2016

Documenting practical consumption of GetFile and PutFile processors (more or less "built-ins") of NiFi. These are the differences from default properties:

GetFile: Configure Processor

  1. No auto-termination (Settings → Auto terminate relationships)
  2. 30 seconds (Scheduling → Run schedule)
    This is how many seconds between GetFile pulsing and sending the (same) file again.
  3. /tmp/nifi-source (Properties → Input Directory)
  4. test-lily-munster.csv (Properties → File Filter)

PutFile: Configure Processor

  1. Check failure and success (Settings → Auto terminate relationships)
    This is because I want to stop at PutFile in my example above—processing will not be going on to anywhere else.
  2. /tmp/nifi-output (Properties → Directory)

My own AppointmentsProcessor: Configure Processor

  1. Check FAILURE and ORIGINAL (Settings → Auto terminate relationships)
    This is because I want to stop on failure and do not wish to route original document on to the next point (PutFile).
  2. Bay Health (Properties → Provider)

The UI sequence appears to be:

  1. Perform all configuration.
  2. Any processor can be started because it will sit idle until something comes in or, if it's feeding into the pipeline and has something, it will just fill the pipeline which will await the next processor down-stream to start.

Monday, 22 February 2016

Looking to change my port number from 8080, the property is

nifi.web.http.port=8080

...and this is in NIFI_HOME/conf/nifi.properties. Also of interest may be:

nifi.web.http.host
nifi.web.https.host
nifi.web.https.port

Monday, 29 February 2016

I'm testing actual NiFi set-ups for cda-breaker and cda-filter this morning. The former went just fine, but I'm going to document the setting up of a pipeline with the latter since I see I have not completely documented such a thing before.

Nifi CdaFilterProcessor in its pipeline

I'm going to set up this:

I accomplish this by clicking, dragging and dropping three new processors to the workspace. Here they are and their configurations:

GetFile configuration

Just the defaults...

The 60 seconds keep the processor from firing over and over again (on the same file) before I can stop the pipeline: I only need it to fire once.

Get CDA_Sample1.xml out of /tmp/nifi-sources for the test.

From GetFile, a connection to CdaFilterProcessor must be created for the success relationship (I only pass this file along if it exists):

CdaFilterProcessor configuration

From CdaFilterProcessor, a connection to PutFile must be created for the original and success relationships:

PutFile configuration

Running the test..

(Of course, you must ensure that the source file, rules list and rules are in place for the test to have any chance of running.)

Once the three processors and two connections set up, you can turn the processors on in any order, but only once two are on at the same time, with data in the pipeline, will there be processing. Logically, GetFile if the first to light up.

  1. Click GetFile to select, then click the green start button in the button bar above.
  2. Next, select CdaFilterProcessor and start it too. It might not start, getting "No eligible components are selected. Please select the components to be started and ensure they are no longer running." If you thought you had set everything up right, dismiss this alert and hover the mouse pointer over the yellow, warning triangle of this processor. You may see something like, "'Path to rule pathnames' is invalid. Path to rule pathnames is required." Likely, you forgot to specify this path, as shown above.
  3. If nothing is running so far, did you copy CDA_Sample1.xml to /tmp/nifi-source? If not copy it. Note: It likely will not make a difference if you don't wait for the 60 seconds you set on GetFile. You can change this setting or just wait.
  4. Now, once you see bytes in the CdaFilterProcessor, this means you've run something.
  5. Click to select PutFile, then click the green start button.
  6. Stop GetFile (before it resubmits CDA_Sample1.xml to the filter).
  7. Check /tmp/filter-output for CDA_Sample1.cxml.

Monday, 7 March 2016

Have left off busy with other stuff. I just found out that the reason additionalDetails.html doesn't work is because one's expected—and this isn't what the JSON example I used to get started does—to produce a JAR of one's processor, then only wrap that in the NAR.

Without that, your trip to right-click the processor, then choosing Usage is unrewarding.

Wednesday, 9 March 2016

Attacking NiFi attributes today. You get them off a flow file. However, to add an attribute, putAttribute() returns a new or wrapped flow file. The attribute won't be there if you look at the old flow file. In this snippet, we just forget the old flow file and use the new one under the same variable.

FlowFile flowfile        = session.get();
String   attribute-value = flowfile.getAttribute( "attribute-name" );

flowfile  = session.putAttribute( flowfile, attributeName, attributeValue );

Wednesday, 23 March 2016

I'm working on remote debugging of NiFi processors using IntelliJ IDEA.

Steps to debugging a NiFi processor you've written. See this article to learn how remote debugging works in IntelliJ. The example uses IntelliJ to debug an application running in Tomcat, but it's a similar thing no matter if you're using Eclipse (or IntelliJ) to debug NiFi.

  1. Edit conf/bootstrap.conf and simply uncomment the line that starts java.arg.debug. If you like, change the port number (address) to what you want to use in steps 2e or 2f below.
  2. Create a Debug configuration.
    In IntelliJ IDEA, ...
    1. Do Run → Edit Configurations.
    2. Click the green +.
    3. Choose Remote (because it's a remote debugging session you want to create).
    4. Give a Name:, something like "Local NiFi".
    5. Under Settings, change the Port: to 8000 or to whatever you established in conf/bootstrap.conf. (Note: this number has nothing to do with launching your browser with http://localhost:8080/nifi)
    6. If you're only debugging a single processor in a project with multiple modules, set the drop-down Search sources using module's classpath: to the module in which it lives.
    In Eclipse, do
    1. Do Run → Debug Configurations....
    2. Choose Remote Java Application.
    3. Click on the New Launch Configuration icon (a tiny sheet of paper with a yellow plus sign in upper left of dialog).
    4. Give it a name like "Local NiFi".
    5. In Project:, type the name (or browse for it) of the project containing your processor code.
    6. Set the Port: to 8000 or whatever you established in conf/bootstrap.conf. (Note: this number has nothing to do with launching your browser with http://localhost:8080/nifi)
    7. Click Apply or, if you're ready, Debug.
  3. Note that you likely need to punch port 8000 (or whatever you used) through the remote host's firewall if there is one in place (and there likely is). You can do this a number of ways, but here's how to do it using iptables:

    # iptables -A INPUT -p tcp --dport 8000 -j ACCEPT
    

    If this is a secure server, you might not want to open such a port, but set up a tunnel instead. See the note on using a tunnel at the end of these steps.

  4. Launch NiFi (or bounce it).
  5. Set one or more breakpoints in your processor code.
  6. Launch the debugger.
    In IntelliJ IDEA, do Run → Debug and choose "Local NiFi" (or whatever you called it) from the list presented. This brings up the debugger and displays, "Connected to the target VM, address 'localhost:8000', transport: 'socket'
    In Eclipse, do Run → Debug Configurations..., scroll down and choose "Local NiFi" or whatever you called it. What you see should give you warm fuzzies, something akin to what I'm reporting seeing in IntelliJ IDEA.
  7. Prepare data flows to your processor.
  8. Start your processor; the IDE debugger should pop up stopped at the first breakpoint.
  9. Debug away.

Someone else had this experience setting up to debug on an AWS instance.

I stumbled upon something peculiar in NiFi. I had been attaching an attribute to a flowfile in the session.read() call-back. The NiFi unit-testing framework tolerated this, but when I ran my processor for real, it blew chunks (IllegalStateException). I solved the problem by saving the new attribute and attaching it outside the call-back just before calling session.transfer().

The only reason I'm pointing this out is because I somewhat rely on unit testing and hope to catch this level of error/gotcha earlier than button-pushing testing.

Addendum posted 27 July 2017:

How to do the above using a tunnel...

Let's posit that you must debug your NiFi processor running on a remote server that's secure enough that you can't punch your debugging port through its firewall.

In this case, configure just as described above, but you'll be talking to your local host on the port you decide. Before editing your debug configuration, erect your IP tunnel as below. Let's pretend that the IP address of the remote host running NiFi is 10.10.9.158 and that my username there is russ:

# ssh -f -L 8000:10.10.9.158:8000 [email protected] -N

What this means is "set up my ssh (port 22) connection such that it handles all port 8000 traffic. At the other end, IP address 10.10.9.158, put that traffic back out to 8000. The 'tunnel' here is port 22 that carries the traffic meant for port 8000 carefully to and from the remote server without having to open port 8000 on that remote server (and punch a hole through its firewall, etc.) NiFi, configured in conf/bootstrap.conf to use port 8000 for debugging traffic, will answer back and the tunnel will handle the communication."

Wednesday, 13 April 2016

An exception that is not handled by the NiFi processor is therefore handled by the framework by doing a session roll-back and administrative yield. The flowfile is put back into the queue whence it was taken before the exception. It will continue to be submitted to the processor though the processor will slow down (because yielding). Once solution is to fix the processor to catch then gate failure down a relationship.

Thursday, 14 April 2016

Got "DocGenerator Unable to document: class xyz / NullPointerException" reported in nifi-app.log

This occurs (at least) when your processor's relationships field (and the return from getRelationships()) is null.

2016-04-14 14:23:26,529 WARN [main] o.apache.nifi.documentation.DocGenerator Unable to document: class com.etretatlogiciels.nifi.processor.HeaderFooterIdentifier
java.lang.NullPointerException: null
    at org.apache.nifi.documentation.html.HtmlProcessorDocumentationWriter.writeRelationships(HtmlProcessorDocumentationWriter.java:200) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.html.HtmlProcessorDocumentationWriter.writeAdditionalBodyInfo(HtmlProcessorDocumentationWriter.java:47) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.html.HtmlDocumentationWriter.writeBody(HtmlDocumentationWriter.java:129) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.html.HtmlDocumentationWriter.write(HtmlDocumentationWriter.java:67) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.DocGenerator.document(DocGenerator.java:115) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.documentation.DocGenerator.generate(DocGenerator.java:76) ~[nifi-documentation-0.4.1.jar:0.4.1]
    at org.apache.nifi.NiFi.(NiFi.java:123) [nifi-runtime-0.4.1.jar:0.4.1]
    at org.apache.nifi.NiFi.main(NiFi.java:227) [nifi-runtime-0.4.1.jar:0.4.1]

Monday, 18 April 2016

Here's how to document multiple dynamic properties. For only one, just use @DynamicProperty alone.

@SideEffectFree
@Tags( { "SomethingProcessor" } )
@DynamicProperties( {
  @DynamicProperty(
    name = "section-N.name",
    value = "Name",
    supportsExpressionLanguage = false,
    description = "Add the name to identify a section."
        + " N is a required integer to unify the three properties especially..."
        + " Example: \"section-1.name\" : \"illness\"."
  ),
  @DynamicProperty(
    name = "section-N.pattern",
    value = "Pattern",
    supportsExpressionLanguage = false,
    description = "Add the pattern to identify a section."
        + " N is a required integer to unify the three properties especially..."
        + " Example: \"section-1.pattern\" : \"History of Illness\"."
  ),
  @DynamicProperty(
    name = "section-N.NLP",
    value = "\"on\" or \"off\"",
    supportsExpressionLanguage = false,
    description = "Add whether to turn NLP on or off."
        + " N is a required integer to unify the three properties especially..."
        + " Example: \"section-1.nlp\" : \"on\"."
  )
} )
public class SomethingProcessor extends AbstractProcessor
{
  ...

Friday and Monday, 22 & 25 April 2016

Need to set up NiFi to work from home. First, download the tarball containing binaries from Apache nifi Downloads and explode it.

~/Downloads $ tar -zxf nifi-0.6.1-bin.tar.gz 
~/Downloads $ mv nifi-0.6.1 ~/dev
~/Downloads $ cd ~/dev
~/dev $ ln -s ./nifi-0.6.1/ ./nifi

Edit nifi-0.6.1/conf/bootstrap.conf and add my username.

# Username to use when running NiFi. This value will be ignored on Windows.
run.as=russ

Launch NiFi.

~/dev/nifi/bin $ ./nifi.sh start

Launch browser.

http://localhost:8080/nifi/

And there we are!

Tuesday, 26 April 2016

How to inject flow-file attributes:

TestRunner runner = TestRunners.newTestRunner( new VelocityTemplating() );

Map< String, String > flowFileAttributes = new HashMap<>();
flowFileAttributes.put( "name", "Velocity-templating Unit Test" );       // used in merging "Hello $name!"
.
.
.
runner.enqueue( DOCUMENT, flowFileAttributes );
.
.
.

If you install NiFi as a service, then wish to remove it, there are some files (these might not be the exact names) you'll need to remove:

Friday, 29 April 2016

Some useful comments on examing data during flow and debugging NiFi processors.

Andy LoPresto:

NiFi is intentionally designed to allow you to make changes on a very tight cycle while interacting with live data. This separates it from a number of tools that require lengthy deployment processes to push them to sandbox/production environments and test with real data. As one of the engineers says frequently, "NiFi is like digging irrigation ditches as the water flows, rather than building out a sprinkler system in advance."

Each processor component and queue will show statistics about the data they are processing and you can see further information and history by going into the Stats dialog of the component (available through the right-click context menu on each element).

To monitor actual data (such as the response of an HTTP request or the result of a JSON split), I'd recommend using the LogAttribute processor. You can use this processor to print the value of specific attributes, all attributes, flowfile payload, expression language queries, etc. to a log file and monitor the content in real-time. I'm not sure if we have a specific tutorial for this process, but many of the tutorials and videos for other flows include this functionality to demonstrate exactly what you are looking for. If you decide that the flow needs modification, you can also replay flowfiles through the new flow very easily from the provenance view.

One other way to explore the processors, is of course to write a unit test and evaluate the result compared to your expected values. This is more advanced and requires downloading the source code and does not use the UI, but for some more complicated usages or for people familiar with the development process, NiFi provides extensive flow mocking capabilities to allow developers to do this.

Matt Burgess:

As of at least 0.6.0, you can view the items in a queued connection. So for your example, you can have a GetHttp into a SplitJson, but don't start the SplitJson, just the GetHttp. You will see any flowfiles generated by GetHttp queued up in the success (or response?) connection (whichever you have wired to SplitJson). Then you can right-click on the connection (the line between the processors) and choose List Queue. In that dialog you can choose an element by clicking on the Info icon ('i' in a circle) and see the information about it, including a View button for the content.

The best part is that you don't have to do a "preview" run, then a "real" run. The data is in the connection's queue, so you can make alterations to your SplitJson, then start it to see if it works. If it doesn't, stop it and start the GetHttp again (if stopped) to put more data in the queue. For fine-grained debugging, you can temporarily set the Run schedule for the SplitJson to something like 10 seconds, then when you start it, it will likely only bring in one flow file, so you can react to how it works, then stop it before it empties the queue.

Joe Percivall:

Provenance is useful for replaying events but I also find it very useful for debugging processors/flows as well. Data Provenance is a core feature of NiFi and it allows you to see exactly what the FlowFile looked like (attributes and content) before and after a processor acted on it as well as the ability to see a map of the journey that FlowFile underwent through your flow. The easiest way to see the provenance of a processor is to right click on it and then click "Data provenance".

Thursday, 9 June 2016

Before explaining how to work with templates and process groups, it's necessary to define certain visual elements of the NiFi workspace. I'm just getting around to this because my focus until now has been to write and debug custom processors for others' use.

NiFi templates

Here's how to make a template in NiFi. Templates are used to reproduce work flows without having to rebuild them, from all their components and configuration, by hand. I don't know how yet to "carry" templates around between NiFi installations.

Here's a video on creating templates by the way. And here are the steps:

  1. SHIFT + MOUSE + drag to select a work flow.
  2. Click on the Create Template icon in the actions toolbar.
  3. Name and describe the template.
  4. Click the Create button, then click OK.

Here's how to consume an existing template in your NiFi workspace:

  1. Click on the Template icon in the Components Toolbar.
  2. Drag the template to the workspace.
  3. If there are no templates, Instantiate Template will tell you.
  4. Select the desired template from the modal's drop-down; hovering over one you'll see the description appear.

Process groups

You can turn a template into a process group in order to push the detail of how it's built down to make your work flow simpler at the top level.

Here's a video on process groups.

  1. Drag the process group from the Process Group icon on the (Components) toolbar over the NiFi workspace.
  2. Give the process group a name.
  3. Go into the process group (which will be blank at first) by double-clicking it.
  4. (Notice the bread crumbs in the workspace.)

Sometimes you already have a flow you want to transform into a process group.

  1. Select process group, just as for creating a template, and click the Group button from the actions toolbar.
  2. Give the process group a name and click Add.

You can zoom in (or out) on the process group. It looks like a processor because it performs the same function as a processor, only it does a lot more since it subsumes multiple processors.

However, a process group must have explicit input- and output ports in order to treat it as a processor:

  1. Double-click the process group.
  2. Click and drag an Input Port from the Components Toolbar.
  3. Name it ("to" plus the process-group name?) and click Add.
  4. Click the new input port and drag to the "entering" processor.
  5. Click Add.
  6. Similarly, for the output port...
  7. Click and drag an Output Port from the Components Toolbar.
  8. Name it ("from" plus the process-group name?) and click Add.
  9. Drag a connection from the exiting processor (or a funnel) to the output port and click Add.
  10. Go back to the next root up and add the connection from the process group to the consuming processor.
  11. Add the connection from the supplying processor to the process group.

Exporting templates for use in other NiFi installations

...and the current installation too. Here's a video on exporting templates.

  1. Assuming you have one or more templates, click the Templates button in the Management Toolbar. This brings up the Template Management dialog.
  2. Click the Download button to download the template as an XML file.

  3. Save the XML file to ~/Downloads or wherever you want it.
  4. Send it to a friend.

Note: if your template uses processors or other components that don't exist in the target NiFi installation (that your template is taken to), then it can be imported, but when attempting to use it in that installation, NiFi will issue an error (in a modeless alert).

Importing a template

This uses the Template Management dialog too.

  1. Click the Templates button in the Management Toolbar.
  2. Click Browse to go find a template XML file you've copied into your filesystem somewhere.
  3. Navigate to that file and click Open.
  4. The new template should show up in your list.
  5. Exit the modal dialog and consume the template as noted.

Monday, 13 June 2016

Some NiFi notes here on logging...

Beginning in NiFi 1.0, it can be said that it used to be that NiFi logging would be done by each processor at the INFO level to say what it's doing for each flowfile. However, this became extremely abusive in terms of logging diskspace, so the the minimum level for processors was instead set to WARN. In conf/logback.xml this can be changed back by setting the log level of org.apache.nifi.processors.

Prior to 1.0, however, one saw this:

.
.
.
<logger name="org.apache.nifi" level="INFO" />
.
.
.

Friday, 8 July 2016

Today I'm studying the NiFi ReST (NiFi API) which provides programmatic access to command and control of a NiFi instance (in real time). Specifically, I'm interested in monitoring, indeed, in obtaining an answer to questions like:

  1. How many times a processor processed something?
  2. How many flowfiles were processed?
  3. How many flowfiles were produced?
  4. Can I get this information per processor?
  5. How can I tell different instances of the same processor apart?
    • ...in what's reported?
    • ...in asking about a specific processor instance?

Also, I'm interested in:

  1. Getting a list of items on the NiFi canvas like processors and relationships.
  2. How many times processors or a processor failed?
  3. What is the profile of resource usage?
    • memory consumed
    • ?
  4. What is the profile of processor latency?
    • flowfiles backed up awaiting processing
    • flowfiles in other states (?)

But first, let's break this ReST control down by service. I'm not interested in all of these today, but since I've got the information in a list, here it is:

Service Function(s)
Controller Get controller configuration, search the flow, manage templates, system diagnostics
Process Groups Get the flow, instantiate a template, manage sub groups, monitor component status
Processors Create a processor, set properties, schedule
Connections Create a connection, set queue priority, update connection destination
Input Ports Create an input port, set remote port access control
Output Ports Create an output port, set remote port access control
Remote Process Groups Create a remote group, enable transmission
Labels Create a label, set label style
Funnels Manage funnels
Controller Services Manage controller services, update controller service references
Reporting Tasks Manage reporting tasks
Cluster View node status, disconnect nodes, aggregate component status
Provenance Query provenance, search event lineage, download content, replay
History View flow history, purge flow history
Users Update user access, revoke accounts, get account details, group users

Let me make a quick list of end-points that pique my interest and I'll try them out, make further notes, etc.

Service Verb URI Notes
Controller GET /controller/config retrieve configuration
GET /controller/counter current counters

Here's dynamic access to all of the service documentation: https://nifi.apache.org/docs/nifi-docs/rest-api/

How to use the ReST API

If I'm accessing my NiFi canvas (flow) using http://localhost:8080/nifi, I use this URI to reach the ReST end-points:

http://localhost:8080/nifi-api/ service / end-point

For example, http://localhost:8080/nifi-api/controller/config returns (I'm running this against a NiFI flow that's got two flows of 3 and 2 processors with relationships between them that have sat for a couple of weeks unused) this:

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<controllerConfigurationEntity>
    <revision>
        <clientId>87718998-e57b-43ac-9e65-15f2b8e0feaa</clientId>
    </revision>
    <config>
        <autoRefreshIntervalSeconds>30</autoRefreshIntervalSeconds>
        <comments></comments>
        <contentViewerUrl>/nifi-content-viewer/</contentViewerUrl>
        <currentTime>13:23:01 MDT</currentTime>
        <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
        <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
        <name>NiFi Flow</name>
        <siteToSiteSecure>true</siteToSiteSecure>
        <timeOffset>-21600000</timeOffset>
        <uri>http://localhost:8080/nifi-api/controller</uri>
    </config>
</controllerConfigurationEntity>

I'm using the Firefox </> RESTED client. What about JSON? Yes, RESTED allows me to add Accept: application/json:

{
  "revision":
  {
    "clientId": "f841561c-60c6-4d9b-a3ce-d2202789eef7"
  },
  "config":
  {
    "name": "NiFi Flow",
    "comments": "",
    "maxTimerDrivenThreadCount": 10,
    "maxEventDrivenThreadCount": 5,
    "autoRefreshIntervalSeconds": 30,
    "siteToSiteSecure": true,
    "currentTime": "14:06:18 MDT",
    "timeOffset": -21600000,
    "contentViewerUrl": "/nifi-content-viewer/",
    "uri": "http://localhost:8080/nifi-api/controller"
  }
}

I wanted to see the available processors in a list. I did this via: http://localhost:8080/nifi-api/controller/processor-types

However, I could not figure out what to supply as {processorId} to controller/history/processors/{processorId}. I decided to use a processor name: http://localhost:8080/nifi-api/controller/history/processors/VelocityTemplating
This did the trick:

{
  "revision":
  {
    "clientId": "86082d22-9fb2-4aea-86bc-efe062861d51"
  },
  "componentHistory":
  {
    "componentId": "VelocityTemplating",
    "propertyHistory": {}
  }
}

I notice that I can't see the name I give the processor. For example, I have a GetFile that I named Get PDF files from test fodder. I can't find any API that returns that.

Now, about instances of processors. The forum told me that, even if I'm not using any process groups, there is one process group for what I am using, namely root; this URI is highlighted below:

http://localhost:8080/nifi-api/controller/process-groups/root/status
{
  "revision":
  {
    "clientId": "98d70ac0-476c-4c6b-bcbc-3ec91193d994"
  },
  "processGroupStatus":
  {
    "bulletins": [],
    "id": "50e3886b-984d-4e90-a889-d46dc53c4afc",
    "name": "NiFi Flow",
    "connectionStatus":
    [
      {
        "id": "f3a032d6-6bd1-4cc5-ab5e-8923e131c222",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "success",
        "input": "0 / 0 bytes",
        "queuedCount": "0",
        "queuedSize": "0 bytes",
        "queued": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "sourceId": "9929acf6-aac7-46d9-a0dd-f239d66e1594",
        "sourceName": "Get PDFs from inbox",
        "destinationId": "c396af51-2221-4bfd-ba31-5f4b5913b910",
        "destinationName": "Generate XML from advance directives template"
      },
      {
        "id": "e5ba7858-8301-4fd0-885e-6fb63b336e53",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "success",
        "input": "0 / 0 bytes",
        "queuedCount": "0",
        "queuedSize": "0 bytes",
        "queued": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "sourceId": "c396af51-2221-4bfd-ba31-5f4b5913b910",
        "sourceName": "Generate XML from advance directives template",
        "destinationId": "1af4a618-ca9b-4697-81b5-3ba1bbfaba6b",
        "destinationName": "Put generated XMLs to output folder"
      },
      {
        "id": "53e4a0e9-ce20-4e4d-84f3-d12504edeccc",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "success",
        "input": "0 / 0 bytes",
        "queuedCount": "3",
        "queuedSize": "25.27 KB",
        "queued": "3 / 25.27 KB",
        "output": "0 / 0 bytes",
        "sourceId": "4435c701-0719-4231-8867-671db08dd813",
        "sourceName": "Get PDF files from test fodder",
        "destinationId": "5f28baeb-b24c-4c3f-b6d0-c3284553d6ac",
        "destinationName": "Put PDFs as in-box items"
      }
    ],
    "processorStatus":
    [
      {
        "id": "c396af51-2221-4bfd-ba31-5f4b5913b910",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Generate XML from advance directives template",
        "type": "VelocityTemplating",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "9929acf6-aac7-46d9-a0dd-f239d66e1594",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Get PDFs from inbox",
        "type": "GetInbox",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "5f28baeb-b24c-4c3f-b6d0-c3284553d6ac",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Put PDFs as in-box items",
        "type": "PutInbox",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "1af4a618-ca9b-4697-81b5-3ba1bbfaba6b",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Put generated XMLs to output folder",
        "type": "PutFile",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      },
      {
        "id": "4435c701-0719-4231-8867-671db08dd813",
        "groupId": "50e3886b-984d-4e90-a889-d46dc53c4afc",
        "name": "Get PDF files from test fodder",
        "type": "GetFile",
        "runStatus": "Stopped",
        "read": "0 bytes",
        "written": "0 bytes",
        "input": "0 / 0 bytes",
        "output": "0 / 0 bytes",
        "tasks": "0",
        "tasksDuration": "00:00:00.000",
        "activeThreadCount": 0
      }
    ],
    "processGroupStatus": [],
    "remoteProcessGroupStatus": [],
    "inputPortStatus": [],
    "outputPortStatus": [],
    "input": "0 / 0 bytes",
    "queuedCount": "3",
    "queuedSize": "25.27 KB",
    "queued": "3 / 25.27 KB",
    "read": "0 bytes",
    "written": "0 bytes",
    "output": "0 / 0 bytes",
    "transferred": "0 / 0 bytes",
    "received": "0 / 0 bytes",
    "sent": "0 / 0 bytes",
    "activeThreadCount": 0,
    "statsLastRefreshed": "09:53:12 MDT"
  }
}

This sort of hits the jackpot. What we see above are:

  1. status of connections and processors
  2. processor name (type): "type" : "VelocityTemplating"
  3. processor name (name): "name" : "Generate XML from advance directives template"
  4. "runStatus"
  5. "read" : 0 bytes"
  6. "written" : "0 bytes"
  7. "input" : "0 / 0 bytes"
  8. "output" : "0 / 0 bytes"
  9. "tasksDuration" : "00:00:00.000"
  10. all-processor totals
  11. other stuff

So, let's see if we can't dig into this stuff above using Python.

My Python's not as good as it was a year ago when I last coded in this language, but Python is extraordinarily useful, as compared to anything else (yeah, okay, I'm not too keen on Ruby as compared to Python and I'm certainly down on Perl) for writing these scripts. To write this in Java, you'd end up with fine software, but it is more work (to juggle the JSON-to-POJO transformation) and of course, a JAR isn't as brainlessly usable as your command-line is mucker than in a proper scripting language.

This illustrates a cautious traversal down to processor names (and types) which shows the way because there are countless additional tidbits to show like process group-wide status. Remember, this is a weeks-old NiFi installation that's just been sitting there. I've made no attempt to shake it around so that the status numbers get more interesting. Here's my first script:

import sys
import json
import globals
import httpclient

def main( argv ):
    if( len( argv ) > 1 and argv[ 1 ] == '-d' or argv[ 1 ] == '--debug' ):
        DEBUG = True
    URI = globals.NIFI_API_URI( "controller" ) + 'process-groups/root/status'

    print( 'URI = %s' % URI )

    response = httpclient.get( URI )
    jsonDict = json.loads( response )

    print
    print 'Keys back from request:'
    print list( jsonDict.keys() )

    processGroupStatus = jsonDict[ 'processGroupStatus' ]
    print
    print 'processGroupStatus keys:'
    print list( processGroupStatus.keys() )

    connectionStatus = processGroupStatus[ 'connectionStatus' ]
    print
    print 'connectionStatus keys:'
    print connectionStatus

    processorStatus  = processGroupStatus[ 'processorStatus' ]
    print
    print 'processorStatus:'
    print '  There are %d processors listed.' % len( processorStatus )
    for processor in processorStatus:
        print( '  (%s): %s' % ( processor[ 'type' ], processor[ 'name' ] ) )


if __name__ == "__main__":
    if len( sys.argv ) <= 1:
        args = [ 'first.py' ]
        main( args )
        sys.exit( 0 )
    elif len( sys.argv ) >= 1:
        sys.exit( main( sys.argv ) )

Output:

URI = http://localhost:8080/nifi-api/controller/process-groups/root/status

Keys back from request:
[u'processGroupStatus', u'revision']

processGroupStatus keys:
[u'outputPortStatus', u'received', u'queuedCount', u'bulletins', u'activeThreadCount', u'read', u'queuedSize', ...]

connectionStatus keys:
[{u'queuedCount': u'0', u'name': u'success', u'sourceId': u'9929acf6-aac7-46d9-a0dd-f239d66e1594', u'queuedSize': u'0 bytes' ...]

processorStatus:
  There are 5 processors listed.
  (VelocityTemplating): Generate XML from advance directives template
  (GetInbox): Get PDFs from inbox
  (PutInbox): Put PDFs as in-box items
  (PutFile): Put generated XMLs to output folder
  (GetFile): Get PDF files from test fodder

Wednesday, 13 July 2016

I'm convinced that Reporting Tasks, which are written very similarly to Processors, but not deployed like them (so, using a reporting task isn't so much a matter of tediously clicking and dragging boxes, but of enabling task reporting including custom task reporting for the whole flow or process group) are the best approach to monitoring. See also Re: Workflow monitoring/notifications.

(The second of the three NiFi architectural components is the Controller Service.)

The NiFi ReST end-points are very easy to use, but there is some difficulty in their semantics which appear a bit random still to my mind. The NiFi canvas or web UI is built atop these end-points (of course) and It is possible to follow what's happening using the Developer tools in your browser (Firefox: Developer Tools → Network, then click desired method to see URI, etc.). I found that a little challenging because I've been pretty much a back-end guy who's never used the browser developer intensively.

I found it very easy, despite not being a rockstar Python guy, to write simple scripts to hit NiFi. And I did. However, realize that the asynchrony of this approach presents a challenge given that a work flow can be very dynamic. Examining something in a NiFi flow from Python is like trying to inspect a flight to New York to see if your girlfriend is sitting next to your rival as it passes over Colorado, Nebraska, Iowa, etc. It's just not the best way. The ids of individual things you're looking for are not always easily ascertained and can change as the plane transits from Nebraska to Iowa to Illinois. Enabling task report or even writing a custom reporting task and enabling that strikes me as the better solution though it forces us to design or conceive of what we want to know up front. This isn't necessarily bad in the long-run.

Gain access to reporting tasks by clicking the Controller Settings in the Management Toolbar.


Here's a coded example, ControllerStatusReportingTask. The important methods to implement appear to be:

The onTrigger( ReportingContext context ) method is given access to reporting context which carries the following (need to investigate how this is helpful):

But, ControllerStatusReportingTask!

Yes, it just already exists, right? So use it? ControllerStatusReportingTask, that yields the following, which may be sufficient to most of our needs.

Information logged:

By default, the output from this reporting task goes to the NiFi log, but can be redirected elsewhere by modifying conf/logback.xml. (This would constitute a new logger, something like?)

<appender name="STATUS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
  <file>logs/nifi-status.log</file>
  <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
    <fileNamePattern>./logs/nifi-status_%d.log</fileNamePattern>
    <maxHistory>30</maxHistory>
  </rollingPolicy>
  <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
    <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
  </encoder>
</appender>
.
.
.
<logger name="org.apache.nifi.controller.ControllerStatusReportingTask" level="INFO" additivity="false">
  <appender-ref ref="STATUS_FILE" />
</logger>

So, I think what we want is already rolled for us by NiFi and all we need do is:

  1. Modify conf/logback.xml to separate its output from the rest of logging,
  2. Hit that log file to get the data into HEKA.

Thursday, 14 July 2016

Today, I followed these steps to reporting task happiness—as an experiment. I removed all log files in order to minimize the confusion and see differences in less space to look through.

    Establish status quo...
  1. Removed all NiFi logs and bounced NiFi, running NiFi 0.6.1.
  2. Clicked on Controller Settings icon in the Management Toolbar.
  3. Clicked Reporting Tasks tab.
  4. Configured as reporting task leaving defaults (in particular, Run schedule 5 minutes).
  5. Clicked Start (green arrow) icon.
  6. Begin first observation...
  7. Removed all NiFi logs and bounced NiFi.
  8. I note that ControllerStatusReportingTask is running after bounce.

    fgrep reporting *.log yields:

    nifi-app.log:	org.apache.nifi.reporting.ganglia.StandardGangliaReporter || org.apache.nifi.nar.NarClassLoader[./work/nar/extensions/nifi-standard-nar-0.6.1.nar-unpacked]
    nifi-app.log:	org.apache.nifi.reporting.ambari.AmbariReportingTask || org.apache.nifi.nar.NarClassLoader[./work/nar/extensions/nifi-ambari-nar-0.6.1.nar-unpacked]
    nifi-user.log:2016-07-14 09:49:09,509 INFO [NiFi Web Server-25] org.apache.nifi.web.filter.RequestLogger Attempting request for (anonymous) GET http://localhost:8080/nifi-api/controller/reporting-task-types (source ip: 127.0.0.1)
    nifi-user.log:2016-07-14 09:49:09,750 INFO [NiFi Web Server-70] org.apache.nifi.web.filter.RequestLogger Attempting request (ibid)
    nifi-user.log:2016-07-14 09:49:26,596 INFO [NiFi Web Server-24] org.apache.nifi.web.filter.RequestLogger Attempting request (ibid)
    
  9. I launched GetFile (Get PDF files from test fodder) and waited a few seconds for it to run and produce flowfiles.
  10. I launched PutInbox (Put PDFs as in-box items) and waited a few seconds for it to run and produce flowfiles.

    Even after waiting minutes, I see no difference when running the grep. However, I launch vim on nifi-app.log and see, here and there, stuff like:

    2016-07-14 09:48:50,481 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Processors Processor Statuses:
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Processor Name                 | Processor ID                         | Processor Type     | Run Status |             Flow Files In |            Flow Files Out |         Bytes Read |      Bytes Written |   Tasks |                    Proc Time |
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Generate XML from advance dire | c396af51-2221-4bfd-ba31-5f4b5913b910 | VelocityTemplating |    Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) |  0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Get PDFs from inbox            | 9929acf6-aac7-46d9-a0dd-f239d66e1594 | GetInbox           |    Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) |  0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Put PDFs as in-box items       | 5f28baeb-b24c-4c3f-b6d0-c3284553d6ac | PutInbox           |    Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) |  0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Put generated XMLs to output f | 1af4a618-ca9b-4697-81b5-3ba1bbfaba6b | PutFile            |    Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) |  0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Get PDF files from test fodder | 4435c701-0719-4231-8867-671db08dd813 | GetFile            |    Stopped | 0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) | 0 bytes (+0 bytes) | 0 bytes (+0 bytes) |  0 (+0) | 00:00:00.000 (+00:00:00.000) |
    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    2016-07-14 09:48:50,483 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Connections Connection Statuses:
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Connection ID                        | Source                         | Connection Name | Destination                    |              Flow Files In |             Flow Files Out |             FlowFiles Queued |
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | 53e4a0e9-ce20-4e4d-84f3-d12504edeccc | Get PDF files from test fodder | success         | Put PDFs as in-box items       |  0 / 0 bytes (+0/+0 bytes) |  0 / 0 bytes (+0/+0 bytes) |  9 / 75.82 KB (+9/+75.82 KB) |
    | f3a032d6-6bd1-4cc5-ab5e-8923e131c222 | Get PDFs from inbox            | success         | Generate XML from advance dire |  0 / 0 bytes (+0/+0 bytes) |  0 / 0 bytes (+0/+0 bytes) |    0 / 0 bytes (+0/+0 bytes) |
    | e5ba7858-8301-4fd0-885e-6fb63b336e53 | Generate XML from advance dire | success         | Put generated XMLs to output f |  0 / 0 bytes (+0/+0 bytes) |  0 / 0 bytes (+0/+0 bytes) |    0 / 0 bytes (+0/+0 bytes) |
    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    

    (I would figure out a little later that this is the start-up view before anthing's happened.)

  11. Begin attempting to redirect reporting task observation...
  12. I inserted this near the bottom of conf/logback.xml:
    <!-- Logger to redirect reporting task output to a different file. -->
    <logger name="org.apache.nifi.controller.ControllerStatusReportingTask" level="INFO" additivity="false">
      <appender-ref ref="STATUS_FILE" />
    </logger>
    
  13. ...and I inserted this near the top of conf/logback.xml, just after the definition of BOOTSTRAP_FILE rolling appender:
    <!-- rolling appender for controller-status reporting task log. -->
    <appender name="STATUS_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
      <file>logs/nifi-status.log</file>
      <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <fileNamePattern>./logs/nifi-status_%d.log</fileNamePattern>
        <maxHistory>30</maxHistory>
      </rollingPolicy>
      <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
        <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
      </encoder>
    </appender>
    
  14. Removed all NiFi logs and bounced NiFi.
  15. Looked at logs:
    ~/dev/nifi/logs $ ll
    total 76
    drwxr-xr-x  2 russ russ  4096 Jul 14 10:28 .
    drwxr-xr-x 13 russ russ  4096 Jul 14 09:37 ..
    -rw-r--r--  1 russ russ 65016 Jul 14 10:29 nifi-app.log
    -rw-r--r--  1 russ russ  3069 Jul 14 10:28 nifi-bootstrap.log
    -rw-r--r--  1 russ russ     0 Jul 14 10:28 nifi-status.log
    -rw-r--r--  1 russ russ     0 Jul 14 10:28 nifi-user.log
    
  16. I launched GetFile (Get PDF files from test fodder) and waited a few seconds for it to run and produce flowfiles.

  17. I launched PutInbox (Put PDFs as in-box items) and waited a few seconds for it to run and produce flowfiles.

  18. I never see nifi-status.log grow in size. I launch vim on nifi-app.log and see, just as before (but this time I'm scraping the stuff showing work accomplished in bold):
    
    2016-07-14 10:34:06,643 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Processors Processor Statuses:
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Processor Name                 | Processor ID                         | Processor Type     | Run Status |               Flow Files In |              Flow Files Out |           Bytes Read |        Bytes Written |  Tasks |                    Proc Time |
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Get PDF files from test fodder | 4435c701-0719-4231-8867-671db08dd813 | GetFile            |    Stopped |   0 / 0 bytes (+0/+0 bytes) | 6 / 50.54 KB (+6/+50.54 KB) | 50.54 KB (+50.54 KB) | 50.54 KB (+50.54 KB) | 2 (+2) | 00:00:00.056 (+00:00:00.056) |
    | Put PDFs as in-box items       | 5f28baeb-b24c-4c3f-b6d0-c3284553d6ac | PutInbox           |    Stopped | 6 / 50.54 KB (+6/+50.54 KB) |   0 / 0 bytes (+0/+0 bytes) | 50.54 KB (+50.54 KB) |   0 bytes (+0 bytes) | 1 (+1) | 00:00:00.014 (+00:00:00.014) |
    | Generate XML from advance dire | c396af51-2221-4bfd-ba31-5f4b5913b910 | VelocityTemplating |    Stopped |   0 / 0 bytes (+0/+0 bytes) |   0 / 0 bytes (+0/+0 bytes) |   0 bytes (+0 bytes) |   0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Get PDFs from inbox            | 9929acf6-aac7-46d9-a0dd-f239d66e1594 | GetInbox           |    Stopped |   0 / 0 bytes (+0/+0 bytes) |   0 / 0 bytes (+0/+0 bytes) |   0 bytes (+0 bytes) |   0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    | Put generated XMLs to output f | 1af4a618-ca9b-4697-81b5-3ba1bbfaba6b | PutFile            |    Stopped |   0 / 0 bytes (+0/+0 bytes) |   0 / 0 bytes (+0/+0 bytes) |   0 bytes (+0 bytes) |   0 bytes (+0 bytes) | 0 (+0) | 00:00:00.000 (+00:00:00.000) |
    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    2016-07-14 10:34:06,644 INFO [Timer-Driven Process Thread-1] c.a.n.c.C.Connections Connection Statuses:
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | Connection ID                        | Source                         | Connection Name | Destination                    |               Flow Files In |              Flow Files Out |          FlowFiles Queued |
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    | f3a032d6-6bd1-4cc5-ab5e-8923e131c222 | Get PDFs from inbox            | success         | Generate XML from advance dire |   0 / 0 bytes (+0/+0 bytes) |   0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) |
    | e5ba7858-8301-4fd0-885e-6fb63b336e53 | Generate XML from advance dire | success         | Put generated XMLs to output f |   0 / 0 bytes (+0/+0 bytes) |   0 / 0 bytes (+0/+0 bytes) | 0 / 0 bytes (+0/+0 bytes) |
    | 53e4a0e9-ce20-4e4d-84f3-d12504edeccc | Get PDF files from test fodder | success         | Put PDFs as in-box items       | 6 / 50.54 KB (+6/+50.54 KB) | 6 / 50.54 KB (+6/+50.54 KB) | 0 / 0 bytes (+0/+0 bytes) |
    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    
  19. So, this was a partial failure...

What failed is getting the reported status into nifi-status.log. What succeeded is, just as was working before, getting controller status.

I got a response out of [email protected] that this was a reported bug and it's been fixed in 0.7.0 and 1.0.0. Also note that %logging{40} is some log-back incantation to affect the package name such that a name like org.nifi.controller.ControllerStatusReportingTask becomes o.n.c.ControllerStatusReportingTask. (See "Pattern Layouts" at Chapter 6: Layouts.)

I upgraded my NiFi version to 0.7.0 at home and at the office.

Tuesday, 23 August 2016

Apache NiFi expression language notes

I need to take time out to grok this.

  1. Flow file
  2. Attributes on flow file
    • filename
    • path
  3. NiFi expression language syntax

Hmmmm... yeah, there's precious little written on this stuff that really corrals it. So, I wrote this (albeit non-functional) example using the information above plus what I already know about flowfiles, properties and attributes:

package com.etretatlogiciels;

import java.util.HashMap;
import java.util.Map;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;

public class FunWithNiFiProperties
{
  private static final PropertyDescriptor EL_HOSTNAME = new PropertyDescriptor
                        .Builder()
                        .name( "Hostname (supporting EL)" )
                        .description( "Hostname property that supports NiFi Expression Language syntax"
                            + "This is the name of the flowfile attribute to expect to hold the hostname" )
                        .required( false )
                        .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
                        .expressionLanguageSupported( true )                // !
                        .build();
  private static final PropertyDescriptor HOSTNAME    = new PropertyDescriptor
                        .Builder()
                        .name( "Hostname" )
                        .description( "Hostname property that doesn't support NiFi Expression Language syntax."
                            + "This is the name of the flowfile attribute to expect to hold the hostname" )
                        .required( false )
                        .addValidator( Validator.VALID )
                        .build();

  // values will end up with a map of relevant data, names and values...
  private Map< String, String > values = new HashMap<>();

  /**
   * Example of getting a hold of a property value via what the property value evaluates to
   * including what not evaluating it as an expression yields.
   */
  private void expressionLanguageProperty( final ProcessContext context, final ProcessSession session )
  {
    FlowFile      flowFile = session.get();
    PropertyValue property = context.getProperty( EL_HOSTNAME );
    String        value    = property.getValue();       // (may contain expression-language syntax)

    // what's our property and its value?
    values.put( EL_HOSTNAME.getName(), value );

    // is there a flowfile attribute of this precise name (without evaluating)?
    // probably won't be a valid flowfile attribute if contains expression-language syntax...
    values.put( "hostname", flowFile.getAttribute( value ) );

    //         -- or --

    // get NiFi to evaluate the possible expression-language of the property as a flowfile attribute name...
    values.put( "el-hostname", property.evaluateAttributeExpressions( flowFile ).getValue() );
  }

  /**
   * Example of getting a hold of a property value the normal way, that is, not involving
   * any expression syntax.
   */
  private void normalProperty( final ProcessContext context, final ProcessSession session )
  {
    FlowFile      flowFile = session.get();
    PropertyValue property = context.getProperty( HOSTNAME );
    String        value    = property.getValue(); // (what's in HOSTNAME)

    // what's our property and its value?
    values.put( HOSTNAME.getName(), value );

    // is there a flowfile attribute by this precise name?
    values.put( "hostname-attribute-on-flowfile", flowFile.getAttribute( value ) );
  }
}

This is all I think I know so far:

  1. Expression-language syntax can only be used as the value of a processor property.
  2. (Not exactly true: it can be transmitted not only via a processor property value, but by other input means as well including a flowfile attribute value.)
  3. Expression-language syntax can only be evaluated by the evaluateAttributeExpressions() method, so it can only be used for the purpose of finding a specially named flowfile attribute (and get its value).

Friday, 26 August 2016

Yesterday, I finally put together a tiny example of a full, custom NiFi project minus additionalDetails.html—for that, look in the notes on the present page). See A simple NiFi processor project.

Thursday, 8 September 2016

Here's one for your gee-whiz collection: if you do

public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
{
  FlowFile flowfile = session.get();
  .
  .
  .
  someMethod( session );

...NiFi has removed the flowfile from the session's processorQueue and a subsequent call:

void someMethod( final ProcessSession session )
{
  FlowFile flowfile = session.get();

...will produce a null flowfile. This is an interesting side-effect.

Wednesday, 14 September 2016

Missing flowfiles...

In what situation would onTrigger() get called if there is no flowfile?

This can happen for a few reasons. Because the processor has...

  1. ...no incoming connections (it's a source processor),
  2. ...@ScheduleWhenEmpty annotation,
  3. ...more than one concurrent task,
  4. (others?)

The main reason is the third. If you have multiple concurrent tasks, Thread 1 can determine that there is 1 flowfile queued. Thread 2 then determines that there is one flowfile queued. Both threads call onTrigger(), thread 1 gets the flowfile, and thread 2 gets nothing (null).

Backing up NiFi flows, using templates, etc....

Is there a best practice for keeping a backup of data flows, assuming using flow.xml.gz and configuration files, it's possible to restore NiFi in case of any failure? Is it possible simply to "drop" this file into a new NiFi installation or should templates be used in development that get imported into production?

It's a good idea to back up flow.xml.gz and configuration files. But distinguish between using these backups to recreate an equivalent new NiFi installation and attempting to reset the state of the existing one. The difference is the live data in the flow, in the provenance repository, in state variables, etc.: Restoring a flow definition that doesn't match (or no longer matches) your content and provenance data may have unexpected results for you and for systems connecting with the NiFi installation. NiFi tries hard to handle these changes smoothly, but it isn't a magic time machine.

Deploying flow.xml.gz can work, especially when deployed with configuration files that reference IDs in the flow (like authorizations.xml) or the nifi.sensitive.props.key setting, and others. But if you overwrite a running flow, you will still have data migration problems.

Templates are the current recommended best practice for deployment. They provide:

  1. concise packaging for deployment,
  2. separation between site-specific configuration like authorizations from the flow logic,
  3. workflow that allows, encourages and forces the administrator to address migration from the existing flow to incorporate the new template.

Is there a way to predeploy templates to the template directory and have NiFi can recognize them?

Templates have been polished quite a bit to be more deterministic and source-control friendly so they are an ideal artifact to be versioned and kept in source-control repository.

When NiFi starts up, it looks in conf/templates (by default—this directory can be changed in nifi.properties). It looks for any file that has a suffix of .template or .xml so be sure the files are named properly. If starting a cluster, ensure all nodes in the cluster have these same templates or the latter will not show up.

Friday, 23 September 2016

Here's a small, complete example of a processor that translates attributes names or removes attributes. I found UpdateAttributes to be overly complicated and hard to use.

NiFiStandardAttributes.java:
package com.etretatlogiciels.nifi.attributes;

import java.util.ArrayList;
import java.util.List;

public class NiFiStandardAttributes
{
  public static final List< String > FLOWFILE_ATTRIBUTES;

  static
  {
    FLOWFILE_ATTRIBUTES = new ArrayList<>( 5 );
    FLOWFILE_ATTRIBUTES.add( "filename" );
    FLOWFILE_ATTRIBUTES.add( "uuid" );
    FLOWFILE_ATTRIBUTES.add( "path" );
  }

  public static final int FLOWFILE_ATTRIBUTE_COUNT = FLOWFILE_ATTRIBUTES.size();
}
TranslateAttributeNamesTest.java:
package com.etretatlogiciels.nifi.processor;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

import com.etretatlogiciels.nifi.attributes.NiFiStandardAttributes;
import com.etretatlogiciels.testing.TestUtilities;

/**
 * @author Russell Bateman
 * @since September 2016
 */
public class TranslateAttributeNamesTest
{
  // @formatter:off
  @Rule   public TestName name = new TestName();
  @Before public void setUp() throws Exception { TestUtilities.setUp( name ); }
  @After  public void tearDown() throws Exception { }

  private static final boolean VERBOSE = true;//TestUtilities.VERBOSE;

  private static final String TRANSLATE_JSON ="{"
                                            + "    \"inner-MIME\" : \"mime-type\","
                                            + "    \"lose-this\" : \"\","
                                            + "    \"lose-this-too\" : null"
                                            + "}";

  @Test
  public void testBasic()
  {
    TestRunner runner = TestRunners.newTestRunner( new TranslateAttributeNames() );

    final int         ONE      = 1;
    final String      CONTENTS = "This is a test whose flowfile contents are insignificant!";
    final InputStream MESSAGE  = new ByteArrayInputStream( CONTENTS.getBytes() );

    runner.setProperty( TranslateAttributeNames.TRANSLATIONS, TRANSLATE_JSON );

    Map< String, String > flowFileAttributes = new HashMap<>();
    flowFileAttributes.put( "testname",   "X12 Message Router Unit Test" );
    flowFileAttributes.put( "inner-MIME", "application/pdf" );
    flowFileAttributes.put( "lose-this", "" );
    flowFileAttributes.put( "lose-this-too", null );
    flowFileAttributes.put( "dont-touch-this-one", "Don't touch this attribute!" );

    if( VERBOSE )
    {
      System.out.println( "Attributes on the original flowfile:" );
      Map< String, String > sortedAttributes = new TreeMap<>( flowFileAttributes );
      for( Map.Entry< String, String > attribute : sortedAttributes.entrySet() )
        System.out.println( "  " + attribute.getKey() + " = " + attribute.getValue() );
    }

    runner.setValidateExpressionUsage( false );
    runner.enqueue( MESSAGE, flowFileAttributes );

    runner.run( ONE );
    runner.assertQueueEmpty();
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( TranslateAttributeNames.SUCCESS );
    MockFlowFile         result  = results.get( 0 );
    String               actual  = new String( runner.getContentAsByteArray( result ) );

    if( VERBOSE )
    {
      System.out.println( "Surviving flowfile contents:" );
      System.out.println( "  " + actual );
      System.out.println( "Attributes surviving on the flowfile:" );
      Map< String, String > sortedAttributes = new TreeMap<>( result.getAttributes() );
      for( Map.Entry< String, String > attribute : sortedAttributes.entrySet() )
      {
        String key = attribute.getKey();

        if( !NiFiStandardAttributes.FLOWFILE_ATTRIBUTES.contains( key ) )
          System.out.println( "  " + key + " = " + attribute.getValue() );
      }
    }

    assertTrue( "Flowfile contents changed", actual.equals( CONTENTS ) );

    String mimeType = result.getAttribute( "mime-type" );
    assertNotNull( "Attribute \"inner-MIME\" was not renamed", mimeType );
    assertTrue( "Attribute \"inner-MIME\" was not renamed", mimeType.equals( "application/pdf" ) );

    String dontTouchThisOne = result.getAttribute( "dont-touch-this-one" );
    assertNotNull( "Attribute \"dont-touch-this-one\" did not survive", result.getAttribute( "dont-touch-this-one" ) );
    assertTrue( "Attribute \"dont-touch-this-one\" did not survive untouched", dontTouchThisOne.equals( "Don't touch this attribute!" ) );

    assertNull( "Attribute \"lose-this\" was not dropped", result.getAttribute( "lose-this" ) );
    assertNull( "Attribute \"lose-this-too\" was not dropped", result.getAttribute( "lose-this-too" ) );
  }
}
TranslateAttributeNamesTest.java:
package com.etretatlogiciels.nifi.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;

import com.etretatlogiciels.nifi.rel.StandardRelationships;
import com.etretatlogiciels.utilities.StringUtilities;

/**
 * Translate attribute names or even drop attributes altogether.
 *
 * {
 *   "attachment-filename"  : "attachment",
 *   "disposition-encoding" : "encoding",
 *   "disposition-type"     : null,
 *   "inner-MIME"           : "mime_type",
 *   "inner-boundary"       : "Donald Duck
 * }
 *
 * @author Russell Bateman
 * @since September 2016
 */
@SideEffectFree
@Tags( { "translateattributenames" } )
@CapabilityDescription( "Translate the names of attributes to new ones. The flowfile itself remains untouched." )
public class TranslateAttributeNames extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    FlowFile flowfile = session.get();

    Map< String, String > translations = new Gson().fromJson( context.getProperty( TRANSLATIONS ).getValue(),
                                  new TypeToken< Map< String, String > >(){ }.getType() );

    // determine which attributes we'll be pushing onto the new flowfile...
    Map< String, String > candidates = flowfile.getAttributes();
    Map< String, String > attributes = new HashMap<>();

    for( Map.Entry< String, String > attribute : candidates.entrySet() )
    {
      String key = attribute.getKey();

      // do nothing because we're leaving the attribute as-is...
      if( !translations.containsKey( key ) )
        continue;

      // remove this flowfile attribute because we're changing its name or dropping it...
      flowfile = session.removeAttribute( flowfile, key );

      String newKey = translations.get( key );

      // we're dropping this attribute altogether...
      if( StringUtilities.isEmpty( newKey ) || newKey.equals( "null" ) )
        continue;

      // here's the attribute under its new name...
      attributes.put( translations.get( key ), attribute.getValue() );
    }

    flowfile = session.putAllAttributes( flowfile, attributes );
    session.transfer( flowfile, SUCCESS );
  }

  private List< PropertyDescriptor > properties    = new ArrayList<>();
  private Set< Relationship >        relationships = new HashSet<>();

  @Override
  public void init( final ProcessorInitializationContext context )
  {
    properties.add( TRANSLATIONS );
    properties = Collections.unmodifiableList( properties );
    relationships.add( SUCCESS );
    relationships = Collections.unmodifiableSet( relationships );
  }

  @Override public List< PropertyDescriptor> getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >       getRelationships()                { return relationships; }

  //<editor-fold desc="Processor properties and relationships">
  public static final PropertyDescriptor TRANSLATIONS = new PropertyDescriptor.Builder()
      .name( "Translations" )
      .description( "JSON detailing the name changes as key-value pairs where the value is the new name. Any value "
          + "that's null or <"<" will result in the attribute being dropped (not transmitted on the flowfile)." )
      .required( false )
      .addValidator( Validator.VALID )
      .build();

  public static final Relationship SUCCESS = StandardRelationships.SUCCESS;
  //</editor-fold>
}

Wednesday, 28 September 2016

I had a question about cloning the input stream:

  // under some condition, look into the flowfile contents to see if something's there...
  session.read( flowfile, new InputStreamCallback()
  {
    @Override
    public void process( InputStream( in ) throws IOException
    {
      // read from in...
    }
  } );


  // then, later (and always) consume it (so, sometimes a second time):

  session.read( flowfile, new InputStreamCallback()
  {
    @Override
    public void process( InputStream( in ) throws IOException
    {
    // read from in (maybe again)...
    }
  } );

Wouldn't the second time be problematic since the stream contents have already been read?

I learned from Mark Payne that each time session.read() is called, I'm getting a new InputStream that starts at the beginning of the flowfile. How convenient!

Thursday, 29 September 2016

Some observations on using GetFile:

  1. Configure Scheduling → Run schedule to 10000 sec. This makes it so that this processor will only run once, then wait 10.000 seconds before attempting to run again.
  2. Running "once" means that if the File filter template is adequate, it will likely induct all or many of the files on the Input directory no matter how quickly you can reach for the stop button.
  3. So, if you only want one file at a time, only put one file in the Input subdirectory that matches the File filter template.

Friday, 30 September 2016

I don't know why, after all these months, I'm still making mistakes like this. In two of my processors, which work when unit-tested, I keep getting:

Could not read from StandardFlowFileRecord

I tried a new way because likely the InputStream I'm caching is getting closed before I can use it again and this doesn't show up in the unit-test environment:

    FlowFile flowfile = session.get();

    flowfile = session.write( flowfile, new StreamCallback()
    {
      @Override
      public void process( InputStream inputStream, OutputStream outputStream ) throws IOException
      {
        Base64Unroller unroller = new Base64Unroller( inputStream );
        unroller.unroll();

        InputStream in = unroller.getBase64Data();
        int         c;

        for( ; ; )
        {
          c = in.read();
          if( c == EOS || ( c == '\n' || c == '\r' ) )
            break;
          outputStream.write( c );
        }
      }
    } );

    session.transfer( flowfile, SUCCESS );

Flush with this better understanding of input- and output streams in NiFi, I'm going to see if this doesn't answer also the problem I have with Base64Decode.

It does solve my problem there: this is the answer to cases where I need both to read the input stream and write to an output stream.

Monday, 10 October 2016

Got a link to great document I've never seen, Apache NiFi in Depth. Here are points of note.

  1. The flowfile is at the heart of NiFi and its design. The flowfile is a data record. Flowfile repository. However, it is not a file in the filesystem.
  2. The content of the flowfile is the data or payload. Content repository.
  3. Provenance is a record of what's happened to the flowfile. Provenance repository.
  4. Each of the above has its own repository for storage.
  5. Attributes are the flowfile's metadata.
  6. The repositories are immutable; when a change occurs, new entities, such as attributes, are created in memory, then persisted on disk. When content is changed, the original content is read, streamed through the transform and then written to a new stream. At that point, the flowfile's content pointer is updated to the new filesystem location. This makes the flowfile content storage an "immutable, versioned content store." This results in:
    • reduced space required for typically complex graphs of processing and natural replay capability,
    • takes advantage of OS caching,
    • reduces random read/write performance hits.
  7. The flowfile repository is, in effect, a write-ahead log for what's going on.
  8. When NiFi goes down, the write claim for a content change is orphaned, then cleaned up by garbage collection. In effect, this rolls back the flowfile to the last known, stable state, ready to continue.
  9. Flowfiles live in a connection queue, in memory, until their number exceeds a value in nifi.queue.swap.threshold in conf/nifi.properties. Then, the lowest-priority flowfiles are serialized to disk in a swap file of 10K flowfiles and marked so in internal NiFi data structures so they can be fetched when needed. This allows millions of flowfiles to existing in the flow without depleating system memory.
  10. After a flowfile's content is known no longer to be useful, then it is deleted. Depending on configuration, it can also be archived.

The provenance respository is where the history of each flowfile is stored. This involves the chain of custody for each piece of data. Each time a flowfile event (create, fork, clone, modify, etc.), a new provenance event is created. This event is a snapshot of the flowfile as it appeared and fit into the flow as it existed at that point in time. These events are held in the repository for a period of time after completion—controllable by configuration.

Because all of the flowfile attributes and pointers to content are kept in the provenance repository, not only the lineage or processing history of the data is visible, but the data itself and the data can be replayed from any point in the flow. When a down-stream processor appears not to have received data, data lineage can show exactly when it was delivered or indeed that it was never sent.

Flowing between processors is done using pass by reference. Flowfile content isn't actually streamed between processors, but only a reference to a flowfile is passed. When a flowfile is to be passed to two processors, it's cloned (content and attributes) and the references passed on.

Tuesday, 11 October 2016

I asked a question in the [email protected] forum.

"If I create flows for integration testing of multiple components (say, processors), where can I find the underlying files I can pick up, store and deploy to a fresh server with a fresh NiFi, then launch them on data (that I also deploy)?"

"I assumed that once I 'paint' my canvas with processors and relationships, it's harvestable in some form, in some .xml file somewhere, and easily transported to, then dropped into, a new NiFi installation on some server or host for running it. I assumed my testing would be thus configured, then kicked off somehow."

Andy LoPresto answered, "The 'flow' (the processors on the canvas and the relationships between them) is stored on disk as ${nifi_home}/conf/flow.xml.gz [1]. You can also export selections from the flow as templates [2], along with a name and description, which can then be saved as XML files and loaded into a new instance of NiFi with (non-sensitive) configuration values and properties maintained. You may also be interested in the ongoing work to support the 'Software Development Lifecycle (SDLC)' or 'Development to Production Pipeline (D2P)' efforts to allow users to develop flows in testbed environments and 'promote' them through code control/repositories to QE/QA and then Production environments [3].

"If you are attempting to perform automated testing of flows or flow segments, you can do this with the test runner [4]. I would encourage you to store the flow in a template or full flow file (flow.xml.gz, not a 'flowfile') and load that during the test rather than programmatically instantiating and configuring each component and relationship in the test code, but you are able to do both. Also, be aware that the TestRunner is just that, and approximates/mocks some elements of the NiFi environment, so you should understand what decisions are being made behind the scenes to ensure that what is covered by the test is exactly what you expect."

[1] https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#terminology
[2] https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#templates
[3] https://cwiki.apache.org/confluence/display/NIFI/Configuration+Management+of+Flows
[4] https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#testing

I'm waiting on what [4] says about "automated testing of flows or flow segments." I did not understand from reading it that it said anything about that let alone offer a sample of how to inject flow.xml.

Googling, ... I did find a Hortonworks document on NiFi testing. However, it didn't talk about this either.

It did note, and I have often wondered about this because I had sort of reached a contrary conclusion about it, that running the processor from the test framework (run() method):

  1. will invoke any method in the processor code that carries an @OnScheduled annotation,
  2. call onTrigger() once,
  3. then run the @OnUnscheduled methods, and finally,
  4. the @OnStopped methods.

It pointed out that it only does all of the above as long as you do not pass an argument (like 1) to the run() method. This I had not known and I had reached the conclusion that it didn't run these other kinds of methods besides onTrigger().

The options for run() appear to be:

  1. run( int contentsQueued ) —how I run it
  2. run( int contentsQueued, boolean stoprOnFinish ) —pass false to avoid @OnUnscheduled and @OnStopped methods
  3. run( int contentsQueued, boolean stopOnFinish, boolean initialize ) —pass false, false to avoid @OnScheduled events as well
  4. run( int contentsQueued, boolean stopOnFinish, boolean initialize, long runWait ) —to specify milliseconds the framework will wait for a processor to stop running before calling the @OnUnscheduled methods

I got a later answer about someone else wanting to tackle all of this evoking the need to mock all sorts of things like database and wire answers for different processors. A sensation of hope headed toward the distant horizon filled me. It's a huge and thankless undertaking.

Wednesday, 12 October 2016

It's possible to annotate the NiFi canvas with labels, clicking and dragging a label to the the canvas and configuring it. These labels land behind anything you constructively design on the canvas (processors, relationships, funnels, etc.).

I have a hard time remembering how to view flowfile content in a queue, here are the steps:

  1. Right-click the queue,
  2. Choose List queue,
  3. At the extreme left, click the (i) button, which is "View Details," of the flowfile that interests you,
  4. In the Details tab, click the View button,
  5. You have the option to view it as a) original (text) format, b) formatted (see content type at upper right: whether this choice makes a difference or not depends on MIME type) or c) hex dump (very useful for binary or even "confusing" text—looking for carriage returns vs. newlines, control characters, etc.).

Tuesday, 1 November 2016

On dynamic properties in NiFi...

The following appear as observations to me. I've used dynamic properties before, but I've pretty much ignored doing them the way they're supposed to be done, I think. It's worth paying attention to everything in this list.

  1. By default, AbstractProcessor doesn't allow for dynamically created properties. It's sometimes desirable, however, to allow a NiFi user (the one who drags and drops processors in a flow), to configure additional properties. This is accomplished by overriding
    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( final String propertyDescriptorName )
    {
      .
      .
      .
    }
    
  2. Dynamic properties are described in annotations for the processor:
    @Tags( { "processor", "nifi" } )
    @CapabilityDescription( "This is a processor" )
    @DynamicProperty( name = "name",
                value = "value",
                description = "description" )
    public class Processor extends AbstractProcessor
    {
      private List< PropertyDescriptor > properties;
      .
      .
      .
    
  3. Static and dynamic properties are generally kept separate in processor code.
    public class Processor extends AbstractProcessor
    {
      private          List< PropertyDescriptor > properties;
      private volatile Set< String >              dynamicPropertyNames = new HashSet<>();
    
      @Override
      protected void init( final ProcessorInitializationContext context )
      {
        final List< PropertyDescriptor > properties = new ArrayList<>();
        properties.add( PROPERTY );
        this.properties = Collections.unmodifiableList(properties);
      }
      .
      .
      .
    
  4. This method is generally always included in the processor code, for calling by (this isn't clear since there are no examples of it being called, but we explained higher up what it's for). It's crucial for any property returned to be marked as dynamic:
    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( final String propertyDescriptorName )
    {
      return new PropertyDescriptor.Builder()
                          .required( false )
                          .name( propertyDescriptorName )
                          .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
                          .dynamic( true )
                          .build();
    }
    
  5. Often, the names of active, dynamic properties are managed in a list maintained by method, onPropertyModified(). In that method, newValue is nil for propertiers disappearing and non-nil for new ones or ones to be kept:
    /**
     * Called only when a NiFi user actually updates (changes) this property's value.
     * @param descriptor
     * @param oldValue --null if property had no previous value.
     * @param newValue --null if property has been removed.
     */
    @Override
    public void onPropertyModified( final PropertyDescriptor descriptor, final String oldValue, final String newValue )
    {
      final Set newDynamicPropertyNames = new HashSet<>( this.dynamicPropertyNames );
    
      if( newValue == null )
        newDynamicPropertyNames.remove( descriptor.getName() );
      else if( oldValue == null && descriptor.isDynamic() )
        newDynamicPropertyNames.add( descriptor.getName() );
    
      this.dynamicPropertyNames = Collections.unmodifiableSet( newDynamicPropertyNames );
    }
    
  6. Often, method onScheduled() is tasked with updating dynamic properties in the map:
    @OnScheduled
    public void onScheduled( final ProcessContext context )
    {
      for( final PropertyDescriptor descriptor : context.getProperties().keySet() )
      {
        if( descriptor.isDynamic())
        {
          getLogger().debug( "Dynamic property: {}", new Object[] { descriptor } );
          .
          .
          .
          (do stuff here)
        }
      }
    }
    
  7. But the actual consumption of dynamic properties would most often take place in the onTrigger() method.

    For example, here every dynamic property is assumed to be a new attribute to be added to the resulting flowfile.

    Notice that context.getProperties() contains both static and dynamic properties. This is so because all properties, both static and dynamic, set in the configuration are put by NiFi in that container. What distinguishes between them is isDynamic().

    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session )
        throws ProcessException
    {
      FlowFile flowfile = session.get();
      .
      .
      .
      for( Map.Entry< PropertyDescriptor, String > entry : context.getProperties().entrySet() )
      {
        if( entry.getKey().isDynamic() )
    	    flowfile = session.putAttribute( flowfile, entry.getKey().getName(), entry.getValue() );
      }
    }
    
  8. So, why isn't there a simpler interface to getting the value of a dynamic property? This should be obvious, but note that there's no way necessarily to know its name. Therefore, it can only be obtained by looping through all the properties (as just above), ignoring the non-dynamic ones.

  9. And why must the properties be maintained using @OnScheduled. This also should be obvious: static properties are known at initialization, but dynamic ones are certainly not known.

Wednesday, 2 November 2016

Every once in a while, I'm puzzled and rediscover that I've left stuff out making me scratch my head.

If you get this error in your log or debugger...

java.lang.AssertionError: Processor has 1 validation failures:
property-name validated against value is invalid because property-name is not a supported property

..., you've likely left these methods out of your processor's implementation:

@Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
@Override public Set< Relationship >        getRelationships()                { return relationships; }

Here's how to get NiFi back up once you've removed a custom processor (yes, you lose your entire canvas):

~/dev/nifi/nifi-0.7.1/conf $ ll
total 72
drwxr-xr-x.  3 russ russ 4096 Nov  2 14:45 .
drwxr-xr-x. 14 russ russ 4096 Oct 21 09:07 ..
-rw-r--r--.  1 russ russ 2129 Oct 13 17:54 authority-providers.xml
-rw-r--r--.  1 russ russ 2530 Oct 13 17:54 authorized-users.xml
-rw-r--r--.  1 russ russ 3241 Oct 13 17:54 bootstrap.conf
-rw-r--r--.  1 russ russ 2119 Oct 13 17:15 bootstrap-notification-services.xml
-rw-r--r--.  1 russ russ 1317 Nov  2 14:45  flow.xml.gz
-rw-r--r--.  1 russ russ 7454 Oct 13 17:54 logback.xml
-rw-r--r--.  1 russ russ 6089 Oct 13 17:54 login-identity-providers.xml
-rw-r--r--.  1 russ russ 8571 Oct 13 17:54 nifi.properties
-rw-r--r--.  1 russ russ 4603 Oct 13 17:54 state-management.xml
drwxr-xr-x.  2 russ russ 4096 Oct 21 09:07 templates
-rw-r--r--.  1 russ russ 1437 Oct 13 17:15 zookeeper.properties
~/dev/nifi/nifi-0.7.1/conf $ rm flow.xml.gz

Friday, 4 November 2016

(I can be embarrassingly stupid sometimes...)

You've configured nifi.properties to use port 9091.

nifi.web.http.port=9091

...but can't get NiFi to launch. The error from logs/nifi-bootstrap.log is:

~/dev/nifi/nifi-1.0.0/logs $ cat nifi-bootstrap.log
2016-11-04 11:32:32,217 INFO [main] o.a.n.b.NotificationServiceManager Successfully loaded the following 0 services: []
2016-11-04 11:32:32,221 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STARTED
2016-11-04 11:32:32,221 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STOPPED
2016-11-04 11:32:32,221 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_DIED
2016-11-04 11:32:32,222 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is not currently running
2016-11-04 11:32:35,684 INFO [main] o.a.n.b.NotificationServiceManager Successfully loaded the following 0 services: []
2016-11-04 11:32:35,687 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STARTED
2016-11-04 11:32:35,688 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_STOPPED
2016-11-04 11:32:35,688 INFO [main] org.apache.nifi.bootstrap.RunNiFi Registered no Notification Services for Notification Type NIFI_DIED
2016-11-04 11:32:35,695 INFO [main] org.apache.nifi.bootstrap.Command Starting Apache NiFi...
2016-11-04 11:32:35,695 INFO [main] org.apache.nifi.bootstrap.Command Working Directory: /home/russ/dev/nifi/nifi-1.0.0
2016-11-04 11:32:35,696 INFO [main] org.apache.nifi.bootstrap.Command Command: /home/russ/dev/jdk1.8.0_25/bin/java -classpath /home/russ/dev/nifi/nifi-1.0.0/./conf:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-properties-loader-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/jcl-over-slf4j-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-api-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/logback-core-1.1.3.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/logback-classic-1.1.3.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-framework-api-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/jul-to-slf4j-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-documentation-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-runtime-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/slf4j-api-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-properties-1.0.0.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/bcprov-jdk15on-1.54.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/log4j-over-slf 4j-1.7.12.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/commons-lang3-3.4.jar:/home/russ/dev/nifi/nifi-1.0.0/./lib/nifi-nar-utils-1.0.0.jar -Dorg.apache.jasper.compiler.disablejsr199=true -Xmx512m -Xms512m -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000 -Dsun.net.http.allowRestrictedHeaders=true -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -XX:+UseG1GC -Djava.protocol.handler.pkgs=sun.net.www.protocol -Dnifi.properties.file.path=/home/russ/dev/nifi/nifi-1.0.0/./conf/nifi.properties -Dnifi.bootstrap.listen.port=33110 -Dapp=NiFi -Dorg.apache.nifi.bootstrap.config.log.dir=/home/russ/dev/nifi/nifi-1.0.0/logs org.apache.nifi.NiFi
2016-11-04 11:32:35,747 ERROR [NiFi logging handler] org.apache.nifi.StdErr ERROR: transport error 202: bind failed: Address already in use
2016-11-04 11:32:35,747 ERROR [NiFi logging handler] org.apache.nifi.StdErr ERROR: JDWP Transport dt_socket failed to initialize, TRANSPORT_INIT(510)
2016-11-04 11:32:35,747 ERROR [NiFi logging handler] org.apache.nifi.StdErr JDWP exit error AGENT_ERROR_TRANSPORT_INIT(197): No transports initialized [debugInit.c:750]
2016-11-04 11:32:35,747 INFO [NiFi logging handler] org.apache.nifi.StdOut FATAL ERROR in native method: JDWP No transports initialized, jvmtiError=AGENT_ERROR_TRANSPORT_INIT(197)
2016-11-04 11:32:36,739 INFO [main] org.apache.nifi.bootstrap.RunNiFi NiFi never started. Will not restart NiFi

And yet,

# lsof -i | grep 9091

...produces nothing for port 9091 (nor does netstat -lptu). So, port 9091 is not in use and yet the error leads one to believe that this is the problem.

It looks like remote debugging has been configured, which specifies a port that's obviously in use (8000). That happens in bootstrap.conf:

# Enable Remote Debugging
java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8000

Thursday, 17 November 2016

Still working on identifying how to create and store away in version control a process group.

Also looking into how we can hot-swap new or updated processors. Later, I'll get into an Agile write-up everyone agreed I'd do in the Core Team meeting yesterday afternoon.

We could set up a cluster for NiFi because rolling restarts are possible with individual instances in a cluster, that is, if I understand correctly, it's possible to take down one instance and update it, i.e.: restart it, without affecting the other though I wonder still right now if there's a way to throttle or somehow pause the instance so that it stops handling anything so that bouncing it has no ramifications like reprocessing jobs, etc.

To be noted too that the new/updated software isn't available until all nodes of the cluster have been updated and restarted.

When NiFi first starts up, the following directories (and files) are created:

flow.xml.gz

Here's a flow to look at:

Unzipped (from flow.xml.gz), this flow looks like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<flowController>
  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
  <rootGroup>
    <id>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</id>
    <name>NiFi Flow</name>
    <position x="0.0" y="0.0"/>
    <comment/>
    <processor>
      <id>0c689901-1e21-4a89-af21-04ec13f1ca03</id>
      <name>CreateFlowFile</name>
      <position x="-27.0" y="-12.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.CreateFlowFile</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
      <property>
        <name>Content</name>
        <value>This is a test of the Emergency Broadcast System.</value>
      </property>
    </processor>
    <processor>
      <id>c3daf696-11e7-4684-9e84-9f1aaedf15a6</id>
      <name>RetryOperation</name>
      <position x="281.0" y="86.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.RetryOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
      <property>
        <name>Stop retrying operation when</name>
      </property>
      <property>
        <name>Flowfile attribute name</name>
      </property>
      <property>
        <name>Initial counter value</name>
        <value>3</value>
      </property>
      <property>
        <name>Retry cache-table TTL</name>
        <value>10</value>
      </property>
    </processor>
    <processor>
      <id>e1d7c27f-e22f-480d-9d8d-591accbb7180</id>
      <name>NoOperation</name>
      <position x="639.0" y="272.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.NoOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
    </processor>
    <processor>
      <id>3bbe0931-dc54-490e-96cf-23f503f8915a</id>
      <name>NoOperation</name>
      <position x="278.0" y="272.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.NoOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
    </processor>
    <connection>
      <id>15af94b3-3db2-4bee-a7db-68869ad9bb86</id>
      <name/>
      <bendPoints>
        <bendPoint x="234.5" y="229.0"/>
      </bendPoints>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>3bbe0931-dc54-490e-96cf-23f503f8915a</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>success</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>c180a307-c0f5-48b5-8816-9abc54962b5b</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>e1d7c27f-e22f-480d-9d8d-591accbb7180</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>Stop retrying</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>ea3c8896-7775-4fce-83fe-8e95467c7811</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>0c689901-1e21-4a89-af21-04ec13f1ca03</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>success</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>f5739172-bf92-4ef8-a41b-b6e4dd363468</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>c3daf696-11e7-4684-9e84-9f1aaedf15a6</sourceId>
      <sourceGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>3bbe0931-dc54-490e-96cf-23f503f8915a</destinationId>
      <destinationGroupId>9a3b3f31-d3fa-4e18-853f-f49510f48c2a</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship>Retry operation</relationship>
      <maxWorkQueueSize>0</maxWorkQueueSize>
      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
  </rootGroup>
  <controllerServices/>
  <reportingTasks/>
</flowController>

This file stores the flow which represents the processors on the canvas and the relationships between them. The record of processors (lines 10, 31, 62, 79 and 96) includes their properties, even sensitive ones (like passwords—though none here), as shown on lines 53-60 above, and their graphical position on the canvas (see lines 13, 34, etc. above).

Also highlighted above are the names of relationships that correspond to connections. Here's a second flow, this time with a process group:

 




Inside "MyProcessGroup"...

Unzipped (from flow.xml.gz), this flow looks like this:

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<flowController encoding-version="1.0">
  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
  <rootGroup>
    <id>746c58aa-0158-1000-08a8-9984ee3a72bc</id>
    <name>NiFi Flow</name>
    <position x="0.0" y="0.0"/>
    <comment/>
    <processor>
      <id>74a80bc0-0158-1000-8699-37fc9ff670e4</id>
      <name>CreateFlowFile</name>
      <position x="252.0" y="4.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.CreateFlowFile</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
      <property>
        <name>Content</name>
        <value>All good me must come to the aid of their country.</value>
      </property>
    </processor>
    <processor>
      <id>74a8e138-0158-1000-92f3-c6ec8781ba64</id>
      <name>NoOperation</name>
      <position x="249.0" y="601.0"/>
      <styles/>
      <comment/>
      <class>com.imatsolutions.nifi.processor.NoOperation</class>
      <maxConcurrentTasks>1</maxConcurrentTasks>
      <schedulingPeriod>0 sec</schedulingPeriod>
      <penalizationPeriod>30 sec</penalizationPeriod>
      <yieldPeriod>1 sec</yieldPeriod>
      <bulletinLevel>WARN</bulletinLevel>
      <lossTolerant>false</lossTolerant>
      <scheduledState>STOPPED</scheduledState>
      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
      <runDurationNanos>0</runDurationNanos>
    </processor>
    <processGroup>
      <id>74a6c4bd-0158-1000-3de4-adbe359fb278</id>
      <name>MyProcessGroup</name>
      <position x="232.0" y="289.0"/>
      <comment/>
      <processor>
        <id>74a70fd2-0158-1000-3d8e-645008570026</id>
        <name>NoOperation</name>
        <position x="285.0" y="184.0"/>
        <styles/>
        <comment/>
        <class>com.imatsolutions.nifi.processor.NoOperation</class>
        <maxConcurrentTasks>1</maxConcurrentTasks>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <penalizationPeriod>30 sec</penalizationPeriod>
        <yieldPeriod>1 sec</yieldPeriod>
        <bulletinLevel>WARN</bulletinLevel>
        <lossTolerant>false</lossTolerant>
        <scheduledState>STOPPED</scheduledState>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <runDurationNanos>0</runDurationNanos>
      </processor>
      <inputPort>
        <id>74a74ed5-0158-1000-76fc-eeb723a5f47a</id>
        <name>to MyProcessGroup</name>
        <position x="228.0" y="74.0"/>
        <comments/>
        <scheduledState>STOPPED</scheduledState>
      </inputPort>
      <outputPort>
        <id>74a772d8-0158-1000-4916-01c5a8b496e9</id>
        <name>from MyProcessGroup</name>
        <position x="432.0" y="402.0"/>
        <comments/>
        <scheduledState>STOPPED</scheduledState>
      </outputPort>
      <connection>
        <id>74a79d27-0158-1000-ecf5-2c3ae0360366</id>
        <name/>
        <bendPoints/>
        <labelIndex>1</labelIndex>
        <zIndex>0</zIndex>
        <sourceId>74a70fd2-0158-1000-3d8e-645008570026</sourceId>
        <sourceGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</sourceGroupId>
        <sourceType>PROCESSOR</sourceType>
        <destinationId>74a772d8-0158-1000-4916-01c5a8b496e9</destinationId>
        <destinationGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</destinationGroupId>
        <destinationType>OUTPUT_PORT</destinationType>
        <relationship>success</relationship>
        <maxWorkQueueSize>10000</maxWorkQueueSize>
        <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
        <flowFileExpiration>0 sec</flowFileExpiration>
      </connection>
      <connection>
        <id>74a78f0c-0158-1000-a1f3-a3c2c8730ff2</id>
        <name/>
        <bendPoints/>
        <labelIndex>1</labelIndex>
        <zIndex>0</zIndex>
        <sourceId>74a74ed5-0158-1000-76fc-eeb723a5f47a</sourceId>
        <sourceGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</sourceGroupId>
        <sourceType>INPUT_PORT</sourceType>
        <destinationId>74a70fd2-0158-1000-3d8e-645008570026</destinationId>
        <destinationGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</destinationGroupId>
        <destinationType>PROCESSOR</destinationType>
        <relationship/>
        <maxWorkQueueSize>10000</maxWorkQueueSize>
        <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
        <flowFileExpiration>0 sec</flowFileExpiration>
      </connection>
    </processGroup>
    <connection>
      <id>74b5493a-0158-1000-617d-40ba69f7ad05</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>74a772d8-0158-1000-4916-01c5a8b496e9</sourceId>
      <sourceGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</sourceGroupId>
      <sourceType>OUTPUT_PORT</sourceType>
      <destinationId>74a8e138-0158-1000-92f3-c6ec8781ba64</destinationId>
      <destinationGroupId>746c58aa-0158-1000-08a8-9984ee3a72bc</destinationGroupId>
      <destinationType>PROCESSOR</destinationType>
      <relationship/>
      <maxWorkQueueSize>10000</maxWorkQueueSize>
      <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
    <connection>
      <id>74a8a54b-0158-1000-7902-ac61947fd255</id>
      <name/>
      <bendPoints/>
      <labelIndex>1</labelIndex>
      <zIndex>0</zIndex>
      <sourceId>74a80bc0-0158-1000-8699-37fc9ff670e4</sourceId>
      <sourceGroupId>746c58aa-0158-1000-08a8-9984ee3a72bc</sourceGroupId>
      <sourceType>PROCESSOR</sourceType>
      <destinationId>74a74ed5-0158-1000-76fc-eeb723a5f47a</destinationId>
      <destinationGroupId>74a6c4bd-0158-1000-3de4-adbe359fb278</destinationGroupId>
      <destinationType>INPUT_PORT</destinationType>
      <relationship>success</relationship>
      <maxWorkQueueSize>10000</maxWorkQueueSize>
      <maxWorkQueueDataSize>1 GB</maxWorkQueueDataSize>
      <flowFileExpiration>0 sec</flowFileExpiration>
    </connection>
  </rootGroup>
  <controllerServices/>
  <reportingTasks/>
</flowController>

Templates and process groups

The two artifacts that are crucial to our ability to deploy are

  1. process groups
  2. templates

Process groups are found inside flow.xml.gz, as shown above.

Tuesday, 22 November 2016

"NiFi's orientation is around continuous streams of data rather than starting and completing jobs.

"When you look at processors via the UI and you see the green start arrow or the red stopped square it is best to think about [those as meaning] 'this component is (or is not) scheduled to execute' rather than 'this is running or not running.' It is ok that they're scheduled to run. They're still not going to actually get triggered unless they need to be. If the [up-stream processor has no] data to send then [the next processor] won't actually be triggered to execute."

—Joe Witt

On a cluster node going down...

If a NiFi node dies and the disks on which its data are found are inaccessible by other NiFi instances (whether on the same or different hosts), then that data is not available until the situation is rectified. The way to achieve continuous availability is to set up the key repositories (content, flowfile and provenance) such that they can be remounted by another host. This is non-trivial. There has been a discussion of integrating management of this into NiFi itself.

Here's a somewhat random note on clustering by Andy LoPresto:

NiFi does not have a defined maximum cluster size. For the best performance, we usually recommend fewer than 10 nodes per cluster, but no more. If you have high performance needs, we have generally seen the best results with multiple smaller clusters than one large one. In this way, you can have hundreds of nodes processing the data in parallel, but the cluster administration overhead does not tax a single cluster coordinator to death.

Wednesday, 30 November 2016

Imagine a conf/flow.xml.gz that runs as soon as NiFi is started. However, imagine it hanging and the UI is unavailable to stop the processors, look around or otherwise look for the problem. What to do?

In conf/nifi.properties, there is nifi.flowcontroller.autoResumeState that can be set to false (the default is true) before restarting NiFi. Look for errors in logs/nifi-app.log.

Remember to set back to the default when finished. If any changes in debugging the problem are made to the flow, a new conf/flow.xml.gz will be generated with processors in a stopped state (or with other changes made when debugging) which may not be what is wanted.

Thursday, 1 December 2016

To log what flowfile is getting passed out from each processor to understand what the processor has done to that flowfile, LogAttribute processor has a property, Log Payload, that can be set to true that you could use this at different points in your flow.

Another solution that will work without modifying a flow is to use NiFi's provenance feature from the UI. Right-click on a processor and select Data provenance, then pick an event and view the lineage graph. (Note that provenance in NiFi is typically configured to evaporate in order to save space, so there may be nothing to look at if you wait too long). From there you can look at the content of each event in the graph to see how it changed as it moved through the flow.

Rather than examining the lineage, you could clikc View Details (the "info" or question-mark icon to the left of the table record). Then switch to the Content tab to see the in-coming and out-going content at each step of provenance. Clicking Download will bring the raw output to your filesystem while View will open a content viewer in another window displaying the flowfile in various text formats.

UI notifications; processors faulting

One way to draw a user's attention is to use a standard log statement with proper level. This will issue a bulletin both on the processor (and visually indicate a note) as well as add it to a global list of bulletins. By default processors ignore anything lower than WARN, but this is adjustable in processor settings (right-click the processor in the UI, choose Configure, click the Settings tab and see Bulletin Level).

Friday, 9 December 2016

I saw a thread in the developers' forum discussing a counter and I wondered about using ProcessSession's counter in place of writing a special processor for Mat and Steve that counts documents:

void adjustCounter( String name, long delta, boolean immediate ) Adjusts counter data for the given counter name and takes care of registering the counter if not already present, i.e.:

session.adjustCounter( "IMAT Counter", 1, false );
session.transfer( flowFile, SUCCESS );

I need to figure out where to see this counter. I discovered it in the NiFi Counters summary which is in a different place in the UI depending on the NiFi version (showing both at right). It lists the particulars including the individual passage of flowfiles through a processor that adjusts the count and also totals for all flowfiles through a processor that calls adjustCounter(). It's confusing if the processor is used over and over again in a flow because it shows up in the list.

Here's a listing in which I explicitly made this call, as coded just above, in Timer and ran through the flow illustrated above (yesterday).

(The long ids in the list above are processor instance ids.)

However, if I remove this explicit call, nothing shows up in this list after running a flowfile completely through the four instances of Timer. So, some processor must explicitly make this call.

So, to interpret what's in the list above, note that:

  1. The total number of processors called that bump "IMAT Timer" is 4.
  2. Each of
    • Timer 1 (start)
    • Timer 1 (stop)
    • Timer 2 (start)
    • Timer 2 (stop)
    (which are all just different instances of Timer) is called one time.

To name the counter, a property could be used or the name of the counter could come from the processor's special name (as set in the Configure → Settings → Name field.

It remains to be seen which processors of our we wish to put a counter into and how we want to know to make the call.

Tuesday, 13 December 2016

NiFi site-to-site experiment

  1. Erect NiFi on two hosts. Refer to them as "local" and "remote" in these steps. I'm using NiFi 1.1.0 for this experiment.

  2. Ensure conf/nifi-properties configuration of
    • both hosts must have non-conflicting HTTP port (can be same on each host, but must not conflict with anything else using that port and cannot conflict with the remote port that will be set in the succeeding step:
      # web properties
      nifi.web.http.port=9091
      
    • remote host to send to:
      # Site to Site properties
      nifi.remote.input.secure=false
      nifi.remote.input.socket.port=9098
      
  3. Start both local and remote instances of NiFi.

  4. Launch browser tabs to both. In my case, this is:

    local host: localhost:9091/nifi
    remote host: 10.10.8.44:9091/nifi (not that site-to-site port!)

  5. On the local NiFi instance (using the browser), ...
    1. Choose a processor that will create some content. I'm using our own CreateFlowFile with content, "This is a test of the Emergency Broadcast System. It is only a test.":

    2. Drag a remote processor group to the canvas:

      If transmission is disabled, as shown by the circle icon with a line through it on this processor at upper left, then right-click and enable it. This will not work if the remote instance of NiFi has not yet been a) properly configured and b) launched and working.

    3. Connect the content-creating processor, CreateFlowFile, to the remote processor group. You should get the following error alert:

      Skip to the next step to correct this.

  6. On the remote NiFi instance (whose canvas is likely blank), ...
    1. Create an input port. Give it any name you wish. I called mine, "From local NiFi instance":

      Note, at this point, that even with this input port, you still cannot connect the local NiFi instance's CreateFlowFile (to the remote processor group).

    2. Create a place for the content to be delivered to by this new input port. I'm choosing to create a file with it using processor, PutFile. I configured PutFile to create (any file it wants) in Directory /tmp (configuration not illustrated):

    3. Finally, connect the input port to PutFile so the content will flow there.

  7. Return to the local instance of NiFi and try again to connect the content-creating processor, CreateFlowFile, to the remove processor group. This should work as illustrated here. Note that the connection takes on the name you gave the input port on the remote NiFi host. The remote processor group added here represents the remote host.

  8. Now let's try to flow content over from our remote instance of NiFi, where we're creating an arbitrary bit of text in a flowfile, "This is a test of the Emergency Broadcast System. This is only a test." to pass onto the remote NiFi instance which will put that content in a file:
    1. Select CreateFlowFile, start it, then stop it (in order to avoid many instances of the content from being produced). Notice the 1 flowfile in the queue:

    2. Last, ensure that the remote input port named, "From local NiFi instance," is turned on if it isn't:
      1. Right-click the remote process group in the local NiFi instance canvas:

      2. See the on/off button is off (white):

      3. Click it to turn it on (make it blue):

  9. The flow should run all the way to a) the remote NiFi instance and b) into a file in /tmp:
    [email protected] /tmp $ ls -lart
    .
    .
    .
    -rw-r--r--  1 russ russ   70 Dec 13 11:06 1703060523504973
    drwxrwxrwt 24 root root 4096 Dec 13 12:17 .
    [email protected] /tmp $ cat 1703060523504973
    This is a test of the Emergency Broadcast System. This is only a [email protected] /tmp $
    

Note: site-to-site is a feature that's only available to input- and output ports in the root process group—not child process groups. Conceptually, if you want to push (remote) data into a child process group, you must create an input port at the root-level, then connect that input port to the child process group's input port.

Wednesday, 15 December 2016

I was trying to subject NiFi to the Java Flight Recorder (JFR). However, adding the needed arguments to conf/bootstrap.conf proved difficult. NiFi wasn't having them (as reported naively in the developers' forum).

There is no problem in NiFi per se. It was not really necessary to dig down into the code really because there's no problem there. It's just how java.arg.n is consumed that creates trouble. One must play around with the value of n when argument ordering is crucial. In the present case, I was able to turn on JFR using the following (in bold) in conf/bootstrap.conf:

#Set headless mode by default
java.arg.14=-Djava.awt.headless=true

# Light up the Java Flight Recorder...
java.arg.32=-XX:+UnlockCommercialFeatures
java.arg.31=-XX:+FlightRecorder
java.arg.42=-XX:StartFlightRecording=duration=120m,filename=recording.jfr

# Master key in hexadecimal format for encrypted sensitive configuration values
nifi.bootstrap.sensitive.key=

The point was that -XX:+UnlockCommercialFeatures must absolutely precede the other two options. Because of these arguments passing through HashMap, some values of n will fail to sort an argument before others and the addition of other arguments might also come along and disturb this order. What I did here was by trial and error. I guess NiFi code could man-handle n in every case such that the order be respected. I'm guessing that my case is the only or one of the very rare times options are order-dependent. I wouldn't personally up-vote a JIRA to fix this.

My trial and error went pretty fast using this command line between reassigning instances of n for these arguments. It took me maybe 6 tries. (Script bounce-nifi.sh does little more than shut down, then start up NiFi, something I do a hundred times per day across at least two versions.)

~/dev/nifi/nifi-1.1.0/logs $ rm *.log ; bounce-nifi.sh 1.1.0 ; tail -f nifi-bootstrap.log

Thursday, 22 December 2016

http://stackoverflow.com/questions/151238/has-anyone-ever-got-a-remote-jmx-jconsole-to-work

I'm trying to JMC to run the Flight Recorder (JFR) to profile NiFi on a remote server that doesn't offer a graphical environment on which to run JMC.

Here is what I'm supplying to the JVM (conf/bootstrap.conf) when I launch NiFi:

java.arg.90=-Dcom.sun.management.jmxremote=true
java.arg.91=-Dcom.sun.management.jmxremote.port=9098
java.arg.92=-Dcom.sun.management.jmxremote.rmi.port=9098
java.arg.93=-Dcom.sun.management.jmxremote.authenticate=false
java.arg.94=-Dcom.sun.management.jmxremote.ssl=false
java.arg.95=-Dcom.sun.management.jmxremote.local.only=false
java.arg.96=-Djava.rmi.server.hostname=10.10.10.92  (the IP address of my server running NiFi)

I did put this in /etc/hosts, though I doubt it's needed:

10.10.10.92   localhost

Then, upon launching JMC, I create a remote connection with these properties:

Host: 10.10.10.92
Port: 9098
User: (nothing)
Password: (ibid)

Incidentally, if I click the Custom JMX service URL, I see:

service:jmx:rmi:///jndi/rmi://10.10.10.92:9098/jmxrmi

At some point, weeks later, this stopped working. I scratched my head. It turns out that I had run an Ansible script that activated iptables and I had to open port 9098 through the firewall created:

# iptables -A INPUT -p tcp --dport 9098 -j ACCEPT

Tuesday, 3 January 2017

Watching NiFi come up. In conf/nifi.properties, nifi.web.http.port=9091...

~/dev/nifi $ ps -ef | grep [n]ifi
russ     27483     1  0 14:37 pts/4    00:00:00 /bin/sh ./nifi.sh start
russ     27485 27483  2 14:37 pts/4    00:00:00 /home/russ/dev/jdk1.8.0_112/bin/java \
          -cp /home/russ/dev/nifi/nifi-1.1.1/conf:/home/russ/dev/nifi/nifi-1.1.1/lib/bootstrap/* \
          -Xms12m -Xmx24m \
          -Dorg.apache.nifi.bootstrap.config.log.dir=/home/russ/dev/nifi/nifi-1.1.1/logs /
          -Dorg.apache.nifi.bootstrap.config.pid.dir=/home/russ/dev/nifi/nifi-1.1.1/run /
          -Dorg.apache.nifi.bootstrap.config.file=/home/russ/dev/nifi/nifi-1.1.1/conf/bootstrap.conf /
          org.apache.nifi.bootstrap.RunNiFi start
russ     27508 27485 99 14:37 pts/4    00:00:21 /home/russ/dev/jdk1.8.0_112/bin/java /
          -classpath /home/russ/dev/nifi/nifi-1.1.1/./conf
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/slf4j-api-1.7.12.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-documentation-1.1.1.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-nar-utils-1.1.1.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/log4j-over-slf4j-1.7.12.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/logback-core-1.1.3.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/jul-to-slf4j-1.7.12.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/jcl-over-slf4j-1.7.12.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-framework-api-1.1.1.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-properties-1.1.1.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/logback-classic-1.1.3.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-api-1.1.1.jar
                    :/home/russ/dev/nifi/nifi-1.1.1/./lib/nifi-runtime-1.1.1.jar
          -Dorg.apache.jasper.compiler.disablejsr199=true /
          -Xmx512m /
          -Xms512m /
          -Dsun.net.http.allowRestrictedHeaders=true /
          -Djava.net.preferIPv4Stack=true /
          -Djava.awt.headless=true /
          -XX:+UseG1GC /
          -Djava.protocol.handler.pkgs=sun.net.www.protocol /
          -Dnifi.properties.file.path=/home/russ/dev/nifi/nifi-1.1.1/./conf/nifi.properties /
          -Dnifi.bootstrap.listen.port=38751 /
          -Dapp=NiFi /
          -Dorg.apache.nifi.bootstrap.config.log.dir=/home/russ/dev/nifi/nifi-1.1.1/logs /
          org.apache.nifi.NiFi
~ # netstat -lptu
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name
tcp        0      0 localhost:63342         *:*                     LISTEN      23923/java
tcp        0      0 localhost:47060         *:*                     LISTEN      1876/GoogleTalkPlug
tcp        0      0 gondolin:domain         *:*                     LISTEN      1238/dnsmasq
tcp        0      0 *:ssh                   *:*                     LISTEN      1124/sshd
tcp        0      0 *:35353                 *:*                     LISTEN      23923/java
tcp        0      0 localhost:6942          *:*                     LISTEN      23923/java
tcp        0      0 *:9091                  *:*                     LISTEN      27508/java
tcp        0      0 localhost:45608         *:*                     LISTEN      27508/java
tcp        0      0 localhost:58634         *:*                     LISTEN      1876/GoogleTalkPlug
tcp6       0      0 [::]:64628              [::]:*                  LISTEN      24068/java
tcp6       0      0 [::]:ssh                [::]:*                  LISTEN      1124/sshd
tcp6       0      0 [::]:44155              [::]:*                  LISTEN      24068/java
tcp6       0      0 localhost:38751         [::]:*                  LISTEN      27485/java
udp        0      0 *:mdns                  *:*                                 919/avahi-daemon: r
udp        0      0 gondolin:domain         *:*                                 1238/dnsmasq
udp        0      0 gondolin:ntp            *:*                                 28138/ntpd
udp        0      0 localhost:ntp           *:*                                 28138/ntpd
udp        0      0 *:ntp                   *:*                                 28138/ntpd
udp        0      0 *:49427                 *:*                                 919/avahi-daemon: r
udp        0      0 *:ipp                   *:*                                 17819/cups-browsed
udp6       0      0 [::]:mdns               [::]:*                              919/avahi-daemon: r
udp6       0      0 [::]:56356              [::]:*                              919/avahi-daemon: r
udp6       0      0 fe80::cc63:b04:481e:ntp [::]:*                              28138/ntpd
udp6       0      0 ip6-localhost:ntp       [::]:*                              28138/ntpd
udp6       0      0 [::]:ntp                [::]:*                              28138/ntpd
~ # lsof -i | grep 9091
firefox    3842   russ   79u  IPv4 103681669      0t0  TCP localhost:57558->localhost:9091 (ESTABLISHED)
firefox    3842   russ   81u  IPv4 103686787      0t0  TCP localhost:57562->localhost:9091 (ESTABLISHED)
firefox    3842   russ   86u  IPv4 103686788      0t0  TCP localhost:57564->localhost:9091 (ESTABLISHED)
firefox    3842   russ   90u  IPv4 103686789      0t0  TCP localhost:57566->localhost:9091 (ESTABLISHED)
java      27508   russ 1699u  IPv4 103667313      0t0  TCP *:9091 (LISTEN)
java      27508   russ 1730u  IPv4 103687885      0t0  TCP localhost:9091->localhost:57562 (ESTABLISHED)
java      27508   russ 1731u  IPv4 103687886      0t0  TCP localhost:9091->localhost:57564 (ESTABLISHED)
java      27508   russ 1732u  IPv4 103680156      0t0  TCP localhost:9091->localhost:57558 (ESTABLISHED)
java      27508   russ 1734u  IPv4 103687887      0t0  TCP localhost:9091->localhost:57566 (ESTABLISHED)

Friday, 6 January 2017

Just a note from Expression Language Guide that if attempting to evaluate an attribute, via evaluateAttributeExpression(), NiFi looks first for a flowfile attribute by that name, then, if missing, for a system environment variable, and that failing, for a JVM system property.

Tuesday, 10 January 2017

Working on a huge flow, examining performance with JMC/JFR, I have literally hundreds of millions of files that stack up at different queue points, but especially at the end. Ridding myself of all theses files from the queues take something like forever. I posted to ask if anyone had a quick way, but there is none. I tried a) creating a no-op processor drain, but that doesn't go much if at all faster than b) right-clicking the queue and choosing Empty queue.

I even tried a brute-force approach using a script I wrote sitting in my NiFi root:

#!/bin/sh
read -p "Smoke NiFi repositories (y/n)? " yesno
case "$yesno" in
  y|Y)
    ( cd flowfile_repository   ; rm -rf * )
    ( cd provenance_repository ; rm -rf * )
    ( cd content_repository    ; rm -rf * )
    ;;
  *)
    ;;
esac

...because removing everything from the content repository takes forever—hours for a couple of hundred million flowfiles.

This was predictable, but I wasn't even thinking about it before the accumulation became a problem. After letting the files delete over night, I came in a added a counting processor so I could just let them drain out as they arrived (instead of accumulating them in a queue for counting).

Note too that if the total number of flowfiles in any one connection exceeds the value nifi.queue.swap.threshold in nifi.properties, usually 20,000, swapping will occur and performance can be affected. So, by allowing files to accumulate, I was shooting myself in the foot because I was altering the very performance I was trying to monitor/get a feel for.

Wednesday, 11 January 2017

I'd never done this before.

How to make "kinkies" in a relationship/connection? Just double-click the line and it will put a yellow handle in it that you can drag to get the connection around any obstruction to make the connection clearer. To remove one that's not/no longer needed, simply double-click the yellow handle. See Bending connections to add a bend or elbow-point.

Someone in the forum wanted a flowfile counter processor. I'll put it here, but just in case, I'll say "no guarantees" too. NiFi's counter cookie jar consists of the lines highlighted below.

PassThruCounterTest.java:
package com.etretatlogiciels.nifi.processor;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;

public class PassThruCounterTest
{
  private static final String TEST_VALUE = "test123";

  @Test
  public void testFlowfileCount() throws InitializationException
  {
    TestRunner runner = TestRunners.newTestRunner( PassThruCounter.class );
    runner.setProperty( PassThruCounter.NAME, TEST_VALUE );
    runner.setProperty( PassThruCounter.TYPE, "count" );
    runner.enqueue( TEST_VALUE );
    runner.run();
    final Long actual   = runner.getCounterValue( TEST_VALUE );
    final Long expected = Long.valueOf( "1" );
    assertEquals( actual, expected );
  }

  @Test
  public void testFlowfileSize() throws InitializationException
  {
    TestRunner runner = TestRunners.newTestRunner( PassThruCounter.class );
    runner.setProperty( PassThruCounter.NAME, TEST_VALUE );
    runner.setProperty( PassThruCounter.TYPE, "size" );
    String value = TEST_VALUE;
    runner.enqueue( value );
    runner.run();
    final Long actual   = runner.getCounterValue( TEST_VALUE );
    final Long expected = ( long ) value.length();
    assertEquals( actual, expected );
  }
}
PassThruCounter.java:
package com.etretatlogiciels.nifi.processor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@SupportsBatching
@Tags( { "debugging" } )
@CapabilityDescription( "Counts flowfiles passing or their cumulative sizes." )
public class PassThruCounter extends AbstractProcessor
{
  @Override public void onTrigger( final ProcessContext context, final ProcessSession session )
        throws ProcessException
  {
    FlowFile flowfile = session.get();

    if( flowfile == null )
    {
      context.yield();
      return;
    }

    final String type = context.getProperty( TYPE ).getValue();
    final String name = context.getProperty( NAME ).getValue();

    switch( type )
    {
      case "count" :
        session.adjustCounter( name, 1, true );
        break;
      case "size" :
        session.adjustCounter( name, flowfile.getSize(), true );
        break;
    }

    session.transfer( flowfile, SUCCESS );
  }

  //<editor-fold desc="Properties and relationships">
  @Override
  public void init( final ProcessorInitializationContext context )
  {
    List< PropertyDescriptor > properties = new ArrayList<>();
    properties.add( TYPE );
    properties.add( NAME );
    this.properties = Collections.unmodifiableList( properties );

    Set< Relationship > relationships = new HashSet<>();
    relationships.add( SUCCESS );
    this.relationships = Collections.unmodifiableSet( relationships );
  }

  private List< PropertyDescriptor > properties;
  private Set< Relationship >        relationships;

  public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
      .name( "Type" )
      .description( "Counter type, one of \"count\" or \"size\" of flowfiles." )
      .required( true )
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .defaultValue( "count" )
      .build();

  public static final PropertyDescriptor NAME = new PropertyDescriptor.Builder()
      .name( "Name" )
      .description( "Name of registered counter." )
      .required( true )
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .build();

  public static final Relationship SUCCESS = new Relationship.Builder()
      .name( "success" )
      .description( "Continue on to next processor" )
      .build();

  @Override public List< PropertyDescriptor > getSupportedPropertyDescriptors() { return properties; }
  @Override public Set< Relationship >        getRelationships()                { return relationships; }
  //</editor-fold>
}

Thursday, 12 January 2017

Some notes on tuning NiFi for high-performance.

    nifi.properties
  1. nifi.bored.yield.duration=10 millis —prevents processors that are using the timer-driven scheduling technology from using excessive CPU despite there being no work to do.
  2. nifi.ui.autorefresh.interval=30 sec —can be used to make the browser UI refresh more often, but increases burden on the network. It doesn't affect the performance of NiFi itself.
  3. There are two databases used by NiFi (under database_repository). One is a user database keeping track of user logins (in the case where running secured NiFi). The other is the history database that tracks all changes made to the graph (or workflow). Neither of these is very big (I'm seeing just over 2Mb). It's useful to consider relocating this repository elsewhere (that is, outside /opt/nifi), to simplify upgrading by allowing to retain the user and component history after upgrading.
  4. The most likely problem in corrupting repositories is running out of disk space. For this reason, relocating the flowfile-, content- and provenance repositories outside of /opt/nifi is not a bad idea. The location is specified in nifi.properties via nifi.flowfile.repository.directory, etc.
  5. As noted a couple of days ago, it's the content repository's disk that experiences the highest I/O impact on systems with high data volumes. NiFi queues stacking up can be a problem. nifi.content.repository.directory.name=path can be use, in fact, a whole list of them with different names, in nifi.properties to spread out this potentially mammoth repository.
  6. In NiFi clusters, it's a good idea to use different names (just as above) for the repositories of the different NiFi nodes. Obviously, installing these nodes will not lead automatically to different names and keeping the same, default names works fine, but monitoring disk utilization within the NiFi UI will be easier if the names are different when lumped together in the system diagnostics window.
  7. bootstrap.conf
  8. The JVM memory settings control NiFi's heap should be greatly expanded from the shipping defaults (for example, from -Xms256m to -Xms8g).

A good link for NiFi installation best practice: HDF/NIFI Best practices for setting up a high performance NiFi installation.

Friday, 13 January 2017

I still haven't had time to set up a NiFi cluster which I want to do experimentally (before I ever have to do it because forced).

I came across a random, passing observation that Zookeeper, which is integral to erecting a NiFi cluster, generally recommends an odd number of nodes. I'm sure that, just as for MongoDB, this is about elections.

Tuesday, 24 January 2017

The perils of moving a processor between packages.

When you change the class (or package) name of a processor, you have effectively removed the processor from your build and added a new one. At that point, NiFi will create a "ghost" processor (in conf/flow.xml.gz) to take its place because the processor class no longer exists.

However, the properties are still retained, in case the processor is added back (reappears). But since NiFi cannot find it, it doesn't know whether the properties are sensitive or not. As a result, it marks all properties as sensitive because it was determined that it is better to share too little information than to share too much.

It's possible to edit conf/flow.xml.gz by hand, if care is taken, I think.

Thursday, 2 February 2017

If disk space is exhausted, how to recover unprocessed data since NiFi cannot proceed?

Archived content can be deleted from content_repository:

# find content_repository/ -type f -amin +600 | grep archive | xargs rm -f

(The -amin +600 option means file was accessed 600 or more minutes ago.)

This deletes anything from that archive that is more than 10 hours old. Once this has happened, assuming enough diskspace has been freed up, NiFi can be restarted.

Of course, what partition both the content- and flowfile repositories are on is a consideration. If not the same one, then freeing up old content won't do the trick.

Note that it's theoretically safe to perform these deletions with NiFi running, but there are no guarantees. It's better to halt NiFi first, then restart it.

Saturday, 11 February 2017

Time out for NiFi performance...

I need to review some NiFi advice to see where we are on implementing NiFi the correct way. Here are some elements:

  1. Each NiFi repository should be on its own, physically separate partition. The three repositories in question are found directly off ${NIFI_ROOT} (usually /opt/nifi):
    • content_repository
    • flowfile_repository
    • provenance_repository
    • database_repository (small and not part of any performance concern)

    The location of these repositories is managed by changing these defaults in conf/nifi.properties:

    nifi.content.repository.directory.default=./content_repository
    nifi.flowfile.repository.directory.default=./flowfile_repository
    nifi.provenance.repository.directory.default=./provenance_repository
    
  2. The first, easiest step is to make the provenance_repository volatile. This is done by settings for:
    • buffer size
    • maximum storage time —how long to keep provenance around for examination
    • maximum storage size —how much disk space to dedicate to keeping provenance data around
    • roll-over time —the time to wait before rolling over the latest provenance information to be available from the UI
    • index shard size —larger values impact the Java heap when searching through provenance data (from the UI)

    ...in conf/nifi.properties, here are the settings discussed above and their defaults:

    nifi.provenance.repository.buffer.size=100000
    nifi.provenance.repository.max.storage.time=24 hours
    nifi.provenance.repository.max.storage.size=1 GB
    nifi.provenance.repository.rollover.time=30 secs
    nifi.provenance.repository.rollover.size=100 MB
    nifi.provenance.repository.index.shard.size=500 MB
    
  3. Heap size for NiFi should be as great as legitimately possible. This is managed by establishing the JVM command-line option, -Xmx, in conf/bootstrap.conf. It looks like this:
    # JVM memory settings
    java.arg.2=-Xms512m
    java.arg.3=-Xmx12288m
    

  4. Processors that load their content fully into memory harm performance greatly. A very naïve example of this would be this sample onTrigger() implementation. onTrigger() is where the NiFi processor performs its main business, that of manipulating the flowfile. Options are to process the flowfile contents progressively inching through input- and output streams such that only the buffered contents sit in memory at one time (and not the entire input- or output contents):
    import org.apache.commons.io.IOUtils;
    .
    .
    .
    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
    {
      FlowFile flowfile = session.get();
    
      final AtomicReference< String > contents = new AtomicReference<>();
    
      session.read( flowfile, new InputStreamCallback()
      {
        @Override
        public void process( InputStream in ) throws IOException
        {
          StringWriter writer = new StringWriter();
          IOUtils.copy( in, writer );
          contents.set( writer.toString() );
        }
      });
    
      // now the entire flowfile contents are sitting in memory...
    }
    
  5. Consider a run-duration of 25 milliseconds for processors that support it. This allows the framework to batch many operations into fewer transactions which can dramatically increase throughput. The run-duration setting, when supported, is in the Scheduling tab in the processor configuration:

    Note that this setting may be confusing. Let me attempt to explain:

    The lower the latency, the more cycles are given to other processors because the batch size is kept smaller. The higher the throughput (which is throughput per invocation, the more that processor hogs cycles for itself.

    Depending on what's going on elsewhere in the flow, a lower latency will mean that the processor will simply run more times to process the same number of flowfiles that would have been processed in one invocation when this setting was done to favor throughput. However, this also means:

    • better response time at the NiFi UI (canvas),
    • better response time for other processors and elements of the flow.

    The run-duration slider control is included in configuration based on the use of the @SupportsBatching annotation. Once you use this annotation, you don't need to do anything in onTrigger() like:

    private static final int MAX_FLOWFILES_IN_BATCH = 50;
    
    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
    {
      final List< FlowFile > flowfiles = session.get( MAX_FLOWFILES_IN_BATCH );
    
      for( FlowFile flowfile : flowfiles )
      {
        ...
      }
    

    ..., but just respond with:

    @Override
    public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
    {
      final FlowFile flowfile = session.get();
    
      if( flowfile == null )
      {
        context.yield();
        return;
      }
    

    The slider (the NiFi framework) does the rest: you won't need to worry about the existence of a batch. It just is.

    Now, on the other hand, if you need to manage a pool of expensive resources, expensive to allocate, for example, you'd want also to add:

    @OnScheduled
    public void allocateMyResources( final ProcessContext context ) throws ProcessException
    {
      try
      {
        // make a connection, allocate a pool, etc.
      }
      catch( Exception e )
      {
        throw new ProcessException( "Couldn't allocate my resources: ", e );
      }
    }
    

Saturday, 14 February 2017

When you get the error that a flowfile is not the most recent version, it means that you have modified the flowfile in some way via ProcessSession and then attempted to use the flowfile in a variable that preceeds the change. Commonly this happens when you have code that looks something like this:

FlowFile flowfile = session.get();

if( flowfile == null )
  return;

session.putAttribute( flowfile, "hello", "hello" );
session.transfer( flowfile, SUCCESS );

The issue here is that the ProcessSession.putAttribute() method called above returns a new FlowFile object, and we then pass the old version of the flowfile to session.transfer(). This code would need to be updated to look like this:

FlowFile flowfile = session.get();

if( flowfile == null )
  return;

flowfile = session.putAttribute( flowfile, "hello", "hello" );
session.transfer( flowfile, SUCCESS );

Note the flowfile variable is re-assigned to the new FlowFile object when session.putAttribute() is called so that we pass the most up-to-date version of the flowfile to ProcessSession.transfer.

Friday, 17 February 2017

On sensitive-value properties...

This is a phenomenon common in controller services, i.e.: properties that hold a password or -phrase or other sensitive sequence that must be hidden from subsequent view. You type it in, confirm it, and never see it again. You can only type it in again.

Here's a long comment I got from Andy LoPresto:

Each sensitive property value is encrypted using the specified algorithm noted in conf/nifi.properties. By default, and therefore on any unmodified system, this is PBEWITHMD5AND256BITAES-CBC-OPENSSL, which means that the actual cipher applied is AES with CBC mode of operation using a 256-bit key derived from the provided nifi.sensitive.props.key value (really a password and not a true key) using the OpenSSL EVP_BytesToKey key-derivation function (KDF), a modified version of PKCS #5 v1.5' MD5-based PBKDF1 identical when the combined length of the key and IV is less than or equal to an MD5 digest and non-standard if greater.

A random 16-byte salt is generated during the encrypt process (the salt length is actually set to be the block length of the cipher in question—AES is always 16 bytes, but if you choose to use DES for example, this value would be 8 bytes), and this salt is provided, along with an iteration count (1000 by default) to the KDF. Internally to the application, this process occurs within the Jasypt library (slated for removal in NIFI-3116), but you can see a compatible Java native implementation of this process in the ConfigEncryptionTool.

If the nifi.sensitive.props.key value is empty, a hard-coded password is used. This is why all documentation should recommend setting that value to a unique and strong passphrase prior to deployment.

Once the key is derived, the sensitive value is encrypted and the salt and cipher text are concatenated and then encoded in hexadecimal. This output is wrapped in "enc{" and "}" tokens to denote the value is encrypted, and then stored in flow.xml.gz. As pointed out elsewhere, the sensitive value (in plain-text or encrypted form) is never sent over the ReST API to any client, including the UI. When editing a processor property that is sensitive, the UI displays a static placeholder.

If two values are the same (i.e. the same password in an EncryptContent processor encrypting and an EncryptContent processor decrypting later in the same flow), the two encrypted values will be completely different in the flow.xml.gz even though they were encrypted with the same key. This is because of the random salt value mentioned above.

The encrypt-config.sh script in nifi-toolkit exposes the functionality to "migrate" the flow.xml.gz encrypted values to be encrypted with a new key. For example, if you read the above and feel uncomfortable with your values encrypted with the default hard-coded password rather than a unique passphrase, you can run this tool and provide a new passphrase, and it will update conf/nifi.properties with the new passphrase (and encrypt it if you have encrypted configuration enabled) and decrypt all values in the flow.xml.gz and re-encrypt them with the new key and repopulate them.

A small note on the migration tool—you may notice that after running it, identical values will have identical encrypted values. This design decision was made because the performance tradeoff of not re-performing the KDF with a unique salt and cipher initialization on each value is immense, and the threat model is weak because the key is already present on the same system. This decision was further impacted by the process by which Jasypt derives the keys. After NIFI-3116 is completed, I feel confident we can improve the security of the cipher texts by using unique IVs without incurring the substantial penalty currently levied by the library structure.

Throughout this explanation, you may have had concerns about some of the decisions made (algorithms selected, default values, etc.). There are ongoing efforts to improve these points, as security is an evolving process and attacks and computational processing availability to attackers is always increasing. More of this is available in relevant JIRAs and the Security Features Roadmap on the wiki, but in general I am looking to harden the system by restricting the algorithms used by default and available in general and increasing the cost of all key derivations (both in time and memory hardness via PBKDF2, bcrypt, and scrypt). Obviously the obstacles in this effort are backwards compatibility and legacy support. It is a balance, but with the support of the community, I see us continuing to move forward with regards to security while making the user experience even easier.

Wednesday, 1 March 2017

This morning, NiFi queue prioritization...

To impose prioritization on flowfiles, you configure a (stopped) queue, by right-clicking it and choosing Configure. Look for the Settings tab. Then grab one of:

...and drag it down into Selected Prioritizers.

This morning's note is about PriorityAttributePrioritizer.

The PriorityAttributePrioritizer prioritizes flowfiles by looking for a attribute named priority, then sorting the flowfiles using the lexicographical value of the priority marked therein.

Brian Bende pointed out that, coming out of various feeds (let's say some processor groups), you can set the priority attribute using UpdateAttribute to weight the various feeds according to a sense of priority. Before continuing the flow, you can converge all the flows into one using a funnel.

Tuesday, 7 March 2017

How the provenance repository works

The provenance repository writes out the data "inline" as flowfiles traverse the system. It rolls over periodically (by default every 30 seconds or after writing 100 MB of provenance data). When it rolls over, it begins writing data to a new (repository) file and starts to index the events for the old file in Lucene. The events are not available in the UI until they have been indexed into Lucene. This could be a few minutes depending on the system and how many provenance events are being generated, see Persistent Provenance Repository Design.

This said, the existing implementation (1.1.x) is not as fast as it could be. In the next release of NiFi, there is an alternate implementation that can be used. See Provide a newly refactored provenance repository.

Wednesday, 15 March 2017

If you want to see when/who stopped a processor in NiFi (and other configuration changes) you can use the Flow configuration history (the clock icon on the right side of the menu bar in 0.7.x) and filter by processor id.

Thursday, 16 March 2017

Just a note to remember that AbstractProcessor extends AbstractSessionFactoryProcessor and that the real delta between them is the second argument to onTrigger(), where the subclass offers ProcessSession in place of the ProcessSessionFactory. The latter gives the ability to create a new session (with sessionFactory.createSession().

Friday, 17 March 2017

Here are some excellent details on NiFi-processor semantics by Andy LoPresto and Mark Payne. First Andy...

In the Developer Guide, there are discussions of various processor patterns, including Split Content, and a section on Cohesion and Usability, which states:

"In order to avoid these issues, and make processors more reusable, a processor should always stick to the principal of 'do one thing and do it well.' Such a processor should be broken into two separate processors: one to convert the data from format X to format Y, and another processor to send data to the remote resource."

I call this the "Unix model," i.e.: it is better to join several small, specific tools together to accomplish a task than re-invent larger tools every time a small modification is required. In general, that would lead me to develop processors that operate on the smallest unit of data necessary—a single line, element, or record makes sense—unless more context is needed for completeness, or the performance is so grossly different that it is inefficient to operate on such small quantities.

Finally, with regards to your question about which provenance events to use in various scenarios, I agree the documentation is lacking. Luckily Drew Lim did some great work improving this documentation. While it has not been released in an official version, both the Developer Guide and User Guide have received substantial enhancements, describing the complete list of provenance event types and their usage or meaning. This work is available on master and will be released in 1.2.0.

Different processors have different philosophies. Sometimes that is the result of different authors, sometimes it is a legitimate result of the wide variety of scenarios that these processors interact with. Improving the user experience and documentation is always important, and getting started with and maximizing the usefulness of these processors is one of several top priorities.

Next, Mark...

On Provenance Events, while the documentation does appear to be lacking there at the moment, the framework does its best to help here. Here is some additional guidance:

Regarding processors handling all of the data vs. only a single "piece" of the data:

Processors should handle all of the content of a flowfile, whenever it makes sense to do so. For example, if you have a processor that is designed to operate on the header of a CSV file then it does not make sense to read and process the rest of the data. However, if you have a processor that modifies one of the cells in a row of a CSV file it should certainly operate on all rows. This gives the user the flexibility to send in single-row CSV files if they need to split it for some other reason, or to send in a many-gigabyte CSV file without the need to pay the cost of splitting it up and then re-merging it.

Where you may see the existing processors deviate from this is something like ExtractText, which says that it will only buffer up to the configured amount of data. This is done because in order to run a regular expression over the data, the data has to be buffered in the Java heap. If we don't set limits to this, then a user will inevitably send in a 10Gb CSV file and get "out of memory errors." In this case, we would encourage users to use a small buffer size such as 1Mb and then split the content upstream.

Friday, 18 March 2017

I asked the question,

The ability to create a new session is conferred by extending AbstractSessionFactoryProcessor (in lieu of AbstractProcessorwhich which further restricts Processor, among other things, removing the session factory) giving one the option of calling sessionFactory.createSession(). This begs the question of why or when would I wish to create a new session rather than operate within the one I'm given (by AbstractProcessor)?

Mark Payne and Matt Burgess responded, Mark wrote:

MergeContent is a good example of when this would be done.

It creates a ProcessSession for each "bin" of flowfiles that it holds onto. This allows it to build up several flowfiles in one bin and then process that entire bin, committing the ProcessSession that is associated with it. Doing this does not affect any of the flowfiles in another bin, though, as they belong to a different ProcessSession. If they all belonged to the same ProcessSession, then we would not be able to commit the session whenever one bin is full. Instead, we would have to wait until all bins are full, because in order to commit a ProcessSession, all of its flowfiles must be accounted for (transferred or a relationship or removed).

So in a more general sense, you would want this any time that you have multiple flowfiles where you need them to belong to different session so that you can commit/rollback a session without accounting for all flowfiles that you've seen.

The AbstractSessionFactoryProcessor is also sometimes used when we want to avoid committing the session when returning from onTrigger(). For example, if there is some asynchronous process going on.

Matt:

There are a couple of use cases I can think of where you'd want AbstractSessionFactoryProcessor over AbstractProcessor:

  1. You only want to update the StateManager after a successful commit has taken place. Because AbstractProcessor.onTrigger() calls commit for you, you can't add code after that to update the state map.
  2. I am working on a processor that needs to keep the same session across multiple calls to onTrigger(), basically its a transactional thing and I don't know when it will end. If something bad happens in the meantime, I need to roll back that original session.

Tuesday, 21 March 2017

@SupportsBatching leads to that "slider" seen in a processor's configuration. It's a hint only, but has no material effect upon how to code the processor (in terms of special things to do).

A greater than 0 delay (see the slider) will cause the flowfiles over that time period to be batched out such that the work will be done, but no commit until the end of the delay (in slider).

Friday, 24 March 2017

Custom registry properties in the NiFi Expression Language

At least by NiFi 1.1.1, the following became a possibility:

In ${NIFI_ROOT}/conf/nifi.properties, add this line:

nifi.variable.registry.properties=./conf/registry.properties

You can list more than one file, separated by commas. Then, create this file (registry.properties) locally with contents:

zzyzzx="This is a test of the Emergency Broadcast System. This is only a test."

Next, create a flow using CreateFlowFile (and start it) connected to UpdateAttributes, which you configure thus:

Property  Value
ZZYZZX         ${zzyzzx}

...(starting this processor too) then on to NoOperation (that you don't start) so there's a flowfile in the queue you can look at. Bounce NiFi because these properties are static. Once there's a flowfile in the queue in front of NoOperation, right-click and choose List Queue, then click the View Details and, finally, the Attributes tab. You'll see attribute ZZYZZX with the property value assigned by registry.properties in our example.

Monday, 27 March 2017

Failing to restart NiFi...

Only the flowfile_repository need be removed (as long as data is not important): removing that repository means that the contents of the content_repository will be removed by NiFi since no flowfiles point to its contents anymore.

It's possible also/instead of to set nifi.flowcontroller.autoResumeState in conf/nifi.properties to false, then piecewise alleviate large queues or processing of files that could be causing the failure to restart.

Tuesday, 28 March 2017

You create a new processor, everything's perfect, then you move it out of your larger project to its own home ('cause that's needful), but then you cannot load it. The NAR builds, NiFi's happy to load it, but when you go to do Add Processor in the UI, it's not there. You pull your hair out, you cry on the shoulder of sympathetic souls in the community forum, then someone points out that you spelled META-INF with an underscore instead of a hyphen.

What's left to do, but look for a sword to fall on?

Here's a list of things to check that I will add to as more ideas occur to me.

  1. Processor code builds and tests.
  2. Processor contains a @Tags annotation; this won't stop it from being found in the UI list for Add Processor, but not having tags means having to run the list looking for it instead of being able to get nearly to it using a tag.
  3. src/main/resources/META-INF/services/org.apache.nifi.processor.Processor accurately records the package path and processor name.
  4. Check all package paths and names to ensure no conflicts with other processors.
  5. Ensure a proper NAR is built:
    1. Your code is JAR'd up correctly: the JAR contains
      • the expected package path down to and including all the .class files of your project code.
      • a META-INF subdirectory containing the services subdirectory plus the file org.apache.nifi.processor.Processor with the definition of your processor and its accurate packagage name.
      • the file, MANIFEST.MF.
      • a maven subdirectory underneath which the pom.xml of your project code. (This is not the nar subdirectory's pom.xml nor the top-level one, but the one building the significant Java code.)
    2. Your JAR is NAR'd up: the NAR contains a META-INF subdirectory underneath which are:
      • your JAR, alongside all the myriad supporting JARs under the bundled-dependencies subdirectory.
      • the file, MANIFEST.MF.
      • a maven subdirectory underneath which the pom.xml for the nar subdirectory, that is, the instructions on how to build the NAR and what to include in it, its dependency(ies).
  6. Copy the NAR to the lib subdirectory of NiFi.
  7. Bounce NiFi; observe logs/nifi-app.log for trouble with the new processor.
  8. Ensure you can find and add the new processor using the UI.

Monday, 3 April 2017

Bryan Bende makes some useful points in a post. I am going to be dealing with moving our authentication strategy from 0.7.1 over to 1.1.2 very soon, so I'm copying this post here for safe-keeping and quick access.

  1. The toolkit defaults to using OU=NIFI when generating the certificates for the hosts/servers, and you are correct that I probably should have passed in CN=bbende, OU=NIFI for the client certificate to be consistent. In reality the OU does not have any meaning in this case, as long as all the certificates (servers + client) are signed by the same certificate authority, which they are.

  2. The toolkit generates a certificate authority (CA) which is stored in nifi-cert.pem and nifi-key.key. The CA is used to sign the other certificates for the servers and any client certificates. The truststore then has the public key of the CA which is saying to trust anything that was signed by this CA.

    So to add a new node, you would run the tls command again with the new hostname, but it has to be done from the same location where you ran it before because you want it to use the existing nifi-cert.pem and nifi-cert.key, so that the certificate of the new node is signed by the same CA as the other nodes.

    $ ./bin/tls-toolkit.sh standalone -n new-hostname
    

    You won't have to update anything on the existing nodes because they already trust the CA.

  3. The initial admin can be an LDAP user. Most people start out with certificates to make sure they have the initial setup working and then add LDAP after that, but you can go with LDAP right away if you like.

  4. Site-to-site happens on behalf of the system, not the users of the NiFi web UI that configured it. So in order for site1 to send data to site 2 in a secure setup, all the of the nodes from site 1 need to be represented as users in site 2. That is done by creating users at site 2 with the usernames being the DNs from the certificates at site 1. This is shown in this post when creating the user for localhost.

    As far as users of the web UI... If there are different LDAPs then these are just two different accounts and the user has one account to log in with at site 1 and another account to log in with at site 2. If you want all users to be the same across site 1 and site 2 then they should be pointing at the same central LDAP.

  5. NiFi's authorizer is a pluggable component, and NiFi provider's its own file-based authorizer and an Apache Ranger implementation. For cases where you don't have Apache Ranger then you would use NiFi's default file-authorizer. Take a look at the Admin Guide section for configuring the file-based authorizer.

    https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#multi-tenant-authorization

I found an interesting comment I want to keep written by Andy LoPresto to someone today about MiniFi.

NiFi can run on small hardware, such as a Raspberry Pi. You may also be interested in MiNiFi a sub-project of NiFi. MiNiFi is a "headless agent" tool which is designed to run on lightweight or shared systems and extend the reach and capabilities of NiFi to the "edge" of data collection. MiNiFi offers two versions—a Java version which has a high degree of compatibility with NiFi (many of the native processors are available), and a C++ version which is extremely compact but has limited processor definition at this time. MiNiFi may also be a better fit for a "non-UI workflow," as the flow can be defined using the GUI of NiFi and then exported as YAML to the MiNiFi agent, or written directly as YAML if desired.

Wednesday, 5 April 2017

It looks like using NiFi 1.1.2 incurs a problem until 1.2.0 in that Jetty uses /dev/random. This generator will, the first time used on a machine, spend up to 10 minutes accumulating data from noise in order to be completely random. This would be the case every time in a launch of a virtual machine (since every launch is brand new "hardware"), so a JVM option can be used to change this. To override the use of /dev/random, set java.security.egd to use /dev/urandom. However, at least in the Oracle JVM, that string is noted and reinterpreted to be /dev/random thus thwarting the override. Instead use this incantation to get around it:

java.args.N=-Djava.security.egd=file:/dev/./urandom

...in ${NIFI_ROOT}/conf/bootstrap.conf.

See How to solve performance problem with Java SecureRandom? and First deployment of NiFi can hang on VMs without sufficient entropy if using /dev/random. .

Friday, 7 April 2017

Some notes on latency and through-put...

Some processors have, under the Scheduling tab of the Configure Processor dialog, a slider, Run Duration, that permits a balance to be set between latency and through-put.

This slider appears on processors making use of the @SupportsBatching annotation.

The way to think about it is to see the slider as representing how long a processor will be able to use a thread to work on flowfiles (from its in-bound queue), that is, allowing onTrigger() to run more times to generate more out-bound flowfiles. Moving the slider further to the right makes the processor do more work, but at the same time, the processor will hog the thread for a longer period of time before it gives it up to another processor. Overall latency could go down because flowfiles will sit in other queues for longer periods of time before being serviced by their processor (that is, the processor waiting to serve the queue they're sitting in) because other processors don't get the thread that the first one is hogging.

For processors that are willing to delegate the responsibility of when to commit what they've finished to the framework, in a transactional sense, the framework can then use that understanding to combine one or more into a single transaction. This trades off some small latency for what is arguably higher throughput because the framework can do a single write to the flowfile repository in place of doing many writes reducing thus the burden on various locks, filesystem interrupts, etc. In some cases, this is more friendly and has the effect of higher through-put.

If a flow has and handles flows from numerous, perhaps competing concerns, teams, organizations, etc., this must be tunable.

Whether or not to annotate with @SupportsBatching, enabling the existence of this slider, it comes down to whether the processor can or is willing to let the framework control when to commit the session's work. This would be the case when work isn't side-effect free (cf. @NoSideEffects) in which something, some state, has been altered in ways from which actions taken could not be recovered from. For example, PutSFTP will send data via SFTP. Once a file is sent, it cannot be "unsent" and the process cannot be repeated without side effect. This processor would not be a candidate for allowing the framework to decide.

Tuesday, 18 April 2017

I had the opportunity of witnessing what happens when a NAR is inadvertantly restructured resulting in processors ending up on different package paths to where they were before. Of course, they no longer work as they are no longer there in support of processors in existing flows. NiFi makes "ghosts" when that happens and, of course, those ghosts don't work. I was impressed, however, by what happened. I half-expected some cataclysm with damage to flow.xml.gz. The only thing that happened was an exception in logs/nifi-app.log, a comment about the ghost created, then in the flow, the processors weren't operable, but they were still there and still connected.

When I corrected the situation, everything worked again.

Wednesday, 19 April 2017

NiFi processor properties

Today I'm going to corral the topic of responding to changes in configuration. Typically, a processor's properties will remain fixed once established, but it doesn't have to be that way. I'm especially interested in optimizing gathering properties since some properties might be expensive to gather if they need further treatment after gathering.

It's desirable, therefore, for a processor to react when its properties have changed. The method, onPropertyModified(), is inherited from AbstractProcessor and is called when these have changed, once for each modified property:

/**
 * Called only when a NiFi user actually updates (changes) this property's value.
 * @param descriptor the property.
 * @param oldValue null if property had no previous value.
 * @param newValue null if property has been removed.
 */
@Override
public void onPropertyModified( final PropertyDescriptor descriptor, final String oldValue,
                                                                     final String newValue )

NiFi component lifecycle

This brings up also NiFi's component lifecycle, which is governed by means of annotation. Methods are annotated to be called when the lifecycle event occurs.

So, what does this mean for property management?

It means you define static properties (and relationships) in your init() method. However, dynamic properties are not there at the time init() is called since they're dynamic and you can't predict them.

If you expect multiple instances of your processor (duh), you'll want to use instance variables to hang on to your properties. You'll also want instance variables, in the case of expensive, derivative work fo the sort that prompted me to think about this today in the first place (because I've not previously had this challenge of expensive operations as a result of property values that could change), so that each instance of your processor has and deals with this expense separately. (Of course, you shouldn't use static variables for stuff like this.)

The most efficient way to react only to expensive decisions is to watch @OnScheduled. That's the time that your processor is about to run, you want to lock down all the meaningful data then rather than keep doing it over and over again each time onTrigger() is called.

If you've got any deconstruction concerns, which would be rare, you'd want to handle those in an @OnStopped setting. Don't do this in an @OnUnscheduled method because the processor threads aren't halted by then and you'd be pulling the rug from under their feet.

Last, if you want precision reaction only to those properties reported as modified, you can finesse it by writing a method I evoked at the beginning, onPropertyModified().

For the present work, some of this has proven useful and I'm content.

Important note in @OnSceduled methods: It's important here that unvalidated properties, in this case only the rules from dynamic properties, don't lead (when missing or bad) to a situation that result in exceptions, errors, etc. that aren't carefully handled (by clear warning). Otherwise, no sooner is the processor started up on the UI canvas than it gets a potentially confusing error/warning notation on it.

Thursday, 20 April 2017

Clustering notes...

...from comments by Mark Payne.

NiFi relies upon Apache Zookeeper to coordinate which node acts as coordinator and which as primary. An embedded Zookeeper is an option, but for production use, an external one running on different hosts is recommended.

For getting started, the nifi.cluster.node.connection.timeout and the nifi.cluster.node.read.timeout properties are set to 5 seconds by default, but garbage collection for large numbers of flowfiles could cause pauses longer than that, so it's recommended to increase these values to 10 or even 15 seconds. Similarly, change nifi.zookeeper.connect.timeout and nifi.zookeeper.session.timeout to 5 or 10 seconds.

Monday, 24 April 2017

Counter-intuitively, here's how to export, then import a template. I say this because I don't find this interface all that compelling and could come up with a more logical one myself. Nevertheless, we're certain happy that this is possible at all.

The first sequence is exporting the template while browsing one NiFi flow. The second sequence shows importing the template while browsing another (or the same) NiFi flow. Finally, the last sequence shows incorporating the contents of the template as a new NiFi flow.


    Save elements of a flow to a new template...
  1. Select the elements of your flow that are to be the template.
  2. From the Operate menu, click the Create Template button/icon (above).
  3. Name and Describe (if desired) the template.
  4. The template is now in conf/flow.xml.gz (in NiFi 0.x it was kept in a separate file under conf/templates). To export it as .xml, pull down the General menu (upper right-hand corner of NiFi's content window in the browser) and choose Templates.
  5. Click to select your template.
  6. Click the Download icon (right), click the Save radio button, then OK, then navigate to where you wish to drop it and click Save.
  7. Note: it's ironically easier for the next steps to keep this template on the host from which you run the browser hosting the NiFi UI.
  8. Import the new template to the target flow...
  9. From the Operate menu again, click the Upload Template button/icon (above).
  10. Click the Browse (magnifier) button and and navigate to the template you saved above, then click Open. You'll see the template name in the Upload Template dialog.
  11. Click the UPLOAD button. You'll see a confirmation alert. Dismiss it by clicking OK. At this point, you can look via General → Templates to see your new template listed. If you can't see it there, then you will not be able to import it in this next sequence of steps.
  12. Place the template's contents on the canvas...
  13. Wherever you want to insert this template, pull a template from the Components toolbar (below) and drop it on the canvas where you want it.
  14. Before it drops, you'll have the opportunity to select your template using the Choose Template drop-down in the Add Template dialog. Select it and choose Add. If any processors are missing (i.e.: the instance of NiFi you're modifying isn't the same version or, especially, doesn't have the same NAR files (in lib), you won't be able to drop the new template.

Tuesday, 25 April 2017

Yesterday, I learned (or reinforced having learned it before) that our searchadmin user cannot touch the NiFi canvas/flows—only examine it (them). This is because, sometime in the history of NiFi 1.x, it became the case that a user is created and able to create other users then endow them with the ability to do this, but not do it (not by default, that is) by itself.

The symptom of this was, because I was logged in from a user with rights to assign rights, but without rights to edit the flow, much of the UI was greyed out.

So, one thing I had to do yesterday was to create a new user, russ, on the test server (above) that can create, modify and delete NiFi flows. Here are some notes on this using a new user, jack. A new user is created in LDAP whence it's consumed by NiFi. Then, manage NiFi this way:

  1. In NiFi, log in as searchadmin. (This user of ours is not able to do more than view the NiFi canvas, but it's the user that can manage other users and groups too.)
  2. In the General menu, select Users then:
    1. Find an existing user and copy his fully qualified distinguished name (FQDN) to your clipboard. This would be something like "uid=russ,ou=People,dc=searchappliance,dc=com". The first time, however, if there isn't an example to use, you'd have to find out how FQDNs are already set up in your LDAP for this.
    2. Click on the Add User button/icon at the extreme right.
    3. Paste (and correct to have the right user name) the FQDN you want to add, something like "uid=jack,ou=People,dc=searchappliance,dc=com" into the Identity field. Don't assign ROLE_ADMIN.
    4. Click OK.
  3. (Note: I'm not dealing with the concept of a user belonging to groups here.)
  4. Copy the new FQDN you created to the clipboard and dismiss the NiFi Users dialog.
  5. Open the Operate tool palette and click the Access Policies dialog box via the key icon or button. (However, see also my note in the last step of this post.)
  6. Add the new user to each access policy he'll need. This is pretty tedious. Here's how:
    1. Click the Add users/groups to this policy (the first policy is view the component.
    2. To add the new user in the Add Users/Groups dialog, click in the User Identity field.
    3. Paste what you previously copied to the clipboard, the FQDN of your new user, then select it (as it's reproduced just below that field) so that it shows up in the Selected Users list. (I found this step very unwieldy and confusing; you'll need to experiment with this odd UI control until you get it to show up in the list.)
    4. Click Add. What this should accomplish is that the new user is added to the list of users able to view the component.
    5. Click the drop=down where you see view the component and select a different policy, like modify the component.
    6. Click the Add users/groups to this policy button/icon.
    7. Click in the User Identity field, a list should appear; select your new user. (You may need to paste again from the clipboard—as I say, I find this very unwieldy.) The point is to add the new user to the Selected Users list.
    8. Do this for every policy such that the new user has rights to accomplish all that he's expected to do in the NiFi UI in terms of creating and managing flows. Here are the policies that I set up for jack:
      • view the user interface
      • access the controller: view, modify
      • access restricted components
      • retrieve site-tosite details
      • view system diagnostics
      • proxy user requests
      • access counters: view, modify
      What don't I have access to?
      • query provenance (no way to set this up?)
      • access all policies (would make jack a super user)
      • access users/user groups (ibid)
    9. If you're being careful, this new user you're adding probably doesn't need the ability to view- or modify the policies.
    10. Dismiss the Access Policies dialog.
  7. Log out from searchadmin and log in using the new user. If you get:
    Access Denied
    
    Unable to perform the desired action due to insufficient permissions. Contact the system administrator.
    
    you've neglected something. You'll need to log out, log back in as searchadmin, and fix it. Likely, the user doesn't have adequate privileges to view the user interface. It's always better to manage the policies via General → Policies than via the Operate → Access Policies method. For some reason, you don't see all the policies via the second method which is the one I suggested in these steps. Sorry.

Note that, while this is overly complicated and nasty, there is a way to work around it. To do this, create a new group that can read and write the UI canvas, then just add users into that group. Note that it need not be an LDAP group, only a group for convenience in NiFi.

Thursday, 27 April 2017

The behavior of creating a property as a dynamic one (e.g.: creating using the builder with method .isDynamic( true ) ) does lead to a nice, dynamic-looking property in the configuration dialog replete with a trashcan icon/control, but deleting the property doesn't in fact delete it: the next time you open the configuration dialog, it's back and, anyway, it remains in the list of properties to consider when you iterate through them, i.e.:

for( Map.Entry< PropertyDescriptor, String > property : context.getProperties().entrySet() )
{
  PropertyDescriptor descriptor = property.getKey();

  if( descriptor.isDynamic() )
    ...
}

Separately, in JUnit testing a processor, you add dynamic properties in the same way as static ones for the purpose of testing, so this doesn't inform you much in the behavior noted in my first paragraph.

Joe Witt said that the dynamic-property mechanism wasn't really set up to work for what I want to do (preconfigure some very likely defaults to speed up what my users have to do). The answer is to pre-add properties with a boolean enabling property and let them turn them off instead. In my precise need, this works fine though I'm pretty sure the idea has some merit.

Friday, 28 April 2017

Backing up a NiFi instance...

My IT guy is interested in back-ups of NiFi, so I thought I'd do a little research on it. Here are some notes of my own and some taken from the research I'm doing this morning. I haven't been overly interested in this aspect of NiFi in the year-plus I've been writing processors to solve problems using this framework.

Mark Payne offered this advice:

Given that the content and flowfile repositories are intended to be rather short-lived, I'm not sure that you'd really have need to backup the repository (if you were to pause the flow in order to back it up, for instance, you could likely process the data just as fast as you could back it up—and at that point you'd be finished with it). That being said, for a production-use case, I would certainly recommend running on a RAID configuration that provides redundancy so that if a disk were to go bad you'd still be able to access the data.

For the provenance repository, there actually exists a reporting task that can send the data via site-to-site so that it can be exfilled however you see fit in your flow.

Monday, 1 May 2017

Flowfile content

As long as the content of a flowfile doesn't change, but only attributes, only a reference is maintained to it—not separate copies.

Tuesday, 2 May 2017

To turn on logging to get help with a standard NiFi processor, let's pick InvokeHTTP, paste this line into (toward the bottom) ${NIFI_ROOT}/conf/logback.xml:

<logger name="org.apache.nifi.processors.standard.InvokeHTTP" level="DEBUG"/>

Monday, 8 May 2017

If your unit test needs to check for cross-contamination by simulating two, consecutive runs of your processor, you instantiate a new TestRunner only once, for sure, but after checking out the results from the first call to runner.run( 1 ), you need to call runner.clearTransferState() to put the "processor instance" back in the state it would be (fresh) for the next call.

Friday, 12 May 2017

More on slf4j trouble...

I found by accident and more specifically what's troubling me in my case.

Of course, the real problem is having older slf4j JARs linked into your binary because of ancient code. That's the real problem and the rest of this thread was taken up with getting to the real problem. However, ...

...now I know why not all of my test code encounters this problem. It's because it only happens when I use NiFi ComponentLog (or the test framework uses it). As long as execution never wanders down into that, I don't have the problem. It's the call to getLogger().{info|warn|etc.} that does it.

So, if you don't make any mistakes in your test code calling into the NiFi test framework, you're a lot less likely to encounter this. You also must not put a call to getLogger().{info|warn|etc.} into your custom-processor code.

Now, oddly, that NiFi call or my custom processor call getLogger().info(), etc. does not exhibit this problem in production, only when JUnit tests are running.

Makes an old C hack like me wish for a preprocessor. I wrote a class, NiFiLogger, to work through and that I disable in favor of System.out.println() when running tests. Down the road I'll be able to take that out.

Stuck threads

You have a processor that appears to be stopped, but also claims to be running two threads.

This is a stuck thread and there's no way, using the UI, to unstick the threads. You can bounce NiFi.

First, however, you can do

$ bin/nifi.sh dump

...to cause NiFi to do a dump to logs/nifi-bootstrap.log to find out why the threads are stuck.

Tuesday, 16 May 2017

NiFi state management

There's a simple, key-value pair store available for processors, controller services and reporting context, that allows for cluster-wide or local storage of state.

For example, if you want to remember what the last record you got doing some query, and you're the author of the processor doing it, you can add state to do that. The API consists of:

  1. setState(),
  2. replace(),
  3. clear() and
  4. getState().

(The implications of cluster state differ considerably from local state, so read up on this.)

Two processors cannot share state; it's per-processor only. To share state between processors, resort to a controller service for this, that is, to bind two processors to the same controller service and that service exposes thus its (otherwise) private state effectively shared between the two consuming processors.

There is unit-test framework support for this.

Configuring controller services in NiFi 1.x...

It turns out that General → Controller Settings... produces nothing useful. This is because, unless you configure a controller service from Operate → (gear icon) or from a processor's configuration directly, you haven't got useful configuration. This is because controller service configuration is now process group-specific:

Understanding Controller Service Availability in Apache NiFi, paragraph 2. This represents a pretty big change since NiFi 0.x.

Incidentally, I think it's super important, when configuring controller services, to give them a name beyond their coded name—a name that describes what they're supposed to do for any configuration processor. That way, you don't have to sort through myriad instances of the same controller looking for the one you want.

Wednesday, 17 May 2017

I worked on the "NiFi rocks!" example of a controller service to excerpt (or "highlight") more or less the crucial bits because I wrote a controller service this week, had some trouble getting it to work just right (especially creating unit tests for it).

Here is the interface. Oddly, this is the very thing by which the controller service is always referenced (instead of StandardPropertiesFileService). In short, this interface must be able to say it all. Anything it doesn't say won't be easily accessible. Here, it says that the implementing controller service must make a method, String getProperty( String key ) available. Anything else the service does is immaterial: this method is the sole article in the contract. I have highlighted lines that are crucial to stuff in order to work.

PropertiesFileService.java:
package com.etretatlogiciels.nifi.controller.interfaces;

import org.apache.nifi.controller.ControllerService;

public interface PropertiesFileService extends ControllerService
{
  String getProperty( String key );
}

The controller service that extends PropertiesFileService just does that, implements what it does completely behind the contractual String getProperty( String key). Sure, onConfigured() does all the heavy lifting when called, but just sets up what String getProperty( String key ) will return when called by the consuming NiFi processor configured to use the controller service.

StandardPropertiesFileService.java:
package com.etretatlogiciels.nifi.controller;

import com.etretatlogiciels.nifi.controller.interfaces.PropertiesFileService;

@CapabilityDescription( "Provides a controller service to manage property files." )
public class StandardPropertiesFileService extends AbstractControllerService implements PropertiesFileService
{
  private String     configUri;
  private Properties properties = new Properties();

  @OnEnabled
  public void onConfigured( final ConfigurationContext context ) throws InitializationException
  {
    configUri = context.getProperty( CONFIG_URI ).getValue();
    ...
  }

  @Override public String getProperty( String key ) { return properties.getProperty( key ); }

  public static final PropertyDescriptor CONFIG_URI = new PropertyDescriptor.Builder()
      .name( "Configuration Directory" )
      .description( "Configuration directory for properties files." )
      .defaultValue( null )
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .build();
}
Processor.java:

This is the consuming processor. Note that it never references the controller service directly when it identifies the service nor when it gets the product of the service, but always and only the interface upon which the controller service is built:

package com.etretatlogiciels.nifi.processor;

import com.etretatlogiciels.nifi.controller.interfaces.PropertiesFileService;

@CapabilityDescription( "Fetch value from properties service." )
public class Processor extends AbstractProcessor
{
  @Override
  public void onTrigger( final ProcessContext context, final ProcessSession session ) throws ProcessException
  {
    final String                propertyName      = context.getProperty( PROPERTY_NAME ).getValue();
    final PropertiesFileService propertiesService = context.getProperty( PROPERTIES_SERVICE )
                                      .asControllerService( PropertiesFileService.class );
    final String                property          = propertiesService.getProperty( propertyName );
    ...
    session.transfer( flowfile, SUCCESS );
  }

  public static final PropertyDescriptor PROPERTY_NAME = new PropertyDescriptor.Builder()
      .name( "Property Name" )
      .required( true )
      .addValidator( StandardValidators.NON_EMPTY_VALIDATOR )
      .build();
  public static final PropertyDescriptor PROPERTIES_SERVICE = new PropertyDescriptor.Builder()
      .name( "Properties Service" )
      .description( "System properties loader" )
      .required( false )
      .identifiesControllerService( PropertiesFileService.class )
      .build();

  ...
}

Last, but far from least, the JUnit test...

Another challenge, equally difficult to coding a controller service is writing a test for one. The highlighted lines are those germane to testing the controller service and not other testing issues.

ProcessorTest.java:

Notice that, while a new instance of the controller service itself, StandardPropertiesFileService, is created, it's again the interface that's referenced and not the service class itself (line 13). So, when it's time to add the controller service to the test framework (runner) and to enable it, it's the interface reference that's used (line 26).

Note how the controller-service property is loaded ahead of time into a private hash map (line 16) referencing the static PropertyDescriptor in the service (there is no PropertyDescriptor in the interface) directly prior to the association of the properties map with the interface reference when the service is added to the test framework (line 26 as already noted).

Let's detail the three lines (26-28) that are of crucial importance. If you don't get these right, there are some rather cryptic errors that come out of the test framework.

  1. addControllerService() —the first argument is free, you can pass any string you like (but you should make it the same as the one passed as the last argument in line 28). The second argument is the interface of the instantiated controller service (from line 13). The third argument is the properties map (list) you contructed earlier. This is the only opportunity you have to establish the service and its properties and their values.
    Important note: if you have optional properties, make sure not to pass nulls. Simply do not include those properties in the list.
  2. enableControllerService() —this is just the interface of the instantiated controller service (from line 13).
  3. setProperty() —finally, you must inject the property (using its PropertyDescriptor) that is the controller service in the processor that's under test here. Its value is the made-up string from line 26.

The other properties shown explicitly set up in the framework (one, actually, on line 31), using setProperty(), are those of the processor being tested (and not of the controller service).

package com.imatsolutions.nifi.processor;

import com.imatsolutions.nifi.controller.StandardPropertiesFileService;
import com.imatsolutions.nifi.controller.interfaces.PropertiesFileService;

public class ProcessorTest
{

  @Test
  public void testOnTrigger() throws IOException, InitializationException
  {
    TestRunner            runner                      = TestRunners.newTestRunner( new Processor() );
    PropertiesFileService propertiesService           = new StandardPropertiesFileService();
    Map< String, String > propertiesServiceProperties = new HashMap<>();

    propertiesServiceProperties.put( StandardPropertiesFileService.RELOAD_INTERVAL.getName(), "30 sec" );

    URL url = this.getClass().getClassLoader().getResource( "test.properties" );
    assertNotNull( "Should not be null", url );

    String propFile = url.getFile();

    propertiesServiceProperties.put( StandardPropertiesFileService.CONFIG_URI.getName(), propFile );

    // Add controller service
    runner.addControllerService( "propertiesServiceTest", propertiesService, propertiesServiceProperties );
    runner.enableControllerService( propertiesService );
    runner.setProperty( Processor.PROPERTIES_SERVICE, "propertiesServiceTest" );

    // Add properties
    runner.setProperty( Processor.PROPERTY_NAME, "hello" );

    // Add the content to the runner
    runner.enqueue( "TEST".getBytes() );

    // Run the enqueued content, it also takes an int = number of contents queued
    runner.run( 1 );

    // All results were processed with out failure
    runner.assertQueueEmpty();

    // If you need to read or do additional tests on results you can access the content
    List< MockFlowFile > results = runner.getFlowFilesForRelationship( Processor.SUCCESS );
    assertTrue( "1 match", results.size() == 1 );
    MockFlowFile result = results.get( 0 );

    // Test attributes and content
    result.assertAttributeEquals( "property", "nifi.rocks.prop" );
  }
}

Thursday, 18 May 2017

Some notes on ExecuteScript, performance and Jython...

ExecuteScript to run a Python script that runs on a single flowfile despite having multiple flowfiles in the queue is very inefficient. The slowness is due to the Jython initialization time.

At the top of your Python script, put:

flowFiles = session.get( 10 )
for flowFile in flowFiles:
    if flowFile is None:
        continue
    # do stuff here...

Here are two sample scripts.

flowfiles = session.get( 10 )
for flowfile in flowfiles:
    if flowfile is None:
        continue
    s3_bucket = flowfile.getAttribute( 'job.s3_bucket' )
    s3_path   = flowfile.getAttribute( 'job.s3_path' )
    # More stuff here....
    errors = []
    # More stuff here...
    if len( errors ) > 0:
        flowfile = session.putAttribute( flowfile, 'job.error', ';'.join( errors ) )
        session.transfer( flowfile, REL_FAILURE )
    else:
        flowfile = session.putAttribute( flowfile, 'job.number_csv_files', str( len( matches ) ) )
        flowfile = session.putAttribute( flowfile, 'job.total_file_size', str(total_size ) )
        session.transfer( flowfile, REL_SUCCESS
)
import sys
import traceback
from java.nio.charset import StandardCharsets
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil


flowfiles = session.get( 10 )
for flowfile in flowfiles:
    if flowfile is None:
        continue
    start     = int( flowfile.getAttribute( 'range.start' ) )
    stop      = int( flowfile.getAttribute( 'range.stop' ) )
    increment = int( flowfile.getAttribute( 'range.increment' ) )
    for x in range( start, stop + 1, increment ):
        newFlowfile = session.clone( flowfile )
        newFlowfile = session.putAttribute( newFlowfile, 'current', str( x ) )
        session.transfer( newFlowfile, REL_SUCCESS )
        session.remove( flowfile )

Friday, 19 May 2017

NiFi cluster node identifier...

When a node joins a cluster it writes its node identifier into its local state. This is in a state directory under the NiFi installation, which is state/local, I think.

If that subdirectory is removed, the node will get a new identifier. If the subdirectory is left in place, the node gets that same identifier when the NiFi node is started again.

How to process files sequentially...

You have many files in some subdirectory. Let's say they're named file1.csv, file2.csv, etc. How to ensure they're processed sequentially?

There is a processor, EnforceOrder, available beginning in NiFi 1.2.0. Here's a (nice, if complex) flow that Koji Kawamura offers that solves this (and a number of other problems that might arise):

Note, however, that EnforceOrder suffers from needing the object of its counting mechanism to increment not only in-order, but also without skipping any counting values—not so much as even 1.

Wednesday, 24 May 2017

Test framework details...

Matt Burgess answered a question from me in the forum and gave some useful detail. I was struggling to do some Mockito work on a class instantiated in init().

A processor's init() method should be getting called at creation time, sic:

TestRunner runner = TestRunners( new MyProcessor() );

It calls Processor.initialize() and AbstractSessionFactoryProcessor's implementation calls its own protected init() method (which is usually what MyProcessor overrides). It also calls any methods that were annotated with @OnAdded and @OnConfigurationRestored. These could be split out if necessary, or organized just as TestRunner's methods are. TestRunner has some overloaded methods for run(), the base one is:

public void run( final int iterations, final boolean stopOnFinish, final boolean initialize, final long runWait );

If you call run() with no parameters, you'll get 1 iteration that initializes then stops on finish (with a 5 second run-wait). However specifying initialize as true will only invoke methods bearing an @OnScheduled annotation.

If you keep a reference to your instance of MyProcessor, you can call your @OnScheduled method explicitly (rather than via TestRunner), then perform your assertions, etc. Then if you want to run onTrigger() but you don't want to reinitialize, you can do:

runner.run( 1, true, false )

...which says to run once, stop on finish, and don't initialize.

There are a couple examples of manipulating the run() methods in CaptureChangeMySQLTest.groovy.

Thursday, 1 June 2017

Here's how to interact with a flowfile in both input- and output modes:

flowfile = session.write( flowfile, new StreamCallback()
{
  @Override
  public void process( InputStream in, OutputStream out ) throws IOException
  {
    try
    {
      // use InputStream in and OutputStream out at the same time...
    }
    finally
    {
      in.close();
      out.close();
    }
  }
});

Friday, 2 June 2017

Steps to keep original flowfile (for passing along) while creating a new flowfile with content too.

  1. FlowFile original = flowfile;
  2. AtomicReference< String > newContentHolder = new AtomicReference<>();
  3. Fill new content with:
    session.read( flowfile, new InputStreamCallback()
    {
      @Override
      public void process( InputStream existing ) throws IOException
      {
        newContentHolder.set( ... );
      }
    });
    
  4. flowfile = session.create( flowfile ); // (keeps attributes)
  5. Empty new content into new flowfile with:
    flowfile = session.write( flowfile, new OutputStreamCallback()
    {
      @Override
      public void process( OutputStream out ) throws IOException
      {
        out.write( newContentHolder.getBytes() );
      }
    });
    
  6. session.transfer( flowfile, SUCCESS );
  7. session.transfer( original, SUCCESS );

Unknown relationship at runtime...

You get this exception saying that your relationship is not known:

java.lang.IllegalArgumentException: this relationship relationship-name is not known

Did you forget one of these elements in your processor code?

  1. Define relationships (don't use any from some other processor's code):
    public static final Relationship relationship-name = new Relationship.Builder()
        ...
        .build();
    
  2. Collect relationships into a Set:
    @Override
    public void init( final ProcessorInitializationContext context )
    {
      Set< Relationship > relationships = new HashSet<>();
      relationships.add( relationship-name );
      this.relationships = Collections.unmodifiableSet( relationships );
    }
    
  3. Make available required method, getRelationships():
    @Override public Set< Relationship > getRelationships() { return relationships; }
    

Wednesday, 7 June 2017

Towards tighter control over viewing flowfile content...

Is there a way to know when a flowfile is looked at by someone in the UI?

We have flowfiles containing personal health data (PHI) which no one is supposed to see, but in the case where it's unavoidably crucial to take a look, for debugging or otherwise observing the functioning of a flow, we must know the extent of exposure for legal reasons.

Joe Witt answered,

"This is precisely why there is a DOWNLOAD event type in provenance. I recommend using that mechanism to track this. You can also register an authorizer which based on tags of the data and which user/entity is trying to access a given resource—whether they are allowed."

Thursday, 8 June 2017

NiFi's StateManager...

I need a very light amount of state management, only one variable. However, the basic functionality is to give you Map< String, String > as a map of all the values you want. To get one value, it's:

private String getValueFromStateManager( final ProcessContext context, final String key )
{
  StateMap map = context.getStateManager().getState( Scope.LOCAL );
  return( map != null ) ? map.get( key ) : null;
}

and, to manage the value, you have to juggle the entire map. Here, I replace it wholesale since I've only got one value I care about.

private void putLastTransactionNumber( final ProcessContext context, final String key, final String value )
    throws IOException
{
  Map< String, String > map = new HashMap<>( 1 );
  map.put( key, value );
  context.getStateManager().setState( map, Scope.LOCAL );
}

Friday, 16 June 2017

It seems useful to note, so it turns up in a search, how/why a transfer Relationship might not be specified. I've got to where this isn't hard for me to find, but it's something to note nevertheless for posterity.

Quite often, when a "transfer relationship not specified" error occurs, it's the result of a bug in error handling. For example, the flowfile is to be routed to failure for some reason, but the part of the code that does that has itself a problem and fails.

Wednesday, 21 June 2017

Load-balancing and remote -rocess groups...

Remote-process groups (RPG) handle balancing and failover for you. This is a comment by Bryan Bende.

There's no need to test the RPG; you shouldn't need to use, for example, the DistributeLoad processor. You should be able to have a GenerateFlowFile → RPG (with URL of any node) and then an input port going to some processor. The RPG figures out all of the nodes in the cluster and sends data to all of them.

If you go into the Remote Ports menu of the RPG, each port has some settings that control how many flowfiles get sent per transaction. Generally you will probably get a more even distribution with a smaller batch size, but you will get much better performance with a larger batch size. The RPG also factors in the number of flow files on each node when determining where to send them, so if node 1 is the primary node and has more flow files queued, then the RPG will likely send more flow files to node 2.

Thursday, 29 June 2017

Clustering and Zookeeper...

One reason to run an external Zookeeper in production mode is to prevent trouble arising when Zookeeper cannot establish a "quorum." This will keep a two-node cluster from starting up, for instance.

Keeping in the embedded Zookeeper nevertheless, to start the cluster, remove the server.2 line from zookeeper.properties on node 1; on node 2, set the nifi.state.management.zookeeper.start property (in nifi.properties) to false. Thereafter, as long as node 1 is started first, node 2 will start also.

The scenario above is obviated by an external Zookeeper or by adding a third node to the cluster.

NiFi processors blocking subsequent flowfiles after a failure...

Note that, unless coded to do so, a processor doesn't just stop working because a flowfile running through it happened to produce a failure. Others in the queue behind it will flow when they get their turn.

The processor will retry the failed flowfiles (whose original is left behind in the queue) after a penalty period of 30 seconds. This value is configurable in the processor's Settings → Penalty Duration.

Meanwhile, other, "innocent" files continue to make their way through the processor. The processor isn't summarily halted unless it's coded to halt at first failure.

Tuesday, 4 July 2017

The new NiFi registry...

Reminder...

In conf/nifi.properties, there is a variable, nifi.variable.registry.properties, that supports a comma-delimited list of file locations. See notes, Friday, 24 March 2017.

Wednesday, 5 July 2017

Just a note...

NiFi's conf subdirectory is not on any processor's classpath.

Friday, 7 July 2017

NiFi CLASSPATH...

The conf subdirectory is on the class path of the system class loader when NiFi is launched. The class hierarchy is something like this:

  1. System class loader (conf subdirectory and JAR files in the lib subdirectory)
  2. Jetty NAR class loader (parent is sytem class loader)
  3. (for instance:) Geomesa NAR class loader (parent is Jetty class loader)

The classpath of the system class load should be what you see in logs/nifi-bootstrap.log for the command that launched NiFi.

Wednesday, 19 July 2017

NiFi behind a reverse proxy (webserver)

How to set up httpd/conf.d/nifi.conf (or /etc/apache2/sites-available/nifi.conf to run behind a reverse proxy server:

ProxyPass        / http://hostname or address:8080/
ProxyPassReverse / http:/hostname or address:8080/

RequestHeader add X-ProxyScheme      "http"
RequestHeader add X-ProxyHost        "hostname or address"
RequestHeader add X-ProxyPort        "80"
RequestHeader add X-ProxyContextPath "/"

Thursday, 20 July 2017

NiFi support for large data sets...

From Joe Witt...

NiFi can handle very small and very large datasets very well today. It has always been the case with NiFi that it reads data from its content repository using input streams and writes content to its content repository using output streams. Because of this, developers building processors can design components which pull in 1-byte objects just as easily as it can pull in 1Gb objects (or much more). The content does not ever have to be held in memory in whole except in those cases where a given component is not coded to take advantage of this.

Consider a process which pulls data via SFTP or HTTP or some related protocol. NiFi pull data from such an endpoint is streaming the data over the network and writing it directly to disk. We're never holding that whole object in some byte[] in memory. Similarly, when large objects move from processor to processor we're just moving pointers.

In fact, NiFi can even handle cases like merging content together very well for the same reason. We can accumulate a bunch of source content/flowfiles into a single massive output and do so having never held it all in memory at once.

Now, where can memory trouble happen? It can happen when the number of flowfile objects (the metadata about them—not the content) are held in memory at once. These can accumulate substantially in certain cases like splitting data for example. Consider an input CSV file with 1,000,000 lines. One might want individual lines so they can run specific per event processes on it. This can be accomplished in a couple ways.

  1. They can use SplitText to split into single lines. We'll almost surely run out of heap since we're making 1,000,000 flowfile objects. Avoid this option.
  2. They can use SplitText first splitting into say 1,000 lines at a time. This will produce 1,000 output flowfiles each with 1,000 lines in it. They can then do another split text which splits to single lines. This way no single session will ever have more than 1,000 splits in it and things will work great. This combined with backpressure and provenance works extremely well.
  3. They can use the new record reader/writer processors which support really powerful and common patterns in a format and schema aware manner and which makes setup really easy and reusable. In fact, this approach in many cases means they dont even need to split up the data since the record-oriented processors will know how to demarcate events "as they exist in their source form." Huge performance gains and usability improvement here.

Monday, 24 July 2017

Kafka

Been looking at Kafka of late. Other than a missing, nice user interface like NiFi's, what is the real distinction? The one is message-queueing (Kafka) and the other a flow-control framework (NiFi).

If NiFi is providing all that's needed then Kafka isn't prescribed. However, Kafka offers an important capability in a broad, enterprise-requirement sense, the provision of a highly durable and replayable buffer of insertion ordered data for efficient access.

Wednesday, 26 July 2017

Jetty failure to start: org.jasypt.exceptions.EncryptionOperationNotPossibleException

We saw this once:

2017-07-25 23:23:31,148 WARN [main] org.apache.nifi.web.server.JettyServer
Failed to start web server... shutting down.
org.apache.nifi.encrypt.EncryptionException:
org.jasypt.exceptions.EncryptionOperationNotPossibleException
    at
org.apache.nifi.encrypt.StringEncryptor.decrypt(StringEncryptor.java:149)
~[nifi-framework-core-1.1.2.jar:1.1.2]
    at
org.apache.nifi.controller.serialization.FlowFromDOMFactory.decrypt(FlowFromDOMFactory.java:474)
~[nifi-framework-core-1.1.2.jar:1.1.2]
.
.
.

The problem solution was found based on a quickly answered suggestion by Mark Payne:

Our Ansible instructions upgraded NiFi and created a new nifi.sensitive.props.key. In nifi.properties this property, if extant, is used to encrypt sensitive properties in flow.xml.gz. Thus, upon relaunching NiFi, the wrong key was used to decrypt resulting in the reported failure to start, flow.xml.gz is no longer useful.

How did we solve it?

We looked in the nifi.properties.rpmsave file, what RPM does with a file it's changed, and copied the old key from this property to paste in over the newly generated key in nifi.properties. Relaunched, NiFi worked with no problem. The full solution, in our case, is to insist in Ansible that it not generate for and replace nifi.sensitive.props.key with a new key.