Spark项目之电商用户行为分析大数据平台之(十二)Spark上下文构建及模拟数据生成详解大数据

一、模拟生成数据

Spark项目之电商用户行为分析大数据平台之(十二)Spark上下文构建及模拟数据生成详解大数据

  1 package com.bw.test; 
  2  
  3 import java.util.ArrayList; 
  4 import java.util.Arrays; 
  5 import java.util.List; 
  6 import java.util.Random; 
  7 import java.util.UUID; 
  8  
  9 import com.bw.util.DateUtils; 
 10 import com.bw.util.StringUtils; 
 11 import org.apache.spark.api.java.JavaRDD; 
 12 import org.apache.spark.api.java.JavaSparkContext; 
 13 import org.apache.spark.sql.DataFrame; 
 14 import org.apache.spark.sql.Row; 
 15 import org.apache.spark.sql.RowFactory; 
 16 import org.apache.spark.sql.SQLContext; 
 17 import org.apache.spark.sql.types.DataTypes; 
 18 import org.apache.spark.sql.types.StructType; 
 19  
 20  
 21 /** 
 22  * 模拟数据程序 
 23  * @author Administrator 
 24  * 
 25  */ 
 26 public class MockData { 
 27  
 28     /** 
 29      * 模拟数据 
 30      * @param sc 
 31      * @param sqlContext 
 32      */ 
 33     public static void mock(JavaSparkContext sc, 
 34             SQLContext sqlContext) { 
 35         List<Row> rows = new ArrayList<Row>(); 
 36          
 37         String[] searchKeywords = new String[] {"火锅", "蛋糕", "重庆辣子鸡", "重庆小面", 
 38                 "呷哺呷哺", "新辣道鱼火锅", "国贸大厦", "太古商场", "日本料理", "温泉"}; 
 39         String date = DateUtils.getTodayDate(); 
 40         String[] actions = new String[]{"search", "click", "order", "pay"}; 
 41         Random random = new Random(); 
 42          
 43         for(int i = 0; i < 100; i++) { 
 44             //生产100个userID 
 45             long userid = random.nextInt(100);     
 46              
 47             for(int j = 0; j < 10; j++) { 
 48                 //每个userID有10个sessionID 
 49                 String sessionid = UUID.randomUUID().toString().replace("-", "");   
 50                 String baseActionTime = date + " " + random.nextInt(23); 
 51                  
 52                 Long clickCategoryId = null; 
 53                 //每个sessionID可能会做0-100之间的action操作 
 54                 for(int k = 0; k < random.nextInt(100); k++) { 
 55                     long pageid = random.nextInt(10);     
 56                     String actionTime = baseActionTime + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))) + ":" + StringUtils.fulfuill(String.valueOf(random.nextInt(59))); 
 57                     String searchKeyword = null; 
 58                     Long clickProductId = null; 
 59                     String orderCategoryIds = null; 
 60                     String orderProductIds = null; 
 61                     String payCategoryIds = null; 
 62                     String payProductIds = null; 
 63                      
 64                     String action = actions[random.nextInt(4)]; 
 65                     if("search".equals(action)) { 
 66                         searchKeyword = searchKeywords[random.nextInt(10)];    
 67                     } else if("click".equals(action)) { 
 68                         if(clickCategoryId == null) { 
 69                             clickCategoryId = Long.valueOf(String.valueOf(random.nextInt(100)));     
 70                         } 
 71                         clickProductId = Long.valueOf(String.valueOf(random.nextInt(100)));   
 72                     } else if("order".equals(action)) { 
 73                         orderCategoryIds = String.valueOf(random.nextInt(100));   
 74                         orderProductIds = String.valueOf(random.nextInt(100)); 
 75                     } else if("pay".equals(action)) { 
 76                         payCategoryIds = String.valueOf(random.nextInt(100));   
 77                         payProductIds = String.valueOf(random.nextInt(100)); 
 78                     } 
 79                      
 80                     Row row = RowFactory.create(date, userid, sessionid,  
 81                             pageid, actionTime, searchKeyword, 
 82                             clickCategoryId, clickProductId, 
 83                             orderCategoryIds, orderProductIds, 
 84                             payCategoryIds, payProductIds,  
 85                             Long.valueOf(String.valueOf(random.nextInt(10))));     
 86                     rows.add(row); 
 87                 } 
 88             } 
 89         } 
 90          
 91         JavaRDD<Row> rowsRDD = sc.parallelize(rows); 
 92          
 93         StructType schema = DataTypes.createStructType(Arrays.asList( 
 94                 DataTypes.createStructField("date", DataTypes.StringType, true), 
 95                 DataTypes.createStructField("user_id", DataTypes.LongType, true), 
 96                 DataTypes.createStructField("session_id", DataTypes.StringType, true), 
 97                 DataTypes.createStructField("page_id", DataTypes.LongType, true), 
 98                 DataTypes.createStructField("action_time", DataTypes.StringType, true), 
 99                 DataTypes.createStructField("search_keyword", DataTypes.StringType, true), 
100                 DataTypes.createStructField("click_category_id", DataTypes.LongType, true), 
101                 DataTypes.createStructField("click_product_id", DataTypes.LongType, true), 
102                 DataTypes.createStructField("order_category_ids", DataTypes.StringType, true), 
103                 DataTypes.createStructField("order_product_ids", DataTypes.StringType, true), 
104                 DataTypes.createStructField("pay_category_ids", DataTypes.StringType, true), 
105                 DataTypes.createStructField("pay_product_ids", DataTypes.StringType, true), 
106                 DataTypes.createStructField("city_id", DataTypes.LongType, true))); 
107  
108         DataFrame df = sqlContext.createDataFrame(rowsRDD, schema); 
109  
110         df.registerTempTable("user_visit_action");   
111         for(Row _row : df.take(1)) { 
112             System.out.println(_row);   
113         } 
114          
115         /** 
116          * ================================================================== 
117          */ 
118          
119         rows.clear(); 
120         String[] sexes = new String[]{"male", "female"}; 
121         for(int i = 0; i < 100; i ++) { 
122             long userid = i; 
123             String username = "user" + i; 
124             String name = "name" + i; 
125             int age = random.nextInt(60); 
126             String professional = "professional" + random.nextInt(100); 
127             String city = "city" + random.nextInt(100); 
128             String sex = sexes[random.nextInt(2)]; 
129              
130             Row row = RowFactory.create(userid, username, name, age,  
131                     professional, city, sex); 
132             rows.add(row); 
133         } 
134          
135         rowsRDD = sc.parallelize(rows); 
136          
137         StructType schema2 = DataTypes.createStructType(Arrays.asList( 
138                 DataTypes.createStructField("user_id", DataTypes.LongType, true), 
139                 DataTypes.createStructField("username", DataTypes.StringType, true), 
140                 DataTypes.createStructField("name", DataTypes.StringType, true), 
141                 DataTypes.createStructField("age", DataTypes.IntegerType, true), 
142                 DataTypes.createStructField("professional", DataTypes.StringType, true), 
143                 DataTypes.createStructField("city", DataTypes.StringType, true), 
144                 DataTypes.createStructField("sex", DataTypes.StringType, true))); 
145          
146         DataFrame df2 = sqlContext.createDataFrame(rowsRDD, schema2); 
147         for(Row _row : df2.take(1)) { 
148             System.out.println(_row);   
149         } 
150          
151         df2.registerTempTable("user_info");   
152          
153         /** 
154          * ================================================================== 
155          */ 
156         rows.clear(); 
157          
158         int[] productStatus = new int[]{0, 1}; 
159          
160         for(int i = 0; i < 100; i ++) { 
161             long productId = i; 
162             String productName = "product" + i; 
163             String extendInfo = "{/"product_status/": " + productStatus[random.nextInt(2)] + "}";     
164              
165             Row row = RowFactory.create(productId, productName, extendInfo); 
166             rows.add(row); 
167         } 
168          
169         rowsRDD = sc.parallelize(rows); 
170          
171         StructType schema3 = DataTypes.createStructType(Arrays.asList( 
172                 DataTypes.createStructField("product_id", DataTypes.LongType, true), 
173                 DataTypes.createStructField("product_name", DataTypes.StringType, true), 
174                 DataTypes.createStructField("extend_info", DataTypes.StringType, true))); 
175          
176         DataFrame df3 = sqlContext.createDataFrame(rowsRDD, schema3); 
177         for(Row _row : df3.take(1)) { 
178             System.out.println(_row);   
179         } 
180          
181         df3.registerTempTable("product_info");  
182     } 
183      
184 }

