forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartitioner_spec.rb
81 lines (63 loc) · 2.96 KB
/
partitioner_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# frozen_string_literal: true
describe Kafka::Partitioner, "#call" do
let(:message) { double(:message, key: nil, partition_key: "yolo") }
describe "default partitioner" do
let(:partitioner) { Kafka::Partitioner.new }
it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end
it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }
partition = partitioner.call(3, message)
expect(partition).to eq 2
end
it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }
partitions = 30.times.map { partitioner.call(3, message) }
expect(partitions.uniq).to contain_exactly(0, 1, 2)
end
end
describe "murmur2 partitioner" do
let(:partitioner) { Kafka::Partitioner.new(hash_function: :murmur2) }
let(:message) { double(:message, key: nil, partition_key: "yolo") }
it "deterministically returns a partition number for a partition key and partition count" do
partition = partitioner.call(3, message)
expect(partition).to eq 0
end
it "falls back to the message key if no partition key is available" do
allow(message).to receive(:partition_key) { nil }
allow(message).to receive(:key) { "hey" }
partition = partitioner.call(3, message)
expect(partition).to eq 1
end
it "randomly picks a partition if the key is nil" do
allow(message).to receive(:key) { nil }
allow(message).to receive(:partition_key) { nil }
partitions = 30.times.map { partitioner.call(3, message) }
expect(partitions.uniq).to contain_exactly(0, 1, 2)
end
it "picks a Java Kafka compatible partition" do
partition_count = 100
{
# librdkafka test cases taken from tests/0048-partitioner.c
"" => 0x106e08d9 % partition_count,
"this is another string with more length to it perhaps" => 0x4f7703da % partition_count,
"hejsan" => 0x5ec19395 % partition_count,
# Java Kafka test cases taken from UtilsTest.java.
# The Java tests check the result of murmur2 directly,
# so have been ANDd with 0x7fffffff to work here
"21" => (-973932308 & 0x7fffffff) % partition_count,
"foobar" => (-790332482 & 0x7fffffff) % partition_count,
"a-little-bit-long-string" => (-985981536 & 0x7fffffff) % partition_count,
"a-little-bit-longer-string" => (-1486304829 & 0x7fffffff) % partition_count,
"lkjh234lh9fiuh90y23oiuhsafujhadof229phr9h19h89h8" => (-58897971 & 0x7fffffff) % partition_count
}.each do |key, partition|
allow(message).to receive(:partition_key) { key }
expect(partitioner.call(partition_count, message)).to eq partition
end
end
end
end