实时数仓三分钟搞定Flink Cdc
发布时间:2022-01-14 16:11:14 所属栏目:大数据 来源:互联网
导读:Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC)从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。 特点 支持读取数据库快照,并
Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC)从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。 特点 支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义 对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。 对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。 使用场景 数据库之间的增量数据同步 审计日志 数据库之上的实时物化视图 基于CDC的维表join … Flink提供的 table format Flink提供了一系列可以用于table connector的table format,具体如下: Formats Supported Connectors CSV Apache Kafka, Filesystem JSON Apache Kafka, Filesystem, Elasticsearch Apache Avro Apache Kafka, Filesystem Debezium CDC Apache Kafka Canal CDC Apache Kafka Apache Parquet Filesystem Apache ORC Filesystem 使用过程中的注意点 使用MySQL CDC的注意点 如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖: <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.0.0</version> </dependency> 如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。 使用canal-json的注意点 如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖: <!-- universal --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.11.0</version> </dependency> 如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。由于Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动添加依赖包,否则会报如下错误: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen mysql-cdc 使用changelog-json的注意点 如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖: <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-format-changelog-json</artifactId> <version>1.0.0</version> </dependency> 如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。 mysql-cdc的操作实践 创建MySQL数据源表 在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下: Flink SQL Cli创建CDC数据源 启动 Flink 集群,再启动 SQL CLI,执行下面命令: -- 创建订单信息表 在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert。 在SQL CLI中创建订单详情表: 查询结果如下: WHERE order_status = '2' -- 订单已支付 GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd'); 查询表看一下结果: 再查一下kafka的数据: {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"} 当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再观察数据. (编辑:信阳站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |