Flink的状态管理围绕作用域与结构划分,核心分为两类:
keyBy算子生成),状态与Key一一对应(相同Key的数据访问同一状态),状态分布在不同算子任务中。Flink通过富函数(RichFunction)和CheckpointedFunction接口提供状态访问与管理能力:
RuntimeContext获取托管状态(如ValueState、ListState),适用于需要状态的业务逻辑(如RichFlatMapFunction、RichMapFunction)。ListState统计每个用户的异常行为次数,当次数超过阈值时触发报警。public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {
private transient ListState<Long> abnormalData;
private Long threshold;
@Override
public void open(Configuration parameters) {
// 通过状态描述器获取状态实例(未创建则自动初始化)
abnormalData = getRuntimeContext().getListState(
new ListStateDescriptor<>("abnormalData", Long.class)
);
threshold = parameters.getLong("threshold", 5L);
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) {
if (value.f1 > threshold) {
abnormalData.add(value.f1); // 更新状态
out.collect(new Tuple2<>(value.f0, Collections.singletonList(value.f1)));
}
}
}
ListState、UnionListState),适用于需要自定义状态恢复逻辑的场景(如Source/Sink算子)。snapshotState(保存状态快照)、restoreState(恢复状态)。CheckpointedFunction保存消费偏移量,故障时恢复偏移量以保证Exactly-Once语义。Flink通过Checkpoint机制实现状态的持久化与故障恢复,核心流程如下:
env.enableCheckpointing(interval)设置(如每10秒一次),Flink会定期向所有算子任务发送Barrier(屏障)。Serializable)保存为快照,并发送确认消息给JobManager。状态后端负责状态的存储、访问及Checkpoint的持久化,Flink提供三类可插拔后端:
// 代码中配置
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoints", true));
// 或通过flink-conf.yaml配置
state.backend: rocksdb
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。