二、构建Spark上下文

Spark项目之电商用户行为分析大数据平台之(十二)Spark上下文构建及模拟数据生成详解大数据

 1 import com.bw.conf.ConfigurationManager; 
 2 import com.bw.constant.Constants; 
 3 import com.bw.test.MockData; 
 4 import org.apache.spark.SparkConf; 
 5 import org.apache.spark.api.java.JavaSparkContext; 
 6 import org.apache.spark.sql.SQLContext; 
 7  
 8  
 9 /** 
10  * 用户访问session分析Spark作业 
11  * 
12  * */ 
13 public class UserVisitSessionAnalyzeSpark { 
14  
15     public static void main(String[] args) { 
16         //构建Spark上下文 
17         SparkConf sparkConf = new SparkConf(); 
18         //Spark作业本地运行 
19         sparkConf.setMaster("local"); 
20         //为了符合大型企业的开发需求,不能出现硬编码,创建一个Constants接口类,定义一些常量 
21         sparkConf.setAppName(Constants.SPARK_APP_NAME_SESSION); 
22  
23         JavaSparkContext jsc = new JavaSparkContext(sparkConf); 
24         SQLContext sqlContext = new SQLContext(jsc); 
25  
26         mockData(jsc,sqlContext); 
27         jsc.stop(); 
28     } 
29  
30  
31     /** 
32      * 生成模拟数据(只有本地模式,才会去生成模拟数据) 
33      * @param sc 
34      * @param sqlContext 
35      */ 
36     private static void mockData(JavaSparkContext sc, SQLContext sqlContext) { 
37         boolean local = ConfigurationManager.getBoolean(Constants.SPARK_LOCAL); 
38         if(local) { 
39             MockData.mock(sc, sqlContext); 
40         } 
41     } 
42 }

三、打印的测试数据

3.1 user_visit_action

用户下的订单

Spark项目之电商用户行为分析大数据平台之(十二)Spark上下文构建及模拟数据生成详解大数据

[2018-05-23,34,4ad62c0824194e5687467bb84b9beeb9,3,2018-05-23 18:27:37,null,null,null,null,null,8,64,8]

3.2 user_info

Spark项目之电商用户行为分析大数据平台之(十二)Spark上下文构建及模拟数据生成详解大数据

[0,user0,name0,26,professional11,city4,male]

3.3 product_info

Spark项目之电商用户行为分析大数据平台之(十二)Spark上下文构建及模拟数据生成详解大数据

[0,product0,{"product_status": 1}]

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/9016.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论