checkpoint
Chandy Lamport algorithm
barrier
checkpoint configurations
checkpoint mode
- exactly-once
- at-least-once
是否可以给单个算子指定 exactly-once
或者 at-least-once
checkpoint interval
checkpoint timeout
minimum time between checkpoints
number of concurrent checkpoints
externalized checkpoints
“1. ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
: 当 flink job 被取消的时候保存 checkpoint
,也就是说当我们主动取消 job 的时候需要我手动删除
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
: 当 flink job 被取消的时候删除checkpoint
, 只有 job 状态是故障失败时checkpoint
才会被保存。
retain state checkpoint
1 | state.checkpoints.dir: hdfs:///checkpoints/ |
state
keyed state
Keyed Stream
的状态
ListState
List 类型的状态集合
ValueState
单个状态
MapState
Map 类型的状态集合
ReducingState
合并统一类型的多个状态数据到一个状态数据
AggregatingState
operator state
org.apache.flink.api.common.state.OperatorStateStore
用于注册 operator state
getUnionListState
: 获取分布式集群中的 ListState
getListState
: 获取单点的 ListState
state backend
“1. asynchronus
- synchronus
memory state backends
缺点
“1. 每个 state
大小限制是 5M
state
的大小不能超过akka frame size
aggregate state
必须存放在Job Manager
的内存中
fs state backends
该模式需要配置文件系统URL,支持 hdfs(hdfs://namenode:40010/flink/checkpoints)
或 本地文件(file:///data/flink/checkpoints)
将未完成的数据存储在 TaskManager
的内存中,而将 state snapshot
写入到文件系统或者文件夹中,最小化元数据被保存在JobManager的内存中(在高可用模式下,被保存在元数据checkpoint中)。
该模式默认 异步
写入 state backend
,也可以改为 同步
写入
1 | new FsStateBackend(path, false); // true: 异步,false:同步 |
rocksdb state backends
该模式需要配置文件系统URL,支持 hdfs(hdfs://namenode:40010/flink/checkpoints)
或 本地文件(file:///data/flink/checkpoints)
将未完成的数据存储在 rocksDB
中,而将 state snapshot
写入配置的文件系统或者文件夹中,最小化元数据被保存在JobManager的内存中(在高可用模式下,被保存在元数据checkpoint中)。
RocksDBStateBackend
只支持异步快照模式
缺点:
“1. 因为RocksDB的JNI的API基于byte[],状态中每个key和每个value所支持的最大值各为2^31字节。
注意:state使用了RocksDB的合并算子(如ListState),状态的大小很容易累积超过2^31字节,下一次状态恢复就会失败。这是当前RocksDB JNI的局限性。
RocksDBStateBackend是当前唯一一种提供增量checkpoint的state backend.
restart strategies(重启策略)
Event Time
Processing time
flink
开始处理事件的时间
Event time
时间发生的原始时间,由事件发生器
自主设置
Ingestion time
Flink Source
接收到事件的时间
operator
operator lifecycle
1 | // initialization phase |
注意:initializeState()
包含operator state的初始化(例如register keyed state),也包含任务失败后从checkpoint中恢复state的逻辑。