Flink系列 2. standalone 模式部署与任务提交


1. Flink部署

下载安装文件 https://flink.apache.org/downloads.html#apache-flink-1100

这里部署standalone模式,只需要进入bin目录执行 start-cluster.sh 即可启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 解压后 查看文件
MacBook-Pro:Downloads hnbian$ cd flink-1.10.0
MacBook-Pro:flink-1.10.0 hnbian$ ll
total 1112
-rw-r--r--@ 1 hnbian staff 11K 1 24 17:01 LICENSE
drwxr-xr-x@ 3 hnbian staff 96B 1 24 17:01 plugins
-rw-r--r--@ 1 hnbian staff 1.3K 1 24 17:01 README.txt
drwxr-xr-x@ 22 hnbian staff 704B 2 8 02:54 opt
drwxr-xr-x@ 7 hnbian staff 224B 2 8 02:54 lib
drwxr-xr-x@ 7 hnbian staff 224B 2 8 02:54 examples
drwxr-xr-x@ 29 hnbian staff 928B 2 8 02:54 bin
drwxr-xr-x@ 30 hnbian staff 960B 2 8 02:55 licenses
-rw-r--r--@ 1 hnbian staff 538K 2 8 02:55 NOTICE
drwxr-xr-x@ 15 hnbian staff 480B 4 29 21:25 conf
drwxr-xr-x@ 6 hnbian staff 192B 4 29 21:30 log

  • 启动集群
1
2
3
4
5
MacBook-Pro:flink-1.10.0 hnbian$ cd bin
MacBook-Pro:bin hnbian$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host MacBook-Pro.local.
Starting taskexecutor daemon on host MacBook-Pro.local.
  • 查看web界面

http://localhost:8081/

Flink webUI

2. standalone 模式下任务提交流程

standalone 模式下 flink 任务提交流程

3. 编写wordcount 代码

3.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<properties>
<flink.version>1.10.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>

3.2 编写 wordcount 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
* @Author haonan.bian
* @Description 数据流的WordCount
* @Date 2020-04-28 22:31
* 发送数据
* nc -lk 8888
**/
object StreamWordCount extends App{


//--host localhost --port 8888
val parameters = ParameterTool.fromArgs(args)

val host = parameters.get("host")
val port = parameters.getInt("port")

val env = StreamExecutionEnvironment.getExecutionEnvironment


val inputDataStream = env.socketTextStream(host,port)

inputDataStream.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.sum(1)
.print()
//.setParallelism(2) // 设置并行度 默认CPU线程数

// 执行作业
env.execute("StreamWordCount")

/**
* 在IDE中测试打印结果如下
* 前面的数字是线程编号
*
* 11> (xiah,1)
* 9> (xiaog,1)
* 7> (xiaom,1)
* 4> (hello,1)
* 4> (hello,2)
* 4> (hello,3)
* 9> (xiaod,1)
* 4> (hello,4)
*/
}

4. webUI中提交任务

先将代码打包,打包时需要带上依赖

  1. 提交任务

提交任务

  1. 发送数据
1
2
3
4
5

MacBook-Pro:bin hnbian$ nc -lk 8888
hello
hello world

  1. 查看任务执行情况

查看任务执行情况

  1. 查看打印结果

查看打印结果

5. 使用命令提交任务

1
2
3
4
5
6

./flink run -c com.hnbian.flink.wordcount.StreamWordCount -p 2 \
> /FlinkCode/target/FlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar \
> --host localhost --port 8888
Job has been submitted with JobID 0fdf981b60838d7ae3655a03b98c2311

  • 查看任务是否提交成功

使用命令提交的任务

  • 使用命令行关闭任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 查看任务列表
MacBook-Pro:bin hnbian$ ./flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.04.2020 22:32:34 : 0fdf981b60838d7ae3655a03b98c2311 : StreamWordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

# 关闭任务
MacBook-Pro:bin hnbian$ ./flink cancel 0fdf981b60838d7ae3655a03b98c2311
Cancelling job 0fdf981b60838d7ae3655a03b98c2311.
Cancelled job 0fdf981b60838d7ae3655a03b98c2311.
MacBook-Pro:bin hnbian$


文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
  目录