Skip to content

Commit 2ef8ff4

Browse files
Keshav Sharmameta-codesync[bot]
authored andcommitted
Send request-key dedup key to Axon DL write proxy
Summary: The Axon DL write proxy (dl/write_proxy/service/WriteProxyServer.cpp) supports request dedup keyed on cluster + keyspace + DLAppendRequest.dedupeKey, but mcrouter never set dedupeKey, so the dedup window was effectively unusable from the Memcache invalidation path. This sets dedupeKey in AxonLogRoute::writeProxy to the request key (req.key()->fullKey()) for both sets and deletes. All writes to the same key within the proxy dedup window therefore collapse, which is what suppresses hot keys (the goal of this feature). Per review discussion, writes to the same key are treated as concurrent, so collapsing them to a single append is acceptable. The proxy dedup is best-effort, in-memory and per-host, and is gated off by default (append_dedup_window_sec=0). A follow-up will set a small dedup window (<=100ms) once the feature is enabled. Server-side consumption already exists in the DL write proxy; no change needed there. Reviewed By: lenar-f Differential Revision: D107702706 fbshipit-source-id: 3a77b58240a9fc36199ab4a12730b37e7648dcdf
1 parent 6775f35 commit 2ef8ff4

4 files changed

Lines changed: 99 additions & 10 deletions

File tree

‎mcrouter/McDistributionUtils.cpp‎

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include "mcrouter/McDistributionUtils.h"
99

1010
#include <optional>
11+
#include <string>
12+
#include <utility>
1113

