设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间06 现在时间06 缓存数据 只有内心祥和,才不会被生活所左右,所以一定要从容淡泊。

只有内心祥和,才不会被生活所左右,所以一定要从容淡泊。

查看: 1522|回复: 3

SpringBoot 整合 Avro 与 Kafka的详细过程

[复制链接]

  离线 

TA的专栏

  • 打卡等级:即来则安
  • 打卡总天数:25
  • 打卡月天数:0
  • 打卡总奖励:338
  • 最近打卡:2025-03-18 17:57:49
等级头衔

等級:晓枫资讯-上等兵

在线时间
0 小时

积分成就
威望
0
贡献
351
主题
329
精华
0
金钱
1395
积分
736
注册时间
2023-2-10
最后登录
2025-3-18

发表于 2024-12-3 22:53:40 | 显示全部楼层 |阅读模式
目录
  • 一、环境介绍
  • 二、Avro 文件
  • 三、为 Kafka 主题生成 Avro消息
  • 四、AvroConfig 配置类
  • 五、通过 kafkaTemplate 发送消息
  • 六、从 Kafka主题消费 Avro消息反序列化
  • 七、反序列化的配置类
  • 八、消费者消费消息

【需求】:生产者发送数据至 kafka 序列化使用 Avro,消费者通过 Avro 进行反序列化,并将数据通过 MyBatisPlus 存入数据库。

一、环境介绍

【1】Apache Avro 1.8;【2】Spring Kafka 1.2;【3】Spring Boot 1.5;【4】Maven 3.5;

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.codenotfound</groupId>
  6. <artifactId>spring-kafka-avro</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <name>spring-kafka-avro</name>
  9. <description>Spring Kafka - Apache Avro Serializer Deserializer Example</description>
  10. <url>https://www.codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html</url>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>1.5.4.RELEASE</version>
  15. </parent>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. <spring-kafka.version>1.2.2.RELEASE</spring-kafka.version>
  19. <avro.version>1.8.2</avro.version>
  20. </properties>
  21. <dependencies>
  22. <!-- spring-boot -->
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>
  32. <!-- spring-kafka -->
  33. <dependency>
  34. <groupId>org.springframework.kafka</groupId>
  35. <artifactId>spring-kafka</artifactId>
  36. <version>${spring-kafka.version}</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.kafka</groupId>
  40. <artifactId>spring-kafka-test</artifactId>
  41. <version>${spring-kafka.version}</version>
  42. <scope>test</scope>
  43. </dependency>
  44. <!-- avro -->
  45. <dependency>
  46. <groupId>org.apache.avro</groupId>
  47. <artifactId>avro</artifactId>
  48. <version>${avro.version}</version>
  49. </dependency>
  50. </dependencies>
  51. <build>
  52. <plugins>
  53. <!-- spring-boot-maven-plugin -->
  54. <plugin>
  55. <groupId>org.springframework.boot</groupId>
  56. <artifactId>spring-boot-maven-plugin</artifactId>
  57. </plugin>
  58. <!-- avro-maven-plugin -->
  59. <plugin>
  60. <groupId>org.apache.avro</groupId>
  61. <artifactId>avro-maven-plugin</artifactId>
  62. <version>${avro.version}</version>
  63. <executions>
  64. <execution>
  65. <phase>generate-sources</phase>
  66. <goals>
  67. <goal>schema</goal>
  68. </goals>
  69. <configuration>
  70. <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
  71. <outputDirectory>${project.build.directory}/generated/avro</outputDirectory>
  72. </configuration>
  73. </execution>
  74. </executions>
  75. </plugin>
  76. </plugins>
  77. </build>
  78. </project>
复制代码

二、Avro 文件

【1】Avro 依赖于由使用JSON定义的原始类型组成的架构。对于此示例,我们将使用Apache Avro入门指南中的“用户”模式,如下所示。该模式存储在src / main / resources / avro下的 user.avsc文件中。我这里使用的是 electronicsPackage.avsc。namespace 指定你生成 java 类时指定的 package 路径,name 表时生成的文件。

  1. {"namespace": "com.yd.cyber.protocol.avro",
  2. "type": "record",
  3. "name": "ElectronicsPackage",
  4. "fields": [
  5. {"name":"package_number","type":["string","null"],"default": null},
  6. {"name":"frs_site_code","type":["string","null"],"default": null},
  7. {"name":"frs_site_code_type","type":["string","null"],"default":null},
  8. {"name":"end_allocate_code","type":["string","null"],"default": null},
  9. {"name":"code_1","type":["string","null"],"default": null},
  10. {"name":"aggregat_package_code","type":["string","null"],"default": null}
  11. ]
  12. }
