admin管理员组

文章数量:1531666

第6章 Hudi核心概念介绍

更为详细的使用文档请参考《尚硅谷大数据之Hudi》。

6.1 基本概念

6.1.1 时间轴(TimeLine)

Hudi的核心是维护表上在不同的即时时间(instants)执行的所有操作的时间轴(timeline),这有助于提供表的即时视图,同时还有效地支持按到达顺序检索数据。一个instant由以下三个部分组成:

1Instant action:在表上执行的操作类型

  • COMMITS:一次commit表示将一批数据原子性地写入一个表。
  • CLEANS:清除表中不再需要的旧版本文件的后台活动。
  • DELTA_COMMIT:增量提交指的是将一批数据原子性地写入一个MergeOnRead类型的表,其中部分或所有数据可以写入增量日志。
  • COMPACTION:合并Hudi内部差异数据结构的后台活动,例如:将更新操作从基于行的log日志文件合并到列式存储的数据文件。在内部,COMPACTION体现为timeline上的特殊提交。
  • ROLLBACK:表示当commit/delta_commit不成功时进行回滚,其会删除在写入过程中产生的部分文件。
  • SAVEPOINT:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。

2Instant time

通常是一个时间戳(例如:20190117010349),它按照动作开始时间的顺序单调增加。

3State

  • REQUESTED:表示某个action已经调度,但尚未执行。
  • INFLIGHT:表示action当前正在执行。
  • COMPLETED:表示timeline上的action已经完成。

4)两个时间概念

区分两个重要的时间概念:

  • Arrival time: 数据到达 Hudi 的时间,commit time。
  • Event time: record 中记录的时间。

上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据,根据event time该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 (commit time)之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。

6.1.2 文件布局(File Layout

Hudi将一个表映射为如下文件结构

Hudi存储分为两个部分:

(1)元数据:.hoodie目录对应着表的元数据信息,包括表的版本管理(Timeline)、归档目录(存放过时的instant也就是版本),一个instant记录了一次提交(commit)的行为、时间戳和状态,Hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据;

(2)数据:和hive一样,以分区方式存放数据;分区里面存放着Base File(.parquet)和Log File(.log.*);

(1)Hudi将数据表组织成分布式文件系统基本路径(basepath)下的目录结构

(2)表被划分为多个分区,这些分区是包含该分区的数据文件的文件夹,非常类似于Hive表

(3)在每个分区中,文件被组织成文件组,由文件ID唯一标识

(4)每个文件组包含几个文件片(FileSlice)

(5)每个文件片包含:

  • 一个基本文件(.parquet):在某个commit/compaction即时时间(instant time)生成的(MOR可能没有)
  • 多个日志文件(.log.*),这些日志文件包含自生成基本文件以来对基本文件的插入/更新(COW没有)

(6)Hudi采用了多版本并发控制(Multiversion Concurrency Control, MVCC)

  • compaction操作:合并日志和基本文件以产生新的文件片
  • clean操作:清除不使用的/旧的文件片以回收文件系统上的空间

(7)Hudi的base file(parquet 文件)在 footer 的 meta 去记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测。只有不在 BloomFilter 的 key 才需要扫描整个文件消灭假阳。

(8)Hudi 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包含 magic number、size、content、footer 等信息,用于数据读、校验和过滤。

6.1.3 索引(Index)

1)原理

Hudi通过索引机制提供高效的upserts,具体是将给定的hoodie key(record key + partition path)与文件id(文件组)建立唯一映射。这种映射关系,数据第一次写入文件后保持不变,所以,一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。

Hudi 为了消除不必要的读写,引入了索引的实现。在有了索引之后,更新的数据可以快速被定位到对应的 File Group。上图为例,白色是基本文件,黄色是更新数据,有了索引机制,可以做到:避免读取不需要的文件、避免更新不必要的文件、无需将更新数据与历史数据做分布式关联,只需要在 File Group 内做合并。

2)索引选项

Index类型

原理

优点

缺点

Bloom Index

默认配置,使用布隆过滤器来判断记录存在与否,也可选使用record key的范围裁剪需要的文件

效率高,不依赖外部系统,数据和索引保持一致性

因假阳性问题,还需回溯原文件再查找一遍

Simple Index

update/delete操作的新数据和老数据进行join

实现最简单,无需额外的资源

性能比较差

HBase Index

把index存放在HBase里面。在插入 File Group定位阶段所有task向HBase发送 Batch Get 请求,获取 Record Key 的 Mapping 信息

对于小批次的keys,查询效率高

需要外部的系统,增加了运维压力

Flink State-based Index

HUDI 在 0.8.0 版本中实现的 Flink witer,采用了 Flink 的 state 作为底层的 index 存储,每个 records 在写入之前都会先计算目标 bucket ID。

不同于 BloomFilter Index,避免了每次重复的文件 index 查找

注意:Flink只有一种state based index,其他index是Spark可选配置。

3)全局索引与非全局索引

global index里面存放了一张表里所有record的key,而non-global index是每个partition都有一个对应的index,里面只存放了本partition的key。所以如果用户使用non-global index,就必须保证同一个key的record不会出现在多个partition里面。

从index的维护成本和写入性能的角度考虑,维护一个global index的难度更大,对写入性能的影响也更大,所以需要non-global index。

bloom和simple index都有全局选项:

  • hoodie.index.type=GLOBAL_BLOOM
  • hoodie.index.type=GLOBAL_SIMPLE
  • HBase索引本质上是一个全局索引

4)索引的选择

(1)普通索引:主要用于非分区表和分区不会发生分区列值变更的表。当然如果你不关心多分区主键重复的情况也是可以使用。他的优势是只会加载upsert数据中的分区下的每个文件中的索引,相对于全局索引需要扫描的文件少。并且索引只会命中当前分区的fileid 文件,需要重写的快照也少相对全局索引高效。但是某些情况下我们的设置的分区列的值就是会变那么必须要使用全局索引保证数据不重复,这样upsert 写入速度就会慢一些。其实对于非分区表他就是个分区列值不会变且只有一个分区的表,很适合普通索引,如果非分区表硬要用全局索引其实和普通索引性能和效果是一样的。

(2)全局索引:分区表场景要考虑分区值变更,需要加载所有分区文件的索引比普通索引慢。

(3)布隆索引:加载fileid 文件页脚布隆过滤器,加载少量数据数据就能判断数据是否在文件存在。缺点是有一定的误判,但是merge机制可以避免重复数据写入。parquet文件多会影响索引加载速度。适合没有分区变更和非分区表。主键如果是类似自增的主键布隆索引可以提供更高的性能,因为布隆索引记录的有最大key和最小key加速索引查找。