1214
namespace facebook {
1315
namespace memcache {
@@ -35,6 +37,8 @@ FOLLY_NOINLINE folly::exception_wrapper distributeWriteRequest(
3537
pool.emplace(axonCtx->poolFilter);
3638
}
3739

40+
// Dedupe by request key to suppress hot keys at the write proxy.
41+
auto dedupeKey = req.key()->fullKey().str();
3842
// Run serialization off fiber
3943
auto kvPairs = folly::fibers::runInMainContext([&req,
4044
&targetRegion,
@@ -57,7 +61,8 @@ FOLLY_NOINLINE folly::exception_wrapper distributeWriteRequest(
5761
invalidation::DistributionOperation::Write,
5862
sourceRegion);
5963
});
60-
return axonCtx->writeProxyFn(bucketId, std::move(kvPairs), secureWrites);
64+
return axonCtx->writeProxyFn(
65+
bucketId, std::move(kvPairs), secureWrites, std::move(dedupeKey));
6166
}
6267

6368
FOLLY_NOINLINE folly::exception_wrapper distributeDeleteRequest(
@@ -79,6 +84,8 @@ FOLLY_NOINLINE folly::exception_wrapper distributeDeleteRequest(
7984
if (!axonCtx->poolFilter.empty()) {
8085
pool.emplace(axonCtx->poolFilter);
8186
}
87+
// Dedupe by request key to suppress hot keys at the write proxy.
88+
auto dedupeKey = req.key()->fullKey().str();
8289
// Run off fiber to save fiber stack for serialization
8390
auto kvPairs = folly::fibers::runInMainContext(
8491
[&req, &region, &pool, &message, &sourceRegion, type]() {
@@ -110,7 +117,10 @@ FOLLY_NOINLINE folly::exception_wrapper distributeDeleteRequest(
110117
sourceRegion);
111118
});
112119
return axonCtx->writeProxyFn(
113-
bucketId, std::move(kvPairs), /*secureWrites*/ false);
120+
bucketId,
121+
std::move(kvPairs),
122+
/*secureWrites*/ false,
123+
std::move(dedupeKey));
114124
}
115125

116126
FOLLY_NOINLINE bool spoolAsynclog(

‎mcrouter/McrouterFiberContext.h‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <bitset>
1111
#include <map>
1212
#include <memory>
13+
#include <optional>
14+
#include <string>
1315
#include <utility>
1416

1517
#include <folly/Range.h>
@@ -60,7 +62,8 @@ using ExtraDataCallbackT = std::function<ExtraDataMap()>;
6062
using AxonProxyWriteFn = std::function<folly::exception_wrapper(
6163
uint64_t,
6264
folly::F14FastMap<std::string, std::string>&&,
63-
bool)>;
65+
bool,
66+
std::optional<std::string> /* dedupeKey */)>;
6467

6568
struct AxonContext {
6669
bool fallbackAsynclog{false};

‎mcrouter/routes/test/DistributionRouteTest.cpp‎

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@ struct AxonMockResult {
3434
namespace {
3535
AxonMockResult setupAxonFn(std::shared_ptr<AxonContext>& ctx) {
3636
AxonMockResult res;
37-
ctx->writeProxyFn =
38-
[&](auto bucketId, auto&& payload, bool) -> folly::exception_wrapper {
37+
ctx->writeProxyFn = [&](auto bucketId,
38+
auto&& payload,
39+
bool,
40+
auto&& /* dedupeKey */) -> folly::exception_wrapper {
3941
res.bucketId = bucketId;
4042
if (payload.find(invalidation::kRegion) != payload.end()) {
4143
res.region = payload.at(invalidation::kRegion);

‎mcrouter/test/cpp_unit_tests/McDistributionUtilsTest.cpp‎

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ TEST(McDistributionUtilsTest, distributeSetTest) {
2626
facebook::memcache::mcrouter::AxonProxyWriteFn writeProxyFn =
2727
[&](auto bucketId,
2828
auto&& kvPairs,
29-
bool isSecureWrites) -> folly::exception_wrapper {
29+
bool isSecureWrites,
30+
const std::optional<std::string>&) -> folly::exception_wrapper {
3031
resultBucketId = bucketId;
3132
resultKvPairs = kvPairs;
3233
secureWrites = isSecureWrites;
@@ -96,7 +97,8 @@ TEST(McDistributionUtilsTest, distributeSetWithSecurityTest) {
9697
facebook::memcache::mcrouter::AxonProxyWriteFn writeProxyFn =
9798
[&](auto bucketId,
9899
auto&& kvPairs,
99-
bool isSecureWrites) -> folly::exception_wrapper {
100+
bool isSecureWrites,
101+
const std::optional<std::string>&) -> folly::exception_wrapper {
100102
resultBucketId = bucketId;
101103
resultKvPairs = kvPairs;
102104
secureWrites = isSecureWrites;
@@ -169,7 +171,10 @@ TEST(McDistributionUtilsTest, distributeDeleteDirectedTest) {
169171
folly::F14FastMap<std::string, std::string> resultKvPairs;
170172

171173
facebook::memcache::mcrouter::AxonProxyWriteFn writeProxyFn =
172-
[&](auto bucketId, auto&& kvPairs, bool) -> folly::exception_wrapper {
174+
[&](auto bucketId,
175+
auto&& kvPairs,
176+
bool,
177+
const std::optional<std::string>&) -> folly::exception_wrapper {
173178
resultBucketId = bucketId;
174179
resultKvPairs = kvPairs;
175180
return folly::exception_wrapper();
@@ -242,7 +247,10 @@ TEST(McDistributionUtilsTest, distributeDeleteBroadcastTest) {
242247
folly::F14FastMap<std::string, std::string> resultKvPairs;
243248

244249
facebook::memcache::mcrouter::AxonProxyWriteFn writeProxyFn =
245-
[&](auto bucketId, auto&& kvPairs, bool) -> folly::exception_wrapper {
250+
[&](auto bucketId,
251+
auto&& kvPairs,
252+
bool,
253+
const std::optional<std::string>&) -> folly::exception_wrapper {
246254
resultBucketId = bucketId;
247255
resultKvPairs = kvPairs;
248256
return folly::exception_wrapper();
@@ -314,7 +322,10 @@ TEST(McDistributionUtilsTest, spoolDeleteTest) {
314322
folly::F14FastMap<std::string, std::string> resultKvPairs;
315323

316324
facebook::memcache::mcrouter::AxonProxyWriteFn writeProxyFn =
317-
[&](auto bucketId, auto&& kvPairs, bool) -> folly::exception_wrapper {
325+
[&](auto bucketId,
326+
auto&& kvPairs,
327+
bool,
328+
const std::optional<std::string>&) -> folly::exception_wrapper {
318329
resultBucketId = bucketId;
319330
resultKvPairs = kvPairs;
320331
return folly::exception_wrapper();
@@ -376,4 +387,67 @@ TEST(McDistributionUtilsTest, spoolDeleteTest) {
376387
DistributionOperation::Delete);
377388
EXPECT_EQ(resultKvPairs.find(kSourceRegion)->second, sourceRegion);
378389
}
390+
391+
// The dedup key passed to the write proxy is the request key, so all writes to
392+
// the same key collapse within the proxy's dedup window (hot-key suppression)
393+
// regardless of value, while writes to different keys never collapse.
394+
TEST(McDistributionUtilsTest, distributeSetKeyBasedDedupeKeyTest) {
395+
std::string targetRegion = "texas";
396+
std::string sourceRegion = "altoona";
397+
std::string pool = "main";
398+
uint64_t bucketId = 234532;
399+
std::optional<std::string> resultDedupeKey;
400+
401+
facebook::memcache::mcrouter::AxonProxyWriteFn writeProxyFn =
402+
[&](auto,
403+
auto&&,
404+
bool,
405+
std::optional<std::string> dedupeKey) -> folly::exception_wrapper {
406+
resultDedupeKey = std::move(dedupeKey);
407+
return folly::exception_wrapper();
408+
};
409+
410+
auto axonCtx = mcrouter::AxonContext{
411+
.fallbackAsynclog = false,
412+
.allDelete = false,
413+
.writeProxyFn = writeProxyFn,
414+
.defaultRegionFilter = sourceRegion,
415+
.poolFilter = pool};
416+
auto axonCtxPtr =
417+
std::make_shared<facebook::memcache::mcrouter::AxonContext>(axonCtx);
418+
419+
auto makeReq = [](folly::StringPiece key, folly::StringPiece value) {
420+
auto req = McSetRequest(key);
421+
req.value() = folly::IOBuf(folly::IOBuf::COPY_BUFFER, value);
422+
return req;
423+
};
424+
425+
// Dedup key is the request key itself.
426+
auto req = makeReq("key1", "value");
427+
EXPECT_FALSE(distributeWriteRequest(
428+
req, axonCtxPtr, bucketId, targetRegion, sourceRegion)
429+
.has_exception_ptr());
430+
ASSERT_TRUE(resultDedupeKey.has_value());
431+
EXPECT_EQ(*resultDedupeKey, "key1");
432+
const auto firstKey = *resultDedupeKey;
433+
434+
// Same key, different value -> same dedup key (collapses the hot key).
435+
resultDedupeKey.reset();
436+
auto sameKeyReq = makeReq("key1", "value2");
437+
EXPECT_FALSE(distributeWriteRequest(
438+
sameKeyReq, axonCtxPtr, bucketId, targetRegion, sourceRegion)
439+
.has_exception_ptr());
440+
ASSERT_TRUE(resultDedupeKey.has_value());
441+
EXPECT_EQ(*resultDedupeKey, firstKey);
442+
443+
// Different key -> different dedup key (must NOT collapse).
444+
resultDedupeKey.reset();
445+
auto otherKeyReq = makeReq("key2", "value");
446+
EXPECT_FALSE(
447+
distributeWriteRequest(
448+
otherKeyReq, axonCtxPtr, bucketId, targetRegion, sourceRegion)
449+
.has_exception_ptr());
450+
ASSERT_TRUE(resultDedupeKey.has_value());
451+
EXPECT_NE(*resultDedupeKey, firstKey);
452+
}
379453
} // namespace facebook::memcache::invalidation::test

0 commit comments

Comments
 (0)