1. 技术的使用
1.1. 离线api
1.1.1. 样例程序(回顾)
n Java
package org.apache.flink.quickstart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Created by wangsenfeng on 2017/10/17.
*/
public class WordCountExample {
public static void main(String[] args) throws Exception {
//构建环境
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//通过字符串构建数据集
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
//分割字符串、按照key进行分组、统计相同的key个数
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
//打印
wordCounts.print();
}
//分割字符串的方法
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
n Scala
package org.apache.flink.quickstart
import org.apache.flink.api.scala._
object WordCount {
def main(args: Array[String]) {
//初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
//从字符串中加载数据
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
//分割字符串、汇总tuple、按照key进行分组、统计分组后word个数
val counts = text.flatMap { _.toLowerCase.split(" ").filter { _.nonEmpty } }
.map((_,1))
.groupBy(0)
.sum(1)
//打印
counts.print()
}
1.1.2. 算子Dataset transformation
更多算子操作:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
n Java


ackage org.apache.flink.dataset.transformation;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Created by wangsenfeng on 2017/11/15.
*/
public class DataSetTransformationApi {
public static void main(String[] args) throws Exception {
//初始化环境
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
//map函数
/*DataSet<Tuple2<Integer, Integer>> intPairs = env.fromElements(new Tuple2<Integer, Integer> (1,2),new Tuple2<Integer, Integer> (1,2));
DataSet<Integer> intSums = intPairs.map(new IntAdder());
intSums.print();*/
//flatmap函数
/*DataSet<String> textLines = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<String> words = textLines.flatMap(new Tokenizer());
words.print();*/
// mapPartition
/*DataSet<String> textLines = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<Long> counts = textLines.mapPartition(new PartitionCounter());
counts.print();*/
// mapPartition
DataSet<Integer> intNumbers = env.fromElements(-1, -2, -3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0);
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
naturalNumbers.print();
}
}
// MapFunction that adds two integer values
class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> in) {
return in.f0 + in.f1;
}
}
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
class Tokenizer implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) {
for (String token : value.split("//W")) {
out.collect(token);
}
}
}
class PartitionCounter implements MapPartitionFunction<String, Long> {
public void mapPartition(Iterable<String> values, Collector<Long> out) {
long c = 0;
for (String s : values) {
c++;
}
out.collect(c);
}
}
class NaturalNumberFilter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer number) {
return number >= 0;
}
n Scala

