Flink进阶(三)详解大数据

1. 技术的使用

1.1. 离线api

1.1.1. 样例程序(回顾)

n Java

package org.apache.flink.quickstart; 
 
import org.apache.flink.api.common.functions.FlatMapFunction; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.util.Collector; 
 
/** 
 * Created by wangsenfeng on 2017/10/17. 
 */ 
public class WordCountExample { 
    public static void main(String[] args) throws Exception { 
        //构建环境 
        final ExecutionEnvironment env = 
                ExecutionEnvironment.getExecutionEnvironment(); 
        //通过字符串构建数据集 
        DataSet<String> text = env.fromElements( 
                "Who's there?", 
                "I think I hear them. Stand, ho! Who's there?"); 
        //分割字符串、按照key进行分组、统计相同的key个数 
        DataSet<Tuple2<String, Integer>> wordCounts = text 
                .flatMap(new LineSplitter()) 
                .groupBy(0) 
                .sum(1); 
        //打印 
        wordCounts.print(); 
    } 
    //分割字符串的方法 
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { 
        @Override 
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { 
            for (String word : line.split(" ")) { 
                out.collect(new Tuple2<String, Integer>(word, 1)); 
            } 
        } 
    }

n Scala

package org.apache.flink.quickstart 
 
import org.apache.flink.api.scala._ 
 
object WordCount { 
  def main(args: Array[String]) { 
    //初始化环境 
    val env = ExecutionEnvironment.getExecutionEnvironment 
    //从字符串中加载数据 
    val text = env.fromElements( 
      "Who's there?", 
      "I think I hear them. Stand, ho! Who's there?") 
    //分割字符串、汇总tuple、按照key进行分组、统计分组后word个数 
    val counts = text.flatMap { _.toLowerCase.split(" ").filter { _.nonEmpty } } 
      .map((_,1)) 
      .groupBy(0) 
      .sum(1) 
    //打印 
    counts.print() 
  }

1.1.2. 算子Dataset transformation

更多算子操作:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html

n Java

 Flink进阶(三)详解大数据

Flink进阶(三)详解大数据

 

ackage org.apache.flink.dataset.transformation; 
 
import org.apache.flink.api.common.functions.*; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.util.Collector; 
 
/** 
 * Created by wangsenfeng on 2017/11/15. 
 */ 
public class DataSetTransformationApi { 
    public static void main(String[] args) throws Exception { 
        //初始化环境 
        final ExecutionEnvironment env = 
                ExecutionEnvironment.getExecutionEnvironment(); 
        //map函数 
        /*DataSet<Tuple2<Integer, Integer>> intPairs = env.fromElements(new Tuple2<Integer, Integer> (1,2),new Tuple2<Integer, Integer> (1,2)); 
        DataSet<Integer> intSums = intPairs.map(new IntAdder()); 
        intSums.print();*/ 
        //flatmap函数 
        /*DataSet<String> textLines = env.fromElements( 
                "Who's there?", 
                "I think I hear them. Stand, ho! Who's there?"); 
          DataSet<String> words = textLines.flatMap(new Tokenizer()); 
        words.print();*/ 
 
        // mapPartition 
        /*DataSet<String> textLines =  env.fromElements( 
                "Who's there?", 
                "I think I hear them. Stand, ho! Who's there?"); 
        DataSet<Long> counts = textLines.mapPartition(new PartitionCounter()); 
        counts.print();*/ 
 
 
        // mapPartition 
        DataSet<Integer> intNumbers = env.fromElements(-1, -2, -3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0); 
        DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter()); 
        naturalNumbers.print(); 
    } 
} 
 
// MapFunction that adds two integer values 
class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> { 
    @Override 
    public Integer map(Tuple2<Integer, Integer> in) { 
        return in.f0 + in.f1; 
    } 
} 
 
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens. 
class Tokenizer implements FlatMapFunction<String, String> { 
    @Override 
    public void flatMap(String value, Collector<String> out) { 
        for (String token : value.split("//W")) { 
            out.collect(token); 
        } 
    } 
} 
 
class PartitionCounter implements MapPartitionFunction<String, Long> { 
 
