Kafka学习之(四)PHP操作Kafka详解程序员

简单测试

环境:Centos6.4,PHP7,kafka服务器IP:192.168.9.154,PHP服务器:192.168.9.157

在192.168.9.157创建目录和文件。

//生产者 
<?php 
require './modules/kafka.php'; 
$rk = new kafka(); 
$rk->send(['hello my kafka']); 
echo 'OK~';
//消费者 
<?php 
require './modules/kafka.php'; 
$rk = new kafka(); 
$rk->consumer();
//Kafka类 
<?php 
class kafka 
{ 
    public $broker_list = '192.168.9.154:9092';  //现在是一个,也可以多个,用逗号隔开 
    public $topic = 'mytopic';                      //定义topic 
    public $partition = 0;                         //定义topic的物理分组,这里是0 
    public $logFile = './kafkalog/info.log';     //计划在消费的时候写日志,指定日志文件 
 
    protected $producer = null;                  //存储producer对象 
    protected $consumer = null;                     //存储consumer对象 
 
    public function __construct() 
    { 
 
        if (empty($this->broker_list))  
        { 
            echo 'broker not config'; 
        } 
        $rk = new /Rdkafka/Producer();  //实例化对象 
 
        if (empty($rk)) { 
            echo 'producer error1'; 
        } 
        $rk->setLogLevel(LOG_DEBUG);  //设置错误级别 
        if(!$rk->addBrokers($this->broker_list)) {//设置Broker地址 
            echo 'producer error2'; 
        }      
        $this->producer = $rk; 
    } 
 
    //生产者的方法(生产者把日志向消息队列发送) 
    public function send($message = [])  
    { 
         
        $topic = $this->producer->newTopic($this->topic);  //创建topic 
 
        $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode([$message]); //生产 
    } 
    //消费者方法 (监听消息队列) 
    public function consumer() 
    { 
        $conf = new /Rdkafka/Conf(); 
        $conf->set('group.id', 0); 
        $conf->set('metadata.broker.list', $this->broker_list); 
        $topicConf = new /Rdkafka/topicConf(); 
        $topicConf->set('auto.offset.reset', 'smallest'); 
 
        $conf->setDefaultTopicConf($topicConf); 
 
        $consumer = new /Rdkafka/kafkaConsumer($conf); 
 
        $consumer->subscribe([$this->topic]); //订阅 
 
        echo "wating for message..../n"; 
 
        while(true) { 
            $message = $consumer->consume(0*1000); 
            switch ($message->err) { 
                case RD_KAFKA_RESP_ERR_NO_ERROR: 
                    echo '要处理消息了~~~'; 
                    $messageInfo = $message->payload; 
                    // echo $messageInfo."/n"; 
                    break; 
            } 
            sleep(1); 
        } 
    } 
}

记住消费者PHP文件要在终端运行:php consumer.php。

这里就不测试了。

工作代码

/** 
     * 将用户的登陆信息放到 Kafka 
     * 
     */ 
    public function sendCustomerLoginInfoToKafka($param){ 
        $customerLoginInfoServiceClient = new CustomerLoginInfoServiceClient(); 
        $msg =  json_encode($param); 
        $topic=isset(Yii::app()->params['customer_login_info_topic'])?Yii::app()->params['customer_login_info_topic']:'e_user_login_info'; 
        $result = $customerLoginInfoServiceClient->add($topic, $msg); 
    }
/** 
 * 客户登陆信息 服务化接口调用client端 
 */ 
class CustomerLoginInfoServiceClient { 
 
    public function add($topic, $msg) { 
        //直接进kafka不再调用java服务 
        EdjKafka::getInstance()->putin(array("topic" => $topic, "payload" => $msg)); 
    } 
 
}
class EdjKafka { 
 
    private static $instance; 
 
    public static function getInstance($className=__CLASS__) { 
        if (empty(self::$instance)) { 
            self::$instance = new $className(); 
        } 
        return self::$instance; 
    } 
 
    public function putin($params) { 
        $task = array( 
            'class' => __CLASS__, 
            'method' => 'putin_job', 
            'params' => $params, 
        ); 
        Queue::model()->putin($task, 'phptokafka_0000'); 
    } 
 
    public function putin_job($params) { 
        KafkaProducer::getInstance()->putin($params["topic"], $params["payload"]); 
    } 
}
<?php 
require_once(Yii::app()->basePath.'/vendors/kafka/autoload.php');  //kafka包在最下面 
 
class KafkaProducer { 
 
    private static $instance; 
    private $producer; 
    private $partitionCountMap = array(); 
 
    public static function getInstance($className=__CLASS__) { 
        if (empty(self::$instance)) { 
            self::$instance = new $className(); 
        } 
        return self::$instance; 
    } 
 
    public function __construct() { 
        $brokers = Yii::app()->params['kafka']['brokers']; 
 
        $newProducer = /Kafka/Produce::getInstance($brokers, 3000, $brokers); 
        $newProducer->setRequireAck(-1); 
        $this->producer = $newProducer; 
    } 
 