复制代码

【2】Avro附带了代码生成功能,该代码生成功能使我们可以根据上面定义的“用户”模式自动创建Java类。一旦生成了相关的类,就无需直接在程序中使用架构。这些类可以使用 avro-tools.jar 或项目是Maven 项目,调用 Maven Projects 进行 compile 自动生成 electronicsPackage.java 文件:如下是通过 maven 的方式

1.png

【3】这将导致生成一个 electronicsPackage.java 类,该类包含架构和许多 Builder构造 electronicsPackage对象的方法。

2.png

三、为 Kafka 主题生成 Avro消息

Kafka Byte 在其主题中存储和传输数组。但是,当我们使用 Avro对象时,我们需要在这些 Byte数组之间进行转换。在0.9.0.0版之前,Kafka Java API使用 Encoder/ Decoder接口的实现来处理转换,但是在新API中,这些已经被 Serializer/ Deserializer接口实现代替。Kafka附带了许多 内置(反)序列化器,但不包括Avro。为了解决这个问题,我们将创建一个 AvroSerializer类,该类Serializer专门为 Avro对象实现接口。然后,我们实现将 serialize() 主题名称和数据对象作为输入的方法,在本例中,该对象是扩展的 Avro对象 SpecificRecordBase。该方法将Avro对象序列化为字节数组并返回结果。这个类属于通用类,一次配置多次使用。

  1. package com.yd.cyber.web.avro;
  2. import java.io.ByteArrayOutputStream;
  3. import java.io.IOException;
  4. import java.util.Map;
  5. import org.apache.avro.io.BinaryEncoder;
  6. import org.apache.avro.io.DatumWriter;
  7. import org.apache.avro.io.EncoderFactory;
  8. import org.apache.avro.specific.SpecificDatumWriter;
  9. import org.apache.avro.specific.SpecificRecordBase;
  10. import org.apache.kafka.common.errors.SerializationException;
  11. import org.apache.kafka.common.serialization.Serializer;
  12. /**
  13. * avro序列化类
  14. * @author zzx
  15. * @creat 2020-03-11-19:17
  16. */
  17. public class AvroSerializer<T extends SpecificRecordBase> implements Serializer<T> {
  18. @Override
  19. public void close() {}
  20. @Override
  21. public void configure(Map<String, ?> arg0, boolean arg1) {}
  22. @Override
  23. public byte[] serialize(String topic, T data) {
  24. if(data == null) {
  25. return null;
  26. }
  27. DatumWriter<T> writer = new SpecificDatumWriter<>(data.getSchema());
  28. ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
  29. BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(byteArrayOutputStream , null);
  30. try {
  31. writer.write(data, binaryEncoder);
  32. binaryEncoder.flush();
  33. byteArrayOutputStream.close();
  34. }catch (IOException e) {
  35. throw new SerializationException(e.getMessage());
  36. }
  37. return byteArrayOutputStream.toByteArray();
  38. }
  39. }
复制代码

四、AvroConfig 配置类

Avro 配置信息在 AvroConfig 配置类中,现在,我们需要更改,AvroConfig 开始使用我们的自定义 Serializer实现。这是通过将“ VALUE_SERIALIZER_CLASS_CONFIG”属性设置为 AvroSerializer该类来完成的。此外,我们更改了ProducerFactory 和KafkaTemplate 通用类型,使其指定 ElectronicsPackage 而不是 String。当我们有多个序列化的时候,这个配置文件需要多次需求,添加自己需要序列化的对象。

  1. package com.yd.cyber.web.avro;
  2. /**
  3. * @author zzx
  4. * @creat 2020-03-11-20:23
  5. */
  6. @Configuration
  7. @EnableKafka
  8. public class AvroConfig {
  9. @Value("${spring.kafka.bootstrap-servers}")
  10. private String bootstrapServers;
  11. @Value("${spring.kafka.producer.max-request-size}")
  12. private String maxRequestSize;
  13. @Bean
  14. public Map<String, Object> avroProducerConfigs() {
  15. Map<String, Object> props = new HashMap<>();
  16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  17. props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
  18. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  19. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroSerializer.class);
  20. return props;
  21. }
  22. @Bean
  23. public ProducerFactory<String, ElectronicsPackage> elProducerFactory() {
  24. return new DefaultKafkaProducerFactory<>(avroProducerConfigs());
  25. }
  26. @Bean
  27. public KafkaTemplate<String, ElectronicsPackage> elKafkaTemplate() {
  28. return new KafkaTemplate<>(elProducerFactory());
  29. }
  30. }
