设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间01 现在时间01 缓存数据 当你走完一段之后回头看,你会发现,那些真正能被记得的事真的是没有多少,真正无法忘记的人屈指可数,真正有趣的日子不过是那么一些,而真正需要害怕的也是寥寥无几。

当你走完一段之后回头看,你会发现,那些真正能被记得的事真的是没有多少,真正无法忘记的人屈指可数,真正有趣的日子不过是那么一些,而真正需要害怕的也是寥寥无几。

查看: 1251|回复: 4

普通java项目集成kafka方式

[复制链接]

  离线 

TA的专栏

  • 打卡等级:热心大叔
  • 打卡总天数:203
  • 打卡月天数:0
  • 打卡总奖励:3078
  • 最近打卡:2023-08-27 01:50:51
等级头衔

等級:晓枫资讯-上等兵

在线时间
0 小时

积分成就
威望
0
贡献
430
主题
387
精华
0
金钱
4328
积分
837
注册时间
2022-12-26
最后登录
2025-8-25

发表于 2024-11-29 21:33:25 来自手机 | 显示全部楼层 |阅读模式
目录
  • 依赖
  • Kafka配置读取类
  • producer
  • consumer
  • 使用
  • 总结

现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web项目

依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>3.1.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka_2.11</artifactId>
  9. <version>2.4.1</version>
  10. </dependency>
复制代码

Kafka配置读取类

本文后边没用到,直接填配置了,简单点

但如果生产需要,还是有这个类比较好,可以从不同配置中读取,同时给个默认值

  1. import org.apache.commons.lang3.StringUtils;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.io.File;
  5. import java.io.FileInputStream;
  6. import java.io.IOException;
  7. import java.util.Properties;
  8. /**
  9. * kafka配置读取类
  10. *
  11. * @author zzy
  12. */
  13. public class KafkaProperties {
  14. private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class);
  15. private static Properties serverProps = new Properties();
  16. private static Properties clientProps = new Properties();
  17. private static Properties producerProps = new Properties();
  18. private static Properties consumerProps = new Properties();
  19. private static KafkaProperties instance = null;
  20. private KafkaProperties() {
  21. String filePath = System.getProperty("user.dir") + File.separator
  22. + "kafkaConf" + File.separator;
  23. File file;
  24. FileInputStream fis = null;
  25. try {
  26. file = new File(filePath + "producer.properties");
  27. if (file.exists()) {
  28. fis = new FileInputStream(filePath + "producer.properties");
  29. producerProps.load(fis);
  30. }
  31. file = new File(filePath + "consumer.properties");
  32. if (file.exists()) {
  33. fis = new FileInputStream(filePath + "consumer.properties");
  34. consumerProps.load(fis);
  35. }
  36. file = new File(filePath + "server.properties");
  37. if (file.exists()) {
  38. fis = new FileInputStream(filePath + "server.properties");
  39. serverProps.load(fis);
  40. }
  41. file = new File(filePath + "client.properties");
  42. if (file.exists()) {
  43. fis = new FileInputStream(filePath + "client.properties");
  44. clientProps.load(fis);
  45. }
  46. } catch (Exception e) {
  47. LOG.error("init kafka props error." + e.getMessage());
  48. } finally {
  49. if (fis != null) {
  50. try {
  51. fis.close();
  52. } catch (IOException e) {
  53. LOG.error("close kafka properties fis error." + e);
  54. }
  55. }
  56. }
  57. }
  58. /**
  59. * 获取懒汉式单例
  60. */
  61. public static synchronized KafkaProperties getInstance() {
  62. if (instance == null) {
  63. instance = new KafkaProperties();
  64. }
  65. return instance;
  66. }
  67. /**
  68. * 获取配置,获取不到时使用参数的默认配置
  69. */
  70. public String getValue(String key, String defaultValue) {
  71. String value;
  72. if (StringUtils.isEmpty(key)) {
  73. LOG.error("key is null or empty");
  74. }
  75. value = getPropsValue(key);
  76. if (value == null) {
  77. LOG.warn("kafka property getValue return null, the key is " + key);
  78. value = defaultValue;
  79. }
  80. LOG.info("kafka property getValue, key:" + key + ", value:" + value);
  81. return value;
  82. }
  83. private String getPropsValue(String key) {
  84. String value = serverProps.getProperty(key);
  85. if (value == null) {
  86. value = producerProps.getProperty(key);
  87. }
  88. if (value == null) {
  89. value = consumerProps.getProperty(key);
  90. }
  91. if (value == null) {
  92. value = clientProps.getProperty(key);
  93. }
  94. return value;
  95. }
  96. }
