MetaQ对接SparkStreaming示例代码详解编程语言

由于JavaReceiverInputDStream<String> lines = ssc.receiverStream(Receiver<T> receiver) 中 没有直接对接MetaQ的工具,当然可以实用使用spark streaming已经有的工具进行转接,这里不建议,所以可以继承Receiver类重写onStart()方法

import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.concurrent.Executor; 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.streaming.receiver.Receiver; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import com.dinpay.bdp.rcp.domain.Order; 
import com.taobao.metamorphosis.Message; 
import com.taobao.metamorphosis.client.MessageSessionFactory; 
import com.taobao.metamorphosis.client.MetaClientConfig; 
import com.taobao.metamorphosis.client.MetaMessageSessionFactory; 
import com.taobao.metamorphosis.client.consumer.ConsumerConfig; 
import com.taobao.metamorphosis.client.consumer.MessageConsumer; 
import com.taobao.metamorphosis.client.consumer.MessageListener; 
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; 
public abstract class MetaQReceiver<T> extends Receiver<T>{ 
private static final long serialVersionUID = -3240967436204273248L; 
Logger logger=LoggerFactory.getLogger(MetaQReceiver.class); 
private static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 
private String zkConnect; 
private String zkRoot; 
private String topic; 
private String group; 
public MetaQReceiver(String zkConnect,String zkRoot,String topic,String group) { 
super(StorageLevel.MEMORY_ONLY()); 
this.zkConnect=zkConnect; 
this.zkRoot=zkRoot; 
this.topic=topic; 
this.group=group; 
} 
@Override 
public void onStart() { 
try{ 
final MetaClientConfig metaClientConfig = new MetaClientConfig(); 
final ZKConfig zkConfig = new ZKConfig(); 
zkConfig.zkConnect = this.zkConnect;// "127.0.0.1:2181"; 
zkConfig.zkRoot = this.zkRoot;// "/meta"; 
            metaClientConfig.setZkConfig(zkConfig); 
final MessageSessionFactory sessionFactory = new MetaMessageSessionFactory( 
metaClientConfig); 
ConsumerConfig consumerConfig = new ConsumerConfig(group); 
// 默认最大获取延迟为5秒,这里设置成100毫秒,请根据实际应用要求做设置。 
consumerConfig.setMaxDelayFetchTimeInMills(100); 
final MessageConsumer consumer = sessionFactory 
.createConsumer(consumerConfig); 
// subscribe topic 
consumer.subscribe(topic, 1024 * 1024, new MessageListener() { 
@Override 
public void recieveMessages(final Message message) { 
try{ 
//T t=message2Object(new String(message.getData(),"utf-8")); 
logger.info("Receive message " + new String(message.getData())); 
String orderJson = new String(message.getData()); 
Order order = ParameterDataUtil.getObject(orderJson, Order.class); 
String cardNo = order.getCard_no(); 
String yyyyMMdd = df.format(new Date()); 
String payclassId = order.getPayclass_id(); 
String cntKey = "DK_CNT_" + cardNo + "_" + payclassId + "_" + yyyyMMdd; 
logger.info(cntKey); 
System.out.println(cntKey); 
T result = (T) cntKey; 
if(result!=null){ 
store(result); 
} 
}catch(Exception e){ 
logger.error("message2Object error",e); 
} 
} 
@Override 
public Executor getExecutor() { 
return null; 
} 
}); 
consumer.completeSubscribe(); 
}catch(Exception e){ 
throw new RuntimeException("metaq error",e); 
} 
} 
@Override 
public void onStop() { 
} 
//public abstract T message2Object(String message) throws Exception; 
}

下面该段代码可以减掉,若有需要转Object可以在此进行处理

public class MetaQReceiverStreaming extends MetaQReceiver<String>{ 
private static final long serialVersionUID = -2290689243756756929L; 
public MetaQReceiverStreaming(String zkConnect, String zkRoot, String topic, String group) { 
super(zkConnect, zkRoot, topic, group); 
} 
/*@Override 
public String message2Object(String message) throws Exception { 
return message; 
}*/ 
}

 

接下来通过spark streaming进行metaq的消息处理

ort java.util.Arrays; 
import java.util.List; 
import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import com.dinpay.bdp.rcp.util.Constant; 
import com.dinpay.bdp.rcp.util.MetaQReceiverStreaming; 
import com.google.common.base.Optional; 
import com.sun.xml.bind.v2.runtime.reflect.opt.Const; 
import scala.Tuple2;   
/** 
* @author ll 
*/ 
public class MetaqStreamingCount { 
public static void main(String[] args) { 
String zkConnect=Constant.METAZK; 
String zkRoot="/meta"; 
String topic=Constant.METATOPIC; 
String group=Constant.METAGROUP;  
//屏蔽日志 
Logger.getLogger("org.apache.spark").setLevel(Level.OFF); 
SparkConf sparkConf = new SparkConf().setAppName("MetaqStreamingCount").setMaster("local[2]");   
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
JavaReceiverInputDStream<String> lines = ssc.receiverStream(new MetaQReceiverStreaming(zkConnect,zkRoot,topic,group)); 
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
@Override 
public Iterable<String> call(String line) throws Exception { 
return Arrays.asList(line.split(" ")); 
} 
}); 
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 
@Override 
public Tuple2<String, Integer> call(String word) { 
return new Tuple2<>(word, 1); 
} 
}); 
JavaPairDStream<String, Integer> wordsCount = pairs.updateStateByKey(new Function2<List<Integer>, Optional<Integer>,  
Optional<Integer>>() {//对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) 
             
@Override 
public Optional<Integer> call(List<Integer> values, Optional<Integer> state){ 
//第一个参数就是key传进来的数据,第二个参数是已经有的数据 
Integer updateValue = 0;//如果第一次,state没有,updateValue为0,如果有就获取 
if(state.isPresent()){ 
updateValue = state.get(); 
} 
//遍历batch传进来的数据可以一直加,随着时间的流式会不断的累加相同key的value结果 
for (Integer value : values) { 
updateValue += value; 
} 
return Optional.of(updateValue);//返回更新的值 
            } 
}); 
wordsCount.print(); 
//需要将结果保存到Codis中 
ssc.checkpoint("checkpoint"); 
ssc.start(); 
ssc.awaitTermination(); 
ssc.close(); 
} 
}

 

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/11171.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论