Metaq的生产者代码
import java.io.BufferedReader; import java.io.InputStreamReader; import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.producer.MessageProducer; import com.taobao.metamorphosis.client.producer.SendResult; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public class MetaqProvider { public static void main(String[] args) throws Exception { final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = Constant.METAZK; final String topic = Constant.METATOPIC; metaClientConfig.setZkConfig(zkConfig); MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(metaClientConfig); MessageProducer producer = sessionFactory.createProducer(); producer.publish(topic); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line = null; while ((line = reader.readLine()) != null) { SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes())); if (!sendResult.isSuccess()) { System.err.println("Send message failed,error message:" + sendResult.getErrorMessage()); } else { System.out.println("Send message successfully,sent to " + sendResult.getPartition()); } } } }
Metaq消费者代码
import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.consumer.ConsumerConfig; import com.taobao.metamorphosis.client.consumer.MessageConsumer; import com.taobao.metamorphosis.client.consumer.MessageListener; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public class MetaqConsumer { private static final MetaClientConfig metaClientConfig = new MetaClientConfig(); private static final ZKConfig zkConfig = new ZKConfig(); private static final String topic = Constant.METATOPIC; private static final String group = Constant.METAGROUP; public static void main(String[] args) throws Exception { zkConfig.zkConnect = Constant.METAZK; metaClientConfig.setZkConfig(zkConfig); MessageSessionFactory sessionFactory; MessageConsumer consumer; try { sessionFactory = new MetaMessageSessionFactory(metaClientConfig); consumer = sessionFactory.createConsumer(new ConsumerConfig(group)); consumer.subscribe(topic, 1024 * 1024, new MessageListener() { public void recieveMessages(Message message) { System.out.println("Receive message " + new String(message.getData())); } public Executor getExecutor() { return null; } }); consumer.completeSubscribe(); } catch (MetaClientException e) { e.printStackTrace(); } } }
#metaq的配置信息 meta.zk=192.168.46.161:2181 meta.group=meta-gp meta.topic=meta-tpc
<!-- MetaQ start --> <dependency> <groupId>com.taobao.metamorphosis</groupId> <artifactId>metamorphosis-client</artifactId> <version>1.4.6.2</version> </dependency> <dependency> <groupId>com.taobao.metamorphosis</groupId> <artifactId>metamorphosis-tools</artifactId> <version>1.4.6.2</version> </dependency> <!-- MetaQ end -->
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/11172.html