(4)全局布隆索引:解决分区变更场景,原理和布隆索引一样,在分区表中比普通布隆索引慢。

(5)简易索引:直接加载文件里的数据不会像布隆索引一样误判,但是加载的数据要比布隆索引要多,left join 关联的条数也要比布隆索引多。大多数场景没布隆索引高效,但是极端情况命中所有的parquet文件,那么此时还不如使用简易索引加载所有文件里的数据进行判断。

(6)全局简易索引:解决分区变更场景,原理和简易索引一样,在分区表中比普通简易索引慢。建议优先使用全局布隆索引。

(7)HBase索引:不受分区变跟场景的影响,操作算子要比布隆索引少,在大量的分区和文件的场景中比布隆全局索引高效。因为每条数据都要查询hbase ,upsert数据量很大会对hbase有负载的压力需要考虑hbase集群承受压力,适合微批分区表的写入场景 。在非分区表中数量不大文件也少,速度和布隆索引差不多,这种情况建议用布隆索引。

(8)内存索引:用于测试不适合生产环境。

6.1.4 表类型(Table Types

1Copy On Write

在COW表中,只有数据文件/基本文件(.parquet),没有增量日志文件(.log.*)。

对每一个新批次写入都将创建相应数据文件的新版本(新的FileSlice),新版本文件包括旧版本文件的记录以及来自传入批次的记录(全量最新)。

假设我们有 3 个文件组,其中包含如下数据文件。

我们进行一批新的写入,在索引后,我们发现这些记录与File group 1 和File group 2 匹配,然后有新的插入,我们将为其创建一个新的文件组(File group 4)。

因此data_file1 和 data_file2 都将创建更新的版本,data_file1 V2 是data_file1 V1 的内容与data_file1 中传入批次匹配记录的记录合并。

由于在写入期间进行合并,COW 会产生一些写入延迟。但是COW 的优势在于它的简单性,不需要其他表服务(如压缩),也相对容易调试。

2)Merge On Read

MOR表中,包含 列存的基本文件(.parquet)和行存的增量日志文件(基于行的avro格式,.log.*)。

顾名思义,MOR表的合并成本在读取端。因此在写入期间我们不会合并或创建较新的数据文件版本。标记/索引完成后,对于具有要更新记录的现有数据文件,Hudi 创建增量日志文件并适当命名它们,以便它们都属于一个文件组。

读取端将实时合并基本文件及其各自的增量日志文件。每次的读取延迟都比较高(因为查询时进行合并),所以 Hudi 使用压缩机制来将数据文件和日志文件合并在一起并创建更新版本的数据文件。

用户可以选择内联或异步模式运行压缩。Hudi也提供了不同的压缩策略供用户选择,最常用的一种是基于提交的数量。例如可以将压缩的最大增量日志配置为 4。这意味着在进行 4 次增量写入后,将对数据文件进行压缩并创建更新版本的数据文件。压缩完成后,读取端只需要读取最新的数据文件,而不必关心旧版本文件。

MOR表的写入行为,依据 index 的不同会有细微的差别:

  • 对于 BloomFilter 这种无法对 log file 生成 index 的索引方案,对于 INSERT 消息仍然会写 base file (parquet format),只有 UPDATE 消息会 append log 文件(因为 base file 已经记录了该 UPDATE 消息的 FileGroup ID)。
  • 对于可以对 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次写入都是 log format,并且会不断追加和 roll over。

3)COW与MOR的对比

CopyOnWrite

MergeOnRead

数据延迟

查询延迟

Update(I/O) 更新成本

高(重写整个Parquet文件)

低(追加到增量日志)

Parquet文件大小

低(更新成本I/O高)

较大(低更新成本)

写放大

低(取决于压缩策略)

6.1.5 查询类型(Query Types

Hudi支持如下三种查询类型:

1Snapshot Queries

快照查询,可以查询指定commit/delta commit即时操作后表的最新快照。

在读时合并(MOR)表的情况下,它通过即时合并最新文件片的基本文件和增量文件来提供近实时表(几分钟)。

对于写时复制(COW),它可以替代现有的parquet表(或相同基本文件类型的表),同时提供upsert/delete和其他写入方面的功能,可以理解为查询最新版本的Parquet数据文件。

下图是COW的快照查询:

2)Incremental Queries

增量查询,可以查询给定commit/delta commit即时操作以来新写入的数据。有效的提供变更流来启用增量数据管道。

3Read Optimized Queries

读优化查询,可查看给定的commit/compact即时操作的表的最新快照。仅将最新文件片的基本/列文件暴露给查询,并保证与非Hudi表相同的列查询性能。

下图是MOR表的快照查询与读优化查询的对比:

Read Optimized Queries是对Merge On Read表类型快照查询的优化。

Snapshot

Read Optimized

数据延迟

查询延迟

高(合并列式基础文件+行式增量日志文件)

低(原始列式基础文件)

4)不同表支持的查询类型

6.2 数据写

6.2.1 写操作

(1)UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小 => CDC 导入

(2)INSERT:跳过 index,写入效率更高 => Log Deduplication

(3)BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)

6.2.2 写流程(UPSERT)

1Copy On Write

(1)先对 records 按照 record key 去重

(2)首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)

(3)对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice)

(4)对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSlice

2Merge On Read

(1)先对 records 按照 record key 去重(可选)

(2)首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)

(3)如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果  log file 可建索引,尝试 append 小的 log file,如果没有就新写一个  FileGroup + FileSlice + base file

(4)如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小的小文件,会 merge base file,生成新的 file slice)

(5)log file 大小达到阈值会 roll over 一个新的

6.2.3 写流程(INSERT)

1Copy On Write

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file

2Merge On Read

(1)先对 records 按照 record key 去重(可选)

(2)不会创建 Index

(3)如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file

6.2.4 写流程(INSERT OVERWRITE)

在同一分区中创建新的文件组集。现有的文件组被标记为 "删除"。根据新记录的数量创建新的文件组

1COW

在插入分区之前

插入相同数量的记录覆盖

插入覆盖更多的记录

插入重写1条记录

分区包含

file1-t0.parquet,file2-t0.parquet。

分区将添加file3-t1.parquet,file4-t1.parquet。file1, file2在t1后的元数据中被标记为无效。

分区将添加

file3-t1.parquet,

file4-t1.parquet

file5-t1.parquet

...

fileN-t1.parquet

file1, file2在t1后的元数据中被标记为无效

分区将添加file3-t1.parquet。file1, file2在t1后的元数据中被标记为无效。

2MOR

在插入分区之前

插入相同数量的记录覆盖

插入覆盖更多的记录

插入重写1条记录

分区包含

file1-t0.parquet,

file2-t0.parquet。

.file1-t00.log

