Jorgen's blog Jorgen's blog
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
收藏
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jorgen

Love it, make mistakes, learn, keep grinding.
首页
  • 平台架构
  • 混合式开发记录
  • 推送服务
  • 数据分析
  • 实时调度
  • 架构思想

    • 分布式
  • 编程框架工具

    • 编程语言
    • 框架
    • 开发工具
  • 数据存储与处理

    • 数据库
    • 大数据
  • 消息、缓存与搜索

    • 消息队列
    • 搜索与日志分析
  • 前端与跨端开发

    • 前端技术
    • Android
  • 系统与运维

    • 操作系统
    • 容器化与 DevOps
  • 物联网与安全

    • 通信协议
    • 安全
    • 云平台
收藏
  • 关于我
  • 终身学习
  • 关于时间的感悟
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 平台架构
  • 技术选型
  • 开发脚手架
  • UI规范
  • 开发规范
  • 代码分支管理模型
  • 需求分析与管理
  • 权限设计
  • 树形组织设计
  • 协议设计
  • 指令交互
  • OTA
  • 规则引擎
  • 数据流转
  • 报告生成与导出
  • 监控设备接入
  • 时序数据库
  • 平台监控
  • 云⛈
  • 接口设计
  • 安全传输
  • CI&CD
  • 缓存
  • 消息处理引擎
  • 性能调优🔥
  • 线上事故🔥
  • 混合式开发记录
  • 推送服务
  • 机器人通信协议
  • 数据分析
  • flink模板工程
    • 🌟 背景
      • ❓ 遇到的问题
      • 💡 解决方案:FlinkContext
    • 🏗 FlinkContext 设计
      • 🛠 核心功能
      • 🎯 如何使用?
    • 🔌 Source & Sink 封装
      • 📥 Source(数据源)
      • 📤 Sink(数据汇)
    • ⚙ 环境变量 & 配置管理
      • 🛠 支持两种方式配置
      • 📝 环境变量示例
      • 📄 系统变量示例(flink.properties)
    • 🤖 策略模式优化事件解析
      • ✨ 实现方式
    • 📌 部署方式
      • 🌟 Session Mode(会话模式)
      • 💼 CI/CD 一键部署
      • 🛠️ 本地调测 - Web UI 提交 Job
      • 1️⃣ 先构建 JAR
      • 2️⃣ 提交 Job
      • 🚀 Application Mode(应用模式)
      • 💼 CI/CD 部署方案
    • 🔧 统一软件版本
    • 🎯 总结
  • 实时调度
  • 机器人模块化设计
  • STM32入门
  • 开发日志
Jorgen
2024-01-18
目录

flink模板工程

"Less code, more efficiency!" — 代码少一点,开发快一点!🔥

# 🌟 背景

Flink Job 开发太繁琐?配置重复?👀
每次都要写一大堆 connector 相关代码?😩
想要更简单高效地构建 JetStream Flink Job?🔥

没问题!我们来解决这些痛点!💡

FlinkContext —— 让 Flink Job 开发更轻松!

# ❓ 遇到的问题

  • 大量重复代码:每个 Flink Job 都需要写相同的 source/sink 连接代码。
  • 复杂的参数管理:运行模式、检查点、重启策略……参数太多,难管理!
  • 不同数据源适配:NATS JetStream、Redis……如何轻松切换?

# 💡 解决方案:FlinkContext

我们封装了一个 FlinkContext,它解决了:

✅ 减少重复代码:所有 Flink Job 通用的部分都封装好了。

✅ 开箱即用:仅需关注 operator(算子逻辑),其他交给 FlinkContext。

✅ 支持多种数据源:JetStream、NATS、Redis,轻松对接!

# 🏗 FlinkContext 设计

FlinkContext 主要封装了以下几个关键模块:

# 🛠 核心功能

配置项 释义 默认值 说明
setRuntimeMode 运行模式 RuntimeExecutionMode.STREAMING 流式执行模式
enableCheckpointing 检查点执行周期 2000ms 检查点每2秒执行一次
addDefaultKryoSerializer 默认序列化器 ProtobufSerializer.class 默认使用protobuf序列化器
setRestartStrategy 重启策略Task 故障恢复 (opens new window) -- 1. 每个时间间隔的最大故障次数;2. 测量故障率的时间间隔;3. 延时

# 🎯 如何使用?

