1. 技术的使用
1.1. 离线api
1.1.1. 样例程序(回顾)
n Java
package org.apache.flink.quickstart; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Created by wangsenfeng on 2017/10/17. */ public class WordCountExample { public static void main(String[] args) throws Exception { //构建环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //通过字符串构建数据集 DataSet<String> text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); //分割字符串、按照key进行分组、统计相同的key个数 DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); //打印 wordCounts.print(); } //分割字符串的方法 public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }
n Scala
package org.apache.flink.quickstart import org.apache.flink.api.scala._ object WordCount { def main(args: Array[String]) { //初始化环境 val env = ExecutionEnvironment.getExecutionEnvironment //从字符串中加载数据 val text = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?") //分割字符串、汇总tuple、按照key进行分组、统计分组后word个数 val counts = text.flatMap { _.toLowerCase.split(" ").filter { _.nonEmpty } } .map((_,1)) .groupBy(0) .sum(1) //打印 counts.print() }
1.1.2. 算子Dataset transformation
更多算子操作:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
n Java
ackage org.apache.flink.dataset.transformation; import org.apache.flink.api.common.functions.*; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * Created by wangsenfeng on 2017/11/15. */ public class DataSetTransformationApi { public static void main(String[] args) throws Exception { //初始化环境 final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //map函数 /*DataSet<Tuple2<Integer, Integer>> intPairs = env.fromElements(new Tuple2<Integer, Integer> (1,2),new Tuple2<Integer, Integer> (1,2)); DataSet<Integer> intSums = intPairs.map(new IntAdder()); intSums.print();*/ //flatmap函数 /*DataSet<String> textLines = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<String> words = textLines.flatMap(new Tokenizer()); words.print();*/ // mapPartition /*DataSet<String> textLines = env.fromElements( "Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<Long> counts = textLines.mapPartition(new PartitionCounter()); counts.print();*/ // mapPartition DataSet<Integer> intNumbers = env.fromElements(-1, -2, -3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0); DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter()); naturalNumbers.print(); } } // MapFunction that adds two integer values class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> { @Override public Integer map(Tuple2<Integer, Integer> in) { return in.f0 + in.f1; } } // FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens. class Tokenizer implements FlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) { for (String token : value.split("//W")) { out.collect(token); } } } class PartitionCounter implements MapPartitionFunction<String, Long> { public void mapPartition(Iterable<String> values, Collector<Long> out) { long c = 0; for (String s : values) { c++; } out.collect(c); } } class NaturalNumberFilter implements FilterFunction<Integer> { @Override public boolean filter(Integer number) { return number >= 0; }
n Scala
package org.apache.flink.transformation import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.api.scala._ /** * Created by wangsenfeng on 2017/11/15. */ object DataSetTransformationApi { def main(args: Array[String]) { //初始化环境 val env = ExecutionEnvironment.getExecutionEnvironment /** * map函数 */ /*val intPairs: DataSet[(Int, Int)] = env.fromElements((1,2),(3,4)) val intSums = intPairs.map { pair => pair._1 + pair._2 } intSums.print()*/ /** * flatmap函数 */ /*val textLines: DataSet[String] = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?") val words = textLines.flatMap { _.split(" ") } words.print()*/ /** * mapPartition函数 */ /*val textLines: DataSet[String] = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?") val counts = textLines.mapPartition { in => Some(in.size) } counts.print()*/ /** * filter函数 */ /*val intNumbers: DataSet[Int] = env.fromElements(-1, -2, -3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) val naturalNumbers = intNumbers.filter { _ > 0 } naturalNumbers.print()*/ /** * Reduce on DataSet Grouped by Key Expression */ /*val words: DataSet[WC] =env.fromElements(new WC("wang",1),new WC("sen",2),new WC("feng",2),new WC("wang",1),new WC("sen",2),new WC("feng",2)) val wordCounts = words.groupBy("word").reduce { (w1, w2) => new WC(w1.word, w1.count + w2.count) } wordCounts.print()*/ /** * Reduce on DataSet Grouped by KeySelector Function */ /*val words: DataSet[WC] = env.fromElements(new WC("wang", 1), new WC("sen", 2), new WC("feng", 2), new WC("wang", 1), new WC("sen", 2), new WC("feng", 2)) val wordCounts = words.groupBy { _.word } reduce { (w1, w2) => new WC(w1.word, w1.count + w2.count) } wordCounts.print()*/ } } /** * some ordinary POJO */ case class WC(val word: String, val count: Int) { def this() { this(null, -1) } /** * 在这里添加getset方法,或者使用case class */
1.1.3. 数据集Data Sources
Data Sources用来初始化数据集,例如从文件中初始化,或者从java的集合中初始化,flink是通过inputformat的方式进行初始化数据的,参考如下:
n 从文件初始化数据
- readTextFile(path) : TextInputFormat –读取文件,并将其作为字符串返回.
- readTextFileWithValue(path) : TextValueInputFormat –读取文件行,并将其作为字符串值返回。stringvalue是可变的字符串
- readCsvFile(path) : CsvInputFormat –解析逗号(或另一个char)分隔字段的文件。返回一个元组或pojo的数据集。支持基本的java类型和它们的值作为字段类型.
- readFileOfPrimitives(path, Class) : PrimitiveInputFormat –解析新行(或另一个char序列)分隔的原始数据类型,如字符串或整数。
- readFileOfPrimitives(path, delimiter, Class) :PrimitiveInputFormat –使用给定的分隔符来解析新行(或另一个char序列)分隔的原始数据类型,例如字符串或整数。
- readHadoopFile(FileInputFormat, Key, Value, path) : FileInputFormat –使用指定的FileInputFormat、Key类和Value类创建一个JobConf并从指定的路径中读取文件,并将它们作为Tuple2键值返回。
- readSequenceFile(Key, Value, path) : SequenceFileInputFormat –从指定的路径中创建一个JobConf并从指定的路径中读取文件,其中包括类型序列fileinputformat、Key类和Value类,并将它们作为Tuple2键值返回。
n 从集合初始化数据
- fromCollection(Collection) –从Java Java.util.collection创建一个数据集。集合中的所有元素都必须是相同类型的。
- fromCollection(Iterator, Class) -从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
- fromElements(T …) -从给定的对象序列中创建一个数据集。所有对象必须是相同类型的
- fromParallelCollection(SplittableIterator, Class) -从迭代器中创建一个数据集。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) -在给定的时间间隔内生成数字序列。
n 通用的
- readFile(inputFormat, path) / FileInputFormat – Accepts a file input format.
- createInput(inputFormat) / InputFormat – Accepts a generic input format.
n 例子
package org.apache.org.apache.datasource; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; /** * Created by wangsenfeng on 2017/11/15. */ public class FlinkDataSource { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //从本地文件系统读取文件 DataSet<String> localLines = env.readTextFile("file:///c:/words.txt"); localLines.print(); // 从hdfs读取文件 DataSet<String> hdfsLines = env.readTextFile("hdfs://master1:9000/words.txt"); hdfsLines.print(); // 从给定的元素中创建dataset DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); value.print(); // 生成一个number sequence DataSet<Long> numbers = env.generateSequence(1, 100); numbers.print(); } }
n 其他
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //从本地文件系统读取文件 DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); // 从hdfs读取文件 DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); // 从CSV读取文件,包含三列 DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .types(Integer.class, String.class, Double.class); // 从CSV读取文件,5列,但是只要两列 DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); // 从CSV读取3列文件,并对应放到person的列中 DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") .pojoType(Person.class, "name", "age", "zipcode"); // 用TextInputFormat从序列化文件中读取数据 DataSet<Tuple2<LongWritable, Text>> tuples = env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); // 使用SequenceFileInputFormat读取序列化文件 DataSet<Tuple2<IntWritable, Text>> tuples = env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); // 从给定的元素中创建dataset DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); // 生成一个number sequence DataSet<Long> numbers = env.generateSequence(1, 10000000); // 使用JDBC input format从关系型数据库读取文件 DataSet<Tuple2<String, Integer> dbData = env.createInput( JDBCInputFormat.buildJDBCInputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("select name, age from persons") .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) .finish() );
注意:Flink的程序编译器需要推断返回的数据类型的数据类型,一个InputFormat。如果不能自动推断这些信息,就需要手动提供类型信息,如上述示例所示。
n 递归遍历输入路径的目录
对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取
//初始化环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 创建配置conf Configuration parameters = new Configuration(); // 设置递归枚举参数 parameters.setBoolean("recursive.file.enumeration", true); //将配置传递给数据源 DataSet<String> logs = env.readTextFile("file:///path/with.nested/files") .withParameters(parameters);
n 读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
1.1.4. 数据输出Data Sinks
n Data Sinks是通过outputformat将数据集存储或者返回。
- writeAsText():TextOuputFormat – 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
- writeAsFormattedText() : TextOutputFormat –将元素写为字符串。字符串通过为每个元素调用用户定义的format()方法获得。
- writeAsCsv(…) : CsvOutputFormat –将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值都来自对象的toString()方法。
- print() :/ printToErr() / print(String msg) / printToErr(String msg) –打印标准输出/标准错误流中的每个元素的toString()值。可选地,可以提供prefix (msg) ,这是对输出进行预先设置的。这可以帮助区分不同的打印请求。如果并行度大于1,输出也将被预先处理生成输出的任务的标识符。
- write() : FileOutputFormat 用于定制文件输出的方法和基类。支持自定义object-to-bytes转换
- output():OutputFormat –最通用的输出方法,用于不基于文件的数据存储(例如将结果存储在数据库中).
n 例子
package org.apache.flink.datasink; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.core.fs.FileSystem; /** * Created by wangsenfeng on 2017/11/15. */ public class FlinkDataSink { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> textData = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?"); // 将dataset写入本地文件系统 DataSink<String> stringDataSink = textData.writeAsText("file:///F:/flinkdatasink1.txt"); // 将dataset写入hdfs textData.writeAsText("hdfs://master1:9000/flinkdatasink1.txt"); // 将dataset写入本地文件系统,覆盖 textData.writeAsText("file:///F:/flinkdatasink1.txt", FileSystem.WriteMode.OVERWRITE); env.execute(); } }
n 其他
// 文本数据 DataSet<String> textData = // [...] // 将dataset写入本地文件系统 textData.writeAsText("file:///my/result/on/localFS"); // 将dataset写入hdfs textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); // 将dataset写入本地文件系统,覆盖 textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); // 写入CSV文件,用|作为分割,如:"a|b|c" DataSet<Tuple3<String, Integer, Double>> values = // [...] values.writeAsCsv("file:///path/to/the/result/file", "/n", "|"); // 以(a,b,c)格式将数据写入文本文件,而不是CSV values.writeAsText("file:///path/to/the/result/file"); // 使用用户定义的TextFormatter对象来编写字符串 values.writeAsFormattedText("file:///path/to/the/result/file", new TextFormatter<Tuple2<Integer, Integer>>() { public String format (Tuple2<Integer, Integer> value) { return value.f1 + " - " + value.f0; } }); //使用通用的output方法将数据写入关系型数据库 DataSet<Tuple3<String, Integer, Double>> myResult = [...] myResult.output( // build and configure OutputFormat JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") .setDBUrl("jdbc:derby:memory:persons") .setQuery("insert into persons (name, age, height) values (?,?,?)") .finish() );
n 本地排序输出
目前不支持全局排序
DataSet<Tuple3<Integer, String, Double>> tData = // [...] DataSet<Tuple2<BookPojo, Double>> pData = // [...] DataSet<String> sData = // [...] //按升序对字符串字段进行排序,string元素是tuple的第二个元素,所以数字为1 tData.sortPartition(1, Order.ASCENDING).print(); // tuple的第一个integer元素升序,第三个double元素降序 tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print(); // tuple的第0个元素BookPojo的author按照降序排序 pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...); //全部tuple元素升序 tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...); //按照元素降序排列 sData.sortPartition("*", Order.DESCENDING).writeAsText(...);
1.1.5. 广播变量
广播变量允许你为一个操作的所有并行实例提供一个数据集,除了常规的输入操作。这对于辅助数据集或数据相关的参数化非常有用。数据集将作为一个集合在操作符中访问,广播变量是存在每个节点的内存中的,不宜过大。适合做一些简单的事情,例如保存一些function的参数,或者保存flink的一些配置信息等。
n Broadcast:Broadcast是通过withBroadcastSet(dataset,string)来注册的
n Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量
package org.apache.flink.brodcast; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.configuration.Configuration; import java.util.Collection; import java.util.Iterator; /** * Created by wangsenfeng on 2017/11/15. */ public class FlinkBrodCast { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 1. 将要广播的变量 DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); DataSet<String> data = env.fromElements("a", "b"); //2、在任意数据的任意算子中都可以进行广播变量的访问 MapOperator<String, String> broadcastValue = data.map(new RichMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { // 4. 访问广播变量,生成collection Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); Iterator<Integer> iterator = broadcastSet.iterator(); while (iterator.hasNext()){ System.out.println(iterator.next()+"==============="); } } @Override public String map(String value) throws Exception { //do something return value; } }); // 3. 广播这个DataSet broadcastValue.withBroadcastSet(toBroadcast, "broadcastSetName"); broadcastValue.print(); } }
1.1.6. 分布式缓存
Flink提供了一个分布式缓存,有点像hadoop的分布式缓存,使得并行的节点可以访问,应用通过ExecutionEnvironment注册一个文件或者文件夹并指定名字作为分布式缓存文件,当程序执行的时候,flink自动的拷贝缓存文件到每个节点的本地,应用可以通过指定的名字访问这个文件或者文件夹在节点的本地。
l 注册缓存文件
package org.apache.flink.DistrbitCache; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; /** * Created by wangsenfeng on 2017/11/15. */ public class FlinkDistributCache { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从hdfs注册缓存文件,包括名字 env.registerCachedFile("hdfs://master1:9000/words.txt", "hdfsFile"); //从本地注册一个可执行文件 // env.registerCachedFile("file:///c:/words.txt", "localExecFile", true); // 随便定义一个程序执行,读取缓存数据 DataSet<String> input = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?"); DataSet<Integer> result = input.map(new MyMapper()); result.print(); } }
l 访问缓存文件
如果在方法里需要访问缓存文件,那么这个方法必须实现RichFunction,因为需要使用到RuntimeContext这个变量。
// extend a RichFunction to have access to the RuntimeContext final class MyMapper extends RichMapFunction<String, Integer> { @Override public void open(Configuration config) { // 访问 cached file 通过 RuntimeContext 和 DistributedCache File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); // 读取文件 try { System.out.println("文件名:"+myFile.getName()+"============"); System.out.println("缓存路径"+myFile.getPath()+"============"); BufferedReader br = new BufferedReader(new FileReader(myFile)); String line; while ((line = br.readLine()) != null) { System.out.println(line); } br.close(); } catch (Exception e) { e.printStackTrace(); } } @Override public Integer map(String value) throws Exception { // use content of cached file return 2; } }
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9469.html