    public void mapPartition(Iterable<String> values, Collector<Long> out) { 
        long c = 0; 
        for (String s : values) { 
            c++; 
        } 
        out.collect(c); 
    } 
} 
 
class NaturalNumberFilter implements FilterFunction<Integer> { 
    @Override 
    public boolean filter(Integer number) { 
        return number >= 0; 
    }

n Scala

 Flink进阶(三)详解大数据

package org.apache.flink.transformation 
 
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} 
import org.apache.flink.api.scala._ 
 
/** 
  * Created by wangsenfeng on 2017/11/15. 
  */ 
object DataSetTransformationApi { 
  def main(args: Array[String]) { 
    //初始化环境 
    val env = ExecutionEnvironment.getExecutionEnvironment 
 
    /** 
      * map函数 
      */ 
 
    /*val intPairs: DataSet[(Int, Int)] = env.fromElements((1,2),(3,4)) 
    val intSums = intPairs.map { pair => pair._1 + pair._2 } 
    intSums.print()*/ 
 
    /** 
      * flatmap函数 
      */ 
    /*val textLines: DataSet[String] = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?") 
    val words = textLines.flatMap { _.split(" ") } 
    words.print()*/ 
 
    /** 
      * mapPartition函数 
      */ 
    /*val textLines: DataSet[String] = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?") 
    val counts = textLines.mapPartition { in => Some(in.size) } 
    counts.print()*/ 
 
    /** 
      * filter函数 
      */ 
    /*val intNumbers: DataSet[Int] = env.fromElements(-1, -2, -3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0) 
    val naturalNumbers = intNumbers.filter { _ > 0 } 
    naturalNumbers.print()*/ 
 
    /** 
      * Reduce on DataSet Grouped by Key Expression 
      */ 
    /*val words: DataSet[WC] =env.fromElements(new WC("wang",1),new WC("sen",2),new WC("feng",2),new WC("wang",1),new WC("sen",2),new WC("feng",2)) 
    val wordCounts = words.groupBy("word").reduce { 
        (w1, w2) => new WC(w1.word, w1.count + w2.count) 
      } 
    wordCounts.print()*/ 
 
    /** 
      * Reduce on DataSet Grouped by KeySelector Function 
      */ 
    /*val words: DataSet[WC] = env.fromElements(new WC("wang", 1), new WC("sen", 2), new WC("feng", 2), new WC("wang", 1), new WC("sen", 2), new WC("feng", 2)) 
    val wordCounts = words.groupBy { 
      _.word 
    } reduce { 
      (w1, w2) => new WC(w1.word, w1.count + w2.count) 
    } 
    wordCounts.print()*/ 
  } 
} 
 
/** 
  * some ordinary POJO 
  */ 
 
case class WC(val word: String, val count: Int) { 
  def this() { 
    this(null, -1) 
  } 
 
  /** 
    * 在这里添加getset方法,或者使用case class 
    */

1.1.3. 数据集Data Sources

Data Sources用来初始化数据集,例如从文件中初始化,或者从java的集合中初始化,flink是通过inputformat的方式进行初始化数据的,参考如下:

https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java)。

 

n 从文件初始化数据

  • readTextFile(path) : TextInputFormat –读取文件,并将其作为字符串返回.
  • readTextFileWithValue(path) : TextValueInputFormat –读取文件行,并将其作为字符串值返回。stringvalue是可变的字符串
  • readCsvFile(path) : CsvInputFormat –解析逗号(或另一个char)分隔字段的文件。返回一个元组或pojo的数据集。支持基本的java类型和它们的值作为字段类型.
  • readFileOfPrimitives(path, Class) : PrimitiveInputFormat –解析新行(或另一个char序列)分隔的原始数据类型,如字符串或整数。
  • readFileOfPrimitives(path, delimiter, Class) :PrimitiveInputFormat –使用给定的分隔符来解析新行(或另一个char序列)分隔的原始数据类型,例如字符串或整数。
  • readHadoopFile(FileInputFormat, Key, Value, path) : FileInputFormat –使用指定的FileInputFormatKey类和Value类创建一个JobConf并从指定的路径中读取文件,并将它们作为Tuple2键值返回。
  • readSequenceFile(Key, Value, path) : SequenceFileInputFormat –从指定的路径中创建一个JobConf并从指定的路径中读取文件,其中包括类型序列fileinputformatKey类和Value类,并将它们作为Tuple2键值返回。

 

n 从集合初始化数据

