Issue:
Flink application throws Thread 'jobmanager-io-thread-25' produced an uncaught exception. java.lang.OutOfMemoryError: Direct buffer memory and terminates after running for 2-3 days.
No matter how much direct buffer memory is increased it gets exhausted over time (it just stays longer but eventually terminates), tried max of 16GB till now.
When checkpointing is disabled, buffer memory doesn't grows and application works well.
Tried all sorts of optimizations or tuning suggested in documentation such as incrementalCheckpointing, stateChangelog, directBuffer increase etc.. but none worked.
Stack Trace:
2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Direct buffer memory
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71)
at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280)
at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74)
at java.base/java.nio.channels.Channels.writeFully(Channels.java:97)
at java.base/java.nio.channels.Channels$1.write(Channels.java:172)
at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48)
at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54)
at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88)
at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112)
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)
Application Structure (High Level):
- Application is setup on Flink on AWS EMR. Using Flink 1.20, on EMR 7.10.0
- Has 2 S3 Filesources reads parquet files, both unbounded, continuous monitoring with discoveryInterval of 1min, new files get added every min in new paths yyyy/mm/dd/mi/files
- process and create objects from both streams
- Join them using keyed-coprocess function, with TTL of 30mins
- Perform some map, filter operations on joined stream
- Sink result stream to S3 again as parquet files.
Help:
- Please help me understand why would checkpointing consume such large buffers gradually? Even then why aren't they getting released?
- What exactly is getting stored in this buffer memory by checkpoint co-ordinator?
- How can i handle this issue or apply tuning so that this wouldn't occur?
- What can be my next steps of action to try out and resolve this ?


