设为首页收藏本站
网站公告 | 这是第一条公告
     

 找回密码
 立即注册
缓存时间01 现在时间01 缓存数据 你是我生命中所能经历的,最最深切的感觉。

你是我生命中所能经历的,最最深切的感觉。

查看: 830|回复: 2

Laravel中Kafka的使用详解

[复制链接]

  离线 

TA的专栏

  • 打卡等级:热心大叔
  • 打卡总天数:205
  • 打卡月天数:0
  • 打卡总奖励:3013
  • 最近打卡:2023-08-27 04:24:37
等级头衔

等級:晓枫资讯-上等兵

在线时间
0 小时

积分成就
威望
0
贡献
388
主题
361
精华
0
金钱
4155
积分
765
注册时间
2022-12-22
最后登录
2025-3-15

发表于 2023-2-12 20:12:27 | 显示全部楼层 |阅读模式
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:
  1. app/Tools/Kafka.php
复制代码
  1. <?php
  2. namespace App\Tools;

  3. use Illuminate\Config\Repository;

  4. use Illuminate\Support\Facades\DB;
  5. use Monolog\Logger;
  6. use Monolog\Handler\StreamHandler;

  7. use Illuminate\Http\Request;

  8. class Kafka
  9. {
  10.   public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
  11.   public $topic = 'test';//管道名称
  12.   public $partition = 0;

  13.   protected $producer = null;
  14.   protected $consumer = null;

  15.   public function __construct()
  16.   {
  17.     if (empty($this->broker_list)) {
  18.       throw new InvalidConfigException("broker not config");
  19.     }
  20.     $rk = new \RdKafka\Producer();
  21.     if (empty($rk)) {
  22.       throw new InvalidConfigException("producer error");
  23.     }
  24.     $rk->setLogLevel(LOG_DEBUG);
  25.     if (!$rk->addBrokers($this->broker_list)) {
  26.       throw new InvalidConfigException("producer error");
  27.     }
  28.     $this->producer = $rk;
  29.   }

  30.   /**
  31.    * 生产者
  32.    * @param array $messages
  33.    * @return mixed
  34.    */
  35.   public function send($messages = [],$topic)
  36.   {
  37.     $topic = $this->producer->newTopic($topic);
  38.     return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  39.   }

  40.   /**
  41.    * 消费者
  42.    */
  43.   public function consumer($object, $callback){
  44.     $conf = new \RdKafka\Conf();
  45.     $conf->set('group.id', 0);
  46.     $conf->set('metadata.broker.list', $this->broker_list);

  47.     $topicConf = new \RdKafka\TopicConf();
  48.     $topicConf->set('auto.offset.reset', 'smallest');

  49.     $conf->setDefaultTopicConf($topicConf);

  50.     $consumer = new \RdKafka\KafkaConsumer($conf);

  51.     $consumer->subscribe([$this->topic]);

  52.     echo "waiting for messages.....\n";
  53.     while(true) {
  54.       $message = $consumer->consume(120*1000);
  55.       switch ($message->err) {
  56.         case RD_KAFKA_RESP_ERR_NO_ERROR:
  57.           echo "message payload....";
  58.           $object->$callback($message->payload);
  59.           break;
  60.       }
  61.       sleep(1);
  62.     }
  63.   }
  64. }
  65. ?>
复制代码
在控制器中如何使用:
首先再头部导入这个类:
  1. use App\Tools\Kafka;
复制代码
下面是使用生产者实例:
  1. public function test(){

  2.    $topic = 'tool';//输入使用管道名称
  3.    $data['shop_id'] = 58;
  4.    $data['bar_code']=586;
  5.    $data['goods_num'] = 1;
  6.    $data['goods_unit'] = '个';

  7. $Kafka = new Kafka();
  8. $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
  9. var_dump($Error_Msg);


  10.   }
复制代码
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
  1. <?php

  2. $conf = new RdKafka\Conf();

  3. $conf->set('group.id', 'myConsumerGroup');

  4. $rk = new RdKafka\Consumer($conf);
  5. $rk->addBrokers("localhost:9092");

  6. $topicConf = new RdKafka\TopicConf();
  7. $topicConf->set('auto.commit.interval.ms', 100);
  8. $topicConf->set('offset.store.method', 'file');
  9. $topicConf->set('offset.store.path', sys_get_temp_dir());
  10. $topicConf->set('auto.offset.reset', 'smallest');

  11. $topic = $rk->newTopic("tool", $topicConf);//读取的管道

  12. // Start consuming partition 0
  13. $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

  14. while (true) {
  15.   $message = $topic->consume(0, 120*10000);
  16.   switch ($message->err) {
  17.     case RD_KAFKA_RESP_ERR_NO_ERROR:
  18.     //没有错误打印信息
  19.       $message = json_decode(json_encode($message),true);
  20.       $data = json_decode($message['payload'],true);
  21.       var_dump($data);
  22.       break;
  23.     case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  24.       echo "等待接收信息\n";
  25.       break;
  26.     case RD_KAFKA_RESP_ERR__TIMED_OUT:
  27.       echo "超时\n";
  28.       break;
  29.     default:
  30.       throw new \Exception($message->errstr(), $message->err);
  31.       break;
  32.   }
  33. sleep(1);
  34. }

  35. ?>
复制代码
到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索晓枫资讯以前的文章或继续浏览下面的相关文章希望大家以后多多支持晓枫资讯!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
晓枫资讯-科技资讯社区-免责声明
免责声明:以上内容为本网站转自其它媒体,相关信息仅为传递更多信息之目的,不代表本网观点,亦不代表本网站赞同其观点或证实其内容的真实性。
      1、注册用户在本社区发表、转载的任何作品仅代表其个人观点,不代表本社区认同其观点。
      2、管理员及版主有权在不事先通知或不经作者准许的情况下删除其在本社区所发表的文章。
      3、本社区的文章部分内容可能来源于网络,仅供大家学习与参考,如有侵权,举报反馈:点击这里给我发消息进行删除处理。
      4、本社区一切资源不代表本站立场,并不代表本站赞同其观点和对其真实性负责。
      5、以上声明内容的最终解释权归《晓枫资讯-科技资讯社区》所有。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
17
积分
14
注册时间
2022-12-24
最后登录
2022-12-24

发表于 2025-2-14 12:53:58 | 显示全部楼层
感谢楼主分享。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~

  离线 

TA的专栏

等级头衔

等級:晓枫资讯-列兵

在线时间
0 小时

积分成就
威望
0
贡献
0
主题
0
精华
0
金钱
13
积分
6
注册时间
2023-9-28
最后登录
2023-9-28

发表于 2025-4-9 04:12:44 | 显示全部楼层
感谢楼主,顶。
http://bbs.yzwlo.com 晓枫资讯--游戏IT新闻资讯~~~
严禁发布广告,淫秽、色情、赌博、暴力、凶杀、恐怖、间谍及其他违反国家法律法规的内容。!晓枫资讯-社区
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

1楼
2楼
3楼

手机版|晓枫资讯--科技资讯社区 本站已运行

CopyRight © 2022-2025 晓枫资讯--科技资讯社区 ( BBS.yzwlo.com ) . All Rights Reserved .

晓枫资讯--科技资讯社区

本站内容由用户自主分享和转载自互联网,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。

如有侵权、违反国家法律政策行为,请联系我们,我们会第一时间及时清除和处理! 举报反馈邮箱:点击这里给我发消息

Powered by Discuz! X3.5

快速回复 返回顶部 返回列表