普元数据集成平台 普元数据集成平台
产品介绍
安装部署
快速入门
操作指南
FAQ
  • Kafka复杂Json数据写入文件
  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 配置组件属性
  • 运行
  • 查看数据
  • 提交版本

# Kafka复杂Json数据写入文件

本示例主要演示从Kafka表中读取数据,通过JsonPath组件进行字段配置,将处理后的数据写入文件中。

主要步骤如下:

  • 准备数据
  • 新建同步作业
  • 拖拽图元
  • 运行
  • 查看数据
  • 提交版本

# 准备数据

1、在Kafka中创建主题example-userinfo,在主题中写入数据:

Kafka数据:
{
  "_id": "670f897bf887ef30bee8ca2e",
  "id": 1,
  "name": "张三",
  "age": 18,
  "hobby": [
    "音乐",
    "美术",
    "舞蹈"
  ],
  "birthday": "2024-10-16",
  "leader": true,
  "salary": 1E+5,
  "createTime": "1970-01-21",
  "address": {
    "city": "New York",
    "zip": "10001",
    "coordinates": {
      "lng": -74.006,
      "lat": 40.7128
    }
  }
}

2、在seatunnel目录下创建localfile文件夹,复制目录路径备用。 此示例的目录路径为:/home/seatunnel/apache-seatunnel-7.0.2/localfile

# 新建同步作业

点击数据同步上的【...】,选择弹出菜单【新建数据同步作业】,作业名称为:KafkaSource-JsonPath-SQL-LocalFile。

# 拖拽图元

依次拖拽数据源中的Kafka Source组件、转换中的JsonPath组件、SQL组件和目标中的LocalFile Sink组件,依次连线。如下图所示:

kafkasource

# 配置组件属性

1、双击"Kafka Source"组件,根据下图所示步骤依次配置。

kafkasource

2、双击"LocalFile Sink"组件,根据下图所示步骤依次配置。

kafkasource

3、双击"SQL"组件,根据下图所示步骤依次配置。
查询SQL:select id, name, age, hobby, birthday, city from JsonPath1729735209476
其中JsonPath1729735209476为JsonPath组件的节点编码

kafkasource

4、双击"JsonPath"组件,根据下图所示步骤依次配置。

kafkasource

5、Ctrl+S保存该模型。

# 运行

点击【运行】按钮,可以运行已经开发完毕的场景,在日志栏可以看运行日志及运行结果。

kafkasource

# 查看数据

在/home/seatunnel/apache-seatunnel-7.0.2/localfile下查看数据。

kafkasource

# 提交版本

当草稿运行正常后,点击【提交】按钮可以将该版本提交到作业调度,每次修改提交都会生成新的版本,可以看到提交的历史版本,并可以随意切换版本。

提交后的版本,可以在作业调度中进行"定时"调度配置。

kafkasource