SparkStreaming和Drools结合的HelloWord版详解大数据

关于sparkStreaming的测试Drools框架结合版

package com.dinpay.bdp.rcp.service; 
 
import java.math.BigDecimal; 
import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.Arrays; 
import java.util.Date; 
 
import org.apache.commons.lang3.StringUtils; 
import org.apache.log4j.Level; 
import org.apache.log4j.Logger; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.api.java.function.VoidFunction2; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.Time; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.kie.api.KieServices; 
import org.kie.api.runtime.KieContainer; 
import org.kie.api.runtime.KieSession; 
 
import com.dinpay.bdp.rcp.metaq.MetaQReceiver; 
import com.dinpay.bdp.rcp.streaming.StreamingUtil; 
import com.dinpay.bdp.rcp.util.CodisUtil; 
import com.dinpay.bdp.rcp.util.Constant; 
import com.dinpay.dpp.rcp.po.Order; 
 
import redis.clients.jedis.Jedis; 
import scala.Tuple2; 
 
/** 
 * 同卡号单日最大交易金额测试  
 * @author ll-t150 
 * 
 */ 
public class SparkDroolsTest { 
     
    public static Logger logger = Logger.getLogger(SparkDroolsTest.class); 
    public static final DateFormat df = new SimpleDateFormat("yyyyMMdd"); 
     
     public static void main(String[] args) { 
         String zkConnect=Constant.METAZK; 
         String zkRoot="/meta"; 
         String topic=Constant.ORDERTOPIC; 
         String group=Constant.STREAMGROUP;  
         //屏蔽日志 
         Logger.getLogger("org.apache.spark").setLevel(Level.OFF); 
         logger.info("metaq configuration:"+zkConnect+"--"+topic+"--"+group); 
         SparkConf sparkConf = new SparkConf().setAppName("SparkDroolsTest").setMaster("local[2]");   
         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));  
         //从metaq取消息 
         JavaReceiverInputDStream<Order> lines = ssc.receiverStream(new MetaQReceiver(zkConnect,zkRoot,topic,group)); 
 
         JavaDStream<Order> words = lines.flatMap(new FlatMapFunction<Order, Order>() { 
            @Override 
            public Iterable<Order> call(Order order) throws Exception { 
                return Arrays.asList(new Order[]{order}); 
            } 
        }); 
         
        //同卡号单日交易最大次数 统计包括成功和未成功的订单 
        JavaPairDStream<String, Integer> cardCntPairs = getCardJavaPair(words); 
        save2Codis(cardCntPairs); 
        ssc.start(); 
        ssc.awaitTermination(); 
        ssc.close(); 
     } 
      
     @SuppressWarnings({ "unchecked", "serial" }) 
        public static <T> JavaPairDStream<String, T>  getCardJavaPair(JavaDStream<Order> words){ 
             JavaPairDStream<String, T> pairs = null; 
                     //次数统计 
                     pairs = (JavaPairDStream<String, T>) words.mapToPair(new PairFunction<Order, String, Integer>() { 
                        @Override 
                        public Tuple2<String, Integer> call(Order order) { 
                            Jedis jedis = CodisUtil.getJedisPool().getResource(); 
                            String cardCntkey = order.getSystemId()+"_CNT_"+order.getPayerCardNo()+"_"+df.format(new Date()); 
                            //拼接key,先到codis里面查找对应的key是否存在,若存在就直接取对应的值,然后取值加1 
                            String value = jedis.get(cardCntkey); 
                            if (StringUtils.isEmpty(value)) { 
                                return new Tuple2<String, Integer>(cardCntkey, 1); 
                            } else { 
                                return new Tuple2<String, Integer>(cardCntkey, Integer.parseInt(value) + 1); 
                            } 
                        } 
                    }); 
                    return pairs; 
             } 
         
         
         /** 
         * 将计算出的数据保存到codis中 
         * @param pair 
         */ 
        @SuppressWarnings("serial") 
        public static <T> void save2Codis(JavaPairDStream<String, T> pair) { 
            pair.foreachRDD(new VoidFunction2<JavaPairRDD<String,T>,Time>() { 
                @Override 
                public void call(JavaPairRDD<String, T> rdd, Time time) throws Exception { 
                    rdd.foreach(new VoidFunction<Tuple2<String,T>>() { 
                        @Override 
                        public void call(Tuple2<String, T> tp) throws Exception { 
                                Jedis jedis = CodisUtil.getJedisPool().getResource(); 
                                jedis.set(tp._1(), String.valueOf(tp._2())); 
                                logger.info(tp._1() + ">>>" + tp._2()+",保存到Codis完成!"); 
                                KieServices kieServices = KieServices.Factory.get(); 
                                KieContainer kieContainer = kieServices.getKieClasspathContainer(); 
                                KieSession kieSession = kieContainer.newKieSession("helloworld"); 
                                ChannAmount objectChannel = new ChannAmount(); 
                                objectChannel.setAmount(Integer.parseInt(String.valueOf(tp._2()))); 
                                objectChannel.setChannel(tp._1()); 
                                kieSession.insert(objectChannel); 
                                kieSession.fireAllRules(); 
                                if(jedis !=null){ 
                                    jedis.close(); 
                                } 
                        } 
                    }); 
                } 
            }); 
        } 
     
}

 

关于配置文件的设置

SparkStreaming和Drools结合的HelloWord版详解大数据

kmodule.xml文件

<?xml version="1.0" encoding="UTF-8"?> 
<kmodule xmlns="http://jboss.org/kie/6.0.0/kmodule"> 
    <kbase name="rules" packages="rules"> 
        <ksession name="helloworld"/> 
    </kbase> 
    <kbase name="dtables" packages="dtables"> 
        <ksession name="ksession-dtables"/> 
    </kbase> 
    <kbase name="process" packages="process"> 
        <ksession name="ksession-process"/> 
    </kbase> 
</kmodule>

 

riskMonitor.drl内容

package rules; 
 
import com.dinpay.bdp.rcp.service.ChannAmount; 
//其中m为对象objectChannel 的引用 
rule "channel" 
    when 
        ChannAmount(amount>2) 
    then 
        System.out.println("Drools规则实现:该渠道最近5分钟交易金额超过2次 "); 
end

 

测试OK!

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9048.html

(0)
上一篇 2021年7月19日 09:10
下一篇 2021年7月19日 09:10

相关推荐

发表回复

登录后才能评论