设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间20 现在时间20 缓存数据 和聪明人交流,和靠谱的人恋爱,和进取的人共事,和幽默的人随行。晚安!

和聪明人交流,和靠谱的人恋爱,和进取的人共事,和幽默的人随行。晚安!

查看: 121|回复: 0

Kafka的基本使用及环境安装

[复制链接]

  离线 

TA的专栏

  • 打卡等级:热心大叔
  • 打卡总天数:233
  • 打卡月天数:0
  • 打卡总奖励:3331
  • 最近打卡:2025-11-21 11:29:45
等级头衔

等級:晓枫资讯-上等兵

在线时间
0 小时

积分成就
威望
0
贡献
366
主题
348
精华
0
金钱
4457
积分
783
注册时间
2023-1-7
最后登录
2025-11-21

发表于 2025-8-27 23:51:37 | 显示全部楼层 |阅读模式

认识Kafka

消息队列

  1. 消息队列是分布式系统和现代应用架构中至关重要的<strong>中间件</strong>。它的核心作用是<strong>解耦</strong>、<strong>异步</strong>和<strong>削峰填谷</strong>,像一个高效的“通信员”和“缓冲池”协调不同组件之间的工作。
复制代码

消息队列的核心概念

  1. 生产者: 产生消息(数据、任务请求、事件通知)并发送到队列的应用程序或服务。

  2. 消息队列: 一个临时的、持久化的存储区域(通常基于内存、磁盘或数据库),用于存放生产者发送的消息。消息按照先进先出的顺序存储,但很多队列支持优先级、延迟等特性。

  3. 消费者: 从队列中获取消息并进行处理的应用程序或服务。

  4. 消息: 队列中传输的数据单元,通常包含有效载荷(实际数据)和元数据(如ID、时间戳、优先级等)。

核心价值与解决的问题

  1. 解耦:

    • 问题: 系统组件(服务)之间直接调用会导致紧密耦合。一个组件的变更、故障或性能瓶颈会直接影响其他依赖它的组件。扩展也变得困难。

    • 解决: 生产者只需将消息发送到队列,无需知道谁(消费者)会处理它,消费者只需从队列订阅消息,无需知道消息是谁(生产者)发送的。双方只依赖队列,不直接依赖对方,大大降低了耦合度。系统更灵活、更易于维护和扩展。

  2. 异步:

    • 问题: 同步调用要求调用方(生产者)必须等待被调用方(消费者)处理完成并返回结果才能继续执行。如果处理耗时很长,调用方会被阻塞,资源利用率低,用户体验差(如网页卡顿)。

    • 解决: 生产者发送消息到队列后即可返回,无需等待消费者处理。消费者在后台异步地从队列拉取消息进行处理。这显著提高了系统的吞吐量和响应速度。

  3. 削峰填谷:

    • 问题: 系统流量往往存在高峰和低谷。高峰期如果请求量远超消费者处理能力,会导致系统过载、崩溃或请求超时。低谷期资源又可能闲置。

    • 解决: 队列作为缓冲区,在流量高峰时积压请求,平滑地将大量请求暂存起来。消费者按照自己的稳定处理能力从队列中拉取消息进行处理,避免了瞬间洪峰压垮下游系统。在流量低谷时,消费者可以继续处理队列中积压的消息。

  4. 冗余与可靠性:

    • 问题: 直接调用时,如果消费者临时不可用(故障、重启、维护),生产者的请求会丢失或失败。

    • 解决: 消息队列通常提供消息持久化功能(将消息写入磁盘)。即使消费者暂时离线,消息也会安全存储在队列中,待消费者恢复后继续处理,确保消息不丢失。许多队列还提供确认机制(ACK),消费者处理成功后才会从队列中移除消息。

  5. 可伸缩性:

    • 问题: 单一消费者处理能力有限,难以应对增长的业务量。

    • 解决: 可以很容易地增加消费者的数量(水平扩展),让多个消费者并行地从同一个队列中拉取消息进行处理,显著提高系统的整体吞吐量。队列本身也可以做成分布式集群来应对高吞吐量需求。

  6. 顺序保证:

    • 问题: 在分布式环境中保证消息处理的严格顺序很困难。

    • 解决: 虽然完全全局有序很难,但许多消息队列能保证分区有序队列有序(在单个队列/分区内,消息按照发送顺序被消费)。这对于某些需要保证因果关系的业务场景(如账户流水)非常重要。

  7. 缓冲:

    • 问题: 生产者和消费者的处理速度不一致。

    • 解决: 队列天然提供了缓冲能力,允许生产者和消费者以各自不同的速率工作,不会互相拖累。

