设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间11 现在时间11 缓存数据 她接到喜欢了七年的他的电话。他对她说:我们在一起吧。尽管听到电话那头别人的窃窃笑声。她还昰淡定地说:好啊。然后她问:大冒险又输了吧?他说:我选的是真心话。

她接到喜欢了七年的他的电话。他对她说:我们在一起吧。尽管听到电话那头别人的窃窃笑声。她还昰淡定地说:好啊。然后她问:大冒险又输了吧?他说:我选的是真心话。 -- 过火

查看: 766|回复: 1

如何利用Java实现MySQL的数据变化监听

[复制链接]

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
31
主题
25
精华
0
金钱
91
积分
56
注册时间
2023-10-4
最后登录
2025-5-31

发表于 2025-2-17 17:52:28 | 显示全部楼层 |阅读模式
目录
  • 1.binlog 简介
    • 1.1 什么是 binlog
    • 1.2 binlog 的三种格式
  • 2. 开启 binlog 并配置 MySQL
    • 2.1 检查 binlog 是否开启
    • 2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)
    • 2.3 验证 binlog 配置
  • 3. 使用 Java 监听 binlog
    • 3.1 选择工具:Canal
    • 3.2 Java 代码监听 binlog
  • 4. 代码解析
    • 总结

      在高并发和大数据环境下,实时获取 MySQL 数据库的增量变化对数据同步、数据分析、缓存更新等场景至关重要。MySQL 的 binlog(Binary Log) 记录了数据库的所有变更,可以用来实现 增量数据监听。本文将介绍如何利用 binlog 监听 MySQL 数据增量,并提供基于 Java 的 Canal 实现示例。

      1.binlog 简介

      1.1 什么是 binlog

      binlog(Binary Log) 是 MySQL 记录 DDL(数据定义语言,如

      1. CREATE
      复制代码
      1. ALTER
      复制代码
      )和 DML(数据操作语言,如
      1. INSERT
      复制代码
      1. UPDATE
      复制代码
      1. DELETE
      复制代码
      )的日志文件,它用于:

      • 主从复制:MySQL 主库将 binlog 传输到从库,实现数据同步。
      • 数据恢复:通过
        1. mysqlbinlog
        复制代码
        工具解析 binlog 恢复数据。
      • 数据同步:第三方工具(如 Canal)解析 binlog,进行数据同步。

      1.2 binlog 的三种格式

      binlog 格式说明
      STATEMENT记录 SQL 语句本身
      ROW记录行数据变更(推荐)
      MIXED结合前两者,MySQL 自动判断

      由于 ROW 格式能提供精确的行级别变更信息,因此推荐使用它。

      2. 开启 binlog 并配置 MySQL

      2.1 检查 binlog 是否开启

      1. SHOW VARIABLES LIKE 'log_bin';
      复制代码

      如果

      1. log_bin
      复制代码
      值为
      1. OFF
      复制代码
      ,说明 binlog 未开启。

      2.2 修改 MySQL 配置文件(my.cnf 或 my.ini)

      1. [mysqld]
      复制代码
      部分添加以下内容:

      1. server-id=1
      2. log-bin=mysql-bin
      3. binlog-format=ROW
      4. binlog-row-image=FULL
      5. expire_logs_days=7
      复制代码

      重启 MySQL:

      1. systemctl restart mysql # Linux
      2. net stop mysql && net start mysql # Windows
      复制代码

      2.3 验证 binlog 配置

      执行:

      1. SHOW BINARY LOGS;
      复制代码

      如果有 binlog 文件,如

      1. mysql-bin.000001
      复制代码
      ,说明已开启。

      3. 使用 Java 监听 binlog

      3.1 选择工具:Canal

      阿里巴巴开源的 Canal 可以模拟 MySQL 从库协议,解析 binlog 并实时推送增量数据。

      3.2 Java 代码监听 binlog

      引入 Maven 依赖

      1. <dependencies>
      2. <dependency>
      3. <groupId>com.alibaba.otter</groupId>
      4. <artifactId>canal.client</artifactId>
      5. <version>1.1.6</version>
      6. </dependency>
      7. </dependencies>
      复制代码

      编写 Java 代码

      1. import com.alibaba.otter.canal.client.CanalConnector;
      2. import com.alibaba.otter.canal.client.CanalConnectors;
      3. import com.alibaba.otter.canal.protocol.CanalEntry;
      4. import com.alibaba.otter.canal.protocol.Message;
      5. import java.net.InetSocketAddress;
      6. import java.util.List;
      7. public class BinlogListener {
      8. public static void main(String[] args) {
      9. // 连接 Canal
      10. CanalConnector connector = CanalConnectors.newSingleConnector(
      11. new InetSocketAddress("127.0.0.1", 11111),
      12. "example", "canal", "canal");
      13. try {
      14. connector.connect();
      15. connector.subscribe(".*\\..*"); // 监听所有库表
      16. connector.rollback();
      17. while (true) {
      18. Message message = connector.getWithoutAck(100); // 获取数据
      19. long batchId = message.getId();
      20. List<CanalEntry.Entry> entries = message.getEntries();
      21. if (batchId != -1 && !entries.isEmpty()) {
      22. for (CanalEntry.Entry entry : entries) {
      23. if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
      24. processEntry(entry);
      25. }
      26. }
      27. }
      28. connector.ack(batchId); // 确认消息
      29. }
      30. } finally {
      31. connector.disconnect();
      32. }
      33. }
      34. private static void processEntry(CanalEntry.Entry entry) {
      35. try {
      36. CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
      37. CanalEntry.EventType eventType = rowChange.getEventType();
      38. System.out.println("变更表:" + entry.getHeader().getTableName());
      39. System.out.println("变更类型:" + eventType);
      40. for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
      41. if (eventType == CanalEntry.EventType.DELETE) {
      42. System.out.println("删除数据:" + rowData.getBeforeColumnsList());
      43. } else if (eventType == CanalEntry.EventType.INSERT) {
      44. System.out.println("新增数据:" + rowData.getAfterColumnsList());
      45. } else {
      46. System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
      47. System.out.println("更新后数据:" + rowData.getAfterColumnsList());
      48. }
      49. }
      50. } catch (Exception e) {
      51. e.printStackTrace();
      52. }
      53. }
      54. }
      复制代码

      4. 代码解析

      1.创建 Canal 连接

      1. CanalConnector connector = CanalConnectors.newSingleConnector(
      2. new InetSocketAddress("127.0.0.1", 11111),
      3. "example", "canal", "canal");
      复制代码
        1. 127.0.0.1
        复制代码
        :Canal 服务器地址
        1. 11111
        复制代码
        :Canal 端口
        1. example
        复制代码
        :Canal 实例
        1. canal/canal
        复制代码
        :默认账号密码

      2.获取 binlog 变更数据

      1. Message message = connector.getWithoutAck(100);
      复制代码

      1. getWithoutAck(100)
      复制代码
      :拉取 100 条 binlog 事件。

      3.解析 binlog

      1. for (CanalEntry.Entry entry : entries) {
      2. if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
      3. processEntry(entry);
      4. }
      5. }
      复制代码

      仅处理

      1. ROWDATA
      复制代码
      类型的变更,忽略事务等其他信息。

      4.分类处理 INSERT、UPDATE、DELETE

      1. if (eventType == CanalEntry.EventType.DELETE) {
      2. System.out.println("删除数据:" + rowData.getBeforeColumnsList());
      3. } else if (eventType == CanalEntry.EventType.INSERT) {
      4. System.out.println("新增数据:" + rowData.getAfterColumnsList());
      5. } else {
      6. System.out.println("更新前数据:" + rowData.getBeforeColumnsList());
      7. System.out.println("更新后数据:" + rowData.getAfterColumnsList());
      8. }
      复制代码

      总结

      • MySQL binlog 记录数据库变更,可用于监听增量数据。
      • Canal 作为 MySQL 从库解析 binlog,实现数据同步。
      • Java 代码示例 展示如何用 Canal 监听
        1. INSERT
        复制代码
        1. UPDATE
        复制代码
        1. DELETE
        复制代码
        操作,并解析变更数据。

      这种方案适用于 分布式数据同步缓存一致性数据变更通知,是实时数据处理的重要手段。

      到此这篇关于如何利用Java实现MySQL的数据变化监听的文章就介绍到这了,更多相关Java监听MySQL数据变化内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯!


      免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
      晓枫资讯-科技资讯社区-免责声明
      免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
            1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
            2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
            3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
            4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
            5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
      http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

        离线 

      TA的专栏

      等级头衔

      等級:晓枫资讯-列兵

      在线时间
      0 小时

      积分成就
      威望
      0
      贡献
      0
      主题
      0
      精华
      0
      金钱
      16
      积分
      12
      注册时间
      2022-12-25
      最后登录
      2022-12-25

      发表于 2025-4-1 07:23:10 | 显示全部楼层
      路过,支持一下
      http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
      严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
      您需要登录后才可以回帖 登录 | 立即注册

      本版积分规则

      1楼
      2楼

      手机版|晓枫资讯--科技资讯社区 本站已运行

      CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

      晓枫资讯--科技资讯社区

      本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

      如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

      Powered by Discuz! X3.5

      快速回复 返回顶部 返回列表