设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间15 现在时间15 缓存数据 哪怕他只唱了几句,哪怕别人不仔细听都听不到他的声音,他还是要固执的在自己的名字后面写上他的名字,从他们两个名字写在一起的第一天起,就注定了一辈子不会分开[爱心]

哪怕他只唱了几句,哪怕别人不仔细听都听不到他的声音,他还是要固执的在自己的名字后面写上他的名字,从他们两个名字写在一起的第一天起,就注定了一辈子不会分开[爱心] -- 怎么了,没什么

查看: 963|回复: 3

详解Flink同步Kafka数据到ClickHouse分布式表

[复制链接]

  离线 

TA的专栏

  • 打卡等级:常驻代表
  • 打卡总天数:31
  • 打卡月天数:0
  • 打卡总奖励:397
  • 最近打卡:2025-06-26 20:31:25
等级头衔

等級:晓枫资讯-上等兵

在线时间
0 小时

积分成就
威望
0
贡献
370
主题
314
精华
0
金钱
1447
积分
752
注册时间
2023-2-11
最后登录
2025-6-26

发表于 2023-3-4 18:28:01 | 显示全部楼层 |阅读模式
引言

业务需要一种OLAP引擎,可以做到实时写入存储和查询计算功能,提供高效、稳健的实时数据服务,最终决定
  1. ClickHouse
复制代码

什么是ClickHouse?

ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。
列式数据库更适合于OLAP场景(对于大多数查询而言,处理速度至少提高了100倍),下面详细解释了原因(通过图片更有利于直观理解),图片来源于ClickHouse中文官方文档。
行式
192836n4iponii2utjozp6.webp

列式
192836oiig1zz080gzgzyl.webp

我们使用Flink编写程序,消费kafka里面的主题数据,清洗、归一,写入到clickhouse里面去。
这里的关键点,由于第一次使用,无法分清应该建立什么格式的clickhouse表,出现了一些问题,最大的问题就是程序将数据写入了,查询发现数据不完整,只有一部分。我也在网上查了一些原因,总结下来。
为什么有时看不到已经创建好的表并且查询结果一直抖动时多时少?
常见原因1:
建表流程存在问题。ClickHouse的分布式集群搭建并没有原生的分布式DDL语义。如果您在自建ClickHouse集群时使用create table创建表,查询虽然返回了成功,但实际这个表只在当前连接的Server上创建了。下次连接重置换一个Server,您就看不到这个表了。
解决方案:
建表时,请使用create table <table_name> on cluster default语句,on cluster default声明会把这条语句广播给default集群的所有节点进行执行。示例代码如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再创建一个分布式表引擎,建表语句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));
常见原因2:
ReplicatedMergeTree存储表配置有问题。ReplicatedMergeTree表引擎是对应MergeTree表引擎的主备同步增强版,在单副本实例上限定只能创建MergeTree表引擎,在双副本实例上只能创建ReplicatedMergeTree表引擎。
解决方案:
在双副本实例上建表时,请使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)为固定配置,无需修改。
这里引出了复制表的概念,这里介绍一下,只有 MergeTree 系列里的表可支持副本:
  1. ReplicatedMergeTree
  2. ReplicatedSummingMergeTree
  3. ReplicatedReplacingMergeTree
  4. ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree
  5. ReplicatedVersionedCollapsingMergeTree
  6. ReplicatedGraphiteMergeTree
复制代码
副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表。副本不依赖分片。每个分片有它自己的独立副本。

创建复制表

先做好准备工作,该建表的建表,然后编写程序。在表引擎名称上加上 Replicated 前缀。例如:ReplicatedMergeTree。

  • 首先创建一个分布式数据库
  1. create database test on cluster default_cluster;
复制代码

  • 创建本地表
由于clickhouse是分布式的,创建本地表本来应该在每个节点上创建的,但是指定on cluster关键字可以直接完成,建表语句如下:
  1. CREATE TABLE test.test_data_shade on cluster default_cluster
  2. (
  3.     `data` Map(String, String),
  4.     `uid` String,
  5.     `remote_addr` String,
  6.     `time` Datetime64,
  7.     `status` Int32,
  8.     ...其它字段省略
  9.     `dt` String
  10. )
  11. ENGINE = ReplicatedMergeTree()
  12. partition by dt
  13. order by (dt, sipHash64(uid));
复制代码
这里表引擎为ReplicatedMergeTree,即有副本的表,根据dt按天分区,提升查询效率,sipHash64是一个hash函数,根据uid散列使得相同uid数据在同一个分片上面,如果有去重需求,速度更快,因为可以计算每个分片去重,再汇总一下即可。

  • 创建分布式表
  1. CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));
复制代码
在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表写入或读取数据,Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

通过jdbc写入

这个我是看的官方文档,里面有2种选择,感兴趣的同学可以都去尝试一下。
192836uwrhco8x8zrf8f3w.png

这里贴一下我的Pom依赖
  1. <dependency>
  2.     <groupId>ru.yandex.clickhouse</groupId>
  3.     <artifactId>clickhouse-jdbc</artifactId>
  4.     <version>0.3.1-patch</version>
  5.     <classifier>shaded</classifier>
  6.     <exclusions>
  7.         <exclusion>
  8.             <groupId>*</groupId>
  9.             <artifactId>*</artifactId>
  10.         </exclusion>
  11.     </exclusions>
  12. </dependency>
复制代码
Flink主程序,消费kafka,做清洗,然后写入clickhouse,这都是常规操作,这里贴一下关键代码吧。
192837wb1756liibn7z7iy.png

连接clickhouse有2种方式,8123端口的http方式,和基于9000端口的tcp方式。
这里官方推荐的是连接驱动是0.3.2:
  1. <dependency>
  2.     <!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
  3.     <groupId>com.clickhouse</groupId>
  4.     <artifactId>clickhouse-jdbc</artifactId>
  5.     <version>0.3.2-patch11</version>
  6.     <classifier>all</classifier>
  7.     <exclusions>
  8.         <exclusion>
  9.             <groupId>*</groupId>
  10.             <artifactId>*</artifactId>
  11.         </exclusion>
  12.     </exclusions>
  13. </dependency>
复制代码
  1. Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.
复制代码
192837m6f6bf9mpbbbot1t.png

官方推荐升级到0.3.2,上面表格给出了升级方法,文档地址:
github.com/ClickHouse/…
以上就是详解Flink同步Kafka数据到ClickHouse分布式表的详细内容,更多关于Flink数据同步Kafka ClickHouse的资料请关注晓枫资讯其它相关文章!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
      1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
      2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
      3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
      4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
      5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
20
积分
20
注册时间
2022-12-29
最后登录
2022-12-29

发表于 2023-4-19 05:05:20 | 显示全部楼层
谢谢分享~~~~~
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
21
积分
22
注册时间
2022-12-28
最后登录
2022-12-28

发表于 2024-10-31 20:06:46 | 显示全部楼层
顶顶更健康!!!
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
19
积分
18
注册时间
2022-12-29
最后登录
2022-12-29

发表于 昨天 14:51 | 显示全部楼层
感谢楼主分享。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1楼
2楼
3楼
4楼

手机版|晓枫资讯--科技资讯社区 本站已运行

CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

晓枫资讯--科技资讯社区

本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

Powered by Discuz! X3.5

快速回复 返回顶部 返回列表