Flink系列 2. standalone 模式部署与任务提交


1. Flink部署

下载安装文件 https://flink.apache.org/downloads.html#apache-flink-1100

这里部署standalone模式,只需要进入bin目录执行 start-cluster.sh 即可启动


# 解压后 查看文件
MacBook-Pro:Downloads hnbian$ cd flink-1.10.0
MacBook-Pro:flink-1.10.0 hnbian$ ll
total 1112
-rw-r--r--@  1 hnbian  staff    11K  1 24 17:01 LICENSE
drwxr-xr-x@  3 hnbian  staff    96B  1 24 17:01 plugins
-rw-r--r--@  1 hnbian  staff   1.3K  1 24 17:01 README.txt
drwxr-xr-x@ 22 hnbian  staff   704B  2  8 02:54 opt
drwxr-xr-x@  7 hnbian  staff   224B  2  8 02:54 lib
drwxr-xr-x@  7 hnbian  staff   224B  2  8 02:54 examples
drwxr-xr-x@ 29 hnbian  staff   928B  2  8 02:54 bin
drwxr-xr-x@ 30 hnbian  staff   960B  2  8 02:55 licenses
-rw-r--r--@  1 hnbian  staff   538K  2  8 02:55 NOTICE
drwxr-xr-x@ 15 hnbian  staff   480B  4 29 21:25 conf
drwxr-xr-x@  6 hnbian  staff   192B  4 29 21:30 log
  • 启动集群
MacBook-Pro:flink-1.10.0 hnbian$ cd bin
MacBook-Pro:bin hnbian$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Pro.local.
Starting taskexecutor daemon on host MacBook-Pro.local.
  • 查看web界面

http://localhost:8081/

Flink webUI

2. standalone 模式下任务提交流程

standalone 模式下 flink 任务提交流程

3. 编写wordcount 代码

3.1 引入依赖

    <properties>
        <flink.version>1.10.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

3.2 编写 wordcount 代码


import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * @Author haonan.bian
  * @Description 数据流的WordCount
  * @Date 2020-04-28 22:31
  * 发送数据
  * nc -lk 8888
  **/
object StreamWordCount extends App{


  //--host localhost --port 8888
  val parameters = ParameterTool.fromArgs(args)

  val host = parameters.get("host")
  val port = parameters.getInt("port")

  val env = StreamExecutionEnvironment.getExecutionEnvironment


  val inputDataStream = env.socketTextStream(host,port)

  inputDataStream.flatMap(_.split(" "))
    .map((_,1))
    .keyBy(0)
    .sum(1)
    .print()
    //.setParallelism(2) // 设置并行度 默认CPU线程数

  // 执行作业
  env.execute("StreamWordCount")

  /**
    * 在IDE中测试打印结果如下 
    * 前面的数字是线程编号
    *
    * 11> (xiah,1)
    * 9> (xiaog,1)
    * 7> (xiaom,1)
    * 4> (hello,1)
    * 4> (hello,2)
    * 4> (hello,3)
    * 9> (xiaod,1)
    * 4> (hello,4)
    */
}

4. webUI中提交任务

先将代码打包,打包时需要带上依赖

  1. 提交任务

提交任务

  1. 发送数据

MacBook-Pro:bin hnbian$ nc -lk 8888
hello
hello world
  1. 查看任务执行情况

查看任务执行情况

  1. 查看打印结果

查看打印结果

5. 使用命令提交任务


./flink run -c com.hnbian.flink.wordcount.StreamWordCount -p 2 \
> /FlinkCode/target/FlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar \
> --host localhost --port 8888
Job has been submitted with JobID 0fdf981b60838d7ae3655a03b98c2311
  • 查看任务是否提交成功

使用命令提交的任务

  • 使用命令行关闭任务
# 查看任务列表
MacBook-Pro:bin hnbian$ ./flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.04.2020 22:32:34 : 0fdf981b60838d7ae3655a03b98c2311 : StreamWordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

# 关闭任务
MacBook-Pro:bin hnbian$ ./flink cancel 0fdf981b60838d7ae3655a03b98c2311
Cancelling job 0fdf981b60838d7ae3655a03b98c2311.
Cancelled job 0fdf981b60838d7ae3655a03b98c2311.
MacBook-Pro:bin hnbian$

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
Flink系列 3. Flink On Yarn 两种部署模式与提交任务 Flink系列 3. Flink On Yarn 两种部署模式与提交任务
1. 介绍Flink 支持多种部署方式 如 Local、Standalone、Yarn、K8S 等,但是现在企业中大多数的大数据平台都以 Yarn 作为资源管理器,所以 Flink On Yarn 模式也在企业中用的非常多,下面就介绍一下F
2020-05-01
下一篇 
Ambari  环境启动时遇到的一些问题记录 Ambari 环境启动时遇到的一些问题记录
前段时间在虚拟机安装ambari之后一直也没有做测试,今天起来将集群启动发现有些组件启动时遇见一些问题,在这里记录一下。 1. DataNode 无法启动DataNode启动成功后就死掉了。 1.1 查看DataNode日志寻找原因首先在D
2020-04-28
  目录