普元数据集成平台 普元数据集成平台
产品介绍
安装部署
快速入门
操作指南
FAQ
  • Kafka Sink组件使用说明
  • 组件说明
  • 配置项说明
  • FAQ

# Kafka Sink组件使用说明

  • 组件说明
  • 配置项说明
  • 使用示例
    • 读取文件数据写入Kafka
    • 将neo4j数据库数据经过处理后写入Kafka
  • FAQ

# 组件说明

Kafka Sink 连接器。用于将数据写入 Kafka。

# 配置项说明

配置名称 数据类型 是否必填 默认值 描述
节点名称 String 是 - 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。
节点编码 String 是 - 当前创建的节点编码,以此标识此组件,由用户自定义且不可为空。命名可包含字母、数字、下划线。
选择数据源 String 是 - 从下拉选项中选择列出的当前项目已经关联的数据源。
主题名称 String 是 - 填写主题名称。
消息投递语义 String 是 NON 消息投递语义支持三种:NON、EXACTLY_ONCE、AT_LEAST_ONCE。
EXACTLY_ONCE,生产者将在 Kafka 事务中写入所有消息,这些消息将在检查点上提交给 Kafka。
AT_LEAST_ONCE,生产者将等待 Kafka 缓冲区中所有未完成的消息被 Kafka 生产者在检查点上确认。
NON 不提供任何保证:如果 Kafka 代理出现问题,消息可能会丢失,并且消息可能会重复。
使用消息分区策略 Boolean 是 否 分为两种,指定分区和分配分区
指定分区:所有消息都会发送到这个分区。
分配分区:
可以根据消息的内容来决定发送哪个 partition。该参数的作用是分发信息。
比如一共有五个分区,config中的assign_partitions字段如下:assign_partitions = ["shoe", "clothing"]
然后包含“shoe”的消息将被发送到分区0,因为“shoe”在assign_partitions中被订阅为0,而包含“clothing”的消息将被发送到分区1。对于其他消息,将使用哈希算法来将它们分成剩余的分区。
这个函数按MessageContentPartitioner类实现了org.apache.kafka.clients.producer.Partitioner接口。如果我们需要自定义分区,我们也需要实现这个接口。
⚠️ 当“分区“、”分配分区字段“ 配置项同时设置时,消息发送到那个分区,以”分区“配置项设置的分区为准,所有消息都会发送到”分区“配置项所指定的那个分区。
⚠️ 当没有设置 “分区“ 配置项时,设置”分配分区“ 才会生效。
是键取值列 String 否 - 配置哪些字段作为 kafka 消息的 key。如果你想使用来自上游数据的字段值作为键,你可以为这个属性分配字段名称。 如果未设置分区键字段,则将发送空消息键。 消息 key 的格式为 json,如果 key 设置为 name,例如'{"name":"Jack"}'。 所选字段必须是上游中的现有字段。
数据格式 String 是 json 数据格式支持:json、text。默认格式为 json。默认的字段分隔符是","。如果自定义分隔符,请添加“字段分隔符”选项。
可选参数 否 - 其他参数,用户可以根据需求进行配置。

# FAQ

← Inceptor LocalFile →