file3-t1.parquet

file4-t1.parquet

file1, file2在t1后的元数据中被标记为无效。

file3-t1.parquet,

file4-t1.parquet

...

fileN-t1.parquet

file1, file2在t1后的元数据中被标记为无效

分区将添加file3-t1.parquet。file1, file2在t1后的元数据中被标记为无效。

3)优点

(1)COW和MOR在执行方面非常相似。不干扰MOR的compaction。

(2)减少parquet文件大小。

(3)不需要更新关键路径中的外部索引。索引实现可以检查文件组是否无效(类似于在HBaseIndex中检查commit是否无效的方式)。

(4)可以扩展清理策略,在一定的时间窗口后删除旧文件组。

4)缺点

(1)需要转发以前提交的元数据。

  • 在t1,比如file1被标记为无效,我们在t1mit中存储 "invalidFiles=file1"(或者在MOR中存储deltacommit)
  • 在t2,比如file2也被标记为无效。我们转发之前的文件,并在t2mit中标记 "invalidFiles=file1, file2"(或MOR的deltacommit)

(2)忽略磁盘中存在的parquet文件也是Hudi的一个新行为, 可能容易出错,我们必须认识到新的行为,并更新文件系统的所有视图来忽略它们。这一点可能会在实现其他功能时造成问题。

6.2.5 Key 生成策略

用来生成 HoodieKey(record key + partition path),目前支持以下策略:

  • 支持多个字段组合 record keys
  • 支持多个字段组合的 parition path (可定制时间格式,Hive style path name)
  • 非分区表

6.2.6 删除策略

1)逻辑删:将 value 字段全部标记为 null

2)物理删:

(1)通过 OPERATION_OPT_KEY  删除所有的输入记录

(2)配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 删除所有的输入记录

(3)在输入记录添加字段:_hoodie_is_deleted

6.2.7 总结

通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:

(1)写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。

(2)对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。

6.3 数据读

6.3.1 Snapshot读

读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件

6.3.2 Incremantal读

https://hudi.apache/docs/querying_data.html#spark-incr-query

当前的 Spark data source 可以指定消费的起始和结束 commit 时间,读取 commit 增量的数据集。但是内部的实现不够高效:拉取每个 commit 的全部目标文件再按照系统字段 _hoodie_commit_time_  apply 过滤条件。

6.3.3 Streaming读

0.8.0 版本的 HUDI Flink writer 支持实时的增量订阅,可用于同步 CDC 数据,日常的数据同步 ETL pipeline。Flink 的 streaming 读做到了真正的流式读取,source 定期监控新增的改动文件,将读取任务下派给读 task。

6.4 Compaction

(1)没有base file:走copy on write insert流程,直接merge所有的log file并写base file

(2)有base file:走copy on write upsert流程,先读log file建index,再读base file,最后读log file写新的base file

Flink和Spark streaming的writer都可以apply异步的compaction策略,按照间隔commits数或者时间来触发compaction任务,在独立的pipeline中执行。

第7章 湖仓一体项目的优势

7.1使用Hudi搭建数仓

从上一节我们对Hudi的简单介绍中可以知道,使用Hudi构建湖仓一体,有如下优势:

(1)沿用传统数仓中的分层思想

(2)不需要做复杂的拉链表

(3)不需要做全量增量表,延迟低,近似为实时的离线数仓

(4)Hudi自动解决小文件问题

(5)所有层的数据均可在Hive中通过Hive的同步表进行查询(底层只有一份数据)

7.2使用HiveCatalog持久化元数据

HiveCatalog作为表元数据持久化的介质,在生产环境我们一般采用HiveCatalog来管理元数据,这样的好处是不需要重复使用DDL创建表,只需要关心业务逻辑的SQL,简化了开发的流程,可以节省很多时间。

在Flink中创建表,表的元数据可以保存到hive的元数据中,元数据不会丢失。

Flink将表的元数据保存在hive的元数据中,在hive中可以看到Flink的表,但是不能对flink的表进行查询。

需要在启动Flink SQL Client时候显式指定配置文件,启动命令见6.3.3。

将sql-client-init.sql文件放在/opt/module/flink/conf目录下。内容如下:

#sql-client配置sql文件

vim conf/sql-client-init.sql

CREATE CATALOG myhive WITH (

    'type' = 'hive',

    'default-database' = 'default',

    'hive-conf-dir' = '/opt/module/hive/conf',

    'hadoop-conf-dir'='/opt/module/hadoop/etc/hadoop'

);

-- set the HiveCatalog as the current catalog of the session

USE CATALOG myhive;

7.3使用Flink SQL进行开发

基于yarn-session的SQL-client可以在提交任务时候动态申请TM个数。

cd /opt/module/flink

#基于yarn模式的sql-client(TM个数动态申请),启动命令:

bin/yarn-session.sh -d

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

#进入SQL client之后可以设置展示模式,分别有table,tableau,changelog三种模式。

SET sql-client.execution.result-mode=tableau;

7.4引入Hive的系统函数作为Flink的内置函数

在Flink SQL Client中执行

load module hive with ('hive-version'='3.1.2');

可以将Hive的UDF加载为Flink的一个module。

使用

SHOW MODULES;

命令可以查看所有模块的状态

这样做的好处就是,原本Flink中不支持Hive的语法,例如split,在加载hive模块之后就可以进行使用了。

有的同学会有如下疑问,如果一个函数(例如concat)在Hive和Flink中都有,那么在加载时候会解析为哪个模块的呢?

注意:Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数时,会有以下三种情况:

1. 如果两个 Module 都启用的话,Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF

2. 如果只有一个 Module 启用的话,Flink 就只会从启用的 Module 解析 UDF

3. 如果两个 Module 都没有启用,Flink 就无法解析这个 UDF

需要补充说明的是:

1. 如果出现第一种情况时,用户也可以改变使用 Module 的顺序。比如用户可以使用

USE MODULE hive,core

将Hive Module设为第一个使用及解析的Module。

2. 另外,用户可以使用

USE MODULES hive

去禁用默认的core Module,注意,禁用不是卸载 Module,用户之后还可以再次启用Module,并且使用

USE MODULES core

命令去将core Module设置为启用的。

3. 如果使用未加载的Module,则会直接抛出异常。

4. 禁用和卸载Module的区别在于禁用依然会在TableEnvironment保留Module,用户依然可以使用list命令看到禁用的Module。

7.5集成StreamX进行Flink SQL调度

后期可以集成StreamX对Flink SQL进行管理和调度

第8章 湖仓一体环境准备

8.1 编译Hudi源码(了解)

