Continuous Queries

Continuously obtain real-time query results.

Continuous queries are good for cases where you want to execute a query and then continue to get notified about the data changes that match your query filter.

Continuous queries are supported via the ContinuousQuery class and the method ICache.QueryContinuous, which support the following:

Initial Query

When executing a continuous query, you have the option to execute an initial query before starting to listen to updates. The initial query can be set via the initialQry parameter and can be of any Ignite query type, Scan, SQL, or TEXT.

Remote Filter

This filter is executed on the primary node for a given key and evaluates whether the event should be propagated to the listener. If the filter returns true, then the listener will be notified, otherwise the event will be skipped. Filtering events on the node on which they have occurred minimizes unnecessary network traffic for listener notifications. The remote filter can be set via the ContinuousQuery.Filter property.

Local Listener

Whenever events match the remote filter, they will be sent to the client to notify the local listener there. The local listener is set via the ContinuousQuery.Listener property.

var cache = ignite.GetOrCreateCache<int, string>("mycache");

// Callback that is called locally when update notifications are received.
var localListener = new LocalListener();

// Create new continuous query.
var qry = new ContinuousQuery<int, string>(localListener)
{
    // This filter will be evaluated remotely on all nodes.
    // Entry that pass this filter will be sent to the caller.
    Filter = new RemoteFilter()
};

// Optional initial query to select all keys greater than 10.
var initialQry = new ScanQuery<int, string>(new InitialFilter());

using (var queryHandle = cache.QueryContinuous(qry, initialQry))
{
    // Iterate through existing data stored in cache.
    foreach (var entry in queryHandle.GetInitialQueryCursor())
        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);

    // Add a few more keys and watch a few more query notifications.
    for (int i = 5; i < 15; i++)
        cache.Put(i, i.ToString());
}