package org.apache.flink.transformation
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
/**
* Created by wangsenfeng on 2017/11/15.
*/
object DataSetTransformationApi {
def main(args: Array[String]) {
//初始化环境
val env = ExecutionEnvironment.getExecutionEnvironment
/**
* map函数
*/
/*val intPairs: DataSet[(Int, Int)] = env.fromElements((1,2),(3,4))
val intSums = intPairs.map { pair => pair._1 + pair._2 }
intSums.print()*/
/**
* flatmap函数
*/
/*val textLines: DataSet[String] = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?")
val words = textLines.flatMap { _.split(" ") }
words.print()*/
/**
* mapPartition函数
*/
/*val textLines: DataSet[String] = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?")
val counts = textLines.mapPartition { in => Some(in.size) }
counts.print()*/
/**
* filter函数
*/
/*val intNumbers: DataSet[Int] = env.fromElements(-1, -2, -3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
val naturalNumbers = intNumbers.filter { _ > 0 }
naturalNumbers.print()*/
/**
* Reduce on DataSet Grouped by Key Expression
*/
/*val words: DataSet[WC] =env.fromElements(new WC("wang",1),new WC("sen",2),new WC("feng",2),new WC("wang",1),new WC("sen",2),new WC("feng",2))
val wordCounts = words.groupBy("word").reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
wordCounts.print()*/
/**
* Reduce on DataSet Grouped by KeySelector Function
*/
/*val words: DataSet[WC] = env.fromElements(new WC("wang", 1), new WC("sen", 2), new WC("feng", 2), new WC("wang", 1), new WC("sen", 2), new WC("feng", 2))
val wordCounts = words.groupBy {
_.word
} reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
wordCounts.print()*/
}
}
/**
* some ordinary POJO
*/
case class WC(val word: String, val count: Int) {
def this() {
this(null, -1)
}
/**
* 在这里添加getset方法,或者使用case class
*/
1.1.3. 数据集Data Sources
Data Sources用来初始化数据集,例如从文件中初始化,或者从java的集合中初始化,flink是通过inputformat的方式进行初始化数据的,参考如下:
n 从文件初始化数据
- readTextFile(path) : TextInputFormat –读取文件,并将其作为字符串返回.
- readTextFileWithValue(path) : TextValueInputFormat –读取文件行,并将其作为字符串值返回。stringvalue是可变的字符串
- readCsvFile(path) : CsvInputFormat –解析逗号(或另一个char)分隔字段的文件。返回一个元组或pojo的数据集。支持基本的java类型和它们的值作为字段类型.
- readFileOfPrimitives(path, Class) : PrimitiveInputFormat –解析新行(或另一个char序列)分隔的原始数据类型,如字符串或整数。
- readFileOfPrimitives(path, delimiter, Class) :PrimitiveInputFormat –使用给定的分隔符来解析新行(或另一个char序列)分隔的原始数据类型,例如字符串或整数。
- readHadoopFile(FileInputFormat, Key, Value, path) : FileInputFormat –使用指定的FileInputFormat、Key类和Value类创建一个JobConf并从指定的路径中读取文件,并将它们作为Tuple2键值返回。
- readSequenceFile(Key, Value, path) : SequenceFileInputFormat –从指定的路径中创建一个JobConf并从指定的路径中读取文件,其中包括类型序列fileinputformat、Key类和Value类,并将它们作为Tuple2键值返回。
n 从集合初始化数据
- fromCollection(Collection) –从Java Java.util.collection创建一个数据集。集合中的所有元素都必须是相同类型的。
- fromCollection(Iterator, Class) -从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
- fromElements(T …) -从给定的对象序列中创建一个数据集。所有对象必须是相同类型的
- fromParallelCollection(SplittableIterator, Class) -从迭代器中创建一个数据集。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to) -在给定的时间间隔内生成数字序列。
n 通用的
- readFile(inputFormat, path) / FileInputFormat – Accepts a file input format.
- createInput(inputFormat) / InputFormat – Accepts a generic input format.
n 例子
package org.apache.org.apache.datasource;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* Created by wangsenfeng on 2017/11/15.
*/
public class FlinkDataSource {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从本地文件系统读取文件
DataSet<String> localLines = env.readTextFile("file:///c:/words.txt");
localLines.print();
// 从hdfs读取文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://master1:9000/words.txt");
hdfsLines.print();
// 从给定的元素中创建dataset
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
value.print();
// 生成一个number sequence
DataSet<Long> numbers = env.generateSequence(1, 100);
numbers.print();
}
}
n 其他
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从本地文件系统读取文件
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile");
// 从hdfs读取文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
// 从CSV读取文件,包含三列
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.types(Integer.class, String.class, Double.class);
// 从CSV读取文件,5列,但是只要两列
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010") // take the first and the fourth field
.types(String.class, Double.class);
// 从CSV读取3列文件,并对应放到person的列中
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode");
// 用TextInputFormat从序列化文件中读取数据
DataSet<Tuple2<LongWritable, Text>> tuples = env.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class,
"hdfs://nnHost:nnPort/path/to/file");
// 使用SequenceFileInputFormat读取序列化文件
DataSet<Tuple2<IntWritable, Text>> tuples =
env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file");
// 从给定的元素中创建dataset
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar");
// 生成一个number sequence
DataSet<Long> numbers = env.generateSequence(1, 10000000);
// 使用JDBC input format从关系型数据库读取文件
DataSet<Tuple2<String, Integer> dbData = env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);
注意:Flink的程序编译器需要推断返回的数据类型的数据类型,一个InputFormat。如果不能自动推断这些信息,就需要手动提供类型信息,如上述示例所示。
n 递归遍历输入路径的目录
对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取
//初始化环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建配置conf
Configuration parameters = new Configuration();
// 设置递归枚举参数
parameters.setBoolean("recursive.file.enumeration", true);
//将配置传递给数据源
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files") .withParameters(parameters);
n 读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

