MetaQ简单实用demo详解编程语言

Metaq的生产者代码

import java.io.BufferedReader; 
import java.io.InputStreamReader; 
 
import com.taobao.metamorphosis.Message; 
import com.taobao.metamorphosis.client.MessageSessionFactory; 
import com.taobao.metamorphosis.client.MetaClientConfig; 
import com.taobao.metamorphosis.client.MetaMessageSessionFactory; 
import com.taobao.metamorphosis.client.producer.MessageProducer; 
import com.taobao.metamorphosis.client.producer.SendResult; 
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; 
 
public class MetaqProvider { 
     
    public static void main(String[] args) throws Exception { 
        final MetaClientConfig metaClientConfig = new MetaClientConfig(); 
        final ZKConfig zkConfig = new ZKConfig(); 
        zkConfig.zkConnect = Constant.METAZK; 
        final String topic = Constant.METATOPIC; 
        metaClientConfig.setZkConfig(zkConfig); 
        MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig); 
        MessageProducer producer = sessionFactory.createProducer(); 
         
        producer.publish(topic); 
 
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
        String line = null; 
        while ((line = reader.readLine()) != null)  
        { 
            SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes())); 
            if (!sendResult.isSuccess())  
            { 
                System.err.println("Send message failed,error message:" + sendResult.getErrorMessage()); 
            } 
            else { 
                System.out.println("Send message successfully,sent to " + sendResult.getPartition()); 
            } 
        } 
    } 
 
}

 

Metaq消费者代码

import com.taobao.metamorphosis.Message; 
import com.taobao.metamorphosis.client.MessageSessionFactory; 
import com.taobao.metamorphosis.client.MetaClientConfig; 
import com.taobao.metamorphosis.client.MetaMessageSessionFactory; 
import com.taobao.metamorphosis.client.consumer.ConsumerConfig; 
import com.taobao.metamorphosis.client.consumer.MessageConsumer; 
import com.taobao.metamorphosis.client.consumer.MessageListener; 
import com.taobao.metamorphosis.exception.MetaClientException; 
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; 
 
public class MetaqConsumer { 
    private static final MetaClientConfig metaClientConfig = new MetaClientConfig(); 
    private static final ZKConfig zkConfig = new ZKConfig(); 
    private static final String topic = Constant.METATOPIC; 
    private static final String group = Constant.METAGROUP; 
     
    public static void main(String[] args) throws Exception { 
          zkConfig.zkConnect = Constant.METAZK; 
            metaClientConfig.setZkConfig(zkConfig); 
            MessageSessionFactory sessionFactory; 
            MessageConsumer consumer; 
            try { 
                sessionFactory = new MetaMessageSessionFactory(metaClientConfig); 
                consumer = sessionFactory.createConsumer(new ConsumerConfig(group)); 
                consumer.subscribe(topic, 1024 * 1024, new MessageListener() { 
                    public void recieveMessages(Message message) { 
                        System.out.println("Receive message " + new String(message.getData())); 
                    }   
                    public Executor getExecutor() { 
                        return null; 
                    } 
                }); 
                consumer.completeSubscribe(); 
            } catch (MetaClientException e) { 
                e.printStackTrace(); 
            } 
    } 
     
}
#metaq的配置信息 
meta.zk=192.168.46.161:2181 
meta.group=meta-gp 
meta.topic=meta-tpc

 

<!-- MetaQ start --> 
 <dependency> 
     <groupId>com.taobao.metamorphosis</groupId> 
     <artifactId>metamorphosis-client</artifactId> 
     <version>1.4.6.2</version> 
</dependency> 
<dependency> 
     <groupId>com.taobao.metamorphosis</groupId> 
     <artifactId>metamorphosis-tools</artifactId> 
      <version>1.4.6.2</version> 
</dependency> 
<!-- MetaQ end -->

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11172.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论