public static void main(String[] args) throws Exception {
    FlinkContext context = new FlinkContext<>() {
        @Override
        public DataStream operator(DataStream<?> dataStream) {
            return dataStream
                    .map(item -> ConversionManager.protoConversion((MessageProxy) item))
                    .filter(Objects::nonNull)
                    .map((MapFunction<EventModel, DeviceEvent>) value -> 
                        new DeviceEvent(value.getProxy().getRobotId(), value.getEvent().getTimestamp().getSeconds()))
                    .returns(TypeInformation.of(DeviceEvent.class))
                    .name("source-job");
        }
    };
    context.start();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

🎉 这样,我们的 Flink Job 就完成了!


# 🔌 Source & Sink 封装

# 📥 Source(数据源)

public class SourceHolder {
    private static final Map<String, SourceBase<?>> holder = new HashMap<>();
    static { holder.put(ConnectorConstants.SOURCE_JETSTREAM, new JetStreamSourceBase<>()); }
    public static SourceBase<?> getSource(String key) { return holder.get(key); }
}
1
2
3
4
5

# 📤 Sink(数据汇)

public class SinkHolder {
    private static final Map<String, SinkBase<?>> holder = new HashMap<>();
    static {
        holder.put(ConnectorConstants.SINK_JETSTREAM, new JetStreamSinkBase());
        holder.put(ConnectorConstants.SINK_NATS, new NatsSinkBase());
        holder.put(ConnectorConstants.SINK_REDIS, new RedisSinkBase());
    }
    public static SinkBase<?> getSink(String key) { return holder.get(key); }
}
1
2
3
4
5
6
7
8
9

# ⚙ 环境变量 & 配置管理

变量名 含义 是否必填 参考值
SOURCE_URL nats作为数据源的连接地址 nats作为数据源时,必填 nats://localhost:4222
SINK_URL nats作为数据汇的连接地址 nats作为数据汇时,必填 nats://localhost:4222
SOURCE_CREDENTIAL nats作为数据源的连接凭证 nats作为数据源时,必填 /path/file
SINK_CREDENTIAL nats作为数据汇的连接凭证 nats作为数据汇时,必填 /path/file
REDIS_HOST redis连接地址 redis作为数据汇时,必填 redis.sz
REDIS_PORT redis连接端口 redis作为数据汇时,必填 6379
REDIS_PASSWORD redis连接密码 redis作为数据汇时,必填 85269262
REDIS_DATABASE redis连接数据库(不填则默认0) 非必填 0
SOURCE_TYPE 数据源的类型:支持JetStream 必填 JETSTREAM
SINK_TYPES 数据汇的类型:支持JetStream nats Redis。多个逗号分隔 必填 NATS,REDIS
SOURCE_STREAMS nats作为数据源时,需要处理的stream名称。多个逗号分隔 nats作为数据源时,必填 out-FR00015,out-FR00011
SOURCE_CONSUMER nats作为数据源时的消费者名称 nats作为数据源时,必填 TPL
SOURCE_DELIVERY_POLICY nats作为数据源时的消费策略:支持EARLIEST, LAST, LATEST, FROM_TIME, FROM_STREAM_SEQUENCE nats作为数据源时,必填 LAST
SOURCE_FILTER_SUBJECT nats作为数据源时,消费者消费的主题。全消费:> nats作为数据源时,必填 >
SINK_SUBJECT nats作为数据汇时,汇入的主题 nats作为数据汇时,必填 flink.tpl
FLINK_JOB_NAME Flink Job名称 必填 FLINK_TPL_JOB

# 🛠 支持两种方式配置

  1. 环境变量(适用于部署)
  2. 系统变量(适用于本地调试)

# 📝 环境变量示例

environment:
  - SOURCE_URL=nats://localhost:4222
  - SINK_URL=nats://localhost:4222
  - SOURCE_CREDENTIAL=/path/file
  - SINK_CREDENTIAL=/path/file
  - SOURCE_TYPE=JETSTREAM
  - SINK_TYPES=NATS,REDIS
1
2
3
4
5
6
7

# 📄 系统变量示例(flink.properties)

SOURCE_TYPE=JETSTREAM
SINK_TYPES=NATS,REDIS
FLINK_JOB_NAME=FLINK_TPL_JOB
SOURCE_STREAMS=out-FR00015
SINK_SUBJECT=flink.tpl
1
2
3
4
5

🚀 优先级:环境变量 > 系统变量
📌 Tips:如果环境变量未设置,则会使用 flink.properties 中的配置。


# 🤖 策略模式优化事件解析

在数据处理时,我们可能会有不同的 eventSource,如何更优雅地解析?🤔

策略模式 🏆:避免大量 if-else,解耦代码!

# ✨ 实现方式

  1. 定义解析接口
public interface EventParser {
    void parse(EventModel event);
}
1
2
3
  1. 实现不同 Event 解析类
public class DeviceEventParser implements EventParser {
    public void parse(EventModel event) {
        // 设备事件解析逻辑
    }
}
1
2
3
4
5
  1. 管理不同的解析策略
public class EventParserHolder {
    private static final Map<String, EventParser> parsers = new HashMap<>();
    static { parsers.put("DEVICE_EVENT", new DeviceEventParser()); }
    public static EventParser getParser(String type) { return parsers.get(type); }
}
1
2
3
4
5
  1. 使用策略模式解析事件
EventParser parser = EventParserHolder.getParser(eventType);
if (parser != null) { parser.parse(event); }
1
2

🎯 这样,新增 Event 解析时,只需添加一个新的实现类,无需改动现有代码!


System.out.println("Enjoy your Flink coding journey! 🚀");
1

让你的 Flink 任务跑起来,就这么简单! 🏃💨

# 📌 部署方式

Flink 提供了两种主要的部署模式,我们分别看看如何高效部署它们👇


# 🌟 Session Mode(会话模式)

👉 官方文档:Flink Standalone Docker - Session Mode (opens new window)

# 💼 CI/CD 一键部署

  1. CI: 使用 Earthfile,在 CI 环境下打包 JAR 🎁
  2. CD: 读取打包产物,使用 Flink Client 提交 Job 🚀

# 🛠️ 本地调测 - Web UI 提交 Job

# 1️⃣ 先构建 JAR

在 IDEA 中执行 Gradle 构建 👇

./gradlew clean build
1

🔹 产物位置:build/libs/xxx-all.jar

# 2️⃣ 提交 Job
  1. 访问 Flink Web UI(默认 http://localhost:8081)
  2. 左侧菜单 ➝ Submit New Job
  3. 上传 JAR,填写运行参数 ➝ Submit
  4. 左侧菜单 ➝ Running Jobs 查看运行状态 ✅

# 🚀 Application Mode(应用模式)

👉 官方文档:Flink Standalone Docker - Application Mode (opens new window)

# 💼 CI/CD 部署方案

  1. CI(打包阶段)

    • EarthFile ✅ 复用现有 Quarkus 打包流程
    • Dockerfile ✅ 复用,注意 ENTRYPOINT
    • docker-entrypoint ✅ 复用,作为 Flink Job 启动类
  2. CD(部署阶段)

    • docker-compose.yml 指定正确的 镜像 tag 和 JAR 文件名称
    • 指定 Job 运行环境(-env.active xxx)
    • services - jobmanager - command 配置启动命令

📝 JetStream 处理逻辑

如果 consumer 不存在,Flink 会自动创建,无需手动管理 savepoint 🎯

# 🔧 统一软件版本

统一 Flink 版本,确保环境一致!📌

组件 版本 依赖声明
Flink 1.15.2 flink-streaming-java:1.15.2
JNATS 2.16.9 io.nats:jnats:2.16.9
Protobuf 3.21.9 com.google.protobuf:protobuf-java:3.21.9
Java 11 JavaVersion.VERSION_11

# 🎯 总结

💡 无论是 Session Mode 还是 Application Mode,流程都很简单!

✅ CI/CD 无缝集成

✅ 本地调试 & Web UI 提交 Job

✅ 统一版本管理,确保稳定运行

附录参考

链接 备注
Flink WebUI 详解 (opens new window) 作者很详细的说明了webUI的相关操作,介绍了常用操作,一些参数配置含义,对于新手上手非常友好。
Checkpointing (opens new window) 主要关注配置部分
生成 Watermark (opens new window) 水位线的生成策略,使用功能之前建议阅读一下官方文档的描述,便于理解的更加深刻。
窗 (opens new window) 很详细的描述了窗口包括生命周期,不同的窗口含义以及侧重,如何触发以及程序编写等。
Savepoints (opens new window) 详细介绍了savepoint,以及如何触发使用。同时包含了使用savepoint的常见问题,罗列在下面,很具有参考意义
概览 (opens new window) dataStreamAPI快速入门,如何快速开发一个Flink Job
本仓库的意义也想实现该目的,将该文章同样附录在此,方便检索
docker-compose编排
github.dev (opens new window) 理解dockerFile docker-entrypoint docker-compose几个文件相互关系
juejin.cn (opens new window) 完整的介绍了Flink的各种部署方式,同时各种部署方式的架构图也同样出具,很具有参考性
1. What's the difference between flink application cluster and job cluster (opens new window)
2. Developing job for Flink (opens new window) SO上概念辨析以及如何部署Flink Job,高赞回答
#模板工程#Apache Flink
上次更新: 2025/03/09, 15:45:50
数据分析
实时调度

← 数据分析 实时调度→

最近更新
01
STM32入门
03-09
02
ADB调试
03-09
03
微信小程序学习记录
02-09
更多文章>
Theme by Vdoing | Copyright © 2019-2025 Jorgen | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式