1.1.4. 数据输出Data Sinks
n Data Sinks是通过outputformat将数据集存储或者返回。
- writeAsText():TextOuputFormat – 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
- writeAsFormattedText() : TextOutputFormat –将元素写为字符串。字符串通过为每个元素调用用户定义的format()方法获得。
- writeAsCsv(…) : CsvOutputFormat –将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值都来自对象的toString()方法。
- print() :/ printToErr() / print(String msg) / printToErr(String msg) –打印标准输出/标准错误流中的每个元素的toString()值。可选地,可以提供prefix (msg) ,这是对输出进行预先设置的。这可以帮助区分不同的打印请求。如果并行度大于1,输出也将被预先处理生成输出的任务的标识符。
- write() : FileOutputFormat 用于定制文件输出的方法和基类。支持自定义object-to-bytes转换
- output():OutputFormat –最通用的输出方法,用于不基于文件的数据存储(例如将结果存储在数据库中).
n 例子
package org.apache.flink.datasink;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSink;
import org.apache.flink.core.fs.FileSystem;
/**
* Created by wangsenfeng on 2017/11/15.
*/
public class FlinkDataSink {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> textData = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?");
// 将dataset写入本地文件系统
DataSink<String> stringDataSink = textData.writeAsText("file:///F:/flinkdatasink1.txt");
// 将dataset写入hdfs
textData.writeAsText("hdfs://master1:9000/flinkdatasink1.txt");
// 将dataset写入本地文件系统,覆盖
textData.writeAsText("file:///F:/flinkdatasink1.txt", FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}
n 其他
// 文本数据
DataSet<String> textData = // [...]
// 将dataset写入本地文件系统
textData.writeAsText("file:///my/result/on/localFS");
// 将dataset写入hdfs
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");
// 将dataset写入本地文件系统,覆盖
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// 写入CSV文件,用|作为分割,如:"a|b|c"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "/n", "|");
// 以(a,b,c)格式将数据写入文本文件,而不是CSV
values.writeAsText("file:///path/to/the/result/file");
// 使用用户定义的TextFormatter对象来编写字符串
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter<Tuple2<Integer, Integer>>() {
public String format (Tuple2<Integer, Integer> value) {
return value.f1 + " - " + value.f0;
}
});
//使用通用的output方法将数据写入关系型数据库
DataSet<Tuple3<String, Integer, Double>> myResult = [...]
myResult.output(
// build and configure OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish()
);
n 本地排序输出
目前不支持全局排序
DataSet<Tuple3<Integer, String, Double>> tData = // [...]
DataSet<Tuple2<BookPojo, Double>> pData = // [...]
DataSet<String> sData = // [...]
//按升序对字符串字段进行排序,string元素是tuple的第二个元素,所以数字为1
tData.sortPartition(1, Order.ASCENDING).print();
// tuple的第一个integer元素升序,第三个double元素降序
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print();
// tuple的第0个元素BookPojo的author按照降序排序
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...);
//全部tuple元素升序
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...);
//按照元素降序排列
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);
1.1.5. 广播变量
广播变量允许你为一个操作的所有并行实例提供一个数据集,除了常规的输入操作。这对于辅助数据集或数据相关的参数化非常有用。数据集将作为一个集合在操作符中访问,广播变量是存在每个节点的内存中的,不宜过大。适合做一些简单的事情,例如保存一些function的参数,或者保存flink的一些配置信息等。
n Broadcast:Broadcast是通过withBroadcastSet(dataset,string)来注册的
n Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量
package org.apache.flink.brodcast;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.configuration.Configuration;
import java.util.Collection;
import java.util.Iterator;
/**
* Created by wangsenfeng on 2017/11/15.
*/
public class FlinkBrodCast {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 1. 将要广播的变量
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");
//2、在任意数据的任意算子中都可以进行广播变量的访问
MapOperator<String, String> broadcastValue = data.map(new RichMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
// 4. 访问广播变量,生成collection
Collection<Integer> broadcastSet =
getRuntimeContext().getBroadcastVariable("broadcastSetName");
Iterator<Integer> iterator = broadcastSet.iterator();
while (iterator.hasNext()){
System.out.println(iterator.next()+"===============");
}
}
@Override
public String map(String value) throws Exception {
//do something
return value;
}
});
// 3. 广播这个DataSet
broadcastValue.withBroadcastSet(toBroadcast, "broadcastSetName");
broadcastValue.print();
}
}
1.1.6. 分布式缓存
Flink提供了一个分布式缓存,有点像hadoop的分布式缓存,使得并行的节点可以访问,应用通过ExecutionEnvironment注册一个文件或者文件夹并指定名字作为分布式缓存文件,当程序执行的时候,flink自动的拷贝缓存文件到每个节点的本地,应用可以通过指定的名字访问这个文件或者文件夹在节点的本地。
l 注册缓存文件
package org.apache.flink.DistrbitCache;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
/**
* Created by wangsenfeng on 2017/11/15.
*/
public class FlinkDistributCache {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从hdfs注册缓存文件,包括名字
env.registerCachedFile("hdfs://master1:9000/words.txt", "hdfsFile");
//从本地注册一个可执行文件
// env.registerCachedFile("file:///c:/words.txt", "localExecFile", true);
// 随便定义一个程序执行,读取缓存数据
DataSet<String> input = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?");
DataSet<Integer> result = input.map(new MyMapper());
result.print();
}
}
l 访问缓存文件
如果在方法里需要访问缓存文件,那么这个方法必须实现RichFunction,因为需要使用到RuntimeContext这个变量。
// extend a RichFunction to have access to the RuntimeContext
final class MyMapper extends RichMapFunction<String, Integer> {
@Override
public void open(Configuration config) {
// 访问 cached file 通过 RuntimeContext 和 DistributedCache
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
// 读取文件
try {
System.out.println("文件名:"+myFile.getName()+"============");
System.out.println("缓存路径"+myFile.getPath()+"============");
BufferedReader br = new BufferedReader(new FileReader(myFile));
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
br.close();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public Integer map(String value) throws Exception {
// use content of cached file
return 2;
}
}
原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/bigdata/9469.html