-
Notifications
You must be signed in to change notification settings - Fork 217
/
Copy pathtest_utils.h
70 lines (57 loc) · 2.28 KB
/
test_utils.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#ifndef CPPKAFKA_TEST_UTILS_H
#define CPPKAFKA_TEST_UTILS_H
#include <string>
#include <thread>
#include <vector>
#include "cppkafka/consumer.h"
#include "cppkafka/utils/roundrobin_poll_strategy.h"
#include "cppkafka/utils/consumer_dispatcher.h"
extern const std::vector<std::string> KAFKA_TOPICS;
using namespace cppkafka;
//==================================================================================
// BasicConsumerRunner
//==================================================================================
template <typename ConsumerType>
class BasicConsumerRunner {
public:
BasicConsumerRunner(ConsumerType& consumer,
size_t expected,
size_t partitions);
BasicConsumerRunner(const BasicConsumerRunner&) = delete;
BasicConsumerRunner& operator=(const BasicConsumerRunner&) = delete;
~BasicConsumerRunner();
const std::vector<cppkafka::Message>& get_messages() const;
void try_join();
private:
ConsumerType& consumer_;
std::thread thread_;
std::vector<cppkafka::Message> messages_;
};
//==================================================================================
// PollStrategyAdapter
//==================================================================================
/**
* \brief Specific implementation which can be used with other
* util classes such as BasicConsumerDispatcher.
*/
class PollStrategyAdapter : public Consumer {
public:
PollStrategyAdapter(Configuration config);
void add_polling_strategy(std::unique_ptr<PollInterface> poll_strategy);
void delete_polling_strategy();
Message poll();
Message poll(std::chrono::milliseconds timeout);
std::vector<Message> poll_batch(size_t max_batch_size);
std::vector<Message> poll_batch(size_t max_batch_size,
std::chrono::milliseconds timeout);
void set_timeout(std::chrono::milliseconds timeout);
std::chrono::milliseconds get_timeout();
private:
std::unique_ptr<PollInterface> strategy_;
};
// Misc
std::string make_consumer_group_id();
using PollConsumerRunner = BasicConsumerRunner<PollStrategyAdapter>;
using ConsumerRunner = BasicConsumerRunner<Consumer>;
#include "test_utils_impl.h"
#endif // CPPKAFKA_TEST_UTILS_H