1. 初识Apache Paimon为什么说它是流数据湖的“新宠”大家好我是老张在数据领域摸爬滚打了十几年从早期的Hadoop生态一路跟到现在的流批一体。今天想和大家聊聊一个让我眼前一亮的项目——Apache Paimon。你可能听过数据湖也用过流处理引擎但把两者无缝融合还能提供实时更新和高效查询的Paimon算是一个“狠角色”。简单来说Apache Paimon是一个开源的流数据湖平台。什么叫“流数据湖”你可以把它想象成一个既能像数据湖那样海纳百川、低成本存储海量历史数据又能像消息队列比如Kafka那样实时处理源源不断的数据流的“超级综合体”。它最吸引我的地方就是解决了我们做实时数仓时的一个老大难问题如何让实时写入的数据立刻就能被分析查询同时还能保证历史数据的批处理效率。我最早接触Paimon当时还叫Flink Table Store是在一个金融风控项目里。业务方要求我们能实时追踪每一笔交易并在几秒钟内识别出异常模式同时风控模型又需要定期比如每天对全量历史数据进行复杂的批量训练。传统的架构往往需要维护两套系统一套KafkaFlink做实时流一套Hive/ Iceberg做批处理和历史查询链路复杂数据一致性更是让人头疼。而Paimon的出现让我们用一张表就同时支撑了流式写入、实时点查和批量分析开发和运维成本直接砍半。它的核心能力可以概括为“流批一体”和“实时更新”。数据进来无论是来自数据库的CDC变更流还是来自业务系统的实时事件Paimon都能以流的方式高效写入。在底层它巧妙地采用了LSM树日志结构合并树的存储结构这让它在大规模数据更新时表现非常出色避免了传统数据湖格式如Parquet在更新时重写整个文件的性能瓶颈。对外它提供标准的表接口用熟悉的Flink SQL就能操作大大降低了学习门槛。接下来我们就从零开始一步步搭建环境并深入它的核心功能。2. 手把手搭建你的第一个Paimon流数据湖环境纸上得来终觉浅绝知此事要躬行。光说概念没意思咱们直接动手搭一个。Paimon本身是一个存储格式它需要依赖一个计算引擎来读写数据最亲密的伙伴自然是Apache Flink。下面的步骤是我在本地和测试环境反复验证过的你跟着做大概率能一次成功。2.1 基础环境准备与Flink部署首先我们需要一个计算引擎。这里我选择Apache Flink 1.17.1版本这是目前Paimon官方兼容性较好的一个稳定版本。你完全可以用更新的版本但需要注意对应Paimon Jar包的版本匹配。# 1. 下载Flink wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz # 解压 tar -xvf flink-1.17.1-bin-scala_2.12.tgz cd flink-1.17.1 # 2. 关键配置修改 conf/flink-conf.yaml # 我习惯用vim你用任何编辑器都行 vim conf/flink-conf.yaml在flink-conf.yaml文件中我建议你至少配置下面这几项这对后续稳定运行很重要# 设置全局编码避免中文乱码问题 env.java.opts.all: -Dfile.encodingUTF-8 # 关闭类加载器泄漏检查避免一些依赖冲突的警告 classloader.check-leaked-classloader: false # 每个TaskManager的slot数根据你的机器核数来我测试机4核就设4 taskmanager.numberOfTaskSlots: 4 # 开启Checkpoint间隔10秒这是流处理容错的基础 execution.checkpointing.interval: 10s # 状态后端使用RocksDB这是生产级应用的标准选择状态数据量大时比内存后端更可靠 state.backend: rocksdb # 指定Checkpoint的存储路径我这里是HDFS如果你用本地文件系统可以改成 file:///path/to/checkpoints state.checkpoints.dir: hdfs://mycluster/flink/checkpoints # 开启增量Checkpoint对于状态很大的作业可以显著提升Checkpoint效率 state.backend.incremental: true注意state.checkpoints.dir的HDFS地址需要替换成你自己的集群地址。如果你还没有HDFS环境只是想快速体验可以暂时先注释掉这几行Flink会使用默认的本地路径但请注意这仅适用于测试。2.2 引入Paimon与必备ConnectorFlink准备好了现在把主角Paimon请进来。Paimon以Jar包的形式提供我们需要把它放到Flink的lib目录下。# 进入Flink的lib目录 cd lib/ # 下载Paimon与Flink 1.17集成的Jar包 # 注意这里以0.5-SNAPSHOT版本为例生产环境建议使用官方发布的稳定版本如0.4.0-incubating wget https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.17/0.5-SNAPSHOT/paimon-flink-1.17-0.5-20230802.034234-105.jar光有Paimon还不够我们通常需要从其他数据源如MySQL、Kafka摄取数据或者将数据同步到其他系统。所以我习惯一次性把常用的Connector也下载好。# 下载Flink Hive连接器如果你想用Hive Catalog或与Hive集成 wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.1/flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar # 下载Flink MySQL CDC连接器用于实时捕获数据库变更这是流数据湖的重要数据来源 wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar # 下载Flink Kafka连接器用于消费实时消息流 wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar下载完成后回到Flink根目录我们就可以启动集群了。如果你有YARN环境可以用yarn-session模式提交这样资源管理更方便。没有的话用standalone模式启动本地集群也行。cd ../ # 使用YARN Session模式启动Flink集群-d参数表示后台运行 ./bin/yarn-session.sh -d # 启动SQL客户端并连接到YARN Session ./bin/sql-client.sh -s yarn-session进入SQL客户端后执行几个简单命令测试一下环境-- 设置结果展示模式为表格看起来更直观 SET sql-client.execution.result-mode tableau; SHOW DATABASES; SELECT 1 1;如果都能正常返回结果恭喜你基础环境已经就绪你可以通过YARN的Web UI找到Flink集群的ApplicationMaster链接点进去就能看到Flink自己的管理界面确认我们的SQL客户端作业正在运行。3. 核心概念与架构LSM树如何让数据湖“流”起来环境搭好了在动手建表之前我们有必要深入了解一下Paimon的“内功心法”。理解了它的核心架构和设计思想后面无论是调优还是排错你都会更有底气。Paimon的架构简单概括就是“湖存储的外壳LSM树的内核流批一体的接口”。3.1 统一存储一张表应对所有场景在传统Lambda架构里我们经常面临一个困境实时链路用Kafka批处理链路用Hive两套存储、两套计算数据口径对齐是个噩梦。Paimon提出的“统一存储”理念就是想终结这种分裂。你可以把Paimon表想象成一个具有“多重人格”的超级表。当Flink以批处理模式Batch Mode读取它时它表现得就像一个标准的Hive表你可以进行全量扫描、复杂关联执行那些跑一整天的批处理作业。当Flink以流处理模式Streaming Mode读取它时它又摇身一变成了一个永远不会过期、能回溯历史的“超级消息队列”你可以持续不断地读取最新的数据变更Changelog。这种统一性意味着开发人员只需要面对一套APIFlink SQL运维人员只需要维护一套存储大大简化了系统复杂度。3.2 LSM树实时更新的秘密武器数据湖格式如Parquet、ORC擅长一次性写入、多次读取但面对频繁的更新Update和删除Delete操作时往往需要重写整个文件成本极高。这正是Paimon引入LSM树Log-Structured Merge-Tree的原因。LSM树不是什么新概念它在RocksDB、LevelDB等KV存储中广泛应用核心思想是“先内存后磁盘分层合并”。Paimon借鉴了这个思想并将其应用在表格式上。我来打个比方LSM树就像一个有多个层级的工作台。第一层L0内存缓冲区新来的数据插入、更新、删除首先被快速写入内存中的一个有序结构Sorted Run。这里速度极快因为是内存操作。第二层及以下L1, L2...磁盘文件当内存缓冲区满了Paimon会将其中的数据排序后作为一个小的、有序的数据文件SSTable刷到磁盘上这就是一个Sorted Run。磁盘上会有很多这样的Sorted Run。合并Compaction随着小文件越来越多查询时需要合并的文件也越多性能就会下降。因此Paimon会有一个后台的合并任务将多个小的、可能有键值重叠的Sorted Run合并成更大的、键值范围更清晰有序的新文件。这个过程就是Compaction。这样做的好处是什么写入极快因为大部分写入都是顺序追加Append-Only到内存或新文件避免了随机写磁盘。更新高效更新操作只是写入一条带有新时间戳的记录真正的合并留到后台的Compaction去做。查询优化通过分层和合并保证了查询时不需要扫描太多文件并且可以利用主键进行高效查找。3.3 文件布局与核心概念了解了LSM树我们再看看Paimon在磁盘上是怎么组织文件的。理解了文件布局对排查问题、理解时间旅行等功能至关重要。base-path/ ├── snapshot/ # 存放所有快照的元数据文件JSON格式 ├── manifest/ # 存放清单列表和清单文件记录了每个快照包含哪些数据文件 └── partitionxxx/ # 分区目录如果表有分区 └── bucket-xx/ # 桶目录 ├── data/ # 存放实际的列式数据文件ORC/Parquet格式 └── changelog/ # 可选存放变更日志文件快照Snapshot这是Paimon实现时间旅行Time Travel和流式读取的基础。每次数据提交Commit都会生成一个新的快照记录了表在某个时刻的完整状态。你可以通过指定快照ID或时间戳查询历史上的任意一个版本的数据这对于数据审计、回滚、增量同步场景非常有用。分区Partition和Hive分区一样用于根据日期、地区等维度对数据进行物理隔离加速针对分区列的查询和过滤。主键字段必须包含所有分区字段这是一个重要的约束。桶Bucket这是Paimon进行横向扩展和并行读写的最小单元。未分区表或分区内的数据会通过哈希主键或指定的Bucket Key分散到多个桶中。每个桶独立维护一套LSM树结构。桶的数量决定了写作业的最大并行度也影响了文件的大小。我的一般经验是尽量让每个桶的数据量在1GB左右太小会导致文件碎片化太大则影响并行度。一致性保证Paimon使用两阶段提交来保证写入的原子性。只要多个写入作业不修改同一个桶它们的提交就是完全隔离的。如果修改了同一个桶则提供快照隔离级别最终状态可能是两次提交的混合但不会丢失任何更改。4. 实战入门从Catalog管理到表操作全解析理论部分消化得差不多了吧咱们撸起袖子开始真正的实战。这部分我会带你过一遍使用Paimon最核心的操作流程从创建Catalog到管理表每一步都有详细的代码和我的经验之谈。4.1 管理数据的入口Catalog详解Catalog在Paimon里是元数据的“管家”它知道你的表存在哪里、结构是什么。Paimon主要支持两种Catalog文件系统Catalog和Hive Catalog。文件系统Catalog是最简单直接的它把元数据表结构等以JSON文件的形式和表数据一起存放在你指定的文件系统路径下比如HDFS或S3。这种方式部署简单不依赖外部服务。-- 在Flink SQL客户端中执行 CREATE CATALOG fs_catalog WITH ( type paimon, warehouse hdfs://mycluster/paimon/warehouse -- 指定仓库路径 ); USE CATALOG fs_catalog;创建成功后执行SHOW CATALOGS;你就能看到fs_catalog了。之后在这个Catalog下创建的表其元数据和数据文件都会存放在hdfs://mycluster/paimon/warehouse目录下。Hive Catalog则更适合已经存在Hive数仓的环境。它把Paimon表的元数据存储在Hive Metastore里。这样做的好处是Paimon表可以直接被Hive、Spark、Presto这些已经集成Hive Metastore的引擎查询生态互通性极好。使用前确保你的Hive Metastore服务已经启动hive --service metastore 。然后创建CatalogCREATE CATALOG hive_catalog WITH ( type paimon, metastore hive, uri thrift://your-hms-host:9083, -- Hive Metastore地址 hive-conf-dir /path/to/your/hive/conf, -- Hive配置文件目录 warehouse hdfs://mycluster/paimon/hive_warehouse ); USE CATALOG hive_catalog;踩坑提示使用Hive Catalog时数据库名、表名和字段名最好全部使用小写因为Hive Metastore对大小写的处理可能因版本和配置而异统一小写能避免很多意想不到的问题。为了不用每次启动SQL客户端都手动创建Catalog我习惯把初始化语句写在一个文件里比如conf/sql-client-init.sql然后通过-i参数指定。./bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql4.2 表的创建与管理多种姿势任君选择进入Catalog后我们就可以创建表了。Paimon支持多种建表方式适应不同场景。1. 创建带主键的管理表这是最常用的方式主键表支持高效的更新和删除操作。-- 创建一个用户行为表以日期和小时分区主键包含分区键和用户ID CREATE TABLE user_behavior_pk ( user_id BIGINT, item_id BIGINT, behavior STRING, dt STRING COMMENT 日期格式yyyyMMdd, hh STRING COMMENT 小时, PRIMARY KEY (dt, hh, user_id) NOT ENFORCED -- 主键约束由Paimon管理Flink不检查 ) PARTITIONED BY (dt, hh) WITH ( bucket 4 -- 指定桶数为4 );NOT ENFORCED表示这个主键约束由Paimon在写入时保证唯一性而不是由Flink SQL引擎在计算过程中检查。PARTITIONED BY指定了分区字段。WITH子句可以设置很多表属性bucket 4就是指定了哈希桶的数量。2. 使用CTASCREATE TABLE AS SELECT建表这是一种快速建表并灌入数据的方式特别适合数据迁移或中间表创建。-- 从一个已存在的表可以是任何Flink支持的表创建并填充数据 CREATE TABLE user_behavior_new WITH (file.format parquet) -- 新表使用Parquet格式 AS SELECT * FROM some_source_table;3. 创建外部表外部表的特点是删除表时底层的数据文件不会被删除。当你只想临时查询某个Paimon表或者表数据由其他流程管理时可以使用外部表。-- 注意这里使用的是default_catalog而不是paimon catalog USE CATALOG default_catalog; CREATE TABLE external_paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( connector paimon, -- 指定连接器类型 path hdfs://mycluster/paimon/warehouse/mydb/mytable, -- 指向已存在的Paimon表路径 auto-create false -- 路径不存在时不自动创建 );4. 修改表结构业务需求总是在变表结构也需要调整。Paimon支持完整的模式演化Schema Evolution。-- 添加新列 ALTER TABLE my_table ADD (new_column1 INT COMMENT 新增列1, new_column2 STRING); -- 修改列名 ALTER TABLE my_table RENAME COLUMN old_name TO new_name; -- 修改列数据类型需谨慎确保数据兼容性 ALTER TABLE my_table MODIFY column_name DOUBLE; -- 删除列 ALTER TABLE my_table DROP COLUMN column_to_drop; -- 修改表属性比如调整合并参数 ALTER TABLE my_table SET (compaction.max.file-num 10);修改表结构在流处理场景下是平滑进行的新的流写入作业会使用新的Schema而旧的快照依然可以用旧的Schema读取这个特性非常实用。4.3 一个电商场景的完整示例让我们结合一个简单的电商用户点击流场景把上面的操作串起来。假设我们有一个Kafka Topicuser_clicks实时接收用户点击日志。-- 1. 在Hive Catalog下创建一张Paimon主键表用于存储去重后的用户最新点击 USE CATALOG hive_catalog; CREATE TABLE dwd_user_last_click ( user_id BIGINT, item_id BIGINT, click_time TIMESTAMP(3), page_id STRING, dt STRING, PRIMARY KEY (dt, user_id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( bucket 4, merge-engine deduplicate -- 使用去重合并引擎保留同一主键的最新记录 ); -- 2. 创建一个临时视图从Kafka读取原始数据假设我们已经定义了Kafka表kafka_user_clicks -- 这里省略了Kafka源表的DDL定义... CREATE TEMPORARY VIEW click_stream AS SELECT user_id, item_id, CAST(event_time AS TIMESTAMP(3)) AS click_time, page_id, DATE_FORMAT(CAST(event_time AS TIMESTAMP(3)), yyyyMMdd) AS dt FROM kafka_user_clicks; -- 3. 将流数据持续写入Paimon表 -- 这里使用 INSERT INTO 流式写入Flink作业会一直运行 INSERT INTO dwd_user_last_click SELECT user_id, item_id, click_time, page_id, dt FROM click_stream;执行上面的INSERT INTO语句后一个Flink流作业就会启动源源不断地将Kafka里的点击数据经过处理后写入Paimon表。由于我们设置了主键和去重合并引擎这张表始终保存着每个用户当天最新的点击记录。与此同时你可以随时启动另一个Flink作业批或流模式对这张表进行查询分析实现真正的流批一体查询。5. 高级功能与性能调优让你的数据湖飞起来基础功能跑通后我们来看看Paimon的一些高级特性和调优点这些是让它能在生产环境稳定、高效运行的关键。5.1 合并引擎与Changelog生成这是Paimon非常强大的两个特性。merge-engine决定了当多条记录具有相同主键时如何合并它们。默认是deduplicate即保留最后一条根据时间戳或顺序这适用于维表、最新状态表。你还可以设置为partial-update部分更新适用于有多列且更新可能来自不同来源的场景可以按列合并。aggregation聚合比如对同一主键的数值列进行sum、max等聚合非常适合实时聚合场景。changelog-producer决定了如何为流读取生成变更日志。流计算经常需要知道数据的“增删改”I, -U, U, -D。Paimon可以从存储的数据中还原出完整的Changelog。常用的模式有none不生成流读只能看到最新的快照数据。lookup在读取时通过查找数据来生成延迟低但会给读取端带来额外开销。full-compaction在后台合并Compaction时生成资源消耗在写入端读取效率高但有一定延迟。在我的一个实时库存项目中就使用了merge-engine aggregation和changelog-producer full-compaction的组合。订单和退货流同时写入Paimon自动对库存数量进行累加和扣减并生成完整的变更流供下游对账服务消费非常省心。5.2 性能调优要点根据我的经验想让Paimon跑得又快又稳有几个参数需要特别关注Bucket数量这是影响并行度和文件大小的首要参数。公式可以粗略估算为桶数 ≈ 期望表总大小 / 1GB。同时桶数也最好等于你写入作业并行度的整数倍以避免数据倾斜。比如你计划这张表最终有100GB数据写入并行度是4那么设置bucket 8或16可能是个不错的起点。Compaction相关参数合并Compaction是LSM树平衡读写性能的关键。compaction.max.file-num触发Compaction的Sorted Run数量阈值。调小可以加速查询因为文件少但会增加写放大更频繁的合并。默认值通常够用如果查询性能压力大可以适当调小。compaction.early-max.file-num早期L0到L1合并的阈值。对于写入速度非常快、希望尽快合并掉内存中数据的场景可以调小这个值。对于写入吞吐量极高的场景可以考虑开启write-only true并设置compaction.schedule parallel让合并任务在一个独立的Flink作业中执行避免影响主写入作业的稳定性。文件格式Paimon支持ORC默认、Parquet和Avro。ORC在查询性能上通常有更好的表现尤其是涉及列筛选时。Parquet的生态兼容性更广。你可以通过file.format参数指定。内存调优写入时数据先缓存在内存的写缓冲区Write Buffer中。通过write-buffer-size可以调整其大小默认64MB。对于写入量大的作业适当调大如256MB或512MB可以减少刷盘次数提升写入吞吐。但也要考虑TaskManager的堆内存大小。5.3 时间旅行与增量读取这是数据湖相比消息队列的一大优势。你可以轻松查询历史数据。-- 读取指定时间戳的历史数据时间旅行 SELECT * FROM my_table /* OPTIONS(scan.timestamp-millis1690876800000) */; -- 读取指定快照ID的数据 SELECT * FROM my_table /* OPTIONS(scan.snapshot-id12345) */; -- 流式读取从某个快照开始的增量数据增量读取 SELECT * FROM my_table /* OPTIONS(scan.modeincremental, incremental-between12345,67890) */;在数据回溯、版本对比、增量ETL等场景下这些功能简直就是“神器”。有一次我们上游数据源出问题污染了当天部分数据就是利用时间旅行功能将表回滚到问题发生前的快照快速恢复了业务。走到这里你已经从零开始搭建了一个可用的Paimon环境理解了它的核心架构并完成了基本的表和数据处理操作。Paimon的魅力在于它用相对简单的接口封装了背后复杂而精巧的流批一体存储能力。当然每个生产环境都有其独特性在真正上生产前务必在你的业务数据规模和模式上进行充分的性能测试调整好桶数、合并策略等参数。多看看监控指标比如Compaction的频率、文件数量分布等这些都能帮助你更好地驾驭这个强大的流数据湖引擎。