SparkStreaming和Drools结合的HelloWord版详解大数据

关于sparkStreaming的测试Drools框架结合版

package com.dinpay.bdp.rcp.service; 
import java.math.BigDecimal; 
import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.Arrays; 
import java.util.Date; 
import org.apache.commons.lang3.StringUtils; 
import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.api.java.function.VoidFunction2; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.Time; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.kie.api.KieServices; 
import org.kie.api.runtime.KieContainer; 
import org.kie.api.runtime.KieSession; 
import com.dinpay.bdp.rcp.metaq.MetaQReceiver; 
import com.dinpay.bdp.rcp.streaming.StreamingUtil; 
import com.dinpay.bdp.rcp.util.CodisUtil; 
import com.dinpay.bdp.rcp.util.Constant; 
import com.dinpay.dpp.rcp.po.Order; 
import redis.clients.jedis.Jedis; 
import scala.Tuple2; 
/** 
* 同卡号单日最大交易金额测试  
* @author ll-t150 
* 
*/ 
public class SparkDroolsTest { 
public static Logger logger = Logger.getLogger(SparkDroolsTest.class); 
public static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 
public static void main(String[] args) { 
String zkConnect=Constant.METAZK; 
String zkRoot="/meta"; 
String topic=Constant.ORDERTOPIC; 
String group=Constant.STREAMGROUP;  
//屏蔽日志 
Logger.getLogger("org.apache.spark").setLevel(Level.OFF); 
logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group); 
SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local[2]");   
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
//从metaq取消息 
JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group)); 
JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() { 
@Override 
public Iterable<Order> call(Order order) throws Exception { 
return Arrays.asList(new Order[]{order}); 
} 
}); 
//同卡号单日交易最大次数 统计包括成功和未成功的订单 
JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words); 
save2Codis(cardCntPairs); 
ssc.start(); 
ssc.awaitTermination(); 
ssc.close(); 
} 
@SuppressWarnings({ "unchecked", "serial" }) 
public static <T> JavaPairDStream<String, T>  getCardJavaPair(JavaDStream<Order> words){ 
JavaPairDStream<String, T> pairs = null; 
//次数统计 
pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() { 
@Override 
public Tuple2<String, Integer> call(Order order) { 
Jedis jedis = CodisUtil.getJedisPool().getResource(); 
String cardCntkey = order.getSystemId()+"_CNT_"+order.getPayerCardNo()+"_"+df.format(new Date()); 
//拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1 
String value = jedis.get(cardCntkey); 
if (StringUtils.isEmpty(value)) { 
return new Tuple2<String, Integer>(cardCntkey, 1); 
} else { 
return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1); 
} 
} 
}); 
return pairs; 
} 
/** 
* 将计算出的数据保存到codis中 
* @param pair 
*/ 
@SuppressWarnings("serial") 
public static <T> void save2Codis(JavaPairDStream<String, T> pair) { 
pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() { 
@Override 
public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception { 
rdd.foreach(new VoidFunction<Tuple2<String,T>>() { 
@Override 
public void call(Tuple2<String, T> tp) throws Exception { 
Jedis jedis = CodisUtil.getJedisPool().getResource(); 
jedis.set(tp._1(), String.valueOf(tp._2())); 
logger.info(tp._1() + ">>>" + tp._2()+",保存到Codis完成!"); 
KieServices kieServices = KieServices.Factory.get(); 
KieContainer kieContainer = kieServices.getKieClasspathContainer(); 
KieSession kieSession = kieContainer.newKieSession("helloworld"); 
ChannAmount objectChannel = new ChannAmount(); 
objectChannel.setAmount(Integer.parseInt(String.valueOf(tp._2()))); 
objectChannel.setChannel(tp._1()); 
kieSession.insert(objectChannel); 
kieSession.fireAllRules(); 
if(jedis !=null){ 
jedis.close(); 
} 
} 
}); 
} 
}); 
} 
}

 

关于配置文件的设置

SparkStreaming和Drools结合的HelloWord版详解大数据

kmodule.xml文件

<?xml version="1.0" encoding="UTF-8"?> 
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule"> 
<kbase name="rules" packages="rules"> 
<ksession name="helloworld"/> 
</kbase> 
<kbase name="dtables" packages="dtables"> 
<ksession name="ksession-dtables"/> 
</kbase> 
<kbase name="process" packages="process"> 
<ksession name="ksession-process"/> 
</kbase> 
</kmodule>

 

riskMonitor.drl内容

package rules; 
import com.dinpay.bdp.rcp.service.ChannAmount; 
//其中m为对象objectChannel 的引用 
rule "channel" 
when 
ChannAmount(amount>2) 
then 
System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 "); 
end

 

测试OK!

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9048.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论