复制代码

五、通过 kafkaTemplate 发送消息

最后就是通过 Controller类调用 kafkaTemplate 的 send 方法接受一个Avro electronicsPackage对象作为输入。请注意,我们还更新了 kafkaTemplate 泛型类型。

  1. package com.yd.cyber.web.controller.aggregation;
  2. import com.yd.cyber.protocol.avro.ElectronicsPackage;
  3. import com.yd.cyber.web.vo.ElectronicsPackageVO;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.beans.BeanUtils;
  7. import org.springframework.kafka.core.KafkaTemplate;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.RequestMapping;
  10. import org.springframework.web.bind.annotation.RestController;
  11. import javax.annotation.Resource;
  12. /**
  13. * <p>
  14. * InnoDB free: 4096 kB 前端控制器
  15. * </p>
  16. *
  17. * @author zzx
  18. * @since 2020-04-19
  19. */
  20. @RestController
  21. @RequestMapping("/electronicsPackageTbl")
  22. public class ElectronicsPackageController {
  23. //日誌
  24. private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageController.class);
  25. @Resource
  26. private KafkaTemplate<String,ElectronicsPackage> kafkaTemplate;
  27. @GetMapping("/push")
  28. public void push(){
  29. ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();
  30. electronicsPackageVO.setElectId(9);
  31. electronicsPackageVO.setAggregatPackageCode("9");
  32. electronicsPackageVO.setCode1("9");
  33. electronicsPackageVO.setEndAllocateCode("9");
  34. electronicsPackageVO.setFrsSiteCodeType("9");
  35. electronicsPackageVO.setFrsSiteCode("9");
  36. electronicsPackageVO.setPackageNumber("9");
  37. ElectronicsPackage electronicsPackage = new ElectronicsPackage();
  38. BeanUtils.copyProperties(electronicsPackageVO,electronicsPackage);
  39. //发送消息
  40. kafkaTemplate.send("Electronics_Package",electronicsPackage);
  41. log.info("Electronics_Package TOPIC 发送成功");
  42. }
  43. }
复制代码

六、从 Kafka主题消费 Avro消息反序列化

收到的消息需要反序列化为 Avro格式。为此,我们创建一个 AvroDeserializer 实现该 Deserializer接口的类。该 deserialize()方法将主题名称和Byte数组作为输入,然后将其解码回Avro对象。从 targetType类参数中检索需要用于解码的模式,该类参数需要作为参数传递给 AvroDeserializer构造函数。

  1. package com.yd.cyber.web.avro;
  2. import java.io.ByteArrayInputStream;
  3. import java.io.IOException;
  4. import java.util.Arrays;
  5. import java.util.Map;
  6. import org.apache.avro.generic.GenericRecord;
  7. import org.apache.avro.io.BinaryDecoder;
  8. import org.apache.avro.io.DatumReader;
  9. import org.apache.avro.io.DecoderFactory;
  10. import org.apache.avro.specific.SpecificDatumReader;
  11. import org.apache.avro.specific.SpecificRecordBase;
  12. import org.apache.kafka.common.errors.SerializationException;
  13. import org.apache.kafka.common.serialization.Deserializer;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import javax.xml.bind.DatatypeConverter;
  17. /**
  18. * avro反序列化
  19. * @author fuyx
  20. * @creat 2020-03-12-15:19
  21. */
  22. public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
  23. //日志系统
  24. private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializer.class);
  25. protected final Class<T> targetType;
  26. public AvroDeserializer(Class<T> targetType) {
  27. this.targetType = targetType;
  28. }
  29. @Override
  30. public void close() {}
  31. @Override
  32. public void configure(Map<String, ?> arg0, boolean arg1) {}
  33. @Override
  34. public T deserialize(String topic, byte[] data) {
  35. try {
  36. T result = null;
  37. if(data == null) {
  38. return null;
  39. }
  40. LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
  41. ByteArrayInputStream in = new ByteArrayInputStream(data);
  42. DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());
  43. BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
  44. result = (T) userDatumReader.read(null, decoder);
  45. LOGGER.debug("deserialized data='{}'", result);
  46. return result;
  47. } catch (Exception ex) {
  48. throw new SerializationException(
  49. "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
  50. } finally {
  51. }
  52. }
  53. }
复制代码

七、反序列化的配置类

