Collocate Compute and Data

Collocate your computations with the data.

❗️

This is a legacy Apache Ignite documentation

The new documentation is hosted here: https://ignite.apache.org/docs/latest/

Overview

Collocation of computations with data allows for minimizing data serialization within the network and can significantly improve performance and scalability of your application. Whenever possible, you should always try to collocate your computations with the cluster nodes that store the data that needs to be processed.

Affinity Call and Run Methods

The affinityCall(...) and affinityRun(...) methods co-locate jobs with nodes on which data is cached. In other words, knowing a cache name and affinity key, these methods will be able to find the node that is the primary for the given key and will execute a job there.

📘

Consistency Guarantee

Starting with Apache Ignite 1.8 it's guaranteed that the partition to which the affinity key belongs will not be evicted from a node while a job, triggered by affinityCall(...) or affinityRun(...), is being executed there. The partition rebalancing usually happens due to the topology change event when a new node joins the cluster or the old one leaves it.

This guarantee makes it feasible to execute complex logic where it's crucial that the data stays on the same node throughout the time the job is being executed there. For instance, this feature allows executing local SQL queries as a part of the job, triggered by affinityCall(...) or affinityRun(...), not worrying about the fact that the local query may return a partial result set due to the data rebalancing.

IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

IgniteCompute compute = ignite.compute();

for (int key = 0; key < KEY_CNT; key++) {
    // This closure will execute on the remote node where
    // data with the 'key' is located.
    compute.affinityRun(CACHE_NAME, key, () -> { 
        // Peek is a local memory lookup.
        System.out.println("Co-located [key= " + key + ", value= " + cache.localPeek(key) +']');
    });
}
IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

IgniteCompute asyncCompute = ignite.compute().withAsync();

List<IgniteFuture<?>> futs = new ArrayList<>();

for (int key = 0; key < KEY_CNT; key++) {
    // This closure will execute on the remote node where
    // data with the 'key' is located.
    asyncCompute.affinityRun(CACHE_NAME, key, () -> { 
        // Peek is a local memory lookup.
        System.out.println("Co-located [key= " + key + ", value= " + cache.peek(key) +']');
    });
  
    futs.add(asyncCompute.future());
}

// Wait for all futures to complete.
futs.stream().forEach(IgniteFuture::get);
final IgniteCache<Integer, String> cache = ignite.cache(CACHE_NAME);

IgniteCompute compute = ignite.compute();

for (int i = 0; i < KEY_CNT; i++) {
    final int key = i;
 
    // This closure will execute on the remote node where
    // data with the 'key' is located.
    compute.affinityRun(CACHE_NAME, key, new IgniteRunnable() {
        @Override public void run() {
            // Peek is a local memory lookup.
            System.out.println("Co-located [key= " + key + ", value= " + cache.peek(key) +']');
        }
    });
}

📘

Both affinityCall(...) or affinityRun(...) have overloaded versions of the methods that allow locking a partition, avoiding its eviction during the job's execution, across several caches. All you need to do is to pass the names of the caches into the aforementioned methods.