  • fromCollection(Collection) –Java Java.util.collection创建一个数据集。集合中的所有元素都必须是相同类型的。
  • fromCollection(Iterator, Class) -从迭代器创建数据集。该类指定迭代器返回的元素的数据类型
  • fromElements(T …) -从给定的对象序列中创建一个数据集。所有对象必须是相同类型的
  • fromParallelCollection(SplittableIterator, Class) -从迭代器中创建一个数据集。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) -在给定的时间间隔内生成数字序列。

 

n 通用的

  • readFile(inputFormat, path) / FileInputFormat – Accepts a file input format.
  • createInput(inputFormat) / InputFormat – Accepts a generic input format.

 

n 例子

package org.apache.org.apache.datasource; 
 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
 
/** 
 * Created by wangsenfeng on 2017/11/15. 
 */ 
public class FlinkDataSource { 
    public static void main(String[] args) throws Exception { 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
        //从本地文件系统读取文件 
        DataSet<String> localLines = env.readTextFile("file:///c:/words.txt"); 
        localLines.print(); 
 
        // 从hdfs读取文件 
        DataSet<String> hdfsLines = env.readTextFile("hdfs://master1:9000/words.txt"); 
        hdfsLines.print(); 
 
        // 从给定的元素中创建dataset 
        DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); 
        value.print(); 
 
        // 生成一个number sequence 
        DataSet<Long> numbers = env.generateSequence(1, 100); 
        numbers.print(); 
    } 
}

n 其他

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
//从本地文件系统读取文件 
DataSet<String> localLines = env.readTextFile("file:///path/to/my/textfile"); 
// 从hdfs读取文件 
DataSet<String> hdfsLines = env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile"); 
 
// 从CSV读取文件,包含三列 
DataSet<Tuple3<Integer, String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") 
	                       .types(Integer.class, String.class, Double.class); 
 
// 从CSV读取文件,5列,但是只要两列 
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") 
                           .includeFields("10010")  // take the first and the fourth field 
	                       .types(String.class, Double.class); 
 
// 从CSV读取3列文件,并对应放到person的列中 
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file") 
                         .pojoType(Person.class, "name", "age", "zipcode"); 
 
// 用TextInputFormat从序列化文件中读取数据 
DataSet<Tuple2<LongWritable, Text>> tuples = env.readHadoopFile(new TextInputFormat(),  
												LongWritable.class, Text.class,  
												"hdfs://nnHost:nnPort/path/to/file"); 
 
// 使用SequenceFileInputFormat读取序列化文件 
DataSet<Tuple2<IntWritable, Text>> tuples = 
 env.readSequenceFile(IntWritable.class, Text.class, "hdfs://nnHost:nnPort/path/to/file"); 
 
// 从给定的元素中创建dataset 
DataSet<String> value = env.fromElements("Foo", "bar", "foobar", "fubar"); 
 
// 生成一个number sequence 
DataSet<Long> numbers = env.generateSequence(1, 10000000); 
 
// 使用JDBC input format从关系型数据库读取文件 
DataSet<Tuple2<String, Integer> dbData = env.createInput(     
					 JDBCInputFormat.buildJDBCInputFormat() 
                     .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") 
                     .setDBUrl("jdbc:derby:memory:persons") 
                     .setQuery("select name, age from persons") 
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)) 
                     .finish() 
    ); 

注意:Flink的程序编译器需要推断返回的数据类型的数据类型,一个InputFormat。如果不能自动推断这些信息,就需要手动提供类型信息,如上述示例所示

n 递归遍历输入路径的目录

对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取

//初始化环境 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
// 创建配置conf 
Configuration parameters = new Configuration(); 
// 设置递归枚举参数 
parameters.setBoolean("recursive.file.enumeration", true); 
//将配置传递给数据源 
DataSet<String> logs = env.readTextFile("file:///path/with.nested/files")		  								.withParameters(parameters);

n 读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

 Flink进阶(三)详解大数据

1.1.4. 数据输出Data Sinks