本次项目我们采用Hudi进行开发,目前,Hudi的最新版本是0.12.0(发版日期为2022-08-16)。我们在Hudi官网下载到的是Hudi的源码包,这就需要我们针对我们特定版本的Hadoop和Hive等组件进行编译。

数据采集部分的组件安装请参考

组件

版本

Java

1.8.212

Maven

3.6.3

Hadoop

3.1.3

Flume

1.9.0

Kafka

3.0.0

Hudi

0.12.0

Hive

3.1.2

8.1.1 安装Maven

(1)上传apache-maven-3.6.3-bin.tar.gz到/opt/software目录,并解压更名

tar -zxvf apache-maven-3.6.3-bin.tar.gz -C /opt/module/

mv apache-maven-3.6.3 maven

(2)添加环境变量到/etc/profile中

sudo vim /etc/profile

#MAVEN_HOME

export MAVEN_HOME=/opt/module/maven

export PATH=$PATH:$MAVEN_HOME/bin

(3)测试安装结果

source /etc/profile

mvn -v

2)修改为阿里镜像

(1)修改setting.xml,指定为阿里仓库地址

vim /opt/module/maven/conf/settings.xml

<!-- 添加阿里云镜像-->

<mirror>

        <id>nexus-aliyun</id>

        <mirrorOf>central</mirrorOf>

        <name>Nexus aliyun</name>

        <url>http://maven.aliyun/nexus/content/groups/public</url>

</mirror>

8.1.2 编译Hudi源码

源码下载地址:https://github/apache/hudi/releases/tag/release-0.12.0

解压hudi源码包到/opt/software文件夹下

cd /opt/software

tar -zxvf hudi-release-0.12.0.tar.gz

8.1.3修改pom文件

vim /opt/software/hudi-0.12.0/pom.xml

1)新增repository加速依赖下载

<repository>

        <id>nexus-aliyun</id>

        <name>nexus-aliyun</name>

        <url>http://maven.aliyun/nexus/content/groups/public/</url>

        <releases>

            <enabled>true</enabled>

        </releases>

        <snapshots>

            <enabled>false</enabled>

        </snapshots>

    </repository>

2)修改依赖的组件版本

<hadoop.version>3.1.3</hadoop.version>

<hive.version>3.1.2</hive.version>

修改源码使其兼容Hadoop3

Hudi默认依赖的hadoop2,要兼容hadoop3,除了修改版本,还需要修改如下代码:

vim /opt/software/hudi-0.12.0/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java

修改第110行,原先只有一个参数,添加第二个参数null:

8.1.4手动安装Kafka依赖

有几个kafka的依赖需要手动安装,否则编译报错如下:

[ERROR] Failed to execute goal on project hudi-utilities_2.12: Could not resolve dependencies for project org.apache.hudi:hudi-utilities_2.12:jar:0.12.0: The following artifacts could not be resolved: io.confluent:kafka-avro-serializer:jar:5.3.4, io.confluent:common-config:jar:5.3.4, io.confluent:common-utils:jar:5.3.4, io.confluent:kafka-schema-registry-client:jar:5.3.4: Failure to find io.confluent:kafka-avro-serializer:jar:5.3.4 in https://maven.aliyun/repository/public was cached in the local repository, resolution will not be reattempted until the update interval of aliyunmaven has elapsed or updates are forced -> [Help 1]

下载jar包

通过网址下载:http://packages.confluent.io/archive/5.3/confluent-5.3.4-2.12.zip

解压后找到以下jar包,上传服务器hadoop102任意位置

jar包放在了本课程的资料包中。

  • common-config-5.3.4.jar
  • common-utils-5.3.4.jar
  • kafka-avro-serializer-5.3.4.jar
  • kafka-schema-registry-client-5.3.4.jar

install到maven本地仓库

mvn install:install-file -DgroupId=io.confluent -DartifactId=common-config -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-config-5.3.4.jar

mvn install:install-file -DgroupId=io.confluent -DartifactId=common-utils -Dversion=5.3.4 -Dpackaging=jar -Dfile=./common-utils-5.3.4.jar

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-avro-serializer -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-avro-serializer-5.3.4.jar

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=5.3.4 -Dpackaging=jar -Dfile=./kafka-schema-registry-client-5.3.4.jar

8.1.5解决spark模块依赖冲突

修改了Hive版本为3.1.2,其携带的jetty是0.9.3,hudi本身用的0.9.4,存在依赖冲突,修改hudi-spark-bundle的pom文件,排除低版本jetty,添加hudi指定版本的jetty

vim /opt/software/hudi-0.12.0/packaging/hudi-spark-bundle/pom.xml

在388行的位置,修改如下(红色部分):

<!-- Hive -->

    <dependency>

      <groupId>${hive.groupid}</groupId>

      <artifactId>hive-service</artifactId>

      <version>${hive.version}</version>

      <scope>${spark.bundle.hive.scope}</scope>

      <exclusions>

        <exclusion>

          <artifactId>guava</artifactId>

          <groupId>com.google.guava</groupId>

        </exclusion>

        <exclusion>

          <groupId>org.eclipse.jetty</groupId>

          <artifactId>*</artifactId>

        </exclusion>

        <exclusion>

          <groupId>org.pentaho</groupId>

          <artifactId>*</artifactId>

        </exclusion>

      </exclusions>

    </dependency>

    <dependency>

      <groupId>${hive.groupid}</groupId>

      <artifactId>hive-service-rpc</artifactId>

      <version>${hive.version}</version>

      <scope>${spark.bundle.hive.scope}</scope>

    </dependency>

    <dependency>

      <groupId>${hive.groupid}</groupId>

      <artifactId>hive-jdbc</artifactId>

      <version>${hive.version}</version>

      <scope>${spark.bundle.hive.scope}</scope>

      <exclusions>

        <exclusion>

          <groupId>javax.servlet</groupId>

          <artifactId>*</artifactId>

        </exclusion>

        <exclusion>

          <groupId>javax.servlet.jsp</groupId>

          <artifactId>*</artifactId>

        </exclusion>

        <exclusion>

          <groupId>org.eclipse.jetty</groupId>

          <artifactId>*</artifactId>

        </exclusion>

      </exclusions>

    </dependency>

    <dependency>

      <groupId>${hive.groupid}</groupId>

      <artifactId>hive-metastore</artifactId>

      <version>${hive.version}</version>

      <scope>${spark.bundle.hive.scope}</scope>

      <exclusions>

        <exclusion>

          <groupId>javax.servlet</groupId>

          <artifactId>*</artifactId>

        </exclusion>

        <exclusion>

          <groupId>org.datanucleus</groupId>

          <artifactId>datanucleus-core</artifactId>

        </exclusion>

        <exclusion>

          <groupId>javax.servlet.jsp</groupId>

          <artifactId>*</artifactId>

        </exclusion>

        <exclusion>

          <artifactId>guava</artifactId>

          <groupId>com.google.guava</groupId>

        </exclusion>

      </exclusions>

    </dependency>

    <dependency>

      <groupId>${hive.groupid}</groupId>

      <artifactId>hive-common</artifactId>

      <version>${hive.version}</version>

      <scope>${spark.bundle.hive.scope}</scope>

      <exclusions>

        <exclusion>

          <groupId>org.eclipse.jetty.orbit</groupId>

          <artifactId>javax.servlet</artifactId>

        </exclusion>

        <exclusion>

          <groupId>org.eclipse.jetty</groupId>

          <artifactId>*</artifactId>

        </exclusion>

      </exclusions>

