普元数据集成平台 普元数据集成平台
产品介绍
安装部署
快速入门
操作指南
FAQ
  • Kafka 到 PostgreSQL 的数据同步示例
  • 准备数据
  • 新建数据同步作业
  • 画布中拖拽图元
  • 配置"Kafka数据源"组件属性
  • 配置"SQL"组件属性
  • 配置"关系型数据库目标"组件属性
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# Kafka 到 PostgreSQL 的数据同步示例

本示例主要介绍 Kafka 到 PostgreSQL 的数据同步示例场景开发,该场景根据 Kafka 接收到的消息内容向 PostgreSQL 数据源的表 person 分别执行 INSERT、UPDATE、DELETE 的操作。主要步骤如下:

  • 准备数据
  • 新建数据同步作业
  • 画布中拖拽图元
  • 配置"Kafka数据源"组件属性
  • 配置"SQL"组件属性
  • 配置"关系型数据库目标"组件属性
  • 通用配置
  • 保存草稿
  • 运行
  • 提交版本

# 准备数据

在 PostgreSQL 数据源中创建一个表 person,并初始化一些数据,比如:要同步 UPDATE、DELETE 的数据。

CREATE TABLE public.person (
	pkid varchar NULL,
	id varchar NULL,
	name varchar NULL,
	loading_date varchar NULL,
	delete_flag varchar NULL,
	mod_user varchar NULL,
	mod_user_id varchar NULL
);

INSERT INTO person(pkid,id,name,loading_date,delete_flag,mod_user,mod_user_id) 
VALUES('279', '20211128', '陈丽', '2023-03-14 00:00:00.0', '1','annoy','75589');

INSERT INTO person(pkid,id,name,loading_date,delete_flag,mod_user,mod_user_id) 
VALUES('5', '20211128', '测试', '2023-03-14 00:00:00.0', '1','test','00001');

在 Kafka 数据源中创建名称为 person 的 Topic。并创建三个消息,具体如下:

消息一:表示 INSERT 操作的数据

{
  "_source_schema": "PUBLIC",
  "_source_table": "PERSON",
  "_committime": "2023-03-14 14:57:35.863",
  "_optype": "INSERT",
  "_seqno": "2261",
  "record": {
    "PKID": "825",
    "ID": "20211128",
    "NAME": "陈丽",
    "LOADING_DATE": "2023-03-14 00:00:00.0",
    "DELETE_FLAG": "1",
    "MOD_USER": "annoy",
    "MOD_USER_ID": "75589"
  }
}

消息二:表示 UPDATE 操作的数据

{
    "_source_schema": "PUBLIC",
    "_source_table": "PERSON",
    "_committime": "2023-03-14 18:13:43.622",
    "_optype": "UPDATE",
    "_seqno": "2264",
    "record": {
        "PKID": "279",
        "ID": "20210582",
        "NAME": "赵欣",
        "LOADING_DATE": "2023-03-14 00:00:00.0",
        "DELETE_FLAG": "1",
        "MOD_USER": "admin",
        "MOD_USER_ID": "94950"
    },
    "key": {
        "PKID": "279"
    }
}

消息三:表示 DELETE 操作的数据

{
    "_source_schema": "PUBLIC",
    "_source_table": "PERSON",
    "_committime": "2023-03-17 15:02:05.19",
    "_optype": "DELETE",
    "_seqno": "2267",
    "record": {
        "PKID": "5"
    },
    "key": {
        "PKID": "5"
    }
}

# 新建数据同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:udf-kafka2pg。

# 画布中拖拽图元

在画布中分别拖拽 1 个"Kafka数据源"图元、6 个"SQL"图元、3 个"关系型数据库目标"图元,并建立连线。

kafka2pg

# 配置"Kafka数据源"组件属性

在"Kafka数据源"图元上右键,点击【编辑】按钮,弹出"Kafka数据源"组件的弹窗。按照需求进行属性设置,点击【确定】按钮。

kafka2pg

kafka2pg

# 配置"SQL"组件属性

在"SQL"图元上右键,点击【编辑】按钮,弹出"SQL"组件的弹窗。分别设置"SQL"、"SQL1"、"SQL2"、"INSERT"、"UPDATE"、"DELETE"图元的属性。

⚠️ 提示:
查询 SQL 中 select 后边的 record 是"Kafka数据源"属性中"数据结构"中定义的字段。

查询 SQL 定义的含义是过滤出"Kafka数据源"的"person"主题中 _optype 为 INSERT 的数据。

kafka2pg

⚠️ 提示:查询 SQL 定义的含义是过滤出"Kafka数据源"的"person"主题中 _optype 为 UPDATE 的数据。

kafka2pg

