RabbitMQ学习之(五)_一个基于PHP的RabbitMQ操作类详解程序员

//amqp.php类文件 
<?php 
 
class Amqp 
{ 
    public $e_name; 
    public $q_name; 
    public $k_route; 
    public $channel; 
 
    public function __construct($config,$e_name,$q_name,$k_route) 
    { 
        $this->e_name = $e_name; 
        $this->q_name = $q_name; 
        $this->k_route = $k_route; 
 
        //创建连接和channel  
        $this->conn = new AMQPConnection($config);     
        if (!$this->conn->connect()) {     
            return array('error_code' => 1,'msg'=>'Cannot connect to the broker!' ); 
        } 
        $this->channel = new AMQPChannel($this->conn); 
        $this->CreateExchange(); 
        $this->CreateQueue(); 
    } 
 
    //创建交换机 
    public function CreateExchange() 
    { 
        $ex = new AMQPExchange($this->channel);     
        $ex->setName($this->e_name);   
        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型    
        $ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化   
        //echo "Exchange Status:".$ex->declare()."/n";   //队列内容总数 
        $ex->declare(); 
        $this->ex = $ex; 
    } 
     
    //创建队列    
    public function CreateQueue() 
    { 
        $q = new AMQPQueue($this->channel);   
        $q->setName($this->q_name);     
        $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化    
        //echo "Message Total:".$this->q->declare()."/n";   
        //绑定交换机与队列,并指定路由键   
        //echo "queue status: ".$q->declare(); 
        //echo "/n"; 
        //echo 'Queue Bind: '.$q->bind($this->e_name, $this->k_route)."/n";  
        //echo "/n"; 
          
         $q->bind($this->e_name, $this->k_route); 
    } 
     
    //发送消息 
    public function send($msg) 
    {  
        //$this->CreateExchange(); 
        //$this->CreateQueue(); 
        $message=json_encode($msg); 
        $this->channel->startTransaction(); 
        //echo "send: ".$this->ex->publish($message, $this->k_route); //将你的消息通过制定routingKey发送 
        $status = $this->ex->publish($message, $this->k_route); 
        $this->channel->commitTransaction(); 
        $this->conn->disconnect(); 
        return array('status'=>$status); 
    } 
     
    //获取消息 
    public function get() 
    { 
        $q = new AMQPQueue($this->channel);   
        $q->setName($this->q_name);   
        $q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);   
         
        //$q->delete();删除队列 
        $return=array(); 
        while($a=$q->declare()) 
        { 
            //echo "queue status: ".$a; 
            //echo "==========/n"; 
 
            $messages = $q->get(AMQP_AUTOACK); 
            $return[]=json_decode($messages->getBody(),true);   
            //echo "/n";   
        } 
        $this->conn->disconnect(); 
        return $return; 
    } 
 
}
//config.php配置文件 
return array( 
    'amqp'=>array(     
                array(     
                        'host' => '127.0.0.1', 
                        'port' => '5672', 
                        'vhost' => '/', 
                        'user' => 'admin', 
                        'password' => 'admin' 
                 ) 
            ), 
);
//send.php(加入队列文件|生产者) 
require_once('amqp.php'); 
        $e_name = 'e_guest'; //交换机名   
        $k_route = 'k_route_feedpush'; //路由key 
        $q_name = 'q_guest_feedpush'; //队列名 
        $config = config('amqp'); 
         
        $amqp = new Amqp(config('amqp'),$e_name,$q_name,$k_route); 
 
  $msg = array('test','123'); 
 
 $re = $amqp->send($msg);
//get.php(接收并处理文件|消费者) 
 
require_once('amqp.php'); 
$config = require('config.php'); 
$config_qmqp = $config['amqp']; 
 
$e_name = 'e_guest'; //交换机名   
$k_route = 'k_route_sendemail'; //路由key  
$q_name = 'q_guest_sendemail'; //队列名 
$amqp = new Amqp($config_qmqp,$e_name,$q_name,$k_route); 
$re = $amqp->get();

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/aiops/2354.html

(0)
上一篇 2021年7月16日 00:42
下一篇 2021年7月16日 00:42

相关推荐

发表回复

登录后才能评论