认识Kafka
消息队列
- 消息队列是分布式系统和现代应用架构中至关重要的<strong>中间件</strong>。它的核心作用是<strong>解耦</strong>、<strong>异步</strong>和<strong>削峰填谷</strong>,像一个高效的“通信员”和“缓冲池”协调不同组件之间的工作。
复制代码
消息队列的核心概念
生产者: 产生消息(数据、任务请求、事件通知)并发送到队列的应用程序或服务。
消息队列: 一个临时的、持久化的存储区域(通常基于内存、磁盘或数据库),用于存放生产者发送的消息。消息按照先进先出的顺序存储,但很多队列支持优先级、延迟等特性。
消费者: 从队列中获取消息并进行处理的应用程序或服务。
消息: 队列中传输的数据单元,通常包含有效载荷(实际数据)和元数据(如ID、时间戳、优先级等)。
核心价值与解决的问题
解耦:
异步:
削峰填谷:
冗余与可靠性:
可伸缩性:
顺序保证:
缓冲:
常见的消息队列有RabbitMQ,Kafka,RocketMQ。这里主要介绍Kafka。
Kafka
- <strong>Kafka</strong> 通常指 <strong>Apache Kafka</strong>,这是一个开源的、分布式的、高吞吐量、低延迟的<strong>流处理平台</strong>。它最初由 LinkedIn 开发,后来捐赠给了 Apache 软件基金会,并迅速成为大数据和实时数据处理领域的核心基础设施之一。
复制代码
Kafka 不仅仅是一个消息队列,它是一个高吞吐、低延迟、分布式、持久化、可水平扩展的流数据平台。它设计之初就是为了处理持续产生、体量巨大、需要实时处理的“数据流”。
ZooKeeper是一个开源的分布式应用程序协调软件,而Kafka是分布式事件处理平台,底层是使用分布式架构设计,所以Kafka的多个节点之间是采用zookeeper来实现协调调度的。
ZooKeeper
ZooKeeper是一个开源的分布式应用程序协调软件,而Kafka是分布式事件处理平台,底层是使用分布式架构设计,所以Kafka的多个节点之间是采用zookeeper来实现协调调度的。
Zookeeper的核心作用
- ZooKeeper的数据存储结构可以简单地理解为一个Tree结构,而Tree结构上的每一个节点可以用于存储数据,所以一般情况下,我们可以将分布式系统的元数据(环境信息以及系统配置信息)保存在ZooKeeper节点中。
- ZooKeeper创建数据节点时,会根据业务场景创建临时节点或永久(持久)节点。永久节点就是无论客户端是否连接上ZooKeeper都一直存在的节点,而临时节点指的是客户端连接时创建,断开连接后删除的节点。同时,ZooKeeper也提供了Watch(监控)机制用于监控节点的变化,然后通知对应的客户端进行相应的变化。Kafka软件中就内置了ZooKeeper的客户端,用于进行ZooKeeper的连接和通信。
复制代码
Kafka的基本使用
环境安装
我们这里先安装简单的Windows单机环境。在安装之前务必先安装Java8。
下载Kafka:Kafka下载地址Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/downloads
选择版本为2.13-3.8.0
下载完成后进行解压,解压目录放在非系统盘根目录下。为了访问方便,可以将解压后的文件夹名称修改为Kafka
Kafka的文件目录
bin | linux系统下可执行脚本文件 |
bin/windows | windows系统下可执行脚本文件 |
config | 配置文件 |
libs | 依赖类库 |
licenses | 许可信息 |
site-docs | 文档 |
logs | 服务日志 |
启动zookeeper
当前版本的Kafka软件仍然依赖Zookeeper,所以启动Kafka之前,需要先启动Zookeeper,Kafka软件内置了Zookeeper,所以无需额外安装,直接调用启动脚本即可。
1. 进入Kafka解压缩文件夹的config目录,修改zookeeper.properties配置文件
- 修改dataDir配置,用于设置ZooKeeper数据存储位置,该路径如果不存在会自动创建。
- dataDir=D:/kafka/data/zk
复制代码
在kafka解压缩后的目录中创建Zookeeper启动脚本文件:zk.cmd。
- 输入:
- <span>call bin/windows/zookeeper-server-start.bat config/zookeeper.properties</span>
复制代码
上述指令就是调用zookeeper启动命令,同时指定配置文件
双击启动即可:
启动完成。
启动Kafka
进入Kafka解压缩文件夹的config目录,修改server.properties配置文件.
设置Kafka数据的存储目录。如果文件目录不存在,会自动生成。
在kafka解压缩后的目录中创建Kafka启动脚本文件:kfk.cmd。
- 输入:
- call bin/windows/kafka-server-start.bat config/server.properties
复制代码
双击启动即可:
DOS窗口中,输入jps指令,查看当前启动的软件进程:
这里名称为QuorumPeerMain的就是ZooKeeper软件进程,名称为Kafka的就是Kafka系统进程。此时,说明Kafka已经可以正常使用了。
消息主题
- 在发布订阅模型中,为了让消费者对感兴趣的消息进行消费,而不是消费所有消息,所以就定义了主题(Topic),也就是说将不同的消息进行分类,分成不同的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送,而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
复制代码
有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。而对于初学者来讲,掌握基本的命令行操作是必要的。所以接下来,我们采用命令行进行操作。
创建主题
使用命令行方式创建主题test
打开DOS窗口,在确保Zookeeper和Kafkfa启动的情况下,进入Kafkfa解压目录下的bin/windows目录。
输入如下命令创建主题test: kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
test主题创建完成。
查询主题
输入如下命令进行主题查询:kafka-topics.bat --bootstrap-server localhost:9092 --list
修改主题
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2
上述命令将test主题的分区数量设置为2.关于分区的信息,后面会详细介绍。
发送数据
命令行操作
使用命令行方式发送:
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
上述操作就是在控制台生成数据,hello kafka 这里的数据需要回车,才会发送到Kafka服务器。
JavaAPI操作
引入依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.8.0</version>
- </dependency>
复制代码
编写生产者
- public class ProducerTest {
- public static void main(String[] args) {
- // 配置属性集合
- Map<String, Object> configMap = new HashMap<>();
- // 配置属性:Kafka服务器集群地址
- configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
- configMap.put(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
- configMap.put(
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
- "org.apache.kafka.common.serialization.StringSerializer");
- // 创建Kafka生产者对象,建立Kafka连接
- // 构造对象时,需要传递配置参数
- KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
- // 准备数据,定义泛型
- // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
- for (int i = 0; i < 10; i++) {
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(
- "test", "key" + i, "value" + i
- );
- // 生产(发送)数据
- producer.send(record);
- }
- // 关闭生产者连接
- producer.close();
- }
- }
复制代码
消费数据
命令行操作
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
JavaAPI操作
- public class ConsumerTest {
- public static void main(String[] args) {
- // 创建配置对象
- Map<String, Object> configMap = new HashMap<>();
- configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- // 反序列化类配置
- configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- // 组ID配置
- configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
- // 创建消费者对象
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);
- // 从kafka主题中获取对象 订阅主题
- consumer.subscribe(Collections.singleton("test"));
- // 消费者从Kafka主题中拉取数据
- while (true) {
- ConsumerRecords<String, String> datas = consumer.poll(100);
- for (ConsumerRecord<String, String> data : datas) {
- System.out.println(data);
- }
- }
- // 关闭消费者对象
- // consumer.close();
- }
- }
复制代码
到此这篇关于Kafka的基本使用的文章就介绍到这了,更多相关Kafka使用内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!