常见的消息队列有RabbitMQ,Kafka,RocketMQ。这里主要介绍Kafka。 

Kafka

  1. <strong>Kafka</strong> 通常指 <strong>Apache Kafka</strong>,这是一个开源的、分布式的、高吞吐量、低延迟的<strong>流处理平台</strong>。它最初由 LinkedIn 开发,后来捐赠给了 Apache 软件基金会,并迅速成为大数据和实时数据处理领域的核心基础设施之一。
复制代码

  Kafka 不仅仅是一个消息队列,它是一个高吞吐、低延迟、分布式、持久化、可水平扩展的流数据平台。它设计之初就是为了处理持续产生、体量巨大、需要实时处理的“数据流”

ZooKeeper是一个开源的分布式应用程序协调软件,而Kafka是分布式事件处理平台,底层是使用分布式架构设计,所以Kafka的多个节点之间是采用zookeeper来实现协调调度的。

ZooKeeper

ZooKeeper是一个开源的分布式应用程序协调软件,而Kafka是分布式事件处理平台,底层是使用分布式架构设计,所以Kafka的多个节点之间是采用zookeeper来实现协调调度的。

Zookeeper的核心作用

  1. ZooKeeper的数据存储结构可以简单地理解为一个Tree结构,而Tree结构上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在ZooKeeper节点中。
  2. ZooKeeper创建数据节点时,会根据业务场景创建临时节点或永久(持久)节点。永久节点就是无论客户端是否连接上ZooKeeper都一直存在的节点,而临时节点指的是客户端连接时创建,断开连接后删除的节点。同时,ZooKeeper也提供了Watch(监控)机制用于监控节点的变化,然后通知对应的客户端进行相应的变化。Kafka软件中就内置了ZooKeeper的客户端,用于进行ZooKeeper的连接和通信。
复制代码

Kafka的基本使用

环境安装

我们这里先安装简单的Windows单机环境。在安装之前务必先安装Java8。

下载Kafka:Kafka下载地址Apache Kafka: A Distributed Streaming Platform.

https://kafka.apache.org/downloads

选择版本为2.13-3.8.0

1.jpeg

下载完成后进行解压,解压目录放在非系统盘根目录下。为了访问方便,可以将解压后的文件夹名称修改为Kafka

Kafka的文件目录

bin

linux系统下可执行脚本文件

bin/windows

windows系统下可执行脚本文件

config

配置文件

libs

依赖类库

licenses

许可信息

site-docs

文档

logs

服务日志

启动zookeeper

当前版本的Kafka软件仍然依赖Zookeeper,所以启动Kafka之前,需要先启动Zookeeper,Kafka软件内置了Zookeeper,所以无需额外安装,直接调用启动脚本即可。

 1. 进入Kafka解压缩文件夹的config目录,修改zookeeper.properties配置文件

  1. 修改dataDir配置,用于设置ZooKeeper数据存储位置,该路径如果不存在会自动创建。
  2. dataDir=D:/kafka/data/zk
复制代码

2.jpeg

在kafka解压缩后的目录中创建Zookeeper启动脚本文件:zk.cmd。

  1. 输入:
  2. <span>call bin/windows/zookeeper-server-start.bat config/zookeeper.properties</span>
复制代码

上述指令就是调用zookeeper启动命令,同时指定配置文件 

双击启动即可:

3.jpeg

 启动完成。

启动Kafka

进入Kafka解压缩文件夹的config目录,修改server.properties配置文件.

