
离线 TA的专栏
- 打卡等级:热心大叔
- 打卡总天数:203
- 打卡月天数:0
- 打卡总奖励:3078
- 最近打卡:2023-08-27 01:50:51
|
目录- 依赖
- Kafka配置读取类
- producer
- consumer
- 使用
- 总结
现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web项目
依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.1.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>2.4.1</version>
- </dependency>
复制代码
Kafka配置读取类
本文后边没用到,直接填配置了,简单点
但如果生产需要,还是有这个类比较好,可以从不同配置中读取,同时给个默认值
producer
- import org.apache.kafka.clients.producer.Callback;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
-
- /**
- * kafka producer
- * @author zzy
- */
- public class KafkaProFormal {
-
- public static final Logger LOG = LoggerFactory.getLogger(KafkaProFormal.class);
-
- private Properties properties = new Properties();
-
- private final String bootstrapServers = "bootstrap.servers";
- private final String clientId = "client.id";
- private final String keySerializer = "key.serializer";
- private final String valueSerializer = "value.serializer";
- //private final String securityProtocol = "security.protocol";
- //private final String saslKerberosServiceName = "sasl.kerberos.service.name";
- //private final String kerberosDomainName = "kerberos.domain.name";
- private final String maxRequestSize = "max.request.size";
-
- private KafkaProducer<String, String> producer;
-
- private volatile static KafkaProFormal kafkaProFormal;
-
- private KafkaProFormal(String servers) {
- properties.put(bootstrapServers, servers);
- properties.put(keySerializer, "org.apache.kafka.common.serialization.StringSerializer");
- properties.put(valueSerializer, "org.apache.kafka.common.serialization.StringSerializer");
-
- producer = new KafkaProducer<String, String>(properties);
- }
-
- public static KafkaProFormal getInstance(String servers) {
- if(kafkaProFormal == null) {
- synchronized(KafkaProFormal.class) {
- if(kafkaProFormal == null) {
- kafkaProFormal = new KafkaProFormal(servers);
- }
- }
- }
-
- return kafkaProFormal;
- }
-
- public void sendStringWithCallBack(String topic, String message, boolean asyncFlag) {
- ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
- long startTime = System.currentTimeMillis();
- if(asyncFlag) {
- //异步发送
- producer.send(record, new KafkaCallBack(startTime, message));
- } else {
- //同步发送
- try {
- producer.send(record, new KafkaCallBack(startTime, message)).get();
- } catch (InterruptedException e) {
- LOG.error("InterruptedException occured : {0}", e);
- } catch (ExecutionException e) {
- LOG.error("ExecutionException occured : {0}", e);
- }
- }
- }
- }
-
- class KafkaCallBack implements Callback {
- private static Logger LOG = LoggerFactory.getLogger(KafkaCallBack.class);
-
- private String key;
-
- private long startTime;
-
- private String message;
-
- KafkaCallBack(long startTime, String message) {
- this.startTime = startTime;
- this.message = message;
- }
-
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long elapsedTime = System.currentTimeMillis() - startTime;
-
- if(metadata != null) {
- LOG.info("Record(" + key + "," + message + ") sent to partition(" + metadata.partition()
- + "), offset(" + metadata.offset() + ") in " + elapsedTime + " ms.");
- } else {
- LOG.error("metadata is null." + "Record(" + key + "," + message + ")", exception);
- }
- }
- }
复制代码
consumer
- import kafka.utils.ShutdownableThread;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.time.Duration;
- import java.util.Properties;
- import java.util.Set;
-
- /**
- * kafka consumer
- * @author zzy
- */
- public abstract class KafkaConFormal extends ShutdownableThread {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConFormal.class);
-
- private Set<String> topics;
-
- private final String bootstrapServers = "bootstrap.servers";
- private final String groupId = "group.id";
- private final String keyDeserializer = "key.deserializer";
- private final String valueDeserializer = "value.deserializer";
- private final String enableAutoCommit = "enable.auto.commit";
- private final String autoCommitIntervalMs = "auto.commit.interval.ms";
- private final String sessionTimeoutMs = "session.timeout.ms";
-
- private KafkaConsumer<String, String> consumer;
-
- public KafkaConFormal(String topic) {
- super("KafkaConsumerExample", false);
-
- topics.add(topic);
-
- Properties props = new Properties();
- props.put(bootstrapServers, "your servers");
- props.put(groupId, "TestGroup");
- props.put(enableAutoCommit, "true");
- props.put(autoCommitIntervalMs, "1000");
- props.put(sessionTimeoutMs, "30000");
- props.put(keyDeserializer, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(valueDeserializer, "org.apache.kafka.common.serialization.StringDeserializer");
-
- consumer = new KafkaConsumer<>(props);
- }
-
- /**
- * subscribe and handle the msg
- */
- @Override
- public void doWork() {
- consumer.subscribe(topics);
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-
- dealRecords(records);
- }
-
- /**
- * 实例化consumer时,进行对消费信息的处理
- * @param records records
- */
- public abstract void dealRecords(ConsumerRecords<String, String> records);
-
- public void setTopics(Set<String> topics) {
- this.topics = topics;
- }
- }
复制代码
使用
- KafkaProFormal producer = KafkaProFormal.getInstance("kafka server1.1.1.1:9092,2.2.2.2:9092");
-
- KafkaConFormal consumer = new KafkaConFormal("consume_topic") {
- @Override
- public void dealRecords(ConsumerRecords<String, String> records) {
- for (ConsumerRecord<String, String> record: records) {
- producer.sendStringWithCallBack("target_topic", record.value(), true);
- }
- }
- };
-
- consumer.start();
复制代码
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持晓枫资讯。 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:  进行删除处理。
4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
|