Continuous Queries

Continuously obtain real-time query results.

❗️

This is a legacy Apache Ignite documentation

The new documentation is hosted here: https://ignite.apache.org/docs/latest/

Continuous Queries

Continuous queries enable you to monitor data modifications occurring on Ignite caches. Once a continuous query is started, you will get notified of all the data changes that fall into your query filter if any.

Continuous queries functionality is available via the ContinuousQuery class, elaborated upon below.

❗️

Continuous Queries and MVCC

Continuous queries have a number of functional limitations when used with MVCC-enabled caches.

Initial Query

Whenever a continuous query is prepared for execution, you have an option to specify an initial query that is executed before the continuous query gets registered in the cluster and before you start to receive the updates.

The initial query can be set via the ContinuousQuery.setInitialQuery(Query) method and can be of any query type: Scan, SQL , or TEXT.

Remote Filter

This filter is executed on primary and backup nodes for a given key and evaluates whether an update should be propagated as an event to the query's local listener.

If the filter returns true, then the local listener will be notified. Otherwise, the notification will be skipped. Updates filtering on specific primary and backup nodes, on which they occur, reduces unnecessary network traffic between primary/backup nodes and local listeners executed on the application side.

A remote filter can be set via the ContinuousQuery.setRemoteFilter(CacheEntryEventFilter<K, V>) method.

Local Listener

When a cache gets modified (an entry is inserted, updated, or deleted), an event is sent to the continuous query's local listener so that your application can react accordingly.

Whenever events pass through the remote filter, they will be sent to the client to notify the local listener there.
The local listener is set via the ContinuousQuery.setLocalListener(CacheEntryUpdatedListener<K, V>) method.

IgniteCache<Integer, String> cache = ignite.cache("mycache");

// Creating a continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

// Setting an optional initial query. 
// The query will return entries for the keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>((k, v) -> k > 10));

// Local listener that is called locally when an update notification is received.
qry.setLocalListener((evts) -> 
	evts.forEach(e -> System.out.println("key=" + e.getKey() + ", val=" + e.getValue())));

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the local listener.
qry.setRemoteFilter(e -> e.getKey() > 10);

// Executing the query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterating over existing data stored in cache.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Adding a few more cache entries.
  // As a result, the local listener above will be called.
  for (int i = 5; i < 15; i++)
    cache.put(i, Integer.toString(i));
}
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

// // Creating a continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

// Setting an optional initial query. 
// The query will return entries for the keys greater than 10.
qry.setInitialQuery(new ScanQuery<Integer, String>(
  new IgniteBiPredicate<Integer, String>() {
  @Override public boolean apply(Integer key, String val) {
    return key > 10;
  }
}));

// Local listener that is called locally when an update notification is received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
  @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
    for (CacheEntryEvent<Integer, String> e : evts)
      System.out.println("key=" + e.getKey() + ", val=" + e.getValue());
  }
});

// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the local listener.
qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() {
  @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
    return e.getKey() > 10;
  }
});

// Execute query.
try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) {
  // Iterating over existing data stored in cache.
  for (Cache.Entry<Integer, String> e : cur)
    System.out.println("key=" + e.getKey() + ", val=" + e.getValue());

  // Adding a few more cache entries.
  // As a result, the local listener above will be called.
  for (int i = keyCnt; i < keyCnt + 10; i++)
    cache.put(i, Integer.toString(i));
}

Remote Transformer

By default, continuous queries send the whole updated object to listeners running on the application side. This can lead to excessive network usage, especially if a transferred object is very large. Moreover, applications frequently only need a subset of the fields of the updated object rather the whole thing.

To address these cases, you can use ContinuousQueryWithTransformer to set a custom transformer factory that is executed on remote nodes for every updated object. The transformer will only send results of the final transformation to the listeners.​

// Create a new continuous query with the transformer.
ContinuousQueryWithTransformer qry = new ContinuousQueryWithTransformer();

// Factory to create transformers.
Factory factory = FactoryBuilder.factoryOf(
    // Return one field of a complex object.
    // Only this field will be sent over to a local listener over the network.
    (IgniteClosure<CacheEntryEvent, String>)
        event -> ((Organization)event.getValue()).name());

qry.setRemoteTransformerFactory(factory);

// Listener that will receive transformed data.
qry.setLocalListener(names -> {
    for (Object name : names)
        System.out.println("New organization name: " + name);
});

Examples of usage of ContinuousQueryWithTransformer can be found on GitHub.

Events Delivery Guarantees

Continuous query implementation guarantees exactly once delivery of an event to the client's local listener.

This is possible because every backup node maintains an update queue in addition to the primary node. If the primary node crashes or the topology is changed for some other reason, then every backup node flushes the content of its internal queue to the client, making sure that there is no event that is not delivered to the client's local listener.

To avoid duplicate notifications, in cases when all backup nodes flush their queues to the client, Ignite manages a per-partition update counter. Once an entry in some partition is updated, a counter for this partition is incremented on both the primary and backup. The value of this counter is also sent along with the event notification to the client, which also maintains the copy of this mapping. If the client receives an update with a counter that is lower than the one in its local map, this update is treated as a duplicate and discarded.

Once the client confirms that an event is received, the primary and backup nodes remove the record for this event from their backup queues.

Example

A complete example demonstrating the usage of continuous queries is delivered as a part of every Apache Ignite distribution and named CacheContinuousQueryExample. The example is available in GitHub as well.