</dependency>

    <!-- 增加hudi配置版本的jetty -->

    <dependency>

      <groupId>org.eclipse.jetty</groupId>

      <artifactId>jetty-server</artifactId>

      <version>${jetty.version}</version>

    </dependency>

    <dependency>

      <groupId>org.eclipse.jetty</groupId>

      <artifactId>jetty-util</artifactId>

      <version>${jetty.version}</version>

    </dependency>

    <dependency>

      <groupId>org.eclipse.jetty</groupId>

      <artifactId>jetty-webapp</artifactId>

      <version>${jetty.version}</version>

    </dependency>

    <dependency>

      <groupId>org.eclipse.jetty</groupId>

      <artifactId>jetty-http</artifactId>

      <version>${jetty.version}</version>

    </dependency>

否则在使用spark向hudi表插入数据时,会报错如下:

java.lang.NoSuchMethodError: org.apache.hudi.apache.jetty.server.session.SessionHandler.setHttpOnly(Z)V

8.1.6执行编译命令

cd /opt/software/hudi-0.12.0/

mvn clean package -DskipTests -Dspark3.2 -Dflink1.13 -Dscala-2.12 -Dhadoop.version=3.1.3 -Pflink-bundle-shade-hive3

我们最后实际用到的几个包为:

/opt/software/hudi-0.11.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar

/opt/software/hudi-0.11.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar

/opt/software/hudi-0.11.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar

这几个编译好的jar包放在本课程的资料包中。

8.2 Hudi集成Flink

8.2.1解压Flink并配置环境变量

cd /opt/software

tar -zxvf flink-1.13.6-bin-scala_2.12.tgz -C /opt/module/

mv /opt/module/flink-1.13.6 /opt/module/flink

sudo vim /etc/profile.d/my_env.sh

export HADOOP_CLASSPATH=`hadoop classpath`

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

sudo source /etc/profile.d/my_env.sh

8.2.2修改Flink的配置文件

修改flink-conf.yaml

vim /opt/module/flink/conf/flink-conf.yaml

classloader.check-leaked-classloader: false

#根据机器性能进行tm的slot设置

taskmanager.numberOfTaskSlots: 4

state.backend: rocksdb

# checkpoint的时间根据集群性能和数据量确定

execution.checkpointing.interval: 30000

state.checkpoints.dir: hdfs://hadoop102:8020/ckps

state.backend.incremental: true

8.2.3将Hudi集成至Flink

我们将编译好的hudi-flink1.13-bundle_2.12-0.11.0.jar放到Flink的lib目录下

cp /opt/software/hudi-0.11.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink/lib/

8.2.4解决guava依赖冲突

cp /opt/module/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink/lib/

8.2.5将项目用到的connector的jar包放入flink的lib中

#需要下载的jar放入flink的lib

https://repo.maven.apache/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.6/flink-sql-connector-kafka_2.12-1.13.6.jar

https://repo1.maven/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar

https://repo.maven.apache/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.13.6/flink-sql-connector-hive-3.1.2_2.12-1.13.6.jar

需要注意:

(1) hive-connector必须解决guava冲突。使用压缩软件打开jar,删除 com目录下的google文件夹

(2) 解决找不到hadoop的依赖问题

cp /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar flink/lib

8.2.6 将元数据集成至HiveCatalog

需要在flink/conf目录下提前准备配置元数据的配置文件

将sql-client-init.sql文件放在/opt/module/flink/conf目录下。内容如下:

#sql-client配置sql文件

vim conf/sql-client-init.sql

CREATE CATALOG myhive WITH (

    'type' = 'hive',

    'default-database' = 'default',

    'hive-conf-dir' = '/opt/module/hive/conf',

    'hadoop-conf-dir'='/opt/module/hadoop/etc/hadoop'

);

-- set the HiveCatalog as the current catalog of the session

USE CATALOG myhive;

在启动sql client时候显式指定

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

8.3 Hudi集成Hive

8.3.1 解压Hive

把apache-hive-3.1.2-bin.tar.gz上传到Linux的/opt/software目录下

解压apache-hive-3.1.2-bin.tar.gz到/opt/module/目录下面

tar -zxvf /opt/software/apache-hive-3.1.3-bin.tar.gz -C /opt/module/

修改apache-hive-3.1.2-bin.tar.gz的名称为hive

mv /opt/module/apache-hive-3.1.2-bin/ /opt/module/hive

8.3.2 将Hudi集成至Hive

将 hudi-hadoop-mr-bundle-0.12.0.jar和hudi-hive-sync-bundle-0.12.0.jar放到hive节点的lib目录下;

cp /opt/software/hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar /opt/module/hive/lib/

cp /opt/software/hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar /opt/module/hive/lib/

8.3.3 配置Hive与环境变量

修改/etc/profile.d/my_env.sh,添加环境变量

sudo vim /etc/profile.d/my_env.sh

添加内容

#HIVE_HOME

export HIVE_HOME=/opt/module/hive

export PATH=$PATH:$HIVE_HOME/bin

source操作

source /etc/profile.d/my_env.sh

将MySQL的JDBC驱动拷贝到Hive的lib目录下

cp /opt/software/mysql-connector-java-5.1.37.jar $HIVE_HOME/lib

在$HIVE_HOME/conf目录下新建hive-site.xml文件

[atguigu@hadoop102 software]$ vim $HIVE_HOME/conf/hive-site.xml