复制代码

producer

  1. import org.apache.kafka.clients.producer.Callback;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.clients.producer.RecordMetadata;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import java.util.Properties;
  8. import java.util.concurrent.ExecutionException;
  9. /**
  10. * kafka producer
  11. * @author zzy
  12. */
  13. public class KafkaProFormal {
  14. public static final Logger LOG = LoggerFactory.getLogger(KafkaProFormal.class);
  15. private Properties properties = new Properties();
  16. private final String bootstrapServers = "bootstrap.servers";
  17. private final String clientId = "client.id";
  18. private final String keySerializer = "key.serializer";
  19. private final String valueSerializer = "value.serializer";
  20. //private final String securityProtocol = "security.protocol";
  21. //private final String saslKerberosServiceName = "sasl.kerberos.service.name";
  22. //private final String kerberosDomainName = "kerberos.domain.name";
  23. private final String maxRequestSize = "max.request.size";
  24. private KafkaProducer<String, String> producer;
  25. private volatile static KafkaProFormal kafkaProFormal;
  26. private KafkaProFormal(String servers) {
  27. properties.put(bootstrapServers, servers);
  28. properties.put(keySerializer, "org.apache.kafka.common.serialization.StringSerializer");
  29. properties.put(valueSerializer, "org.apache.kafka.common.serialization.StringSerializer");
  30. producer = new KafkaProducer<String, String>(properties);
  31. }
  32. public static KafkaProFormal getInstance(String servers) {
  33. if(kafkaProFormal == null) {
  34. synchronized(KafkaProFormal.class) {
  35. if(kafkaProFormal == null) {
  36. kafkaProFormal = new KafkaProFormal(servers);
  37. }
  38. }
  39. }
  40. return kafkaProFormal;
  41. }
  42. public void sendStringWithCallBack(String topic, String message, boolean asyncFlag) {
  43. ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
  44. long startTime = System.currentTimeMillis();
  45. if(asyncFlag) {
  46. //异步发送
  47. producer.send(record, new KafkaCallBack(startTime, message));
  48. } else {
  49. //同步发送
  50. try {
  51. producer.send(record, new KafkaCallBack(startTime, message)).get();
  52. } catch (InterruptedException e) {
  53. LOG.error("InterruptedException occured : {0}", e);
  54. } catch (ExecutionException e) {
  55. LOG.error("ExecutionException occured : {0}", e);
  56. }
  57. }
  58. }
  59. }
  60. class KafkaCallBack implements Callback {
  61. private static Logger LOG = LoggerFactory.getLogger(KafkaCallBack.class);
  62. private String key;
  63. private long startTime;
  64. private String message;
  65. KafkaCallBack(long startTime, String message) {
  66. this.startTime = startTime;
  67. this.message = message;
  68. }
  69. @Override
  70. public void onCompletion(RecordMetadata metadata, Exception exception) {
  71. long elapsedTime = System.currentTimeMillis() - startTime;
  72. if(metadata != null) {
  73. LOG.info("Record(" + key + "," + message + ") sent to partition(" + metadata.partition()
  74. + "), offset(" + metadata.offset() + ") in " + elapsedTime + " ms.");
  75. } else {
  76. LOG.error("metadata is null." + "Record(" + key + "," + message + ")", exception);
  77. }
  78. }
  79. }
复制代码