⚠️ 提示:查询 SQL 定义的含义是过滤出"Kafka数据源"的"person"主题中 _optype 为 DELETE 的数据。

kafka2pg

后续三个节点编码分别为"INSERT"、"UPDATE"、"DELETE"的组件中,自定义查询 SQL 时用到了自定义函数 QDMX和hutool-all-5.8.20.jar将两个 jar 文件放至 ${seatunnel部署路径}/apache-seatunnel-2.3.5/lib/ 下。

函数具体编码如下:

package org.apache.seatunnel.example.engine;


import cn.hutool.core.util.StrUtil;
import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author primeton
 * @create 2023-08-18
 */
@AutoService(ZetaUDF.class)
public class QdmxUDF implements ZetaUDF {

    @Override
    public String functionName() {
        return "QDMX";
    }

    @Override
    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> list) {
        return BasicType.STRING_TYPE;
    }

    //list 参数实例:(也就是kafka 解析过来的数据)
    //SeaTunnelRow{tableId=, kind=+I, fields=[{key1=value1,key2=value2,.....}]}
    @Override
    public Object evaluate(List<Object> list) {
        String str = list.get(0).toString();
        //1 Remove the prefix
        str = StrUtil.replace(str, "SeaTunnelRow{tableId=, kind=+I, fields=[{", "");
        //2 Remove the suffix
        str = StrUtil.sub(str, -1, 0);
        //3 build Map key value
        Map<String, String> map = parseToMap(str);
        if ("null".equals(map.get(list.get(1).toString())))
            return "";
        //4 return the value of the key
        return map.get(list.get(1).toString());
    }

    public static Map<String, String> parseToMap(String input) {
        Map<String, String> map = new HashMap<>();
        //去除大括号 在字符串阶段去除
        input = input.replaceAll("[{}]", "");
        //拆分键值对
        String[] pairs = input.split(", ");

        for (String pair : pairs) {
            String[] keyValue = pair.split("=");
            if (keyValue.length == 2) {
                String key = keyValue[0].trim();
                String value = keyValue[1].trim();
                map.put(key, value);
            }
        }
        return map;
    }
}

节点编码为"INSERT"的查询 SQL 中编写的 SQL 语句如下:

select
  QDMX(record, 'PKID') as PKID,
  QDMX(record, 'ID') AS ID,
  QDMX(record, 'NAME') AS NAME,
  QDMX(record, 'LOADING_DATE') AS LOADING_DATE,
  QDMX(record, 'DELETE_FLAG') AS DELETE_FLAG,
  QDMX(record, 'MOD_USER') AS MOD_USER,
  QDMX(record, 'MOD_USER_ID') AS MOD_USER_ID
from
  SQL

kafka2pg

节点编码为"UPDATE"的查询 SQL 中编写的 SQL 语句如下:

select
  QDMX(record, 'PKID') as PKID,
  QDMX(record, 'ID') AS ID,
  QDMX(record, 'NAME') AS NAME,
  QDMX(record, 'LOADING_DATE') AS LOADING_DATE,
  QDMX(record, 'DELETE_FLAG') AS DELETE_FLAG,
  QDMX(record, 'MOD_USER') AS MOD_USER,
  QDMX(record, 'MOD_USER_ID') AS MOD_USER_ID,
  QDMX(record, 'PKID') as PKID
from
  SQL1

kafka2pg

节点编码为"DELETE"的查询 SQL 中编写的 SQL 语句如下:

select
  QDMX(record, 'PKID') as PKID
from
  SQL2

kafka2pg

⚠️ 提示:查询 SQL 中 from 后边的参数值是与其连接的前一个组件的"节点编码"。

# 配置"关系型数据库目标"组件属性

在"关系型数据库目标"图元上右键,点击【编辑】按钮,弹出"关系型数据库目标"组件的弹窗。分别设置"PG1"、"PG2"、"PG3"图元的属性。 其他属性使用默认值。

kafka2pg

kafka2pg

kafka2pg

# 通用配置

在通用配置中可以配置任务优先级、Worker 分组、本地参数、超时告警、选择引擎、部署方式、引用参数。

kafka2pg

# 保存草稿

如果所有组件属性都已设置完毕,点击【保存】按钮,可以看到保存过的历史草稿,并可以随意切换草稿。(草稿只保存最近 10 个)

可以参考示例关系型表数据同步示例 中的"保存草稿"说明。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

kafka2pg

可以在 PostgreSQL 数据库表 person 中看到 UPDATE、INSERT 的数据,pkid 为 5 的数据已经被删除。

kafka2pg

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

可以参考示例关系型表数据同步示例 中的"提交版本"说明。

← Kafka到PG的数据同步示例 Elasticsearch 同步到 MySQL示例 →