SpringBoot整合Flink CDC,实时追踪mysql数据变动

前端 / 2024-08-27 / 原文

前言
Flink CDC(Flink Change Data Capture)是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

Flink CDC的应用场景广泛,包括但不限于实时数据仓库更新、实时数据同步和迁移以及实时数据处理等。它还能确保数据一致性,并在数据发生变更时准确地进行捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,便于数据的捕获和处理。

接下来,将详细介绍MySQL CDC的使用。MySQL CDC连接器允许从MySQL数据库中读取快照数据和增量数据。

  1. MySQL开启Binlog
    MySQL中开启binlog功能,需要修改配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini)的[mysqld]部分设置相关参数:

[mysqld]
server-id=1

设置日志格式为行级格式

binlog-format=Row

设置binlog日志文件的前缀

log-bin=mysql-bin

指定需要记录二进制日志的数据库

binlog_do_db=testjpa
1
2
3
4
5
6
7
8
除了开启binlog功能外,还需要为Flink CDC配置相应的权限,以确保其能够正常连接到MySQL并读取数据。这包括授予Flink CDC连接MySQL的用户必要的权限,如SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。

检查是否已开启binlog功能:

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1
2
3
4
5
6
至此,MySQL的相关配置已完成。

  1. 创建Spring Boot项目
    首先,你需要创建一个Spring Boot项目。可以使用Spring Initializr(https://start.spring.io/)来快速生成项目。

  2. 添加依赖
    在pom.xml中添加Apache Flink和Flink CDC的依赖。以下是必要的依赖:

org.apache.flink flink-java 1.14.0 org.apache.flink flink-streaming-java_2.12 1.14.0 org.apache.flink flink-connector-mysql-cdc 2.0.0 org.springframework.boot spring-boot-starter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 4. 配置Flink和MySQL CDC 在Spring Boot的application.yml或application.properties文件中配置Flink和MySQL数据库连接:

flink:
checkpoint:
interval: 10000
parallelism: 1

spring:
datasource:
url: jdbc:mysql://localhost:3306/your_database
username: your_username
password: your_password
1
2
3
4
5
6
7
8
9
10
5. 实现数据实时追踪
创建一个服务类来实现数据的实时追踪:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;

@Service
public class FlinkCdcService {

public void startDataStreaming() {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // 使用Flink CDC连接MySQL
    String name = "inventory";
    tableEnv.executeSql("CREATE TABLE " + name + " (" +
        "  id INT," +
        "  name STRING," +
        "  description STRING," +
        "  weight DECIMAL(10, 3)" +
        ") WITH (" +
        "  'connector' = 'mysql-cdc'," +
        "  'hostname' = 'localhost'," +
        "  'port' = '3306'," +
        "  'username' = 'your_username'," +
        "  'password' = 'your_password'," +
        "  'database-name' = 'your_database'," +
        "  'table-name' = 'your_table'" +
        ")");

    // 查询并打印结果
    DataStream<String> dataStream = tableEnv.sqlQuery("SELECT * FROM " + name).execute().print();

    try {
        env.execute("Flink CDC Demo");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
6. 启动Spring Boot应用
在你的Spring Boot应用的启动类中调用FlinkCdcService的startDataStreaming方法来启动数据追踪:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FlinkCdcApplication implements CommandLineRunner {

@Autowired
private FlinkCdcService flinkCdcService;

public static void main(String[] args) {
    SpringApplication.run(FlinkCdcApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
    flinkCdcService.startDataStreaming();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
7. 运行并测试
运行Spring Boot应用,并在MySQL数据库中做出一些数据变动。你应该能在控制台看到实时打印的数据变动。
————————————————

                        版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。