目录- 1.binlog 简介
- 1.1 什么是 binlog
- 1.2 binlog 的三种格式
- 2. 开启 binlog 并配置 MySQL
- 2.1 检查 binlog 是否开启
- 2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
- 2.3 验证 binlog 配置
- 3. 使用 Java 监听 binlog
- 3.1 选择工具:Canal
- 3.2 Java 代码监听 binlog
- 4. 代码解析
- 总结
在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。
1.binlog 简介
1.1 什么是 binlog
binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如 、 )和 DML(数据操作语言,如 、 、 )的日志文件,它用于:
- 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
- 数据恢复:通过工具解析 binlog 恢复数据。
- 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。
1.2 binlog 的三种格式
binlog 格式 | 说明 |
---|
STATEMENT | 记录 SQL 语句本身 | ROW | 记录行数据变更(推荐) | MIXED | 结合前两者,MySQL 自动判断 |
由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。
2. 开启 binlog 并配置 MySQL
2.1 检查 binlog 是否开启
- SHOW VARIABLES LIKE 'log_bin';
复制代码
如果 值为 ,说明 binlog 未开启。
2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
在 部分添加以下内容:
- server-id=1
- log-bin=mysql-bin
- binlog-format=ROW
- binlog-row-image=FULL
- expire_logs_days=7
复制代码
重启 MySQL:
- systemctl restart mysql # Linux
- net stop mysql && net start mysql # Windows
复制代码
2.3 验证 binlog 配置
执行:
如果有 binlog 文件,如 ,说明已开启。
3. 使用 Java 监听 binlog
3.1 选择工具:Canal
阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。
3.2 Java 代码监听 binlog
引入 Maven 依赖
- <dependencies>
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.6</version>
- </dependency>
- </dependencies>
复制代码
编写 Java 代码
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
-
- import java.net.InetSocketAddress;
- import java.util.List;
-
- public class BinlogListener {
- public static void main(String[] args) {
- // 连接 Canal
- CanalConnector connector = CanalConnectors.newSingleConnector(
- new InetSocketAddress("127.0.0.1", 11111),
- "example", "canal", "canal");
-
-
- try {
- connector.connect();
- connector.subscribe(".*\\..*"); // 监听所有库表
- connector.rollback();
-
- while (true) {
- Message message = connector.getWithoutAck(100); // 获取数据
- long batchId = message.getId();
- List<CanalEntry.Entry> entries = message.getEntries();
-
- if (batchId != -1 && !entries.isEmpty()) {
- for (CanalEntry.Entry entry : entries) {
- if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
- processEntry(entry);
- }
- }
- }
- connector.ack(batchId); // 确认消息
- }
- } finally {
- connector.disconnect();
- }
- }
-
- private static void processEntry(CanalEntry.Entry entry) {
- try {
- CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- CanalEntry.EventType eventType = rowChange.getEventType();
-
- System.out.println("变更表:" + entry.getHeader().getTableName());
- System.out.println("变更类型:" + eventType);
-
- for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
- if (eventType == CanalEntry.EventType.DELETE) {
- System.out.println("删除数据:" + rowData.getBeforeColumnsList());
- } else if (eventType == CanalEntry.EventType.INSERT) {
- System.out.println("新增数据:" + rowData.getAfterColumnsList());
- } else {
- System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
- System.out.println("更新后数据:" + rowData.getAfterColumnsList());
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- }
复制代码
4. 代码解析
1.创建 Canal 连接
- CanalConnector connector = CanalConnectors.newSingleConnector(
- new InetSocketAddress("127.0.0.1", 11111),
- "example", "canal", "canal");
复制代码
- :Canal 服务器地址
- :Canal 端口
- :Canal 实例
- :默认账号密码
2.获取 binlog 变更数据
- Message message = connector.getWithoutAck(100);
复制代码
:拉取 100 条 binlog 事件。
3.解析 binlog
- for (CanalEntry.Entry entry : entries) {
- if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
- processEntry(entry);
- }
- }
复制代码
仅处理 类型的变更,忽略事务等其他信息。
4.分类处理 INSERT、UPDATE、DELETE
- if (eventType == CanalEntry.EventType.DELETE) {
- System.out.println("删除数据:" + rowData.getBeforeColumnsList());
- } else if (eventType == CanalEntry.EventType.INSERT) {
- System.out.println("新增数据:" + rowData.getAfterColumnsList());
- } else {
- System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
- System.out.println("更新后数据:" + rowData.getAfterColumnsList());
- }
复制代码
总结
- MySQL binlog 记录数据库变更,可用于监听增量数据。
- Canal 作为 MySQL 从库解析 binlog,实现数据同步。
- Java 代码示例 展示如何用 Canal 监听、、操作,并解析变更数据。
这种方案适用于 分布式数据同步、缓存一致性 和 数据变更通知,是实时数据处理的重要手段。
到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯! 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |