1. Flink部署
下载安装文件 https://flink.apache.org/downloads.html#apache-flink-1100
这里部署standalone模式,只需要进入bin目录执行 start-cluster.sh 即可启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
MacBook-Pro:Downloads hnbian$ cd flink-1.10.0 MacBook-Pro:flink-1.10.0 hnbian$ ll total 1112 -rw-r--r--@ 1 hnbian staff 11K 1 24 17:01 LICENSE drwxr-xr-x@ 3 hnbian staff 96B 1 24 17:01 plugins -rw-r--r--@ 1 hnbian staff 1.3K 1 24 17:01 README.txt drwxr-xr-x@ 22 hnbian staff 704B 2 8 02:54 opt drwxr-xr-x@ 7 hnbian staff 224B 2 8 02:54 lib drwxr-xr-x@ 7 hnbian staff 224B 2 8 02:54 examples drwxr-xr-x@ 29 hnbian staff 928B 2 8 02:54 bin drwxr-xr-x@ 30 hnbian staff 960B 2 8 02:55 licenses -rw-r--r--@ 1 hnbian staff 538K 2 8 02:55 NOTICE drwxr-xr-x@ 15 hnbian staff 480B 4 29 21:25 conf drwxr-xr-x@ 6 hnbian staff 192B 4 29 21:30 log
|
1 2 3 4 5
| MacBook-Pro:flink-1.10.0 hnbian$ cd bin MacBook-Pro:bin hnbian$ ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host MacBook-Pro.local. Starting taskexecutor daemon on host MacBook-Pro.local.
|
http://localhost:8081/

2. standalone 模式下任务提交流程

3. 编写wordcount 代码
3.1 引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <properties> <flink.version>1.10.0</flink.version> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
|
3.2 编写 wordcount 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamWordCount extends App{
val parameters = ParameterTool.fromArgs(args)
val host = parameters.get("host") val port = parameters.getInt("port")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputDataStream = env.socketTextStream(host,port)
inputDataStream.flatMap(_.split(" ")) .map((_,1)) .keyBy(0) .sum(1) .print()
env.execute("StreamWordCount")
}
|
4. webUI中提交任务
先将代码打包,打包时需要带上依赖
- 提交任务

- 发送数据
1 2 3 4 5
| MacBook-Pro:bin hnbian$ nc -lk 8888 hello hello world
|
- 查看任务执行情况

- 查看打印结果

5. 使用命令提交任务
1 2 3 4 5 6
| ./flink run -c com.hnbian.flink.wordcount.StreamWordCount -p 2 \ > /FlinkCode/target/FlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar \ > --host localhost --port 8888 Job has been submitted with JobID 0fdf981b60838d7ae3655a03b98c2311
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14
| MacBook-Pro:bin hnbian$ ./flink list Waiting for response... ------------------ Running/Restarting Jobs ------------------- 29.04.2020 22:32:34 : 0fdf981b60838d7ae3655a03b98c2311 : StreamWordCount (RUNNING) -------------------------------------------------------------- No scheduled jobs.
MacBook-Pro:bin hnbian$ ./flink cancel 0fdf981b60838d7ae3655a03b98c2311 Cancelling job 0fdf981b60838d7ae3655a03b98c2311. Cancelled job 0fdf981b60838d7ae3655a03b98c2311. MacBook-Pro:bin hnbian$
|