4.jpeg

设置Kafka数据的存储目录。如果文件目录不存在,会自动生成。

在kafka解压缩后的目录中创建Kafka启动脚本文件:kfk.cmd。

  1. 输入:
  2. call bin/windows/kafka-server-start.bat config/server.properties
复制代码

双击启动即可: 

 

5.jpeg

DOS窗口中,输入jps指令,查看当前启动的软件进程:

6.jpeg

这里名称为QuorumPeerMain的就是ZooKeeper软件进程,名称为Kafka的就是Kafka系统进程。此时,说明Kafka已经可以正常使用了。 

消息主题

  1.   在发布订阅模型中,为了让消费者对感兴趣的消息进行消费,而不是消费所有消息,所以就定义了主题(Topic),也就是说将不同的消息进行分类,分成不同的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送,而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
复制代码

有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。而对于初学者来讲,掌握基本的命令行操作是必要的。所以接下来,我们采用命令行进行操作。

创建主题

使用命令行方式创建主题test

打开DOS窗口,在确保Zookeeper和Kafkfa启动的情况下,进入Kafkfa解压目录下的bin/windows目录。

输入如下命令创建主题test: kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test

7.jpeg

test主题创建完成。

查询主题

输入如下命令进行主题查询:kafka-topics.bat --bootstrap-server localhost:9092 --list

8.jpeg

修改主题

kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

上述命令将test主题的分区数量设置为2.关于分区的信息,后面会详细介绍。

发送数据

命令行操作

使用命令行方式发送:

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test

9.jpeg

上述操作就是在控制台生成数据,hello kafka 这里的数据需要回车,才会发送到Kafka服务器。

JavaAPI操作

引入依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>3.8.0</version>
  5. </dependency>
复制代码

 编写生产者

  1. public class ProducerTest {
  2. public static void main(String[] args) {
  3. // 配置属性集合
  4. Map<String, Object> configMap = new HashMap<>();
  5. // 配置属性:Kafka服务器集群地址
  6. configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  7. // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
  8. configMap.put(
  9. ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  10. "org.apache.kafka.common.serialization.StringSerializer");
  11. configMap.put(
  12. ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  13. "org.apache.kafka.common.serialization.StringSerializer");
  14. // 创建Kafka生产者对象,建立Kafka连接
  15. // 构造对象时,需要传递配置参数
  16. KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
  17. // 准备数据,定义泛型
  18. // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
  19. for (int i = 0; i < 10; i++) {
  20. ProducerRecord<String, String> record = new ProducerRecord<String, String>(
  21. "test", "key" + i, "value" + i
  22. );
  23. // 生产(发送)数据
  24. producer.send(record);
  25. }
  26. // 关闭生产者连接
  27. producer.close();
  28. }
  29. }
复制代码

消费数据

命令行操作

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

10.jpeg

JavaAPI操作

  1. public class ConsumerTest {
  2. public static void main(String[] args) {
  3. // 创建配置对象
  4. Map<String, Object> configMap = new HashMap<>();
  5. configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  6. // 反序列化类配置
  7. configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  8. configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  9. // 组ID配置
  10. configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
  11. // 创建消费者对象
  12. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);
  13. // 从kafka主题中获取对象 订阅主题
  14. consumer.subscribe(Collections.singleton("test"));
  15. // 消费者从Kafka主题中拉取数据
  16. while (true) {
  17. ConsumerRecords<String, String> datas = consumer.poll(100);
  18. for (ConsumerRecord<String, String> data : datas) {
  19. System.out.println(data);
  20. }
  21. }
  22. // 关闭消费者对象
  23. // consumer.close();
  24. }
  25. }
复制代码

到此这篇关于Kafka的基本使用的文章就介绍到这了,更多相关Kafka使用内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
      1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
      2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
      3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
      4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
      5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|晓枫资讯--科技资讯社区 本站已运行

CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

晓枫资讯--科技资讯社区

本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

Powered by Discuz! X3.5

快速回复 返回顶部 返回列表