MetaQ对接SparkStreaming示例代码详解编程语言

由于JavaReceiverInputDStream<String> lines = ssc.receiverStream(Receiver<T> receiver) 中 没有直接对接MetaQ的工具,当然可以实用使用spark streaming已经有的工具进行转接,这里不建议,所以可以继承Receiver类重写onStart()方法

import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.concurrent.Executor; 
 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.streaming.receiver.Receiver; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import com.dinpay.bdp.rcp.domain.Order; 
import com.taobao.metamorphosis.Message; 
import com.taobao.metamorphosis.client.MessageSessionFactory; 
import com.taobao.metamorphosis.client.MetaClientConfig; 
import com.taobao.metamorphosis.client.MetaMessageSessionFactory; 
import com.taobao.metamorphosis.client.consumer.ConsumerConfig; 
import com.taobao.metamorphosis.client.consumer.MessageConsumer; 
import com.taobao.metamorphosis.client.consumer.MessageListener; 
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; 
 
public abstract class MetaQReceiver<T> extends Receiver<T>{ 
    private static final long serialVersionUID = -3240967436204273248L; 
    Logger logger=LoggerFactory.getLogger(MetaQReceiver.class); 
    private static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 
     
    private String zkConnect; 
    private String zkRoot; 
    private String topic; 
    private String group; 
 
    public MetaQReceiver(String zkConnect,String zkRoot,String topic,String group) { 
        super(StorageLevel.MEMORY_ONLY()); 
        this.zkConnect=zkConnect; 
        this.zkRoot=zkRoot; 
        this.topic=topic; 
        this.group=group; 
         
    } 
     
    @Override 
    public void onStart() { 
        try{ 
            final MetaClientConfig metaClientConfig = new MetaClientConfig(); 
            final ZKConfig zkConfig = new ZKConfig(); 
            zkConfig.zkConnect = this.zkConnect;// "127.0.0.1:2181"; 
            zkConfig.zkRoot = this.zkRoot;// "/meta"; 
            metaClientConfig.setZkConfig(zkConfig); 
            final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory( 
                    metaClientConfig); 
            ConsumerConfig consumerConfig = new ConsumerConfig(group); 
            // 默认最大获取延迟为5秒,这里设置成100毫秒,请根据实际应用要求做设置。 
            consumerConfig.setMaxDelayFetchTimeInMills(100); 
            final MessageConsumer consumer = sessionFactory 
                    .createConsumer(consumerConfig); 
            // subscribe topic 
            consumer.subscribe(topic, 1024 * 1024, new MessageListener() { 
                @Override 
                public void recieveMessages(final Message message) { 
                    try{ 
                        //T t=message2Object(new String(message.getData(),"utf-8")); 
                        logger.info("Receive message " + new String(message.getData())); 
                        String orderJson = new String(message.getData()); 
                        Order order = ParameterDataUtil.getObject(orderJson, Order.class); 
                        String cardNo = order.getCard_no(); 
                        String yyyyMMdd = df.format(new Date()); 
                        String payclassId = order.getPayclass_id(); 
                        String cntKey = "DK_CNT_" + cardNo + "_" + payclassId + "_" + yyyyMMdd; 
                        logger.info(cntKey); 
                        System.out.println(cntKey); 
                        T result = (T) cntKey; 
                        if(result!=null){ 
                            store(result); 
                        } 
                    }catch(Exception e){ 
                        logger.error("message2Object error",e); 
                    } 
                } 
                @Override 
                public Executor getExecutor() { 
                    return null; 
                } 
            }); 
            consumer.completeSubscribe(); 
        }catch(Exception e){ 
            throw new RuntimeException("metaq error",e); 
        } 
    } 
     
    @Override 
    public void onStop() { 
    } 
     
    //public abstract T message2Object(String message) throws Exception; 
}

下面该段代码可以减掉,若有需要转Object可以在此进行处理

public class MetaQReceiverStreaming extends MetaQReceiver<String>{ 
     
    private static final long serialVersionUID = -2290689243756756929L; 
     
    public MetaQReceiverStreaming(String zkConnect, String zkRoot, String topic, String group) { 
        super(zkConnect, zkRoot, topic, group); 
    } 
 
    /*@Override 
    public String message2Object(String message) throws Exception { 
        return message; 
    }*/ 
     
}

 

接下来通过spark streaming进行metaq的消息处理

ort java.util.Arrays; 
import java.util.List; 
 
import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
 
import com.dinpay.bdp.rcp.util.Constant; 
import com.dinpay.bdp.rcp.util.MetaQReceiverStreaming; 
import com.google.common.base.Optional; 
import com.sun.xml.bind.v2.runtime.reflect.opt.Const; 
 
import scala.Tuple2;   
/** 
 * @author ll 
 */ 
public class MetaqStreamingCount { 
     
     public static void main(String[] args) { 
         String zkConnect=Constant.METAZK; 
         String zkRoot="/meta"; 
         String topic=Constant.METATOPIC; 
         String group=Constant.METAGROUP;  
          
         //屏蔽日志 
         Logger.getLogger("org.apache.spark").setLevel(Level.OFF); 
         SparkConf sparkConf = new SparkConf().setAppName("MetaqStreamingCount").setMaster("local[2]");   
         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
          
         JavaReceiverInputDStream<String> lines = ssc.receiverStream(new MetaQReceiverStreaming(zkConnect,zkRoot,topic,group)); 
          
         JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
              
            @Override 
            public Iterable<String> call(String line) throws Exception { 
                return Arrays.asList(line.split(" ")); 
            } 
        }); 
          
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
            @Override 
            public Tuple2<String, Integer> call(String word) { 
                return new Tuple2<>(word, 1); 
            } 
        }); 
         
        JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>,  
                Optional<Integer>>() {//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) 
             
            @Override 
            public Optional<Integer> call(List<Integer> values, Optional<Integer> state){ 
                //第一个参数就是key传进来的数据,第二个参数是已经有的数据 
                Integer updateValue = 0;//如果第一次,state没有,updateValue为0,如果有就获取 
                if(state.isPresent()){ 
                    updateValue = state.get(); 
                } 
                //遍历batch传进来的数据可以一直加,随着时间的流式会不断的累加相同key的value结果 
                for (Integer value : values) { 
                    updateValue += value; 
                } 
                return Optional.of(updateValue);//返回更新的值 
            } 
        }); 
 
        wordsCount.print(); 
        //需要将结果保存到Codis中 
        ssc.checkpoint("checkpoint"); 
        ssc.start(); 
        ssc.awaitTermination(); 
        ssc.close(); 
         
        } 
}

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/11171.html

(0)
上一篇 2021年7月19日 10:46
下一篇 2021年7月19日 10:46

相关推荐

发表回复

登录后才能评论