Flink 初探


本文针对flink1.3做的记录。

1. Flink简介

Flink官网:https://flink.apache.org/

Flink中文文档地址(目前翻译了大部分):http://flink-cn.shinonomelab.com/

首先我们先了解两种数据集的大致类型

  • 无界数据集:无穷的数据集,数据在源源不断的产生。无界数据集的典型代表有物理传感器的测量数据、金融交易记录、机器日志数据、以及我们经常接触到的页面用户交互数据等。

  • 有界数据集:不变的数据集,最终不会改变的。有界数据集的典型代表有上一年用户行为数据、上个月用户活跃记录等。

Flink是针对无界数据集的持续计算。数据源源不断的进入Flink程序,然后Flink程序根据预定义的代码逻辑实现相关功能。

2. Flink的两种运行模型

如果我们使用过SparkStreaming和Storm就会知道,SparkStreaming是属于微批次的方式进行数据的实时处理。Storm属于对数据的的实时处理。

而Flink同时满足了上面两种计算的方式:

流式计算:只要数据产生就一直处于计算状态。

批次处理:完成一个时间段的任务计算后,释放资源,等待下一个批次的计算。

3. Flink的特性

  • 应用于流处理的开源框架。
  • Flink支持流式计算以及灵活的基于时间的窗口化操作。
  • 结果精准:即使是无序数据或延迟到达的数据也会保证精准。
  • 有状态:保持每次计算的结果,实现累加。
  • 容错:轻量级的,资源占用少,能够保证数据零丢失。出错以后能够恢复或重新计算,维护exactly once的应用状态(对数据算且只算一次)。
  • 大规模:可以在几千台节点上运行,高吞吐量,低延迟。
  • 检查点:Flink通过检查点机制实现exactly once,在出现故障时可以体现出来。版本化机制。

4. Flink流计算模型以及有界数据集

有界数据集是无界数据集的一种特例。

有界数据集在Flink内部是一种最终状态数据集进行处理的。

在Flink内部有界于无界数据集的差别很小,可能使用同一套计算执行引擎的API操作同时处理有界与无界数据集。

Flink 核心模块架构图

6. 搭建单机版

下载地址: https://flink.apache.org/downloads.html

将flink 上传并解压即可

启动本地flink

./start-local.sh

JobManager 进程

7. WebUI界面

http://slave:8081/#/overview

Flink webUI 截图

8. WordCount代码实例

引入依赖

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.3.0</version>
</dependency>

代码示例

package com.hnbian;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 功能说明:
 * author: haonan.bian
 * yum install nc
 * nc -lp 8965
 */
public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {

        //定义连接端口
        final int port = 8965 ;

        // 得到执行环境对象
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //连接套socket之后,读取输入data
        DataStream<String> text = env.socketTextStream("slave", port, "\n");

        // parse the data, group it, window it, and aggregate the countsDataStream
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });

        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        public String toString() {
            return word + " : " + count;
        }
    }
}

安装netcat工具,并且启动nc服务

windows环境

启动nc命令:Cmd> nc -lL -p 9999

运行代码并查看结果。

Linux环境

启动nc命令:nc -lk -p 9999

将代码打包上传到Linux服务器,并且启动:Bin/flink run -c com.hnbian.className myflink.jar

代码启动之后可以通过nc服务发送数据看刚写的代码能否统计到数据。

我们可以在WebUI 中找标准作业管理器点击标准输出 即可看到输出日志。如下图:

输出日志


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
CentOS 安装 rz sz lrzsz CentOS 安装 rz sz lrzsz
服务器版本: [root@node3 ~]# lsb_release -a LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-
2019-10-08
下一篇 
关于maven 编译时source 1.5 中不支持 lambda 表达式 的问题记录 关于maven 编译时source 1.5 中不支持 lambda 表达式 的问题记录
1. 错误说明首先贴上错误截图 原因是Maven Compiler 插件默认会加 -source 1.5 及 -target 1.5 参数来编译 当我们使用1.8 中的lambda 表达式时需要将source 版本调高 2. 解决办法在p
2019-09-11
  目录