n Data Sinks是通过outputformat将数据集存储或者返回。

  • writeAsText():TextOuputFormat – 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
  • writeAsFormattedText() : TextOutputFormat –将元素写为字符串。字符串通过为每个元素调用用户定义的format()方法获得。
  • writeAsCsv(…) : CsvOutputFormat –将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值都来自对象的toString()方法。
  • print() :/ printToErr() / print(String msg) / printToErr(String msg) –打印标准输出/标准错误流中的每个元素的toString()值。可选地,可以提供prefix (msg) ,这是对输出进行预先设置的。这可以帮助区分不同的打印请求。如果并行度大于1,输出也将被预先处理生成输出的任务的标识符。
  • write() : FileOutputFormat 用于定制文件输出的方法和基类。支持自定义object-to-bytes转换
  • output():OutputFormat –最通用的输出方法,用于不基于文件的数据存储(例如将结果存储在数据库中).

 

n 例子

package org.apache.flink.datasink; 
 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.operators.DataSink; 
import org.apache.flink.core.fs.FileSystem; 
 
/** 
 * Created by wangsenfeng on 2017/11/15. 
 */ 
public class FlinkDataSink { 
    public static void main(String[] args) throws Exception{ 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
        DataSet<String> textData = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?"); 
         
        // 将dataset写入本地文件系统 
        DataSink<String> stringDataSink = textData.writeAsText("file:///F:/flinkdatasink1.txt"); 
 
        // 将dataset写入hdfs 
        textData.writeAsText("hdfs://master1:9000/flinkdatasink1.txt"); 
 
        // 将dataset写入本地文件系统,覆盖 
        textData.writeAsText("file:///F:/flinkdatasink1.txt", FileSystem.WriteMode.OVERWRITE); 
        env.execute(); 
    } 
}

n 其他

// 文本数据 
DataSet<String> textData = // [...] 
 
// 将dataset写入本地文件系统 
textData.writeAsText("file:///my/result/on/localFS"); 
 
// 将dataset写入hdfs 
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS"); 
 
// 将dataset写入本地文件系统,覆盖 
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE); 
 
// 写入CSV文件,用|作为分割,如:"a|b|c" 
DataSet<Tuple3<String, Integer, Double>> values = // [...] 
values.writeAsCsv("file:///path/to/the/result/file", "/n", "|"); 
 
// 以(a,b,c)格式将数据写入文本文件,而不是CSV 
values.writeAsText("file:///path/to/the/result/file"); 
 
// 使用用户定义的TextFormatter对象来编写字符串 
values.writeAsFormattedText("file:///path/to/the/result/file", 
    new TextFormatter<Tuple2<Integer, Integer>>() { 
        public String format (Tuple2<Integer, Integer> value) { 
            return value.f1 + " - " + value.f0; 
        } 
}); 
//使用通用的output方法将数据写入关系型数据库 
DataSet<Tuple3<String, Integer, Double>> myResult = [...] 
myResult.output( 
    // build and configure OutputFormat 
    JDBCOutputFormat.buildJDBCOutputFormat() 
                    .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") 
                    .setDBUrl("jdbc:derby:memory:persons") 
                    .setQuery("insert into persons (name, age, height) values (?,?,?)") 
                    .finish() 
    );

n 本地排序输出

目前不支持全局排序

DataSet<Tuple3<Integer, String, Double>> tData = // [...] 
DataSet<Tuple2<BookPojo, Double>> pData = // [...] 
DataSet<String> sData = // [...] 
//按升序对字符串字段进行排序,string元素是tuple的第二个元素,所以数字为1 
tData.sortPartition(1, Order.ASCENDING).print(); 
// tuple的第一个integer元素升序,第三个double元素降序 
tData.sortPartition(2, Order.DESCENDING).sortPartition(0, Order.ASCENDING).print(); 
// tuple的第0个元素BookPojo的author按照降序排序 
pData.sortPartition("f0.author", Order.DESCENDING).writeAsText(...); 
//全部tuple元素升序 
tData.sortPartition("*", Order.ASCENDING).writeAsCsv(...); 
//按照元素降序排列 
sData.sortPartition("*", Order.DESCENDING).writeAsText(...);

1.1.5. 广播变量