添加如下内容:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

    <!-- jdbc连接的URL -->

    <property>

        <name>javax.jdo.option.ConnectionURL</name>

        <value>jdbc:mysql://hadoop102:3306/metastore?useSSL=false&useUnicode=true&characterEncoding=UTF-8</value>

    </property>

    <!-- jdbc连接的Driver-->

    <property>

        <name>javax.jdo.option.ConnectionDriverName</name>

        <value>com.mysql.jdbc.Driver</value>

    </property>

    <!-- jdbc连接的username-->

    <property>

        <name>javax.jdo.option.ConnectionUserName</name>

        <value>root</value>

    </property>

    <!-- jdbc连接的password -->

    <property>

        <name>javax.jdo.option.ConnectionPassword</name>

        <value>123456</value>

    </property>

    <!-- Hive默认在HDFS的工作目录 -->

    <property>

        <name>hive.metastore.warehouse.dir</name>

        <value>/user/hive/warehouse</value>

    </property>

    <!-- Hive元数据存储的验证 -->

    <property>

        <name>hive.metastore.schema.verification</name>

        <value>false</value>

    </property>

    <!-- 元数据存储授权  -->

    <property>

        <name>hive.metastore.event.db.notification.api.auth</name>

        <value>false</value>

    </property>

    <!-- 指定hiveserver2连接的host -->

    <property>

        <name>hive.server2.thrift.bind.host</name>

        <value>hadoop102</value>

    </property>

    <!-- 指定hiveserver2连接的端口号 -->

    <property>

        <name>hive.server2.thrift.port</name>

        <value>10000</value>

    </property>

    <!-- hiveserver2高可用参数,开启此参数可以提高hiveserver2启动速度 -->

    <property>

        <name>hive.server2.active.passive.ha.enable</name>

        <value>true</value>

    </property>

    <!-- 指定metastore服务的地址 -->

    <property>

        <name>hive.metastore.uris</name>

        <value>thrift://hadoop102:9083</value>

    </property>

    <!-- 打印表名 -->

    <property>

        <name>hive.cli.print.header</name>

        <value>true</value>

    </property>

    <!-- 打印库名 -->

    <property>

        <name>hive.cli.print.current.db</name>

        <value>true</value>

    </property></configuration>

8.3.4 初始化Hive元数据库

登录MySQL

mysql -uroot -p123456

新建Hive元数据库

mysql> create database metastore;

mysql> quit;

初始化Hive元数据库(修改为采用MySQL存储元数据)

bin/schematool -dbType mysql -initSchema -verbose

8.3.5 启动Hive Metastore和Hiveserver2服务(附脚本)

启动hiveserver2和metastore服务的命令如下:

bin/hive --service hiveserver2

bin/hive --service metastore

也可编写脚本进行一键启停

vim $HIVE_HOME/bin/hiveservices.sh

#!/bin/bash

HIVE_LOG_DIR=$HIVE_HOME/logs

if [ ! -d $HIVE_LOG_DIR ]

then

mkdir -p $HIVE_LOG_DIR

fi

#检查进程是否运行正常,参数1为进程名,参数2为进程端口

function check_process()

{

    pid=$(ps -ef 2>/dev/null | grep -v grep | grep -i $1 | awk '{print $2}')

    ppid=$(netstat -nltp 2>/dev/null | grep $2 | awk '{print $7}' | cut -d '/' -f 1)

    echo $pid

    [[ "$pid" =~ "$ppid" ]] && [ "$ppid" ] && return 0 || return 1

}

function hive_start()

{

    metapid=$(check_process HiveMetastore 9083)

    cmd="nohup hive --service metastore >$HIVE_LOG_DIR/metastore.log 2>&1 &"

    [ -z "$metapid" ] && eval $cmd || echo "Metastroe服务已启动"

    server2pid=$(check_process HiveServer2 10000)

    cmd="nohup hive --service hiveserver2 >$HIVE_LOG_DIR/hiveServer2.log 2>&1 &"

    [ -z "$server2pid" ] && eval $cmd || echo "HiveServer2服务已启动"

}

function hive_stop()

{

metapid=$(check_process HiveMetastore 9083)

    [ "$metapid" ] && kill $metapid || echo "Metastore服务未启动"

    server2pid=$(check_process HiveServer2 10000)

    [ "$server2pid" ] && kill $server2pid || echo "HiveServer2服务未启动"

}

case $1 in

"start")

    hive_start

    ;;

"stop")

    hive_stop

    ;;

"restart")

    hive_stop

    sleep 2

    hive_start

    ;;

"status")

    check_process HiveMetastore 9083 >/dev/null && echo "Metastore服务运行正常" || echo "Metastore服务运行异常"

    check_process HiveServer2 10000 >/dev/null && echo "HiveServer2服务运行正常" || echo "HiveServer2服务运行异常"

    ;;

*)

    echo Invalid Args!

    echo 'Usage: '$(basename $0)' start|stop|restart|status'

    ;;

esac

给脚本执行权限

chmod u+x $HIVE_HOME/bin/hiveservices.sh

启动hiveserver2和metastore服务

hiveservices.sh start

8.3.6 使用外部工具连接Hive(可选)

可使用Navicat/Datagrip等外部数据库工具进行连接,端口为10000。

8.4 模拟数据准备

通常企业在开始搭建数仓时,业务系统中会存在历史数据,一般是业务数据库存在历史数据,而用户行为日志无历史数据。假定数仓上线的日期为2020-06-14,为模拟真实场景,需准备以下数据。

8.4.1日志数据

用户行为日志,一般是没有历史数据的,故日志只需要准备2020-06-14一天的数据。具体操作如下:

(1)生成模拟数据

(1)启动日志采集通道,包括Flume、Kafka等

(2)修改两个日志服务器(hadoop102、hadoop103)中的

/opt/module/applog/application.yml配置文件,将mock.date参数改为2020-06-14。

(3)执行日志生成脚本lg.sh。

(4)观察Kafka的topic中是否出现相应数据

(2)使用Flink SQL将Kafka中的数据映射为表

库为kafka_log

CREATE TABLE `kafka_log`.`kafka_topic_log`  (

  `common`   ROW<ar STRING,ba STRING,ch STRING,is_new STRING,md STRING,mid STRING,os STRING,uid STRING,vc STRING>,

  `page`     ROW<during_time STRING,item STRING,item_type STRING,last_page_id STRING,page_id STRING,source_type STRING> ,

  `actions`  ARRAY<ROW<action_id STRING,item STRING,item_type STRING,ts BIGINT>>,

  `displays` ARRAY<ROW<display_type STRING,item STRING,item_type STRING,`order` STRING,pos_id STRING>>,

  `start`    ROW<entry STRING,loading_time BIGINT,open_ad_id BIGINT,open_ad_ms BIGINT,open_ad_skip_ms BIGINT>,

  `err`      ROW<error_code BIGINT,msg STRING>,

  `ts`       BIGINT

) WITH (

    'connector' = 'kafka',

    'topic' = 'topic_log',

    'properties.bootstrap.servers' = 'hadoop102:9092,hadoop103:9092,hadoop104:9092',

    'properties.group.id' = 'hudi_source',

    'scan.startup.mode' = 'latest-offset',

    'format' = 'json',

    'json.fail-on-missing-field'='false',

    'json.ignore-parse-errors' = 'true'

);