    private function getPartitionCount($topic, $force=false) { 
        $now = time(); 
 
        //3分钟查询一次patition 
        if( !$force && array_key_exists($topic, $this->partitionCountMap) && $this->partitionCountMap[$topic]["expire"] > $now ) { 
            return $this->partitionCountMap[$topic]["count"]; 
        } 
 
 
        //获取到topic下可用的partitions 
        $this->producer->getClient()->refreshMetadata(); 
        $partitions = $this->producer->getAvailablePartitions($topic); 
        EdjLog::info(__METHOD__.'|'.$topic.'|get partition|'.json_encode($partitions)); 
        $partitionCount = count($partitions); 
        if ($partitionCount == 0) { 
            EdjLog::error(__METHOD__."|".$topic."|topic partitions count 0"); 
        } 
 
        $this->partitionCountMap[$topic] = array("count" => $partitionCount, "expire" => $now + 180); 
 
        return $partitionCount; 
    } 
 
    public function putin($topic, $payload) { 
        if(empty($topic)) { 
            return; 
        } 
 
        $partitionCount = $this->getPartitionCount($topic); 
 
        if ($partitionCount != 0) { 
            try { 
                $pid = time() % $partitionCount; 
                $this->producer->setMessages($topic, $pid, array($payload)); 
                $result = $this->producer->send(); 
                EdjLog::debugLog(__METHOD__.'|'.$topic.'|'.$pid); 
            } catch (/Kafka/Exception $e) { 
                EdjLog::error(__METHOD__.'|'.$e->getMessage()); 
                $this->getPartitionCount($topic, true); 
            } 
        } 
    } 
}
<?php 
return array( 
    'brokers' => "123.123.123.123:9092,123.123.123.123:9093,123.123.123.123:9094",  //ip一样,端口不一样 
    //topic名的映射,推荐用class名字做key 
    //测试环境和线上用不同的配置文件 
    'topicmap' => array( 
        "RDriverPositionToKafka" => "driver_location_test", 
        "ROrderToKafka" => "order_test", 
        "SubmitOrderAutoService_saveOrderInfoJob" => "finished_order_picture", 
        'vip_customer_change' => 'vip_customer_change', 
     ), 
);
链接:https://pan.baidu.com/s/1xiHAt8mbxpdPLhqZbKL1LQ  
提取码:l92h  //kafka包
<?php 
/** 
* 基于redis的queue队列 
*/ 
class Queue { 
private static $_models; 
public $queue_max_length = array( 
); 
public static function model($className=__CLASS__) { 
$model=null; 
if (isset(self::$_models[$className])) 
$model=self::$_models[$className]; 
else { 
$model=self::$_models[$className]=new $className(null); 
} 
return $model; 
} 
//确定redis 
private function select_redis($type) { 
return QueuePool::model()->get_zone($type); 
} 
private function trim($queue_name) { 
$type = str_replace("queue_", "", $queue_name); 
$max = 0; 
if (isset($this->queue_max_length[$type])) { 
$max = intval($this->queue_max_length[$type]); 
} 
if ($max>0) { 
$zone = $this->select_redis($type); 
if($zone) { 
$zone['redis']->lTrim($queue_name, 0, $max-1); 
} 
else { 
EdjLog::error("can not find zone, queue name: " . $type); 
return; 
} 
} 
} 
/** 
* 放入队列,统一队列对外暴露方法,增加类型默认放task队列,指定了就放对应的队列,同时如果不在指定类型内的,也放默认队列 
* 
* @author sunhongjing 2013-07-04 
* @param unknown_type $params 
* @param unknown_type $type 
* @return mixed 
*/ 
public function putin($params=null, $type){ 
$type = empty($type) ? 'error' : strtolower($type); 
$base_qname = QNameManagerService::model()->get_base_qname($type); 
if(!empty($base_qname)) { 
$this->queue_name = 'queue_'.$base_qname; 
}else{ 
$this->queue_name = 'queue_error'; 
} 
if ($params===null) { 
return $this->get(); 
} else { 
return $this->add($params); 
} 
} 
/** 
* 取一条队列数据,封装多个队列,统一调用方法 
* @author sunhongjing 2013-07-09 
* @param string $type 
* @return array 
*/ 
public function getit($type='default') 
{ 
$base_qname = QNameManagerService::model()->get_base_qname($type); 
if(!empty($base_qname)) { 
$this->queue_name = 'queue_'.$base_qname; 
}else{ 
return array(); 
} 
$zone = $this->select_redis($type); 
if($zone) { 
if($zone['brpop']) { 
$json = ''; 
$result = $zone['redis']->brPop($this->queue_name, $zone['brpop']); 
if(!empty($result) && isset($result[1])) { 
$json = $result[1]; 
} 
} 
else { 
$json = $zone['redis']->rPop($this->queue_name); 
} 
} 
else { 
EdjLog::error("can not find zone, queue name: " . $type); 
return array(); 
} 
return json_decode($json, true); 
} 
/** 
* 返回队列接收的类型列表 
* @author sunhongjing 2013-07-04 
* @return array 
*/ 
public function getQueueTypeList() 
{ 
$list = QNameManager::model()->findall(); 
if($list) { 
return $list; 
} 
EdjLog::error("Error: get queue list from database"); 
return array(); 
} 
/** 
* 设置或者读取位置队列 
* @param array $params 
* @return mixed 
*/ 
public function position($params=null) { 
$this->queue_name='queue_position'; 
if ($params===null) { 
return $this->get(); 
} else { 
return $this->add($params); 
} 
} 
/** 
* 心跳队列 
* @param string $params 
* @return mixed 
*/ 
public function heartbeat($params=null) { 
$this->queue_name='queue_heartbeat'; 
if ($params===null) { 
return $this->get(); 
} else { 
return $this->add($params); 
} 
} 
/** 
* 最高优先级队列 
* @param string $params 
* @return mixed 
*/ 
public function task($params=null) { 
$this->queue_name='queue_task'; 
if ($params===null) { 
return $this->get(); 
} else { 
return $this->add($params); 
} 
} 
/** 
* 保存日志到数据库 
* @param string $params 
* @return mixed 
*/ 
public function dumplog($params=null) { 
$this->queue_name='queue_dumplog'; 
if ($params===null) { 
return $this->get(); 
} else { 
return $this->add($params); 
} 
} 
/** 
* 返回各个队列中的任务总数 
*/ 
public function length() { 
$queue = $this->getQueueTypeList(); 
$queue_length=array(); 
$reg = "/P[0-9]+$/"; 
foreach($queue as $item) { 
$base_qname = $item->base_qname; 
$zone = $this->select_redis($base_qname); 
$key = 'queue_'.$base_qname; 
if($zone) { 
$len = $zone['redis']->lLen($key); 
if(isset($item->max) && $len > $item->max) { 
$key = '!'.$key; 
} 
$pkey = ''; 
if(preg_match($reg, $zone['name'])) { 
$pkey = $key.'@'.$zone['name']; 
} 
else { 
$pkey = $key.'@'.$zone['name']."_P".$item->level; 
} 
$queue_length[$pkey] = $len; 
} 
else { 
EdjLog::error("can not find zone, queue name: " . $key); 
} 
} 
return $queue_length; 
} 
private function get() { 
$type = str_replace("queue_", "", $this->queue_name); 
$zone = $this->select_redis($type); 
if($zone) { 
if($zone['brpop']) { 
$json = ''; 
$result = $zone['redis']->brPop($this->queue_name, $zone['brpop']); 
if(!empty($result) && isset($result[1])) { 
$json = $result[1]; 
} 
} 
else { 
$json = $zone['redis']->rPop($this->queue_name); 
} 
} 
else { 
EdjLog::error("can not find zone, queue name: " . $type); 
return array(); 
} 
return json_decode($json, true); 
} 
private function add($params) { 
$json=json_encode($params); 
$type = str_replace("queue_", "", $this->queue_name); 
$zone = $this->select_redis($type); 
$return = 0; 
if($zone) { 
try { 
$return = $zone['redis']->lPush($this->queue_name, $json); 
} catch (Exception $e) { 
EdjLog::error("write redis error,msg:".$e->getMessage()); 
//echo $e->getMessage(); 
            } 
} 
else { 
EdjLog::error("can not find zone, queue name: " . $type); 
} 
return $return; 
} 
public function processTask($task) { 
if(!isset($task['method'], $task['params'])) { 
$task_content = json_encode($task); 
EdjLog::error("can not run task due to no 'method' or 'params' specified, task is $task_content"); 
return; 
} 
$method=$task['method']; 
$params=$task['params']; 
$class = isset($task['class']) ? $task['class'] : "QueueProcess"; 
EdjLog::info("REDIS_QUEUE_OUT CLASS:$class METHOD:$method PARAMS:".json_encode($params)); 
try { 
//throw new Exception("Value must be 1 or below"); 
$queue_process=new $class(); 
// check this method is exist, if not throw ReflectionException 
new ReflectionMethod($queue_process, $method); 
call_user_func_array(array($queue_process, $method), array($params)); 
} catch(Exception $e) { 
$errmsg = $e->getMessage(); 
EdjLog::error("execption queue_run method:$method err: $errmsg"); 
} 
} 
public function getLengthByType($type){ 
$type = empty($type) ? 'error' : strtolower($type); 
$base_qname = QNameManagerService::model()->get_base_qname($type); 
$zone = $this->select_redis($base_qname); 
$key = 'queue_'.$base_qname; 
$len = 0; 
if($zone) { 
$len = $zone['redis']->lLen($key); 
} else { 
EdjLog::error("can not find zone, queue name: " . $base_qname); 
} 
return $len; 
} 
}

 

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/2316.html

(0)
上一篇 2021年7月16日
下一篇 2021年7月16日

相关推荐

发表回复

登录后才能评论