-
Notifications
You must be signed in to change notification settings - Fork 402
/
Copy pathkafka-consumer.h
135 lines (104 loc) · 3.58 KB
/
kafka-consumer.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
#ifndef SRC_KAFKA_CONSUMER_H_
#define SRC_KAFKA_CONSUMER_H_
#include <nan.h>
#include <uv.h>
#include <iostream>
#include <string>
#include <vector>
#include "rdkafkacpp.h"
#include "src/common.h"
#include "src/connection.h"
#include "src/callbacks.h"
namespace NodeKafka {
/**
* @brief KafkaConsumer v8 wrapped object.
*
* Specializes the connection to wrap a consumer object through compositional
* inheritence. Establishes its prototype in node through `Init`
*
* @sa RdKafka::Handle
* @sa NodeKafka::Client
*/
class KafkaConsumer : public Connection {
friend class Producer;
public:
static void Init(v8::Local<v8::Object>);
static v8::Local<v8::Object> NewInstance(v8::Local<v8::Value>);
Baton Connect();
Baton Disconnect();
Baton Subscription();
Baton Unsubscribe();
bool IsSubscribed();
Baton Pause(std::vector<RdKafka::TopicPartition*> &);
Baton Resume(std::vector<RdKafka::TopicPartition*> &);
// Asynchronous commit events
Baton Commit(std::vector<RdKafka::TopicPartition*>);
Baton Commit(RdKafka::TopicPartition*);
Baton Commit();
Baton OffsetsStore(std::vector<RdKafka::TopicPartition*> &);
Baton GetWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*);
// Synchronous commit events
Baton CommitSync(std::vector<RdKafka::TopicPartition*>);
Baton CommitSync(RdKafka::TopicPartition*);
Baton CommitSync();
Baton Committed(std::vector<RdKafka::TopicPartition*> &, int timeout_ms);
Baton Position(std::vector<RdKafka::TopicPartition*> &);
Baton RefreshAssignments();
bool HasAssignedPartitions();
int AssignedPartitionCount();
Baton Assign(std::vector<RdKafka::TopicPartition*>);
Baton Unassign();
Baton IncrementalAssign(std::vector<RdKafka::TopicPartition*>);
Baton IncrementalUnassign(std::vector<RdKafka::TopicPartition*>);
Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms);
std::string Name();
std::string RebalanceProtocol();
Baton Subscribe(std::vector<std::string>);
Baton Consume(int timeout_ms);
void ActivateDispatchers();
void DeactivateDispatchers();
protected:
static Nan::Persistent<v8::Function> constructor;
static void New(const Nan::FunctionCallbackInfo<v8::Value>& info);
KafkaConsumer(Conf *, Conf *);
~KafkaConsumer();
private:
static void part_list_print(const std::vector<RdKafka::TopicPartition*>&);
std::vector<RdKafka::TopicPartition*> m_partitions;
int m_partition_cnt;
bool m_is_subscribed = false;
void* m_consume_loop = nullptr;
// Node methods
static NAN_METHOD(NodeConnect);
static NAN_METHOD(NodeSubscribe);
static NAN_METHOD(NodeDisconnect);
static NAN_METHOD(NodeAssign);
static NAN_METHOD(NodeUnassign);
static NAN_METHOD(NodeIncrementalAssign);
static NAN_METHOD(NodeIncrementalUnassign);
static NAN_METHOD(NodeRebalanceProtocol);
static NAN_METHOD(NodeAssignments);
static NAN_METHOD(NodeUnsubscribe);
static NAN_METHOD(NodeCommit);
static NAN_METHOD(NodeCommitSync);
static NAN_METHOD(NodeOffsetsStore);
static NAN_METHOD(NodeCommitted);
static NAN_METHOD(NodePosition);
static NAN_METHOD(NodeSubscription);
static NAN_METHOD(NodeSeek);
static NAN_METHOD(NodeGetWatermarkOffsets);
static NAN_METHOD(NodeConsumeLoop);
static NAN_METHOD(NodeConsume);
static NAN_METHOD(NodePause);
static NAN_METHOD(NodeResume);
};
} // namespace NodeKafka
#endif // SRC_KAFKA_CONSUMER_H_