flink模板工程
"Less code, more efficiency!" — 代码少一点,开发快一点!🔥
# 🌟 背景
Flink Job 开发太繁琐?配置重复?👀
每次都要写一大堆 connector 相关代码?😩
想要更简单高效地构建 JetStream Flink Job?🔥
没问题!我们来解决这些痛点!💡
FlinkContext —— 让 Flink Job 开发更轻松!
# ❓ 遇到的问题
- 大量重复代码:每个 Flink Job 都需要写相同的 source/sink 连接代码。
- 复杂的参数管理:运行模式、检查点、重启策略……参数太多,难管理!
- 不同数据源适配:NATS JetStream、Redis……如何轻松切换?
# 💡 解决方案:FlinkContext
我们封装了一个 FlinkContext,它解决了:
✅ 减少重复代码:所有 Flink Job 通用的部分都封装好了。
✅ 开箱即用:仅需关注 operator(算子逻辑),其他交给 FlinkContext。
✅ 支持多种数据源:JetStream、NATS、Redis,轻松对接!
# 🏗 FlinkContext 设计
FlinkContext 主要封装了以下几个关键模块:
# 🛠 核心功能
配置项 | 释义 | 默认值 | 说明 |
---|---|---|---|
setRuntimeMode | 运行模式 | RuntimeExecutionMode.STREAMING | 流式执行模式 |
enableCheckpointing | 检查点执行周期 | 2000ms | 检查点每2秒执行一次 |
addDefaultKryoSerializer | 默认序列化器 | ProtobufSerializer.class | 默认使用protobuf序列化器 |
setRestartStrategy | 重启策略Task 故障恢复 (opens new window) | -- | 1. 每个时间间隔的最大故障次数;2. 测量故障率的时间间隔;3. 延时 |
# 🎯 如何使用?
public static void main(String[] args) throws Exception {
FlinkContext context = new FlinkContext<>() {
@Override
public DataStream operator(DataStream<?> dataStream) {
return dataStream
.map(item -> ConversionManager.protoConversion((MessageProxy) item))
.filter(Objects::nonNull)
.map((MapFunction<EventModel, DeviceEvent>) value ->
new DeviceEvent(value.getProxy().getRobotId(), value.getEvent().getTimestamp().getSeconds()))
.returns(TypeInformation.of(DeviceEvent.class))
.name("source-job");
}
};
context.start();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
🎉 这样,我们的 Flink Job 就完成了!
# 🔌 Source & Sink 封装
# 📥 Source(数据源)
public class SourceHolder {
private static final Map<String, SourceBase<?>> holder = new HashMap<>();
static { holder.put(ConnectorConstants.SOURCE_JETSTREAM, new JetStreamSourceBase<>()); }
public static SourceBase<?> getSource(String key) { return holder.get(key); }
}
2
3
4
5
# 📤 Sink(数据汇)
public class SinkHolder {
private static final Map<String, SinkBase<?>> holder = new HashMap<>();
static {
holder.put(ConnectorConstants.SINK_JETSTREAM, new JetStreamSinkBase());
holder.put(ConnectorConstants.SINK_NATS, new NatsSinkBase());
holder.put(ConnectorConstants.SINK_REDIS, new RedisSinkBase());
}
public static SinkBase<?> getSink(String key) { return holder.get(key); }
}
2
3
4
5
6
7
8
9
# ⚙ 环境变量 & 配置管理
变量名 | 含义 | 是否必填 | 参考值 |
---|---|---|---|
SOURCE_URL | nats作为数据源的连接地址 | nats作为数据源时,必填 | nats://localhost:4222 |
SINK_URL | nats作为数据汇的连接地址 | nats作为数据汇时,必填 | nats://localhost:4222 |
SOURCE_CREDENTIAL | nats作为数据源的连接凭证 | nats作为数据源时,必填 | /path/file |
SINK_CREDENTIAL | nats作为数据汇的连接凭证 | nats作为数据汇时,必填 | /path/file |
REDIS_HOST | redis连接地址 | redis作为数据汇时,必填 | redis.sz |
REDIS_PORT | redis连接端口 | redis作为数据汇时,必填 | 6379 |
REDIS_PASSWORD | redis连接密码 | redis作为数据汇时,必填 | 85269262 |
REDIS_DATABASE | redis连接数据库(不填则默认0) | 非必填 | 0 |
SOURCE_TYPE | 数据源的类型:支持JetStream | 必填 | JETSTREAM |
SINK_TYPES | 数据汇的类型:支持JetStream nats Redis。多个逗号分隔 | 必填 | NATS,REDIS |
SOURCE_STREAMS | nats作为数据源时,需要处理的stream名称。多个逗号分隔 | nats作为数据源时,必填 | out-FR00015,out-FR00011 |
SOURCE_CONSUMER | nats作为数据源时的消费者名称 | nats作为数据源时,必填 | TPL |
SOURCE_DELIVERY_POLICY | nats作为数据源时的消费策略:支持EARLIEST, LAST, LATEST, FROM_TIME, FROM_STREAM_SEQUENCE | nats作为数据源时,必填 | LAST |
SOURCE_FILTER_SUBJECT | nats作为数据源时,消费者消费的主题。全消费:> | nats作为数据源时,必填 | > |
SINK_SUBJECT | nats作为数据汇时,汇入的主题 | nats作为数据汇时,必填 | flink.tpl |
FLINK_JOB_NAME | Flink Job名称 | 必填 | FLINK_TPL_JOB |
# 🛠 支持两种方式配置
- 环境变量(适用于部署)
- 系统变量(适用于本地调试)
# 📝 环境变量示例
environment:
- SOURCE_URL=nats://localhost:4222
- SINK_URL=nats://localhost:4222
- SOURCE_CREDENTIAL=/path/file
- SINK_CREDENTIAL=/path/file
- SOURCE_TYPE=JETSTREAM
- SINK_TYPES=NATS,REDIS
2
3
4
5
6
7
# 📄 系统变量示例(flink.properties)
SOURCE_TYPE=JETSTREAM
SINK_TYPES=NATS,REDIS
FLINK_JOB_NAME=FLINK_TPL_JOB
SOURCE_STREAMS=out-FR00015
SINK_SUBJECT=flink.tpl
2
3
4
5
🚀 优先级:环境变量 > 系统变量
📌 Tips:如果环境变量未设置,则会使用 flink.properties 中的配置。
# 🤖 策略模式优化事件解析
在数据处理时,我们可能会有不同的 eventSource,如何更优雅地解析?🤔
策略模式 🏆:避免大量 if-else,解耦代码!
# ✨ 实现方式
- 定义解析接口
public interface EventParser {
void parse(EventModel event);
}
2
3
- 实现不同 Event 解析类
public class DeviceEventParser implements EventParser {
public void parse(EventModel event) {
// 设备事件解析逻辑
}
}
2
3
4
5
- 管理不同的解析策略
public class EventParserHolder {
private static final Map<String, EventParser> parsers = new HashMap<>();
static { parsers.put("DEVICE_EVENT", new DeviceEventParser()); }
public static EventParser getParser(String type) { return parsers.get(type); }
}
2
3
4
5
- 使用策略模式解析事件
EventParser parser = EventParserHolder.getParser(eventType);
if (parser != null) { parser.parse(event); }
2
🎯 这样,新增 Event 解析时,只需添加一个新的实现类,无需改动现有代码!
System.out.println("Enjoy your Flink coding journey! 🚀");
让你的 Flink 任务跑起来,就这么简单! 🏃💨
# 📌 部署方式
Flink 提供了两种主要的部署模式,我们分别看看如何高效部署它们👇
# 🌟 Session Mode(会话模式)
👉 官方文档:Flink Standalone Docker - Session Mode (opens new window)
# 💼 CI/CD 一键部署
- CI: 使用
Earthfile
,在 CI 环境下打包 JAR 🎁 - CD: 读取打包产物,使用 Flink Client 提交 Job 🚀
# 🛠️ 本地调测 - Web UI 提交 Job
# 1️⃣ 先构建 JAR
在 IDEA 中执行 Gradle 构建 👇
./gradlew clean build
🔹 产物位置:build/libs/xxx-all.jar
# 2️⃣ 提交 Job
- 访问 Flink Web UI(默认
http://localhost:8081
) - 左侧菜单 ➝
Submit New Job
- 上传 JAR,填写运行参数 ➝
Submit
- 左侧菜单 ➝
Running Jobs
查看运行状态 ✅
# 🚀 Application Mode(应用模式)
👉 官方文档:Flink Standalone Docker - Application Mode (opens new window)
# 💼 CI/CD 部署方案
CI(打包阶段)
- EarthFile ✅ 复用现有 Quarkus 打包流程
- Dockerfile ✅ 复用,注意 ENTRYPOINT
- docker-entrypoint ✅ 复用,作为 Flink Job 启动类
CD(部署阶段)
docker-compose.yml
指定正确的 镜像 tag 和 JAR 文件名称- 指定 Job 运行环境(
-env.active xxx
) services - jobmanager - command
配置启动命令
📝 JetStream 处理逻辑
如果
consumer
不存在,Flink 会自动创建,无需手动管理savepoint
🎯
# 🔧 统一软件版本
统一 Flink 版本,确保环境一致!📌
组件 | 版本 | 依赖声明 |
---|---|---|
Flink | 1.15.2 | flink-streaming-java:1.15.2 |
JNATS | 2.16.9 | io.nats:jnats:2.16.9 |
Protobuf | 3.21.9 | com.google.protobuf:protobuf-java:3.21.9 |
Java | 11 | JavaVersion.VERSION_11 |
# 🎯 总结
💡 无论是 Session Mode 还是 Application Mode,流程都很简单!
✅ CI/CD 无缝集成
✅ 本地调试 & Web UI 提交 Job
✅ 统一版本管理,确保稳定运行
附录参考
链接 | 备注 |
---|---|
Flink WebUI 详解 (opens new window) | 作者很详细的说明了webUI的相关操作,介绍了常用操作,一些参数配置含义,对于新手上手非常友好。 |
Checkpointing (opens new window) | 主要关注配置部分 |
生成 Watermark (opens new window) | 水位线的生成策略,使用功能之前建议阅读一下官方文档的描述,便于理解的更加深刻。 |
窗 (opens new window) | 很详细的描述了窗口包括生命周期,不同的窗口含义以及侧重,如何触发以及程序编写等。 |
Savepoints (opens new window) | 详细介绍了savepoint,以及如何触发使用。同时包含了使用savepoint的常见问题,罗列在下面,很具有参考意义 |
概览 (opens new window) | dataStreamAPI快速入门,如何快速开发一个Flink Job |
本仓库的意义也想实现该目的,将该文章同样附录在此,方便检索 | |
docker-compose编排 | |
github.dev (opens new window) | 理解dockerFile docker-entrypoint docker-compose几个文件相互关系 |
juejin.cn (opens new window) | 完整的介绍了Flink的各种部署方式,同时各种部署方式的架构图也同样出具,很具有参考性 |
1. What's the difference between flink application cluster and job cluster (opens new window) | |
2. Developing job for Flink (opens new window) | SO上概念辨析以及如何部署Flink Job,高赞回答 |