我将反序列化的配置和序列化的配置都放置在 AvroConfig 配置类中。在 AvroConfig 需要被这样更新了AvroDeserializer用作值“VALUE_DESERIALIZER_CLASS_CONFIG”属性。我们还更改了 ConsumerFactory 和 ConcurrentKafkaListenerContainerFactory通用类型,以使其指定 ElectronicsPackage 而不是 String。将 DefaultKafkaConsumerFactory 通过1个新的创造 AvroDeserializer 是需要 “User.class”作为构造函数的参数。需要使用Class targetType,AvroDeserializer 以将消费 byte[]对象反序列化为适当的目标对象(在此示例中为 ElectronicsPackage 类)。

  1. @Configuration
  2. @EnableKafka
  3. public class AvroConfig {
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String bootstrapServers;
  6. @Value("${spring.kafka.producer.max-request-size}")
  7. private String maxRequestSize;
  8. @Bean
  9. public Map<String, Object> consumerConfigs() {
  10. Map<String, Object> props = new HashMap<>();
  11. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  12. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  13. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroDeserializer.class);
  14. props.put(ConsumerConfig.GROUP_ID_CONFIG, "avro");
  15. return props;
  16. }
  17. @Bean
  18. public ConsumerFactory<String, ElectronicsPackage> consumerFactory() {
  19. return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
  20. new AvroDeserializer<>(ElectronicsPackage.class));
  21. }
  22. @Bean
  23. public ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> kafkaListenerContainerFactory() {
  24. ConcurrentKafkaListenerContainerFactory<String, ElectronicsPackage> factory =
  25. new ConcurrentKafkaListenerContainerFactory<>();
  26. factory.setConsumerFactory(consumerFactory());
  27. return factory;
  28. }
  29. }
复制代码

八、消费者消费消息

消费者通过 @KafkaListener 监听对应的 Topic ,这里需要注意的是,网上直接获取对象的参数传的是对象,比如这里可能需要传入 ElectronicsPackage 类,但是我这样写的时候,error日志总说是返回序列化的问题,所以我使用 GenericRecord 对象接收,也就是我反序列化中定义的对象,是没有问题的。然后我将接收到的消息通过 mybatisplus 存入到数据库。

  1. package com.zzx.cyber.web.controller.dataSource.intercompany;
  2. import com.zzx.cyber.web.service.ElectronicsPackageService;
  3. import com.zzx.cyber.web.vo.ElectronicsPackageVO;
  4. import org.apache.avro.generic.GenericRecord;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.BeanUtils;
  8. import org.springframework.kafka.annotation.KafkaListener;
  9. import org.springframework.stereotype.Controller;
  10. import javax.annotation.Resource;
  11. /**
  12. * @desc:
  13. * @author: zzx
  14. * @creatdate 2020/4/1912:21
  15. */
  16. @Controller
  17. public class ElectronicsPackageConsumerController {
  18. //日志
  19. private static final Logger log = LoggerFactory.getLogger(ElectronicsPackageConsumerController.class);
  20. //服务层
  21. @Resource
  22. private ElectronicsPackageService electronicsPackageService;
  23. /**
  24. * 扫描数据测试
  25. * @param genericRecordne
  26. */
  27. @KafkaListener(topics = {"Electronics_Package"})
  28. public void receive(GenericRecord genericRecordne) throws Exception {
  29. log.info("数据接收:electronicsPackage + "+ genericRecordne.toString());
  30. //业务处理类,mybatispuls 自动生成的类
  31. ElectronicsPackageVO electronicsPackageVO = new ElectronicsPackageVO();
  32. //将收的数据复制过来
  33. BeanUtils.copyProperties(genericRecordne,electronicsPackageVO);
  34. try {
  35. //落库
  36. log.info("数据入库");
  37. electronicsPackageService.save(electronicsPackageVO);
  38. } catch (Exception e) {
  39. throw new Exception("插入异常"+e);
  40. }
  41. }
  42. }
复制代码

到此这篇关于SpringBoot 整合 Avro 与 Kafka的文章就介绍到这了,更多相关SpringBoot 整合 Avro 与 Kafka内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯!


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
      1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
      2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
      3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
      4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
      5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
21
积分
22
注册时间
2022-12-26
最后登录
2022-12-26

发表于 2025-1-23 21:32:28 | 显示全部楼层
路过,支持一下
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
24
积分
28
注册时间
2022-12-25
最后登录
2022-12-25

发表于 2025-2-12 16:24:19 | 显示全部楼层
感谢楼主分享。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
20
积分
20
注册时间
2022-12-29
最后登录
2022-12-29

发表于 2025-4-10 16:33:47 | 显示全部楼层
感谢楼主,顶。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1楼
2楼
3楼
4楼

手机版|晓枫资讯--科技资讯社区 本站已运行

CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

晓枫资讯--科技资讯社区

本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

Powered by Discuz! X3.5

快速回复 返回顶部 返回列表