Checkpointing

❗️

This is a legacy Apache Ignite documentation

The new documentation is hosted here: https://ignite.apache.org/docs/latest/

Overview

Checkpointing provides the ability to save an intermediate job state. It can be useful when long running jobs need to store some intermediate state to protect from node failures. Then upon restart of a failed node, a job would load the saved checkpoint and continue from where it left off. To enable job checkpoint states, implement the java.io.Serializable interface.

Checkpoints are available through the following methods on the GridTaskSession interface:

  • ComputeTaskSession.loadCheckpoint(String)
  • ComputeTaskSession.removeCheckpoint(String)
  • ComputeTaskSession.saveCheckpoint(String, Object)

🚧

@ComputeTaskSessionFullSupport annotation

Note that checkpointing is disabled by default for performance reasons. To enable it attach @ComputeTaskSessionFullSupport annotation to the task or closure class.

Master Node Failure Protection

One important use case for checkpointing that is not readily apparent is to guard against failure of the "master" node — the node that started the original execution. If the master node fails, Ignite will not know where to send the results of the job execution, and thus the result will be discarded.

To failover this scenario, you can store the final result of the job execution as a checkpoint and have the logic re-run the entire task in case of a "master" node failure. In such a case the task re-run will be much faster since all jobs can start from the saved checkpoints.

Setting Checkpoints

Every compute job can periodically checkpoint itself by calling the ComputeTaskSession.saveCheckpoint(...) method.

If a job did save a checkpoint, then upon beginning of its execution, it checks whether the checkpoint is available and starts executing from the last saved checkpoint.

IgniteCompute compute = ignite.compute();

compute.run(new CheckpointsRunnable());
  
/**
 * Note that this class is annotated with @ComputeTaskSessionFullSupport
 * annotation to enable checkpointing.
 */
@ComputeTaskSessionFullSupport
private static class CheckpointsRunnable implements IgniteCallable<Object> {
  // Task session (injected on closure instantiation).
  @TaskSessionResource
  private ComputeTaskSession ses;

  @Override 
  public Object call() throws GridException {
    // Try to retrieve step1 result.
    Object res1 = ses.loadCheckpoint("STEP1");

    if (res1 == null) {
      res1 = computeStep1(); // Do some computation.

      // Save step1 result.
      ses.saveCheckpoint("STEP1", res1);
    }

    // Try to retrieve step2 result.
    Object res2 = ses.loadCheckpoint("STEP2");

    if (res2 == null) {
      res2 = computeStep2(res1); // Do some computation.

      // Save step2 result.
      ses.saveCheckpoint("STEP2", res2);
    }

    ...
  }
}

CheckpointSpi

In Ignite, checkpointing functionality is provided by CheckpointSpi which has the following out-of-the-box implementations:

Class

Description

SharedFsCheckpointSpi

This implementation uses a shared file system to store checkpoints.

CacheCheckpointSpi

This implementation uses a cache to store checkpoints.

JdbcCheckpointSpi

This implementation uses a database to store checkpoints.

S3CheckpointSpi

This implementation uses Amazon S3 to store checkpoints.

CheckpointSpi is provided in IgniteConfiguration and passed into the Ignition class at startup. By default, no-op checkpoint SPI is used.

File System Checkpoint Configuration

The following configuration parameters can be used to configure SharedFsCheckpointSpi:

Setter Method

Description

Default

setDirectoryPaths(Collection)

Sets directory paths to the shared folders where checkpoints are stored. The path can either be absolute or relative to the path specified in the IGNITE_HOME environment or system varialble.

IGNITE_HOME/work/cp/sharedfs

<bean class="org.apache.ignite.IgniteConfiguration" singleton="true">
  ...
  <property name="checkpointSpi">
    <bean class="org.apache.ignite.spi.checkpoint.sharedfs.SharedFsCheckpointSpi">
    <!-- Change to shared directory path in your environment. -->
      <property name="directoryPaths">
        <list>
          <value>/my/directory/path</value>
          <value>/other/directory/path</value>
        </list>
      </property>
    </bean>
  </property>
  ...
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();
 
SharedFsCheckpointSpi checkpointSpi = new SharedFsCheckpointSpi();
 
// List of checkpoint directories where all files are stored.
Collection<String> dirPaths = new ArrayList<String>();
 
dirPaths.add("/my/directory/path");
dirPaths.add("/other/directory/path");
 
// Override default directory path.
checkpointSpi.setDirectoryPaths(dirPaths);
 
// Override default checkpoint SPI.
cfg.setCheckpointSpi(checkpointSpi);
 
// Starts Ignite node.
Ignition.start(cfg);

Cache Checkpoint Configuration

CacheCheckpointSpi is a cache-based implementation for checkpoint SPI. Checkpoint data is stored in the Ignite data grid in a pre-configured cache.

