《Flink官方文档》监控Wikipedia 编辑流

原文链接 译者:ivansong

在本指南中,我们会从头开始,从从创建一个Flink项目到在一个Flink集群上运行一个流分析程序。

Wikipedia 提供了一个记录所有wiki的编辑的IRC通道。我们将会接入这个通道,计算每个用户在给定的时间窗口上编辑的字节数。用Flink能足够简单地在短时间内实现,但是给了你一个创建更复杂的分析程序的好的基础。

构建一个maven项目

我们将会用一个Flink Maven Archetype来创建我们的项目结构。关于这个请看Java API Quickstart章节来获取更多的细节。为了我们的目的,在命令窗口中运行如下:

$ mvn archetype:generate /
    -DarchetypeGroupId=org.apache.flink /
    -DarchetypeArtifactId=flink-quickstart-java /
    -DarchetypeVersion=1.2.0 /
    -DgroupId=wiki-edits /
    -DartifactId=wiki-edits /
    -Dversion=0.1 /
    -Dpackage=wikiedits /
    -DinteractiveMode=false

如果你原意,你可以编辑groupId,artifactId和 package 。用上面的参数,Maven会创建一个如下的项目结构:

$ tree wiki-edits
wiki-edits/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── wikiedits
        │       ├── BatchJob.java
        │       ├── SocketTextStreamWordCount.java
        │       ├── StreamingJob.java
        │       └── WordCount.java
        └── resources
            └── log4j.properties

在根目录的pom.xml已经添加了 Flink的依赖,同时在src/main/java目录中用几个Flink程序的示例。我们要从头开始,所以可以删除这些示例程序:

$ rm wiki-edits/src/main/java/wikiedits/*.java

最后一步,我们为了能在程序中使用Wikipedia需要添加Flink Wikipedia连接器的依赖。编辑pom.xml中依赖的部分,如下:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-wikiedits_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

注意添加的这个flink-connector-wikiedits_2.11依赖(这个示例和Wikipedia 连接器是受Apache Samza的Hello Samza示例启发

下面是编码时间。启动你最喜欢的IDE并引入这个maven项目或者打开一个文本编辑器,创建文件src/main/java/wikiedits/WikipediaAnalysis.java:

package wikiedits;
public class WikipediaAnalysis {
  public static void main(String[] args) throws Exception {
  }
}

这个程序非常简单,但接下来我们会填充它。注意的是我不会在这里引入一些声明,因为IDE能自动引入。如果你简单地想跳过前面在你的编辑器里直接输入,在这块儿内容的结束的地方,我会展示有引入的声明的完整的代码。
一个Flink程序的第一步是创建一个StreamExecutionEnvironment对象(如果你写的是一个批处理任务那就是ExecutionEnvironment对象)。这个对象可以用来设置执行参数,为从外部系统读取创建资源。因此,我们添加这个到main方法中:

   StreamExecutionEnvironme see = StreamExecutionEnvironment.getExecutionEnvironment();

接下来我们会创建一个读取 Wikipedia IRC 日志的资源:

DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

这行代码创建了一个我们能进一步运行的WikipediaEditEvent元素DateStream对象。为了本示例的目的,假设五秒钟,我们监控这段时间内在特定窗口中用户产生的添加或者删除的字节数。为此,我们首先应该指定此用户名上的我们想要的输入流,意思是在这个流的操作上我们应该绑定用户名。在我们的示例中,窗口中编辑的字节总数应该是每一个唯一的用户。对于键入的流我们应该提供一个KeySelector,如下:

KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
            return event.getUser();
        }
    });

这段代码给我们一个以String作为key即用户名作为key的WikipediaEditEvent 的Stream。现在我们可以指定想要的加在这个l流 上的窗口了,并且可以统计一个基于在这些窗口上的元素的结果。一个窗口指定了一个在其上执行运算的Steam的一个片段。在计算一个无穷流的元素的聚合时,窗口是很有必要的。在示例中,我们会假设我们想要聚合五秒中编辑的字节总数:

DataStream<Tuple2<String, Long>> result = keyedEdits
    .timeWindow(Time.seconds(5))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
            acc.f0 = event.getUser();
            acc.f1 += event.getByteDiff();
            return acc;
        }
    });

第一个调用,.timeWindow(),指定了我们想要有五秒钟的滚动(非重叠)窗口。第二个调用为每个唯一键在每个窗口片段上指定了一个Fold 转换。在例子中,我们从一个初始值为(“”, 0L)开始,为每个用户添加在该时间内每次编辑的字节码差异。结果流现在包含了一个每个用户五秒中产生的Tuple2<String, Long>。

下面仅仅要做的是将这个流输出到控制台,并开始执行:

result.print();

see.execute();

最后一个调用对启动一个真实的Flink任务是必要的。所有的操作,如创建资源,转换,聚合,这些仅是建立了内部操作的图。仅仅是当调用execute()方法的时候,这个操作图才会被扔到一个集群中执行或者在你本机执行。

下面是完整的代码:

package wikiedits;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class WikipediaAnalysis {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource());

    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
        @Override
        public String getKey(WikipediaEditEvent event) {
          return event.getUser();
        }
      });

    DataStream<Tuple2<String, Long>> result = keyedEdits
      .timeWindow(Time.seconds(5))
      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) {
          acc.f0 = event.getUser();
          acc.f1 += event.getByteDiff();
          return acc;
        }
      });

    result.print();

    see.execute();
  }
}

你可以用maven在你的IDE或者命令行中运行这个示例:

$ mvn clean package
$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis

第一个命令是构建我们的项目,第二个命令是执行我们的主要类。输出的内容跟下面相似:

1> (Fenix down,114)
6> (AnomieBOT,155)
8> (BD2412bot,-3690)
7> (IgnorantArmies,49)
3> (Ckh3111,69)
5> (Slade360,0)
7> (Narutolovehinata5,2195)
6> (Vuyisa2001,79)
4> (Ms Sarah Welch,269)
4> (KasparBot,-245)

每行前面的数字告诉你每个并行的实例产生的输出。

本示例应该能让你开始写自己的Flink程序。为了学习更多,你可以打开我们的基本概念DataStream API的指南。如果你想学习如何构建一个Flink集群在自己机器上并将结果写入Kafka,请看接下来的激励练习。

激励练习:在一个Flink集群上运行,并将结果写入Kafka

请按照我们的快速开始里面的内容来在你的机器上构建一个Flink分布式,再参考Kafka的快速开始来安装Kafka,然后我们继续。

第一步,我们为了能使用Kafka连接器,需要添加Flink Kafka连接器的依赖。将这个添加的pom.xml文件的依赖模块中:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>

接下来,我们需要修改我们的程序。我们会移除print(),替换成使用Kafka 接收器。新的代码示例如下:

result
    .map(new MapFunction<Tuple2<String,Long>, String>() {
        @Override
        public String map(Tuple2<String, Long> tuple) {
            return tuple.toString();
        }
    })
    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema()));

也需要引入相关的类:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.api.common.functions.MapFunction;

注意我们首先是如何使用一个MapFunction将Tuple2<String, Long>的流转换成一个字符串流的。我们就是在做这个,因为这个对写入简单的字符串到Kafka更容易。因而,我们创建了一个Kafka接收器。你肯能要适配一下你设置的主机名和端口号。”wiki-result”是运行我们程序之前我们将会创建的Kafka 流的名字。因为我们需要一个在集群上运行的jar文件,故用Maven 命令构建这个项目:

$ mvn clean package

产生的jar文件会在target的子文件夹中: target/wiki-edits-0.1.jar。我们接下来会用到这个。现在我们准备安装一个Flink集群,并在其上运行写入到Kafka的程序。到你安装的Flink目录下,开启一个本地的集群:

$ cd my/flink/directory
$ bin/start-local.sh

我们也需要创建这个Kafka Topic,以便我们的程序能写入:

$ cd my/kafka/directory
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results

现在我们准备在本地的Flink集群上运行我们的jar文件:

$ cd my/flink/directory
$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar

如果一切按照计划执行,命令行输出会跟下面的相似:

03/08/2016 15:09:27 Job execution switched to status RUNNING.
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING
03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING
03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING

你可以看到各个操作是如何开始运行的。只有两个操作是因为由于性能原因窗口后面的操作折叠成了一个。在Flink中,我们称这个为chaining。

你可以用Kafka 控制台消费者通过检测Kafka主题来观察程序的输出:

bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic wiki-result

你可以查看运行在 http://localhost:8081上面的Flink仪表盘。你可以对你的集群资源和运行的任务有个整体的感知:

《Flink官方文档》监控Wikipedia 编辑流
如果你点击运行的任务,你会看到一个可以观察单个操作的视图,例如,看到执行的元素的数量:

《Flink官方文档》监控Wikipedia 编辑流

结束了我们的Flink之旅,如果你有如何问题,请不要犹豫在我们的Mailing Lists提问。

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/100743.html

(0)
上一篇 2021年8月22日 01:08
下一篇 2021年8月22日 01:17

相关推荐

发表回复

登录后才能评论