Flink
安装和运行
单点安装
下载 flink 安装包
在 官网下载地址 选择适当版本的flink 安装包,这里我选择 Apache Flink 1.6.2 only
;下载安装包到 /path/to/flink-1.6.2
中
解压运行 flink
1 | cd /path/to/ |
到这里我们就已经把 单机版flink
成功的运行起来了,我们可以访问 localhost:8081
看到 flink
管理页面
flink
集群搭建准备工作
JAVA 环境配置
这里不赘述如何配置 JAVA 环境,只需要注意使用 JAVA 1.8+ 即可
集群机器互信
不管是 standalone
部署模式还是依赖 hadoop yarn
搭建集群,都需要在集群机器之间设置互信,实现 ssh
相互免密登录
“1. 生成公钥/秘钥
1 | ssh-keygen -t rsa #一直回车即可 |
生成了 ~/.ssh/id_rsa.pub
和 ~/.ssh/id_rsa
- 公钥认证
将机器A 上面的 ~/.ssh/id_rsa.pub
追加到机器B 的公钥认证文件 ~/.ssh/authorized_keys
里面去;
再将机器B 上面的 ~/.ssh/id_rsa.pub
追加到机器A 的公钥认证文件 ~/.ssh/authorized_keys
里面去
这样我们就可以在机器A、B之间互相免密码登陆了
flink on yarn
集群搭建
在 10.0.0.1
,10.0.0.2
,10.0.0.3
三台机器上尝试搭建 flink
集群
hadoop yarn
安装配置
下载解压 hadoop
安装包
这里我用的 cdh
版本的 hadoop
,可以在这里下载,然后解压
1 | tar -xvf hadoop-2.6.0-cdh5.11.0.tar.gz |
配置 hadoop
在 /path/to/hadoop-2.6.0-cdh5.11.0/etc/hadoop/
文件夹下配置如下七个文件 hadoop-env.sh
,yarn-env.sh
,slaves
,core-site.xml
,hdfs-site.xml
,mapred-site.xml
,yarn-site.xml
在 hadoop-env.sh
1 | export JAVA_HOME=/path/to/java/home |
在 yarn-env.sh
中配置 JAVA_HOME
1 | export JAVA_HOME=/path/to/java/home |
在 slaves
中配置 slave 节点的ip 或者host
1 | "10.0.0.2 |
修改 core-site.xml
配置 hadoop
集群文件系统主机和端口、hadoop
临时目录
1 | <configuration> |
修改 hdfs-site.xml
配置 hadoop
集群文件系统主机和端口、hadoop
临时目录
1 | <configuration> |
standalone
部署模式集群搭建
举例说明,搭建
下载、解压安装包
修改 flink-conf.yaml
文件配置
修改 /path/to/flink-1.6.2/conf
文件中的 flink-conf.yaml
文件,参数说明如下:
1 | # java安装路径,如果没有指定则默认使用系统的$JAVA_HOME环境变量。建议设置此值,因为之前我曾经在standalone模式中启动flink集群,报找不到JAVA_HOME的错误。config.sh中(Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.) |
flink on Yarn
启动 yarn session 运行 flink job
1 | bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -s 32 |
yarn-session.sh
使用说明
1 | Usage: |
yarn
会话管理
1 | $yarn application --help |
直接提交 flink job 到 yarn 集群
1 | bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar |
yarn
日常维护
查看 yarn session
中的 flink job
1 | ./bin/flink list -m yarn-cluster -yid <Yarn Application Id> -r |
yarn
日志查看
1 | yarn logs -applicationId <applicationId> |
yarn application
下线
1 | yarn application -kill <applicationId> |
触发 savepoints
取消 flink job
1 | ./bin/flink cancel -s [savepointDirectory] <jobID> |
执行上述指令将得到如下提示
1 | Cancelling job <jobID> with savepoint to <savepointDirectory>. |
从 savepoints
恢复启动 flink job
1 | ./bin/flink run -s <savepointDirectory>/<savepointID> -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar |
这里 -s
参数值是执行cancel
指令的时候得到的 savepoint 保存地址<savepointDirectory>/<savepointID>
flink run
指令
1 | $./bin/flink run --help |