The following configuration parameters can be used to configure CacheCheckpointSpi:

Setter Method

Description

Default

setCacheName(String)

Sets the name of the cache to use for storing checkpoints.

checkpoints

Database Checkpoint Configuration

JdbcCheckpointSpi uses a database to store checkpoints. All checkpoints are stored in the database table and are available from all nodes in the grid. Note that every node must have access to the database. A job state can be saved on one node and loaded on another (e.g., if a job gets preempted on a different node after node failure).

The following configuration parameters can be used to configure JdbcCheckpointSpi (all are optional):

Setter Method

Description

Default

setDataSource(DataSource)

Sets the datasource to use for database access.

No value

setCheckpointTableName(String)

Sets the checkpoint table name.

CHECKPOINTS

setKeyFieldName(String)

Sets the checkpoint key field name.

NAME

setKeyFieldType(String)

Sets the checkpoint key field type. The field should have a corresponding SQL string type (VARCHAR , for example).

VARCHAR(256)

setValueFieldName(String)

Sets the checkpoint value field name.

VALUE

setValueFieldType(String)

Sets the checkpoint value field type. Note, that the field should have a corresponding SQL BLOB type. The default value, BLOB, won’t work for all databases. For example, if using HSQL DB, then the type should be longvarbinary.

BLOB

setExpireDateFieldName(String)

Sets the checkpoint expiration date field name.

EXPIRE_DATE

setExpireDateFieldType(String)

Sets the checkpoint expiration date field type. The field should have a corresponding SQL DATETIME type.

DATETIME

setNumberOfRetries(int)

Sets the number of retries in case of any database errors.

2

setUser(String)

Sets the checkpoint database user name. Note that authentication will be performed only if both user and password are set.

No value

setPwd(String)

Sets the checkpoint database password.

No value

Apache DBCP

The Apache DBCP project provides various wrappers for data sources and connection pools. You can use these wrappers as Spring beans to configure this SPI from Spring configuration file or code. Refer to Apache DBCP project for more information.

<bean class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
  ...
  <property name="checkpointSpi">
    <bean class="org.apache.ignite.spi.checkpoint.jdbc.JdbcCheckpointSpi">
      <property name="dataSource">
        <ref bean="anyPoolledDataSourceBean"/>
      </property>
      <property name="checkpointTableName" value="CHECKPOINTS"/>
      <property name="user" value="test"/>
      <property name="pwd" value="test"/>
    </bean>
  </property>
  ...
</bean>
JdbcCheckpointSpi checkpointSpi = new JdbcCheckpointSpi();
 
javax.sql.DataSource ds = ... // Set datasource.
 
// Set database checkpoint SPI parameters.
checkpointSpi.setDataSource(ds);
checkpointSpi.setUser("test");
checkpointSpi.setPwd("test");
 
IgniteConfiguration cfg = new IgniteConfiguration();
 
// Override default checkpoint SPI.
cfg.setCheckpointSpi(checkpointSpi);
 
// Start Ignite node.
Ignition.start(cfg);

Amazon S3 Checkpoint Configuration

S3CheckpointSpi uses Amazon S3 storage to store checkpoints. You must have an existing S3 bucket. For information about Amazon S3 visit http://aws.amazon.com/.

The following configuration parameters can be used to configure S3CheckpointSpi:

Setter Method

Description

Default

setAwsCredentials(AWSCredentials)

Sets AWS credentials to use for storing checkpoints.

No value (must be provided)

setClientConfiguration(Client)

Sets AWS client configuration.

No value

setBucketNameSuffix(String)

Sets bucket name suffix.

default-bucket

<bean class="org.apache.ignite.configuration.IgniteConfiguration" singleton="true">
  ...
  <property name="checkpointSpi">
    <bean class="org.apache.ignite.spi.checkpoint.s3.S3CheckpointSpi">
      <property name="awsCredentials">
        <bean class="com.amazonaws.auth.BasicAWSCredentials">
          <constructor-arg value="YOUR_ACCESS_KEY_ID" />
          <constructor-arg value="YOUR_SECRET_ACCESS_KEY" />
        </bean>
      </property>
    </bean>
  </property>
  ...
</bean>
IgniteConfiguration cfg = new IgniteConfiguration();
 
S3CheckpointSpi spi = new S3CheckpointSpi();
 
AWSCredentials cred = new BasicAWSCredentials(YOUR_ACCESS_KEY_ID, YOUR_SECRET_ACCESS_KEY);
 
spi.setAwsCredentials(cred);
 
spi.setBucketNameSuffix("checkpoints");
 
// Override default checkpoint SPI.
cfg.setCheckpointSpi(cpSpi);
 
// Start Ignite node.
Ignition.start(cfg);