An Apache Kafka and Apache NiFi Experience

Russell Bateman
January 2023

This is an initial pump-priming exercise in using Apache NiFi to interact with Kafka. I'm using Apache NiFi 1.19.1 and Apache Kafka 2.13-3.3.2.

  1. Create a place to play.
    russ@tirion ~/dev/ $ mkdir kafka
    russ@tirion ~/dev/ $ cd kafka
    
  2. We're following the instructions at How to Install Apache Kafka on Linux?.
  3. Download Apache Kafka from here.
  4. Explode Kafka and verify Java version because Zookeeper cares about that.
    russ@tirion ~/dev/kafka $ tar -zxf kafka_2.13-3.3.2.tgz
    russ@tirion ~/dev/kafka $ ll
    russ@tirion ~/dev/kafka $ rm *.tgz
    russ@tirion ~/dev/kafka $ java --version
    openjdk 11.0.17 2022-10-18
    OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu220.04)
    OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu220.04, mixed mode, sharing)
    
  5. Create a link to Kafka's distro subdirectory because this simplifies the monotonous typing.
    russ@tirion ~/dev/kafka $ ln -s kafka_2.13-3.3.2 kafka
    russ@tirion ~/dev/kafka $ ll
    total 12
    drwxrwxr-x  3 russ russ 4096 Jan 27 14:35 .
    drwxrwxr-x 59 russ russ 4096 Jan 27 14:32 ..
    lrwxrwxrwx  1 russ russ   16 Jan 27 14:35 kafka -> kafka_2.13-3.3.2
    drwxr-xr-x  8 russ russ 4096 Jan 27 14:36 kafka_2.13-3.3.2
    
  6. Now we launch Zookeeper...
    russ@tirion ~/dev/kafka $ ./kafka/bin/zookeeper-server-start.sh ./kafka/config/zookeeper.properties
    [2023-01-27 14:52:18,116] INFO Reading configuration from: ./kafka/config/zookeeper.properties \
                                   (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    [2023-01-27 14:52:18,121] INFO clientPortAddress is 0.0.0.0:2181 (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...
    
  7. ...and then Kafka!
    russ@tirion ~/dev/kafka $ ./kafka/bin/kafka-server-start.sh ./kafka/config/server.properties
    [2023-01-27 14:54:02,744] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2023-01-27 14:54:02,970] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation \
                                   (org.apache.zookeeper.common.X509Util)
    [2023-01-27 14:54:03,031] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
    [2023-01-27 14:54:03,032] INFO starting (kafka.server.KafkaServer)
    [2023-01-27 14:54:03,032] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
    [2023-01-27 14:54:03,041] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
    [2023-01-27 14:54:03,045] INFO Client environment:zookeeper.version=3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, \
                                   built on 04/08/2021 16:35 GMT (org.apache.zookeeper.ZooKeeper)
    [2023-01-27 14:54:03,045] INFO Client environment:host.name=tirion (org.apache.zookeeper.ZooKeeper)
    [2023-01-27 14:54:03,045] INFO Client environment:java.version=11.0.10 (org.apache.zookeeper.ZooKeeper)
    [2023-01-27 14:54:03,045] INFO Client environment:java.vendor=AdoptOpenJDK (org.apache.zookeeper.ZooKeeper)
    [2023-01-27 14:54:03,045] INFO Client environment:java.home=/home/russ/dev/jdk-11.0.10+9 (org.apache.zookeeper.ZooKeeper)
    ...
    [2023-01-27 14:54:03,968] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, \
                                   from now on will use node tirion:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
    

Apache NiFi flow "Write to Kafka"

  1. Set up flow with GenerateFlowFile creating a FHIR bundle as if from Mirth.
  2. Add PublishKafka_2_6 configured with topic, FHIR.
  3. Connect the two with the success relationship from GenerateFlowFile. The only property I set on this processor was Topic Name (to FHIR). The rest remain at their defaults.
  4. Set up two NoOp processors after PublishKafka_2_6 and connect the Kafka processor's success to the first NoOp and its failure to the second NoOp processor.
  5. Should see (nominally, i.e.: the successful path):
     +------------------+          +------------------+          +------+
     | GenerateFlowFile |  -----→  | PublishKafka_2_6 |  -----→  | NoOp |
     +------------------+          +------------------+          +------+
    
  6. Start then immediately stop GenerateFlowFile. The Mirth/FHIR record is put into the queue between GenerateFlowFile and PublishKafka_2_6.
  7. Start then stop PublishKafka_2_6.

Run from command line to see if the message arrived in Kafka

russ@tirion ~/dev/kafka/kafka $ ./bin/kafka-console-consumer.sh --topic FHIR --from-beginning --bootstrap-server localhost:9092
<record>
<HL7>
<Bundle xmlns="http://hl7.org/fhir">
   <type value="transaction"></type>
   <entry>
      <fullUrl value="urn:uuid:5cbc121b-cd71-4428-b8b7-31e53eba8184"></fullUrl>
      <resource>
         <Patient xmlns="http://hl7.org/fhir">
            <text>
               <status value="generated"></status>
               .
               .
               .
               </amount>
            </payment>
         </ExplanationOfBenefit>
      </resource>
      <request>
         <method value="POST"></method>
         <url value="ExplanationOfBenefit"></url>
      </request>
   </entry>
</Bundle>
</HL7>
<MessageReceived>2021-01-13 21:44:41.0</MessageReceived>
<MessageDeliveredtoMariaDB>2021-01-13 21:44:33.0</MessageDeliveredtoMariaDB>
<MessageTransactionId>13fb854b-b6da-4bf9-9738-ba06ce41735a~8</MessageTransactionId>
<MessageId>1</MessageId>
<MessageInterfaceId>IB_Stage_Bundle</MessageInterfaceId>
<MessageStatus>PR</MessageStatus>
<MessageOrganizationId>HeC</MessageOrganizationId>
<MessageMimeType>XML</MessageMimeType>
<MessageStandardVersion>R4</MessageStandardVersion>
</record>

I think that, as soon as this command-line utility is run, a flowfile appears in the success queue between PublishKafka_6_2 and the NoOp processor, but maybe it was there already as soon as I started PublishKafka_2_6.

Apache NiFi flow "Read from Kafka"

  1. Set up ConsumeKafka_2_6 configure with topic, FHIR, and Group ID, MIRTH. These are the only properties that do not remain at their defaults.
  2. Set up a NoOp processor after ConsumeKafka_2_6 and connect the success relationship to NoOp.
  3. Should see (nominally):
     +------------------+          +------+
     | ConsumeKafka_2_6 |  -----→  | NoOp |
     +------------------+          +------+
    
  4. Start and stop the Kafka processor. No flowfile is passed to the queue because either

Let the learning commence!

Let's check out How to Read and Write to Apache Kafka with Apache NiFi to see if it can tell us what we did wrong.

Conclusions

  1. It appears the Kafka consumer processor works fine. I just had to start it, then send another flowfile through the publisher.
  2. Also, I played around with the Offset Reset propery between values latest (the default) and earliest. Because of how simple my flow is and how I'm only firing one message at a time from GenerateFlowFile, this setting seems to make no difference, but I think I know what would happen if I added some latency or swamped the consumer with messages (and it picked the latest).
  3. It seems that Group ID can be a string rather than an integer, not that this makes a big deal, but for debugging, I'm less wearied by strings than numbers.
  4. I read somewhere that there's are topic-naming conventions. I'm not stressed in my non-conformance given the infantile undertaking here.