consumer

  1. import kafka.utils.ShutdownableThread;
  2. import org.apache.kafka.clients.consumer.ConsumerRecords;
  3. import org.apache.kafka.clients.consumer.KafkaConsumer;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import java.time.Duration;
  7. import java.util.Properties;
  8. import java.util.Set;
  9. /**
  10. * kafka consumer
  11. * @author zzy
  12. */
  13. public abstract class KafkaConFormal extends ShutdownableThread {
  14. private static final Logger LOG = LoggerFactory.getLogger(KafkaConFormal.class);
  15. private Set<String> topics;
  16. private final String bootstrapServers = "bootstrap.servers";
  17. private final String groupId = "group.id";
  18. private final String keyDeserializer = "key.deserializer";
  19. private final String valueDeserializer = "value.deserializer";
  20. private final String enableAutoCommit = "enable.auto.commit";
  21. private final String autoCommitIntervalMs = "auto.commit.interval.ms";
  22. private final String sessionTimeoutMs = "session.timeout.ms";
  23. private KafkaConsumer<String, String> consumer;
  24. public KafkaConFormal(String topic) {
  25. super("KafkaConsumerExample", false);
  26. topics.add(topic);
  27. Properties props = new Properties();
  28. props.put(bootstrapServers, "your servers");
  29. props.put(groupId, "TestGroup");
  30. props.put(enableAutoCommit, "true");
  31. props.put(autoCommitIntervalMs, "1000");
  32. props.put(sessionTimeoutMs, "30000");
  33. props.put(keyDeserializer, "org.apache.kafka.common.serialization.StringDeserializer");
  34. props.put(valueDeserializer, "org.apache.kafka.common.serialization.StringDeserializer");
  35. consumer = new KafkaConsumer<>(props);
  36. }
  37. /**
  38. * subscribe and handle the msg
  39. */
  40. @Override
  41. public void doWork() {
  42. consumer.subscribe(topics);
  43. ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  44. dealRecords(records);
  45. }
  46. /**
  47. * 实例化consumer时,进行对消费信息的处理
  48. * @param records records
  49. */
  50. public abstract void dealRecords(ConsumerRecords<String, String> records);
  51. public void setTopics(Set<String> topics) {
  52. this.topics = topics;
  53. }
  54. }
复制代码

使用

  1. KafkaProFormal producer = KafkaProFormal.getInstance("kafka server1.1.1.1:9092,2.2.2.2:9092");
  2. KafkaConFormal consumer = new KafkaConFormal("consume_topic") {
  3. @Override
  4. public void dealRecords(ConsumerRecords<String, String> records) {
  5. for (ConsumerRecord<String, String> record: records) {
  6. producer.sendStringWithCallBack("target_topic", record.value(), true);
  7. }
  8. }
  9. };
  10. consumer.start();
复制代码

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持晓枫资讯。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
      1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
      2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
      3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
      4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
      5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
11
积分
2
注册时间
2024-6-7
最后登录
2024-6-7

发表于 2024-11-30 19:45:48 | 显示全部楼层
路过,支持一下
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

  • 打卡等级:无名新人
  • 打卡总天数:2
  • 打卡月天数:0
  • 打卡总奖励:31
  • 最近打卡:2025-03-29 06:30:00
等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
44
积分
6
注册时间
2024-11-16
最后登录
2025-3-29

发表于 2025-1-19 08:46:56 | 显示全部楼层
感谢楼主,顶。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

  • 打卡等级:常驻代表
  • 打卡总天数:37
  • 打卡月天数:0
  • 打卡总奖励:469
  • 最近打卡:2025-09-14 14:21:11
等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
527
积分
80
注册时间
2023-1-1
最后登录
2025-9-14

发表于 2025-9-11 22:43:18 | 显示全部楼层
感谢楼主分享。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

  • 打卡等级:常驻代表
  • 打卡总天数:37
  • 打卡月天数:1
  • 打卡总奖励:389
  • 最近打卡:2025-12-05 22:18:06
等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
450
积分
78
注册时间
2022-12-30
最后登录
2025-12-5

发表于 1 小时前 | 显示全部楼层
顶顶更健康!!!
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1楼
2楼
3楼
4楼
5楼

手机版|晓枫资讯--科技资讯社区 本站已运行

CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

晓枫资讯--科技资讯社区

本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

Powered by Discuz! X3.5

快速回复 返回顶部 返回列表