rd_kafka
的常见发布和订阅配置选项的整理。这些选项可以用于设置 Kafka 生产者(发布者)和消费者(订阅者)的行为和属性。
发布者配置选项:
-
bootstrap.servers
:Kafka 集群的地址列表,用于引导连接。
示例代码:rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr));
-
message.send.max.retries
:消息发送的最大重试次数。
示例代码:rd_kafka_conf_set(conf, "message.send.max.retries", "3", errstr, sizeof(errstr));
-
request.required.acks
:生产者需要接收到的分区写入确认数。
示例代码:rd_kafka_conf_set(conf, "request.required.acks", "-1", errstr, sizeof(errstr));
-
compression.codec
:消息压缩编解码算法。
示例代码:rd_kafka_conf_set(conf, "compression.codec", "gzip", errstr, sizeof(errstr));
-
message.timeout.ms
:消息发送超时时间(毫秒)。
示例代码:rd_kafka_conf_set(conf, "message.timeout.ms", "5000", errstr, sizeof(errstr));
-
partitioner
:生产者使用的分区器。
示例代码:rd_kafka_conf_set(conf, "partitioner", "random", errstr, sizeof(errstr));
-
max.in.flight.requests.per.connection
:单个连接上允许的未完成请求的最大数量。
示例代码:rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "5", errstr, sizeof(errstr));
-
max.poll.interval.ms
:消费者在轮询期间可空闲的最大时间(毫秒)。
示例代码:rd_kafka_conf_set(conf, "max.poll.interval.ms", "300000", errstr, sizeof(errstr));
-
message.max.bytes
:单个消息的最大字节数。
示例代码:rd_kafka_conf_set(conf, "message.max.bytes", "1000000", errstr, sizeof(errstr));
-
socket.send.buffer.bytes
:发送套接字的缓冲区大小(字节)。
示例代码:rd_kafka_conf_set(conf, "socket.send.buffer.bytes", "65536", errstr, sizeof(errstr));
-
socket.receive.buffer.bytes
:接收套接字的缓冲区大小(字节)。
示例代码:rd_kafka_conf_set(conf, "socket.receive.buffer.bytes", "65536", errstr, sizeof(errstr));
-
enable.idempotence
:启用幂等性发送,确保消息的顺序性和唯一性。
示例代码:rd_kafka_conf_set(conf, "enable.idempotence", "true", errstr, sizeof(errstr));
-
retries
:消息发送的总重试次数,包括网络错误和可重试的应用程序错误。
示例代码:rd_kafka_conf_set(conf, "retries", "5", errstr, sizeof(errstr));
14 在 librdkafka 中,设置自定义分区器回调函数的方式是通过rd_kafka_conf_set_partitioner_cb
函数来实现。以下是一个正确的示例代码:int32_t my_partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) {
// 自定义分区逻辑 // 返回一个分区号(0 到 partition_cnt-1) } // 创建 Kafka 配置对象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); // 设置自定义分区器回调函数 rd_kafka_conf_set_partitioner_cb(conf, my_partitioner_cb);
订阅者配置选项:
-
bootstrap.servers
:Kafka 集群的地址列表,用于引导连接。
示例代码:rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", errstr, sizeof(errstr));
-
group.id
:消费者所属的消费者组的唯一标识符。
示例代码:rd_kafka_conf_set(conf, "group.id", "my-consumer-group", errstr, sizeof(errstr));
-
auto.offset.reset
:当消费者在启动时没有有效的偏移量时,对应的消费位置。
示例代码:rd_kafka_conf_set(conf, "auto.offset.reset","earliest", errstr, sizeof(errstr));
-
enable.auto.commit
:启用自动提交消费位移。
示例代码:rd_kafka_conf_set(conf, "enable.auto.commit", "true", errstr, sizeof(errstr));
-
auto.commit.interval.ms
:自动提交消费位移的间隔时间(毫秒)。
示例代码:rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", errstr, sizeof(errstr));
-
max.poll.records
:每次轮询从单个分区中返回的最大记录数。
示例代码:rd_kafka_conf_set(conf, "max.poll.records", "100", errstr, sizeof(errstr));
-
fetch.wait.max.ms
:在没有可用消息时,消费者等待获取新消息的最大时间(毫秒)。
示例代码:rd_kafka_conf_set(conf, "fetch.wait.max.ms", "100", errstr, sizeof(errstr));
-
session.timeout.ms
:消费者组中消费者被认为失效之前的超时时间(毫秒)。
示例代码:rd_kafka_conf_set(conf, "session.timeout.ms", "6000", errstr, sizeof(errstr));
-
heartbeat.interval.ms
:心跳间隔时间(毫秒),用于检测消费者组中的消费者是否存活。
示例代码:rd_kafka_conf_set(conf, "heartbeat.interval.ms", "2000", errstr, sizeof(errstr));
-
fetch.max.bytes
:单次拉取请求从代理返回的最大数据量(字节)。
示例代码:rd_kafka_conf_set(conf, "fetch.max.bytes", "1048576", errstr, sizeof(errstr));
-
queued.min.messages
:当消息排队的消息数低于此阈值时,轮询将阻塞(等待更多消息)。
示例代码:rd_kafka_conf_set(conf, "queued.min.messages", "1000", errstr, sizeof(errstr));
-
queued.max.messages.kbytes
:待处理消息队列的最大总大小(千字节)。
示例代码:rd_kafka_conf_set(conf, "queued.max.messages.kbytes", "10240", errstr, sizeof(errstr));
这些是一些常见的 rd_kafka
发布和订阅配置选项。
https://blog.csdn.net/dwjlyl/article/details/127432628