广播变量允许你为一个操作的所有并行实例提供一个数据集,除了常规的输入操作。这对于辅助数据集或数据相关的参数化非常有用。数据集将作为一个集合在操作符中访问,广播变量是存在每个节点的内存中的,不宜过大。适合做一些简单的事情,例如保存一些function的参数,或者保存flink的一些配置信息等。

n Broadcast:Broadcast是通过withBroadcastSet(datasetstring)来注册的

n Access:通过getRuntimeContext().getBroadcastVariable(String)访问广播变量

package org.apache.flink.brodcast; 
 
import org.apache.flink.api.common.functions.RichMapFunction; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.operators.MapOperator; 
import org.apache.flink.configuration.Configuration; 
 
import java.util.Collection; 
import java.util.Iterator; 
 
/** 
 * Created by wangsenfeng on 2017/11/15. 
 */ 
public class FlinkBrodCast { 
    public static void main(String[] args) throws Exception{ 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
        // 1. 将要广播的变量 
        DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3); 
        DataSet<String> data = env.fromElements("a", "b"); 
        //2、在任意数据的任意算子中都可以进行广播变量的访问 
        MapOperator<String, String> broadcastValue = data.map(new RichMapFunction<String, String>() { 
            @Override 
            public void open(Configuration parameters) throws Exception { 
                // 4. 访问广播变量,生成collection 
                Collection<Integer> broadcastSet = 
                        getRuntimeContext().getBroadcastVariable("broadcastSetName"); 
                Iterator<Integer> iterator = broadcastSet.iterator(); 
                while (iterator.hasNext()){ 
                    System.out.println(iterator.next()+"==============="); 
                } 
            } 
 
            @Override 
            public String map(String value) throws Exception { 
                //do something 
                return value; 
            } 
        }); 
        // 3. 广播这个DataSet 
        broadcastValue.withBroadcastSet(toBroadcast, "broadcastSetName"); 
        broadcastValue.print(); 
    } 
}

1.1.6. 分布式缓存

Flink提供了一个分布式缓存,有点像hadoop的分布式缓存,使得并行的节点可以访问,应用通过ExecutionEnvironment注册一个文件或者文件夹并指定名字作为分布式缓存文件,当程序执行的时候,flink自动的拷贝缓存文件到每个节点的本地,应用可以通过指定的名字访问这个文件或者文件夹在节点的本地。

l 注册缓存文件

package org.apache.flink.DistrbitCache; 
 
import org.apache.flink.api.common.functions.RichMapFunction; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.configuration.Configuration; 
 
import java.io.BufferedReader; 
import java.io.File; 
import java.io.FileReader; 
 
/** 
 * Created by wangsenfeng on 2017/11/15. 
 */ 
public class FlinkDistributCache { 
    public static void main(String[] args) throws Exception { 
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
 
        // 从hdfs注册缓存文件,包括名字 
        env.registerCachedFile("hdfs://master1:9000/words.txt", "hdfsFile"); 
 
        //从本地注册一个可执行文件 
       // env.registerCachedFile("file:///c:/words.txt", "localExecFile", true); 
 
        // 随便定义一个程序执行,读取缓存数据 
        DataSet<String> input = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?"); 
        DataSet<Integer> result = input.map(new MyMapper()); 
        result.print(); 
 
    } 
}

l 访问缓存文件

如果在方法里需要访问缓存文件,那么这个方法必须实现RichFunction,因为需要使用到RuntimeContext这个变量。

// extend a RichFunction to have access to the RuntimeContext 
final class MyMapper extends RichMapFunction<String, Integer> { 
 
    @Override 
    public void open(Configuration config) { 
 
        // 访问 cached file 通过 RuntimeContext 和 DistributedCache 
        File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile"); 
        // 读取文件 
        try { 
            System.out.println("文件名:"+myFile.getName()+"============"); 
            System.out.println("缓存路径"+myFile.getPath()+"============"); 
            BufferedReader br = new BufferedReader(new FileReader(myFile)); 
            String line; 
            while ((line = br.readLine()) != null) { 
                System.out.println(line); 
            } 
            br.close(); 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
 
    } 
 
    @Override 
    public Integer map(String value) throws Exception { 
        // use content of cached file 
        return 2; 
    } 
}

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/9469.html

(0)
上一篇 2021年7月19日
下一篇 2021年7月19日

相关推荐

发表回复

登录后才能评论