8.4.2业务数据

业务数据一般存在历史数据,此处需准备2020-06-10至2020-06-14的数据。具体操作如下。(1)生成模拟数据

1修改hadoop102节点上的/opt/module/db_log/application.properties文件,将mock.date、mock.clear,mock.clear.user三个参数调整为如图所示的值。

2执行模拟生成业务数据的命令,生成第一天2020-06-10的历史数据。

[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-01-22.jar

3修改/opt/module/db_log/application.properties文件,将mock.date、mock.clear,mock.clear.user三个参数调整为如图所示的值。

4执行模拟生成业务数据的命令,生成第二天2020-06-11的历史数据。

[atguigu@hadoop102 db_log]$ java -jar gmall2020-mock-db-2021-10-10.jar

5之后只修改/opt/module/db_log/application.properties文件中的mock.date参数,依次改为2020-06-12,2020-06-13,2020-06-14,并分别生成对应日期的数据。

(2)使用Flink CDC将MySQL的数据映射为Flink中的表

库为flink_cdc

表中除了有业务数据还有维度数据

-- 创建mysql-cdc表,34张

CREATE TABLE `flink_cdc`.`activity_info_cdc`  (

  `id` bigint,

  `activity_name` varchar(200),

  `activity_type` varchar(10),

  `activity_desc` varchar(2000),

  `start_time` timestamp(0),

  `end_time` timestamp(0),

  `create_time` timestamp(0),

  PRIMARY KEY(id) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'activity_info');

    

CREATE TABLE `flink_cdc`.`activity_rule_cdc`  (

  `id` int,

  `activity_id` int,

  `activity_type` varchar(20),

  `condition_amount` decimal(16, 2),

  `condition_num` bigint,

  `benefit_amount` decimal(16, 2),

  `benefit_discount` decimal(10, 2),

  `benefit_level` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'activity_rule');

    

# 未用

CREATE TABLE flink_cdc.`activity_sku_cdc`  (

  `id` bigint,

  `activity_id` bigint,

  `sku_id` bigint,

  `create_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'activity_sku');

# 未用

CREATE TABLE flink_cdc.`base_attr_info_cdc`  (

  `id` bigint,

  `attr_name` varchar(100),

  `category_id` bigint,

  `category_level` int,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_attr_info');

    

# 未用

CREATE TABLE flink_cdc.`base_attr_value_cdc`  (

  `id` bigint,

  `value_name` varchar(100),

  `attr_id` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_attr_value');

    

CREATE TABLE flink_cdc.`base_category1_cdc`  (

  `id` bigint,

  `name` varchar(10),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_category1');

    

CREATE TABLE flink_cdc.`base_category2_cdc`  (

  `id` bigint,

  `name` varchar(200),

  `category1_id` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_category2');

    

CREATE TABLE flink_cdc.`base_category3_cdc`  (

  `id` bigint,

  `name` varchar(200),

  `category2_id` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_category3');

    

    

CREATE TABLE flink_cdc.`base_dic_cdc`  (

  `dic_code` varchar(10),

  `dic_name` varchar(100),

  `parent_code` varchar(10),

  `create_time` timestamp(0),

  `operate_time` timestamp(0),

  PRIMARY KEY (`dic_code`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_dic');

    

    

CREATE TABLE flink_cdc.`base_province_cdc`  (

  `id` bigint,

  `name` varchar(20),

  `region_id` varchar(20),

  `area_code` varchar(20),

  `iso_code` varchar(20),

  `iso_3166_2` varchar(20)

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_province');

    

CREATE TABLE flink_cdc.`base_region_cdc`  (

  `id` varchar(20),

  `region_name` varchar(20)

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_region');

    

    

CREATE TABLE flink_cdc.`base_trademark_cdc`  (

  `id` bigint,

  `tm_name` varchar(100),

  `logo_url` varchar(200),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'base_trademark');

    

    

CREATE TABLE flink_cdc.`cart_info_cdc`  (

  `id` bigint,

  `user_id` varchar(200),

  `sku_id` bigint,

  `cart_price` decimal(10, 2),

  `sku_num` int,

  `img_url` varchar(200),

  `sku_name` varchar(200),

  `is_checked` int,

  `create_time` timestamp(0),

  `operate_time` timestamp(0),

  `is_ordered` bigint,

  `order_time` timestamp(0),

  `source_type` varchar(20),

  `source_id` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'cart_info');

    

CREATE TABLE flink_cdc.`comment_info_cdc`  (

  `id` bigint,

  `user_id` bigint,

  `nick_name` varchar(20),

  `head_img` varchar(200),

  `sku_id` bigint,

  `spu_id` bigint,

  `order_id` bigint,

  `appraise` varchar(10),

  `comment_txt` varchar(2000),

  `create_time` timestamp(0),

  `operate_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'comment_info');

    

CREATE TABLE flink_cdc.`coupon_info_cdc`  (

  `id` bigint,

  `coupon_name` varchar(100),

  `coupon_type` varchar(10),

  `condition_amount` decimal(10, 2),

  `condition_num` bigint,

  `activity_id` bigint,

  `benefit_amount` decimal(16, 2),

  `benefit_discount` decimal(10, 2),

  `create_time` timestamp(0),

  `range_type` varchar(10),

  `limit_num` int,

  `taken_count` int,

  `start_time` timestamp(0),

  `end_time` timestamp(0),

  `operate_time` timestamp(0),

  `expire_time` timestamp(0),

  `range_desc` varchar(500),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'coupon_info');

    

# 未用

CREATE TABLE flink_cdc.`coupon_range_cdc`  (

  `id` bigint,

  `coupon_id` bigint,

  `range_type` varchar(10),

  `range_id` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'coupon_range');

    

CREATE TABLE flink_cdc.`coupon_use_cdc`  (

  `id` bigint,

  `coupon_id` bigint,

  `user_id` bigint,

  `order_id` bigint,

  `coupon_status` varchar(10),

  `create_time` timestamp(0),

  `get_time` timestamp(0),

  `using_time` timestamp(0),

  `used_time` timestamp(0),

  `expire_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'coupon_use');

    

CREATE TABLE flink_cdc.`favor_info_cdc`  (

  `id` bigint,

  `user_id` bigint,

  `sku_id` bigint,

  `spu_id` bigint,

  `is_cancel` varchar(1),

  `create_time` timestamp(0),

  `cancel_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'favor_info');

    

    

CREATE TABLE flink_cdc.`order_detail_cdc`  (

  `id` bigint,

  `order_id` bigint,

  `sku_id` bigint,

  `sku_name` varchar(200),

  `img_url` varchar(200),

  `order_price` decimal(10, 2),

  `sku_num` bigint,

  `create_time` timestamp(0),

  `source_type` varchar(20),

  `source_id` bigint,

  `split_total_amount` decimal(16, 2),

  `split_activity_amount` decimal(16, 2),

  `split_coupon_amount` decimal(16, 2),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'order_detail');

CREATE TABLE flink_cdc.`order_detail_activity_cdc`  (

  `id` bigint,

  `order_id` bigint,

  `order_detail_id` bigint,

  `activity_id` bigint,

  `activity_rule_id` bigint,

  `sku_id` bigint,

  `create_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'order_detail_activity');

    

    

CREATE TABLE flink_cdc.`order_detail_coupon_cdc`  (

  `id` bigint,

  `order_id` bigint,

  `order_detail_id` bigint,

  `coupon_id` bigint,

  `coupon_use_id` bigint,

  `sku_id` bigint,

  `create_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'order_detail_coupon');

    

CREATE TABLE flink_cdc.`order_info_cdc`  (

  `id` bigint,

  `consignee` varchar(100),

  `consignee_tel` varchar(20),

  `total_amount` decimal(10, 2),

  `order_status` varchar(20),

  `user_id` bigint,

  `payment_way` varchar(20),

  `delivery_address` varchar(1000),

  `order_comment` varchar(200),

  `out_trade_no` varchar(50),

  `trade_body` varchar(200),

  `create_time` timestamp(0),

  `operate_time` timestamp(0),

  `expire_time` timestamp(0),

  `process_status` varchar(20),

  `tracking_no` varchar(100),

  `parent_order_id` bigint,

  `img_url` varchar(200),

  `province_id` int,

  `activity_reduce_amount` decimal(16, 2),

  `coupon_reduce_amount` decimal(16, 2),

  `original_total_amount` decimal(16, 2),

  `feight_fee` decimal(16, 2),

  `feight_fee_reduce` decimal(16, 2),

  `refundable_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'order_info');

    

    

CREATE TABLE flink_cdc.`order_refund_info_cdc`  (

  `id` bigint,

  `user_id` bigint,

  `order_id` bigint,

  `sku_id` bigint,

  `refund_type` varchar(20),

  `refund_num` bigint,

  `refund_amount` decimal(16, 2),

  `refund_reason_type` varchar(200),

  `refund_reason_txt` varchar(20),

  `refund_status` varchar(10),

  `create_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'order_refund_info');

    

CREATE TABLE flink_cdc.`order_status_log_cdc`  (

  `id` bigint,

  `order_id` bigint,

  `order_status` varchar(11),

  `operate_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'order_status_log');

    

    

CREATE TABLE flink_cdc.`payment_info_cdc`  (

  `id` int,

  `out_trade_no` varchar(50),

  `order_id` bigint,

  `user_id` bigint,

  `payment_type` varchar(20),

  `trade_no` varchar(50),

  `total_amount` decimal(10, 2),

  `subject` varchar(200),

  `payment_status` varchar(20),

  `create_time` timestamp(0),

  `callback_time` timestamp(0),

  `callback_content` string,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'payment_info');

    

    

CREATE TABLE flink_cdc.`refund_payment_cdc`  (

  `id` int,

  `out_trade_no` varchar(50),

  `order_id` bigint,

  `sku_id` bigint,

  `payment_type` varchar(20),

  `trade_no` varchar(50),

  `total_amount` decimal(10, 2),

  `subject` varchar(200),

  `refund_status` varchar(30),

  `create_time` timestamp(0),

  `callback_time` timestamp(0),

  `callback_content` string,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'refund_payment');

    

CREATE TABLE flink_cdc.`sku_attr_value_cdc`  (

  `id` bigint,

  `attr_id` bigint,

  `value_id` bigint,

  `sku_id` bigint,

  `attr_name` varchar(30),

  `value_name` varchar(30),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'sku_attr_value');

    

CREATE TABLE flink_cdc.`sku_info_cdc`  (

  `id` bigint,

  `spu_id` bigint,

  `price` decimal(10, 0),

  `sku_name` varchar(200),

  `sku_desc` varchar(2000),

  `weight` decimal(10, 2),

  `tm_id` bigint,

  `category3_id` bigint,

  `sku_default_img` varchar(300),

  `is_sale` boolean,

  `create_time` timestamp(0),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'sku_info');

    

    

CREATE TABLE flink_cdc.`sku_sale_attr_value_cdc`  (

  `id` bigint,

  `sku_id` bigint,

  `spu_id` int,

  `sale_attr_value_id` bigint,

  `sale_attr_id` bigint,

  `sale_attr_name` varchar(30),

  `sale_attr_value_name` varchar(30),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'sku_sale_attr_value');

    

    

CREATE TABLE flink_cdc.`spu_info_cdc`  (

  `id` bigint,

  `spu_name` varchar(200),

  `description` varchar(1000),

  `category3_id` bigint,

  `tm_id` bigint,

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'spu_info');

    

    

# 未用

CREATE TABLE flink_cdc.`spu_sale_attr_cdc`  (

  `id` bigint,

  `spu_id` bigint,

  `base_sale_attr_id` bigint,

  `sale_attr_name` varchar(20),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'spu_sale_attr');

    

    

# 未用

CREATE TABLE flink_cdc.`spu_sale_attr_value_cdc`  (

  `id` bigint,

  `spu_id` bigint,

  `base_sale_attr_id` bigint,

  `sale_attr_value_name` varchar(20),

  `sale_attr_name` varchar(20),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'spu_sale_attr_value');

    

    

# 未用

CREATE TABLE flink_cdc.`user_address_cdc`  (

  `id` bigint,

  `user_id` bigint,

  `province_id` bigint,

  `user_address` varchar(500),

  `consignee` varchar(40),

  `phone_num` varchar(40),

  `is_default` varchar(1),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'user_address');

    

    

CREATE TABLE flink_cdc.`user_info_cdc`  (

  `id` bigint,

  `login_name` varchar(200),

  `nick_name` varchar(200),

  `passwd` varchar(200),

  `name` varchar(200),

  `phone_num` varchar(200),

  `email` varchar(200),

  `head_img` varchar(200),

  `user_level` varchar(200),

  `birthday` date,

  `gender` varchar(1),

  `create_time` timestamp(0),

  `operate_time` timestamp(0),

  `status` varchar(200),

  PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

    'connector' = 'mysql-cdc',

    'hostname' = 'hadoop102',

    'port' = '3306',

    'server-time-zone' = 'Asia/Shanghai',

    'username' = 'root',

    'password' = '123456',

    'database-name' = 'gmall',

    'table-name' = 'user_info');

本文标签: Hudi