MapReduce & ForkJoin
Execute MapReduce and ForkJoin tasks in memory.
IComputeTask
is the Ignite .NET abstraction for the simplified in-memory MapReduce, which is also very close to ForkJoin paradigm. Pure MapReduce was never built for performance and only works well when dealing with off-line batch oriented processing (e.g. Hadoop MapReduce). However, when computing on data that resides in-memory, real-time low latencies and high throughput usually take the highest priority. Also, simplicity of the API becomes very important as well. With that in mind, Ignite introduced the IComputeTask
API, which is a light-weight MapReduce (or ForkJoin) implementation.
Use
IComputeTask
only when you need fine-grained control over the job-to-node mapping, or custom fail-over logic. For all other cases you should use simple closure executions on the cluster documented in Distributed Computations section.
IComputeTask
IComputeTask<,,>
defines jobs to execute on the cluster, and the mappings of those jobs to nodes. It also defines how to process (reduce) the job results. All ICompute.Execute(...)
methods execute the given task on the grid. User applications should implement Map(...)
and Reduce(...)
methods of IComputeTask interface.
Tasks are defined by implementing the 2 or 3 methods on IComputeTask
interface
Map Method
Method Map(...)
instantiates the jobs and maps them to worker nodes. The method receives the collection of cluster nodes on which the task is run and the task argument. The method should return a map with jobs as keys and mapped worker nodes as values. The jobs are then sent to the mapped nodes and executed there.
Refer to ComputeTaskSplitAdapter for simplified implementation of the
Map(...)
method.
OnResult Method
Method OnResult(...)
is called each time a job completes on some cluster node. It receives the result returned by the completed job, as well as the list of all the job results received so far. The method should return a ComputeJobResultPolicy
instance, indicating what to do next:
Wait
- wait for all remaining jobs to complete (if any)Reduce
- immediately move to reduce step, discarding all the remaining jobs and unreceived yet resultsFailover
- failover the job to another node (see Fault Tolerance)
All the received job results will be available in thereduce(...)
method as well.
Reduce Method
Method Reduce(...)
is called on reduce step, when all the jobs have completed (or Reduce result policy was returned from the OnResult(...)
method). The method receives a list with all the completed results and should return a final result of the computation.
Compute Task Adapters
It is not necessary to implement all 3 methods of the IComputeTask
API each time you need to define a computation. There is a number of helper classes that let you describe only a particular piece of your logic, leaving out all the rest to Ignite to handle automatically.
ComputeTaskAdapter
ComputeTaskAdapter
defines a default implementation of the OnResult(...)
method which returns Failover
policy if a job threw an exception and Wait
policy otherwise, thus waiting for all jobs to finish with a result.
ComputeTaskSplitAdapter
ComputeTaskSplitAdapter
extends ComputeTaskAdapter
and adds capability to automatically assign jobs to nodes. It hides the Map(...)
method and adds a new Split(...)
method in which user only needs to provide a collection of the jobs to be executed (the mapping of those jobs to nodes will be handled automatically by the adapter in a load-balanced fashion).
This adapter is especially useful in homogeneous environments where all nodes are equally suitable for executing jobs and the mapping step can be done implicitly.
IComputeJob
All jobs that are spawned by a task are implementations of the IComputeJob
interface. The Execute()
method of this interface defines the job logic and should return a job result. The Cancel()
method defines the logic in case if the job is discarded (for example, in case when task decides to reduce immediately or to cancel).
ComputeJobAdapter
Convenience adapter which provides a no-op implementation of the Cancel()
method.
Example
Here is an example of IComputeTask
and IComputeJob
implementations.
void CountChars()
{
using (var ignite = Ignition.Start())
{
int charCount = ignite.GetCompute().Execute(new CharCountTask(), "Hello Grid Enabled World!");
Console.WriteLine(">>> Total number of characters in the phrase is " + charCount);
}
}
class CharCountTask : ComputeTaskSplitAdapter<string, int, int>
{
public override int Reduce(IList<IComputeJobResult<int>> results)
{
return results.Sum(res => res.Data);
}
protected override ICollection<IComputeJob<int>> Split(int gridSize, string arg)
{
// 1. Splits the received string into words
// 2. Creates a job for each word
return arg.Split(' ').Select(word => new CharCountJob {Word = word}).ToList<IComputeJob<int>>();
}
}
class CharCountJob : ComputeJobAdapter<int>
{
public string Word { get; set; }
public override int Execute()
{
return Word.Length;
}
}
void CountChars()
{
using (var ignite = Ignition.Start())
{
int charCount = ignite.GetCompute().Execute(new CharCountTask(), "Hello Grid Enabled World!");
Console.WriteLine(">>> Total number of characters in the phrase is " + charCount);
}
}
class CharCountTask : ComputeTaskAdapter<string, int, int>
{
public override IDictionary<IComputeJob<int>, IClusterNode> Map(IList<IClusterNode> subgrid, string arg)
{
int nodeIdx = 0;
// 1. Splits the received string into words
// 2. Creates a job for each word
// 3. Distributes created jobs between nodes for processing.
return arg.Split(' ').Select(word => (IComputeJob<int>) new CharCountJob {Word = word}).ToDictionary(job => job, job => subgrid[nodeIdx++ % subgrid.Count]);
}
public override int Reduce(IList<IComputeJobResult<int>> results)
{
return results.Sum(res => res.Data);
}
}
private class CharCountJob : ComputeJobAdapter<int>
{
public string Word { get; set; }
public override int Execute()
{
return Word.Length;
}
}
Updated over 5 years ago