sparkmlib使用Pipeline实现简单的逻辑回归详解大数据

MLib 机器学习算法的标准API可以很方便的把多个算法整合到一个pipeline,并可以把整个过程形象的比如机器学习算法流;

Pipeline包括三个阶段:

sparkmlib使用Pipeline实现简单的逻辑回归详解大数据

第一阶段:Tokenier会把每个一个文件分成word

第二阶段:把word转化为特征向量

第三阶段:用特征向量和列别训练模型

下面就以一个简单的逻辑回归为例说明使用Pipeline开发;

定义一个类:

public class Person implements Serializable{ 
	public static final long serialVersionUID = 1L; 
	public  long id; 
	public  String text; 
	public  double label; 
	 
	public Person(){}; 
	public Person(long id, String text) { 
		this.id = id; 
		this.text = text; 
	} 
	public Person(long id, String text, double label) { 
		super(); 
		this.id = id; 
		this.text = text; 
		this.label = label; 
	} 
	public long getId() { 
		return id; 
	} 
	public void setId(long id) { 
		this.id = id; 
	} 
	public String getText() { 
		return text; 
	} 
	public void setText(String text) { 
		this.text = text; 
	} 
	public double getLabel() { 
		return label; 
	} 
	public void setLabel(double label) { 
		this.label = label; 
	} 
	@Override 
	public String toString() { 
		return "Person [id=" + id + ", text=" + text + ", label=" + label + "]"; 
	} 
}
public static void main(String[] args) { 
		 
		SparkSession sparkSession = SparkSession 
			      .builder() 
			      .appName("loi").master("local[1]") 
			      .getOrCreate(); 
		// Create an RDD of SparkIn objects from a text file 
		JavaRDD<String> dataRDD = sparkSession 
				                   .read() 
				                   . textFile("E:/sparkMlib/sparkMlib/src/mllib/testLine.txt") 
				                   .javaRDD(); 
		JavaRDD<Person> dataPerson = dataRDD.map(new Function<String,Person>(){ 
			public Person call(String line) throws Exception { 
				String[] parts = line.split(","); 
				Person sparkIn = new Person(); 
				sparkIn.setId(Long.parseLong(parts[0])); 
				sparkIn.setText(parts[1]); 
				sparkIn.setLabel(Double.parseDouble(parts[2])); 
				return sparkIn; 
			} 
		}); 
		// Apply a schema to an RDD of JavaBeans to get a DataFrame 
		Dataset<Row> training  = sparkSession.createDataFrame(dataPerson, Person.class); 
		//把document组成words 
		Tokenizer tokenizer = new Tokenizer() 
				.setInputCol("text") 
				.setOutputCol("words"); 
		//words特征向量化 
		HashingTF hashigTF = new HashingTF() 
				.setNumFeatures(1000) 
				.setInputCol(tokenizer.getOutputCol()) 
				.setOutputCol("features"); 
		//选择模型 
		LogisticRegression lr = new LogisticRegression() 
				.setMaxIter(30) 
				.setRegParam(0.001); 
		//组建pipeline流 
		Pipeline pipeline =new Pipeline() 
				.setStages(new PipelineStage[]{tokenizer,hashigTF,lr}); 
		//训练模型 
		PipelineModel model = pipeline.fit(training); 
		 
		//组织预测数据 
		Dataset<Row> test =sparkSession.createDataFrame( 
				Arrays.asList( 
						new Person(4,"spark i j k"), 
				        new Person(5,"l m n"), 
						new Person(6,"spark hadoop spark"), 
						new Person(7,"apache hadoop")), 
				Person.class); 
		Dataset<Row> predictions =model.transform(test); 
		for(Row r:predictions.select("id", "text", "probability", "prediction").collectAsList()){ 
			System.out.println(r.get(0)+":"+r.get(1)+":"+r.get(2)+":"+r.get(3)); 
		} 
		 
	}

sparkmlib使用Pipeline实现简单的逻辑回归详解大数据

以上就是用P
ipeline实现的一个简单的逻辑回归模型;



原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9381.html

(0)
上一篇 2021年7月19日 09:31
下一篇 2021年7月19日 09:31

相关推荐

发表回复

登录后才能评论