Skip to content

Commit a0da1f5

Browse files
committed
fix: Ack/ModAck failures in non exactly once delivery flow should not be retried.
1 parent 4a33a31 commit a0da1f5

File tree

2 files changed

+5
-35
lines changed

2 files changed

+5
-35
lines changed

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs

-3
Original file line numberDiff line numberDiff line change
@@ -423,15 +423,12 @@ public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses
423423
// For exactly once delivery, only messages that succeed or fail permanently appear here, i.e., only messages whose status is finalized.
424424
// The messages with temporary failures whose status may change in future appear here only when they succeed or fail permanently.
425425
// In non exactly once delivery, we show status of every message as Success because every message acknowledgement is treated as fire and forget.
426-
// If message failed with recoverable error, it will be re-queued and hence may appear again here with the same success status, so adding distinct items only.
427426
Responses.Locked(() => Responses.AddRange(responses.Where(item => !Responses.Any(j => j.MessageId == item.MessageId))));
428427

429-
430428
public override void HandleNackResponses(IReadOnlyList<AckNackResponse> responses) =>
431429
// For exactly once delivery, only messages that succeed or fail permanently appear here, i.e., only messages whose status is finalized.
432430
// The messages with temporary failures whose status may change in future appear here only when they succeed or fail permanently.
433431
// In non exactly once delivery, we show status of every message as Success because every message acknowledgement is treated as fire and forget.
434-
// If message failed with recoverable error, it will be re-queued and hence may appear again here with the same success status, so adding distinct items only.
435432
Responses.Locked(() => Responses.AddRange(responses.Where(item => !Responses.Any(j => j.MessageId == item.MessageId))));
436433
}
437434

‎apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs

+5-32
Original file line numberDiff line numberDiff line change
@@ -1727,39 +1727,12 @@ private void HandleAckResponse(Task writeTask, List<string> ackIds, List<string>
17271727
_pushInFlight -= (ackIds?.Count ?? 0) + (nackIds?.Count ?? 0) + (extendIds?.Count ?? 0);
17281728
if (writeTask.IsFaulted)
17291729
{
1730-
if (writeTask.Exception.As<RpcException>()?.IsRecoverable() ?? false)
1730+
// Check if it's an RpcException. If it is, then ignore it and continue. We may want to log it later.
1731+
// Other non-gRPC unrecoverable errors will continue to be thrown.
1732+
if (writeTask.Exception.As<RpcException>() is null)
17311733
{
1732-
// Recoverable write error, requeue data and continue.
1733-
// ackIds and nackIds are never both set in the same call, so no need to share a lock.
1734-
if (hasAckIds)
1735-
{
1736-
lock (_lock)
1737-
{
1738-
_ackQueue.Requeue(ackIds);
1739-
}
1740-
}
1741-
if (hasNackIds)
1742-
{
1743-
lock (_lock)
1744-
{
1745-
_nackQueue.Requeue(nackIds);
1746-
}
1747-
}
1748-
if (extendIds != null && extendIds.Count > 0)
1749-
{
1750-
_extendQueue.Requeue(extendIds);
1751-
}
1752-
// TODO: Backoff
1753-
}
1754-
else
1755-
{
1756-
// Check if it's an RpcException. If it is, then ignore it and continue. We may want to log it later.
1757-
// Other non-gRPC unrecoverable errors will continue to be thrown.
1758-
if (writeTask.Exception.As<RpcException>() is null)
1759-
{
1760-
// It is a non-gRPC unrecoverable error; throw exception.
1761-
throw writeTask.Exception.FlattenIfPossible();
1762-
}
1734+
// It is a non-gRPC unrecoverable error; throw exception.
1735+
throw writeTask.Exception.FlattenIfPossible();
17631736
}
17641737
}
17651738
// Immediately send more data if there is any to send.

0 commit comments

Comments
 (0)