@EventDriven @SupportsBatching @Tags(value={"map","cache","wait","hold","distributed","signal","release"}) @InputRequirement(value=INPUT_REQUIRED) @CapabilityDescription(value="Routes incoming FlowFiles to the \'wait\' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the \'success\' relationship, with attributes copied from the FlowFile that produced the release signal from the Notify processor. The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to \'expired\' if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the \'Target Signal Count\' property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the \'original\' relationship to a Wait processor, and the \'splits\' relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the \'${fragment.identifier}\' as the value of \'Release Signal Identifier\', and specify \'${fragment.count}\' as the value of \'Target Signal Count\' in the Wait processor.") @WritesAttribute(attribute="wait.start.timestamp",description="All FlowFiles will have an attribute \'wait.start.timestamp\', which sets the initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.") @WritesAttribute(attribute="wait.counter.<counterName>",description="If a signal exists when the processor runs, each count value in the signal is copied.") @SeeAlso(classNames={"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService","org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer","org.apache.nifi.processors.standard.Notify"}) public class Wait extends AbstractProcessor
Modifier and Type | Field and Description |
---|---|
static AllowableValue |
ATTRIBUTE_COPY_KEEP_ORIGINAL |
static PropertyDescriptor |
ATTRIBUTE_COPY_MODE |
static AllowableValue |
ATTRIBUTE_COPY_REPLACE |
static PropertyDescriptor |
DISTRIBUTED_CACHE_SERVICE |
static PropertyDescriptor |
EXPIRATION_DURATION |
static Relationship |
REL_EXPIRED |
static Relationship |
REL_FAILURE |
static Relationship |
REL_SUCCESS |
static Relationship |
REL_WAIT |
static PropertyDescriptor |
RELEASABLE_FLOWFILE_COUNT |
static PropertyDescriptor |
RELEASE_SIGNAL_IDENTIFIER |
static PropertyDescriptor |
SIGNAL_COUNTER_NAME |
static PropertyDescriptor |
TARGET_SIGNAL_COUNT |
static PropertyDescriptor |
WAIT_BUFFER_COUNT |
static PropertyDescriptor |
WAIT_MODE |
static AllowableValue |
WAIT_MODE_KEEP_IN_UPSTREAM |
static AllowableValue |
WAIT_MODE_TRANSFER_TO_WAIT |
static java.lang.String |
WAIT_START_TIMESTAMP |
Constructor and Description |
---|
Wait() |
Modifier and Type | Method and Description |
---|---|
java.util.Set<Relationship> |
getRelationships() |
protected java.util.List<PropertyDescriptor> |
getSupportedPropertyDescriptors()
Allows subclasses to register which property descriptor objects are
supported.
|
void |
onTrigger(ProcessContext context,
ProcessSession session) |
onTrigger
getControllerServiceLookup, getIdentifier, getLogger, getNodeTypeProvider, init, initialize, isConfigurationRestored, isScheduled, toString, updateConfiguredRestoredTrue, updateScheduledFalse, updateScheduledTrue
customValidate, equals, getPropertyDescriptor, getPropertyDescriptors, getSupportedDynamicPropertyDescriptor, hashCode, onPropertyModified, validate
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
getPropertyDescriptor, getPropertyDescriptors, onPropertyModified, validate
public static final java.lang.String WAIT_START_TIMESTAMP
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE
public static final PropertyDescriptor RELEASE_SIGNAL_IDENTIFIER
public static final PropertyDescriptor TARGET_SIGNAL_COUNT
public static final PropertyDescriptor SIGNAL_COUNTER_NAME
public static final PropertyDescriptor WAIT_BUFFER_COUNT
public static final PropertyDescriptor RELEASABLE_FLOWFILE_COUNT
public static final PropertyDescriptor EXPIRATION_DURATION
public static final AllowableValue ATTRIBUTE_COPY_REPLACE
public static final AllowableValue ATTRIBUTE_COPY_KEEP_ORIGINAL
public static final PropertyDescriptor ATTRIBUTE_COPY_MODE
public static final AllowableValue WAIT_MODE_TRANSFER_TO_WAIT
public static final AllowableValue WAIT_MODE_KEEP_IN_UPSTREAM
public static final PropertyDescriptor WAIT_MODE
public static final Relationship REL_SUCCESS
public static final Relationship REL_FAILURE
public static final Relationship REL_WAIT
public static final Relationship REL_EXPIRED
protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors()
AbstractConfigurableComponent
getSupportedPropertyDescriptors
in class AbstractConfigurableComponent
public java.util.Set<Relationship> getRelationships()
getRelationships
in interface Processor
getRelationships
in class AbstractSessionFactoryProcessor
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException
onTrigger
in class AbstractProcessor
ProcessException