Apache Kafka
This package provides a client for Apache Kafka versions 0.11 and up. It is a work in progress, so expect breaking changes.
1 Client
(require kafka) | package: kafka-lib |
Clients transparently pool connections to brokers within a cluster. Connections are leased from the pool in order of least in-progress requests. Reconnections are handled transparently, and connection errors bubble up to the caller. Despite being thread-safe, clients may not be shared between consumers.
procedure
(make-client [ #:id id #:bootstrap-host host #:bootstrap-port port #:sasl-mechanism&ctx sasl-ctx #:ssl-ctx ssl-ctx #:proxy proxy]) โ client? id : non-empty-string? = "racket-kafka" host : string? = "127.0.0.1" port : (integer-in 0 65535) = 9092
sasl-ctx :
(or/c #f (list/c 'plain string?) (list/c symbol? sasl-ctx-proc/c)) = #f ssl-ctx : (or/c #f ssl-client-context?) = #f proxy : (or/c #f proxy?) = #f
When a sasl-ctx is provided, it is used to authenticate the connection to the bootstrap host as well as any subsequent connections made to other nodes in the cluster.
When an ssl-ctx is provided, it is used to encrypt all connections.
procedure
(disconnect-all c) โ void?
c : client?
value
sasl-ctx-proc/c : (-> string? (integer-in 0 65535) sasl-ctx?)
1.1 Proxies
Proxies act as intermediaries between clients and brokers.
procedure
(make-http-proxy host port) โ proxy?
host : string? port : (integer-in 1 65535)
1.2 Errors
procedure
(exn:fail:kafka? v) โ boolean?
v : any/c
procedure
(exn:fail:kafka:client? v) โ boolean?
v : any/c
procedure
(exn:fail:kafka:server? v) โ boolean?
v : any/c
1.3 Topic Management
procedure
(create-topics c t ...+) โ CreatedTopics?
c : client? t : CreateTopic?
When given a set of topics, some of them may succeed, and some may fail. Itโs up to the caller to inspect the error codes on the returned CreatedTopics.
procedure
(delete-topics c t ...+) โ DeletedTopics?
c : client? t : string?
struct
(struct CreateTopic (name partitions))
name : string? partitions : exact-positive-integer?
procedure
(make-CreateTopic #:name name #:partitions partitions [ #:replication-factor factor #:assignments assignments #:configs configs]) โ CreateTopic? name : string? partitions : exact-positive-integer? factor : (or/c -1 exact-positive-integer?) = -1
assignments : (hash/c exact-nonnegative-integer? (listof exact-nonnegative-integer?)) = (hasheqv) configs : (hash/c string? string?) = (hash)
struct
(struct CreatedTopics (topics))
topics : (listof CreatedTopic?)
struct
(struct CreatedTopic (error-code error-message name))
error-code : exact-nonnegative-integer? error-message : (or/c #f string?) name : string?
struct
(struct DeletedTopics (throttle-time-ms topics tags))
throttle-time-ms : (or/c exact-nonnegative-integer?) topics : (listof DeletedTopic?) tags : (or/c #f tags/c)
struct
(struct DeletedTopic (error-code error-message name uuid tags))
error-code : error-code/c error-message : (or/c #f string?) name : string? uuid : (or/c #f bytes?) tags : (or/c #f tags/c)
1.4 Record Results
Record results represent the results of publishing individual records.
struct
(struct RecordResult (topic partition))
topic : string? partition : ProduceResponsePartition?
struct
(struct ProduceResponsePartition (id error-code offset))
id : exact-nonnegative-integer? error-code : exact-nonnegative-integer? offset : exact-nonnegative-integer?
1.5 Contracts
2 Consumer
(require kafka/consumer) | package: kafka-lib |
Consumers form consumer groups to subscribe to topics and retrieve records. As the name implies, consumer groups group consumers together so that topic partitions may be spread out across the members of the group.
Consumers are not thread-safe.
procedure
(make-consumer client group-id topic ...+ [ #:reset-strategy strategy #:session-timeout-ms session-timeout-ms]) โ consumer? client : client? group-id : string? topic : string? strategy : (or/c 'earliest 'latest) = 'earliest session-timeout-ms : exact-nonnegative-integer? = 30000
The #:reset-strategy argument controls what the consumerโs initial offsets for newly-assigned partitions are going to be. When this value is 'earliest, the consumer will receive records starting from the beginning of each partition. When this value is 'latest, it will receive records starting from the time that it subscribes to each topic.
procedure
(consume-evt c [timeout-ms])
โ
(evt/c (or/c (values 'rebalance (hash/c string? (hash/c integer? integer?))) (values 'records (vectorof record?)))) c : consumer? timeout-ms : exact-nonnegative-integer? = 1000
When a consumer leaves or joins the consumer group, the event will synchronize to a 'rebalance result. In that case, the consumer will automatically re-join the group and discard any un-committed offsets. The associated data is a hash from topic names to hashes of partition ids to offsets. When a rebalance happens, you must take care not to commit any old offsets (i.e. you must issue a new consume-evt before making any calls to consumer-commit).
When either the timeout passes or new records become available on the broker, the event will synchronize to a 'records result whose associated data will be a vector of records.
More result types may be added in the future.
The timeout-ms argument controls how long the server-side may wait before returning a response. If there are no records in between the time this function is called and when the timeout passes, an empty vector or records will be returned. The other end may not necessarily respect the timeout value, and may return immediately when there are no more records.
procedure
(consumer-commit c) โ void?
c : consumer?
Call this function after you have successfully processed a batch of records received from consume-evt. If you forget to call this function, or if the consumer crashes in between calling consume-evt and calling this function, another consumer in the group will eventually receive that same batch again.
procedure
(consumer-stop c) โ void?
c : consumer?
2.1 Records
Records represent individual key-value pairs on a topic.
procedure
r : record?
procedure
r : record?
procedure
(record-key r) โ (or/c #f bytes?)
r : record?
procedure
(record-value r) โ bytes?
r : record?
procedure
(record-headers r) โ (hash/c string? bytes?)
r : record?
2.2 Limitations
Consumers have several limitations at the moment, some of which will be addressed in future versions.
2.2.1 Compression
At the moment, only 'gzip is supported when producing records and 'gzip, 'lz4, 'snappy and 'zstd are supported when consuming records. Fetching a batch of records that is compressed using any other method will fail silently.
2.2.2 Group Assignment
Only brokers that implement client-side assignment are supported (Apache Kafka versions 0.11 and up). At the moment, only the range and round-robin group assignment strategies are implemented.
2.2.3 Error Detection
Batches retrieved from the broker contain a CRC code for error detection, but the library does not validate these at the moment.
3 Producer
(require kafka/producer) | package: kafka-lib |
Producers publish data to one or more topics. They batch data internally by topic & partition, and they are thread-safe.
procedure
(make-producer c [ #:acks acks #:compression compression #:flush-interval interval #:max-batch-bytes max-bytes #:max-batch-size max-size]) โ producer? c : client? acks : (or/c 'none 'leader 'full) = 'leader compression : (or/c 'none 'gzip) = 'gzip interval : exact-positive-integer? = 60000 max-bytes : exact-positive-integer? = (* 100 1024 1024) max-size : exact-positive-integer? = 1000
Data is batched internally by topic & partition. Within each batch, the data is compressed according to the #:compression method.
The producer automatically flushes its data every #:flush-interval milliseconds, whenever the total size of all its batches exceeds #:max-batch-bytes, or whenever the total number of records contained in all of its batches exceeds #:max-batch-size, whichever condition is true first.
During a flush, calling produce on a producer blocks until the flush completes.
procedure
(produce p topic key value [ #:partition partition #:headers headers]) โ evt? p : producer? topic : string? key : (or/c #f bytes?) value : (or/c #f bytes?) partition : exact-nonnegative-integer? = 0 headers : (hash/c string? (or/c #f bytes?)) = (hash)
Typically, you would call this function in a loop to produce a set of data, collect the results then sync them to ensure theyโve been written to the log.
Each result event may only be synced at most once.
procedure
(producer-flush p) โ void?
p : producer?
procedure
(producer-stop p) โ void?
p : producer?
3.1 Limitations
3.1.1 Compression
Kafka supports snappy, lz4, and zstd compression in addition to gzip, but this library only supports gzip at the moment.