-3

Issue:

  • Flink application throws Thread 'jobmanager-io-thread-25' produced an uncaught exception. java.lang.OutOfMemoryError: Direct buffer memory and terminates after running for 2-3 days.

  • No matter how much direct buffer memory is increased it gets exhausted over time (it just stays longer but eventually terminates), tried max of 16GB till now.

  • When checkpointing is disabled, buffer memory doesn't grows and application works well.

  • Tried all sorts of optimizations or tuning suggested in documentation such as incrementalCheckpointing, stateChangelog, directBuffer increase etc.. but none worked.

Stack Trace:

2025-11-09 17:06:56,442 ERROR org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 'jobmanager-io-thread-25' produced an uncaught exception. Stopping the process...
java.lang.OutOfMemoryError: Direct buffer memory
    at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
    at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
    at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
    at java.base/sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242)
    at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:71)
    at java.base/sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:280)
    at java.base/java.nio.channels.Channels.writeFullyImpl(Channels.java:74)
    at java.base/java.nio.channels.Channels.writeFully(Channels.java:97)
    at java.base/java.nio.channels.Channels$1.write(Channels.java:172)
    at org.apache.flink.core.fs.OffsetAwareOutputStream.write(OffsetAwareOutputStream.java:48)
    at org.apache.flink.core.fs.RefCountedFileWithStream.write(RefCountedFileWithStream.java:54)
    at org.apache.flink.core.fs.RefCountedBufferingFileStream.write(RefCountedBufferingFileStream.java:88)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.write(S3RecoverableFsDataOutputStream.java:112)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.write(FsCheckpointMetadataOutputStream.java:78)
    at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeStreamStateHandle(MetadataV2V3SerializerBase.java:758)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeStreamStateHandle(MetadataV3Serializer.java:264)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serializeOperatorState(MetadataV3Serializer.java:109)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase.serializeMetadata(MetadataV2V3SerializerBase.java:165)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer.serialize(MetadataV3Serializer.java:83)
    at org.apache.flink.runtime.checkpoint.metadata.MetadataV4Serializer.serialize(MetadataV4Serializer.java:56)
    at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:101)
    at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:88)
    at org.apache.flink.runtime.checkpoint.Checkpoints.storeCheckpointMetadata(Checkpoints.java:83)
    at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:339)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1624)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1518)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1410)
    at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$2(ExecutionGraphHandler.java:109)
    at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$4(ExecutionGraphHandler.java:139)

Checkpointing Config: Checkpoint Config

Application Structure (High Level):

  • Application is setup on Flink on AWS EMR. Using Flink 1.20, on EMR 7.10.0
  • Has 2 S3 Filesources reads parquet files, both unbounded, continuous monitoring with discoveryInterval of 1min, new files get added every min in new paths yyyy/mm/dd/mi/files
  • process and create objects from both streams
  • Join them using keyed-coprocess function, with TTL of 30mins
  • Perform some map, filter operations on joined stream
  • Sink result stream to S3 again as parquet files.

Help:

  • Please help me understand why would checkpointing consume such large buffers gradually? Even then why aren't they getting released?
  • What exactly is getting stored in this buffer memory by checkpoint co-ordinator?
  • How can i handle this issue or apply tuning so that this wouldn't occur?
  • What can be my next steps of action to try out and resolve this ?

Direct Memory vs Checkpoint Size : Direct Memory vs Checkpoint Size

Native Memory Leak - Stack Trace : Native Memory Leak - Stack Trace

1
  • Sorry to bother but difficult to tell without the code Commented Nov 17 at 16:44

1 Answer 1

0

This feels an awful lot like a memory leak, but in my experience, that's not always the case. I'll try to throw out a few things to try based on the configurations that you provided. I think the key culprits here though may be related to one (or more) of: state changelog, checkpoint retention, or possibly some other configurations.

I'll provide a few suggestions (feel free to try one or more):

Disabling State Changelog
This is a recommendation that likely could explain some of the situation as the use of changelog can add some additional memory pressure (it functions as a buffer for those changes as mentioned in this older blog post). I'd suspect that using it in conjunction with RocksDB could likely impact memory utilization:

state.backend.changelog.enabled: false

Limiting Checkpoint Retention
Currently your job has checkpoint retention enabled which is fine, however you may want to consider limiting it to a specific number of those to retain (otherwise things could balloon) as well as cleaning them up to ensure too many don't stick around:

state.checkpoints.num-retained: 5
state.checkpoints.externalized.enable: true
state.checkpoints.externalized.delete-on-cancellation: true

Gather Metrics
One thing that I would highly suggest as you monitor the job and these changes would be to implement and monitor some of the built-in metrics that Flink provides out of the box with regards to memory/JVM. Using these with some type of visualization tool (e.g., Prometheus, Grafana, etc.) would allow you to easily monitor things like the JobManager, Changelog, Checkpointing, etc.

Definitely check out:

  • Any/all memory-related metrics (or just JobManager in general too)

  • Any/all changelog-related metrics

  • Checkpointing sizes/durations

  • RocksDB metrics (these need to be enabled separately)

Questions
As far as your questions go, I'll try to give a few possible explanations to help on those fronts too:

Please help me understand why would checkpointing consume such large buffers gradually? Even then why aren't they getting released?

tl;dr: there's a lot more moving pieces to the puzzle when checkpointing is enabled that can impact memory pressure (even gradually) in a consistently flowing system

So there's a lot of things at play when checkpointing is enabled (vs. why things are rainbows and butterflies when it's disabled). Checkpointing is going to bring RocksDB into the picture which has a native memory impact with each checkpoint this in conjunction with the changelog could apply a quite a bit more pressure for the changelog-related segments as well.

Many of these things can stick around much longer than expected if data is continually flowing at scale into the system and may require tuning. RocksDB, for example, does a decent job at cleanup during its compaction process, however if the job is busy 24/7 it may never have the opportunity to do so, especially with all of the checkpointing operations and state being interacted with.

What exactly is getting stored in this buffer memory by checkpoint co-ordinator?

Obviously there's things like just direct memory like Netty, checkpoint buffers for your filesystem, tons of RocksDB related things (e.g, cache, tables, changelog, etc.), and the changelog has its own series of content.

How can i handle this issue or apply tuning so that this wouldn't occur? What can be my next steps of action to try out and resolve this?

Combining these two as this is already way too long, but hopefully some of the configurations that I provided above can help relieve the issue. Checkpointing and OOM type errors can be really nasty to troubleshoot, even when you know the ins/outs of a given job, but I'll keep my fingers crossed for you.

Sign up to request clarification or add additional context in comments.

3 Comments

- Changelog enabled or disabled, scenario is same. - I did not find the exact configuration you provided for checkpointing retention from documentation, but the default retention count is 1 and i need retain_on_cancellation, to basically start up from previous checkpoint. - Apart from above, i've also removed all intermediate operators like joins, flatmaps, maps, filters etc and had only filesource + no-op sink, yet i saw buffer memory increase. so it shouldn't be because of rocksdb
I do have metrics being flushed to cloudwatch with statsd, so i can help providing with any metrics required for more information.
I have added Direct Memory vs Checkpoint Size graph and async-profiler native memory leak flame graph. Used below commands to get leak flame-graph - ./asprof -e nativemem -d 600 --live -f /tmp/native_mem.jfr 1600550 - ./jfrconv --total --nativemem --leak /tmp/native_mem.jfr app-leak.html

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.