clj-kafka.consumer.zk

consumer

(consumer m)

Uses information in Zookeeper to connect to Kafka. More info on settings is available here: https://kafka.apache.org/08/configuration.html

Recommended for using with with-resource: (with-resource [c (consumer m)] shutdown (take 5 (messages c “test”)))

Keys: zookeeper.connect : host:port for Zookeeper. e.g: 127.0.0.1:2181 group.id : consumer group. e.g. group1 auto.offset.reset : what to do if an offset is out of range, e.g. smallest, largest auto.commit.interval.ms : the frequency that the consumed offsets are committed to zookeeper. auto.commit.enable : if set to true, the consumer periodically commits to zookeeper the latest consumed offset of each partition

create-message-stream

(create-message-stream consumer topic)(create-message-stream consumer topic key-decoder value-decoder)

Creates a single message stream for given topic.

create-message-streams

(create-message-streams consumer topic-count-map)(create-message-streams consumer topic-count-map key-decoder value-decoder)

Creates multiple message streams for consuming multiple topics, or a single topic cusing multiple threads. topic-count-map is a map from a topic name to the number of streams desired for that topic.

default-decoder

(default-decoder)

Creates the default decoder that reads message keys and values as byte arrays.

messages

(messages consumer topic & {:keys [key-decoder value-decoder], :or {key-decoder (default-decoder), value-decoder (default-decoder)}})

Provides an easy way to consume a sequence of KafkaMessage messages from the named topic. Consumes on a single thread and returns a lazy sequence.

shutdown

(shutdown consumer)

Closes the connection to Zookeeper and stops consuming messages.

stream-seq

(stream-seq stream)

Returns a lazy sequence of KafkaMessage messages from the stream.