public class NiFiReceiver extends org.apache.spark.streaming.receiver.Receiver<NiFiDataPacket>
The NiFiReceiver
is a Reliable Receiver that provides a way to
pull data from Apache NiFi so that it can be processed by Spark Streaming.
The NiFi Receiver connects to NiFi instance provided in the config and
requests data from the OutputPort that is named. In NiFi, when an OutputPort
is added to the root process group, it acts as a queue of data for remote
clients. This receiver is then able to pull that data from NiFi reliably.
It is important to note that if pulling data from a NiFi cluster, the URL that should be used is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes in that cluster and pull from those nodes as appropriate.
In order to use the NiFiReceiver, you will need to first build a
SiteToSiteClientConfig
to provide to the constructor. This can be
achieved by using the SiteToSiteClient.Builder
. Below is an example
snippet of driver code to pull data from NiFi that is running on
localhost:8080. This example assumes that NiFi exposes and OutputPort on the
root group named "Data For Spark". Additionally, it assumes that the data
that it will receive from this OutputPort is text data, as it will map the
byte array received from NiFi to a UTF-8 Encoded string.
Pattern SPACE = Pattern.compile(" ");
// Build a Site-to-site client config
SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
.setUrl("http://localhost:8080/nifi")
.setPortName("Data For Spark")
.buildConfig();
SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
// Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
// specified Port
JavaReceiverInputDStream<NiFiDataPacket> packetStream =
ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
// Map the data from NiFi to text, ignoring the attributes
JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
public String call(final NiFiDataPacket dataPacket) throws Exception {
return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
}
});
// Split the words by spaces
JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(final String text) throws Exception {
return Arrays.asList(SPACE.split(text));
}
});
// Map each word to the number 1, then aggregate by key
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
// print the results
wordCounts.print();
ssc.start();
ssc.awaitTermination();
Constructor and Description |
---|
NiFiReceiver(SiteToSiteClientConfig clientConfig,
org.apache.spark.storage.StorageLevel storageLevel) |
Modifier and Type | Method and Description |
---|---|
void |
onStart() |
void |
onStop() |
public NiFiReceiver(SiteToSiteClientConfig clientConfig, org.apache.spark.storage.StorageLevel storageLevel)
public void onStart()
onStart
in class org.apache.spark.streaming.receiver.Receiver<NiFiDataPacket>
public void onStop()
onStop
in class org.apache.spark.streaming.receiver.Receiver<NiFiDataPacket>