登 录
注 册
< 大 数 据
Flink
Hadoop
Spark
Hive
HBase
Kafka
其他框架
分布式消息队列
Kafka命令行操作
生产者与幂等性
ExactlyOnce与事务
分区消费策略
手动提交offset
Kafka对接Flume
热门推荐>>>
中台架构
中台建设与架构
Hadoop
源码分析-NN启动(三)
HBase
HBased对接Hive
Linux
Nginx高可用
Python
数据导出工具
Flink
3分钟搭建Flink SQL测试环境
深度学习
卷积神经网络
数据结构与算法
选择合适的算法
MySQL
数据备份恢复
计算机系统
信号量同步线程
Hive
Hive调优参数大全
其他框架
Azkaban Flow1.0与2.0
ClickHouse
表引擎-其他类型
技术成长
最好的职业建议
精选书单
技术成长书单—机器学习
技术资讯
数据在线:计算将成为公共服务
开发工具
IntelliJ IDEA 20年发展回顾(二)
系统工具
Mac命令行工具
虚拟化
内存虚拟化概述
云原生
云原生构建现代化应用
云服务
一文搞懂公有云、私有云...
Java
Spring Boot依赖注入与Runners
Go
Go函数与方法
SQL
SQL模板
安全常识
一文读懂SSO
当前位置:
首页
>>
Kafka
>>
手动提交offset
手动提交offset
2020-07-04 15:08:52 星期六 阅读:1385
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的API。 手动提交 offset 的方法有两种: **`commitSync(同步提交)`** **`commitAsync(异步提交)`** 两者的相同点:都会将本次 poll 的一批数据最高的偏移量提交; 两者的不同点:commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。 手动提交的问题:无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。所以`对于特别严谨的场景,可以自定义维护offset的存储(维护难度较大)`,可以将消费者的offset存放到MySQL(不存储到zk或者broker集群),利用MySQL的事务功能将消费者的offset存储与消费逻辑进行绑定。 ####手动同步提交 同步提交的offset有失败重试机制,所以更加可靠。具体实现方式为: 1、在构造消费者配置信息的时候关闭自动提交功能 ``` // 关闭自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); ``` 2、在消费完一个批次之后调用同步提交的方法手动提交 ``` // 同步提交,当前线程会阻塞直到offset提交成功 consumer.commitSync(); ``` 提交位置如下 ``` while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<Integer, String> record : records) { System.out.println(record.key() + " " + record.value() + "-> offset:" + record.offset()); } // 同步提交,当前线程会阻塞直到offset提交成功 consumer.commitSync(); } ``` ####手动异步提交 虽然同步提交offset更可靠一些,但是由于会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此在生产环境中,会选用异步提交offset的方式。具体的实现方式为: 1、在构造消费者配置信息的时候关闭自动提交功能 ``` // 关闭自动提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); ``` 2、在消费完一个批次之后调用异步提交的方法提交 ``` //异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) { System.out.println("Commit failed for " + offsets); } } }); ``` 提交位置如下所示 ``` while (true) { ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(100)); for (ConsumerRecord<Integer, String> record : records) { System.out.println(record.key() + " " + record.value() + "-> offset:" + record.offset()); } //异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) { System.out.println("Commit failed for " + offsets); } } }); } ```