RabbitMQ学习之(五)_一个基于PHP的RabbitMQ操作类详解程序员

//amqp.php类文件 
<?php 
class Amqp 
{ 
public $e_name; 
public $q_name; 
public $k_route; 
public $channel; 
public function __construct($config,$e_name,$q_name,$k_route) 
{ 
$this->e_name = $e_name; 
$this->q_name = $q_name; 
$this->k_route = $k_route; 
//创建连接和channel  
$this->conn = new AMQPConnection($config);     
if (!$this->conn->connect()) {     
return array('error_code' => 1,'msg'=>'Cannot connect to the broker!' ); 
} 
$this->channel = new AMQPChannel($this->conn); 
$this->CreateExchange(); 
$this->CreateQueue(); 
} 
//创建交换机 
public function CreateExchange() 
{ 
$ex = new AMQPExchange($this->channel);     
$ex->setName($this->e_name);   
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型    
$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化   
//echo "Exchange Status:".$ex->declare()."/n";   //队列内容总数 
$ex->declare(); 
$this->ex = $ex; 
} 
//创建队列    
public function CreateQueue() 
{ 
$q = new AMQPQueue($this->channel);   
$q->setName($this->q_name);     
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE); //持久化    
//echo "Message Total:".$this->q->declare()."/n";   
//绑定交换机与队列,并指定路由键   
//echo "queue status: ".$q->declare(); 
//echo "/n"; 
//echo 'Queue Bind: '.$q->bind($this->e_name, $this->k_route)."/n";  
//echo "/n"; 
          
$q->bind($this->e_name, $this->k_route); 
} 
//发送消息 
public function send($msg) 
{  
//$this->CreateExchange(); 
//$this->CreateQueue(); 
$message=json_encode($msg); 
$this->channel->startTransaction(); 
//echo "send: ".$this->ex->publish($message, $this->k_route); //将你的消息通过制定routingKey发送 
$status = $this->ex->publish($message, $this->k_route); 
$this->channel->commitTransaction(); 
$this->conn->disconnect(); 
return array('status'=>$status); 
} 
//获取消息 
public function get() 
{ 
$q = new AMQPQueue($this->channel);   
$q->setName($this->q_name);   
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);   
//$q->delete();删除队列 
$return=array(); 
while($a=$q->declare()) 
{ 
//echo "queue status: ".$a; 
//echo "==========/n"; 
 
$messages = $q->get(AMQP_AUTOACK); 
$return[]=json_decode($messages->getBody(),true);   
//echo "/n";   
        } 
$this->conn->disconnect(); 
return $return; 
} 
}
//config.php配置文件 
return array( 
'amqp'=>array(     
array(     
'host' => '127.0.0.1', 
'port' => '5672', 
'vhost' => '/', 
'user' => 'admin', 
'password' => 'admin' 
) 
), 
);
//send.php(加入队列文件|生产者) 
require_once('amqp.php'); 
$e_name = 'e_guest'; //交换机名   
$k_route = 'k_route_feedpush'; //路由key 
$q_name = 'q_guest_feedpush'; //队列名 
$config = config('amqp'); 
$amqp = new Amqp(config('amqp'),$e_name,$q_name,$k_route); 
$msg = array('test','123'); 
$re = $amqp->send($msg);
//get.php(接收并处理文件|消费者) 
 
require_once('amqp.php'); 
$config = require('config.php'); 
$config_qmqp = $config['amqp']; 
$e_name = 'e_guest'; //交换机名   
$k_route = 'k_route_sendemail'; //路由key  
$q_name = 'q_guest_sendemail'; //队列名 
$amqp = new Amqp($config_qmqp,$e_name,$q_name,$k_route); 
$re = $amqp->get();

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/2354.html

(0)
上一篇 2021年7月16日
下一篇 2021年7月16日

相关推荐

发表回复

登录后才能评论