普元数据开发平台 普元数据开发平台
产品介绍
安装部署
快速入门
操作指南
应用集成
FAQ
  • Kafka输入组件使用说明
  • 组件说明
  • 配置项说明
  • FAQ

# Kafka输入组件使用说明

  • 组件说明
  • 配置项说明
  • 使用示例
    • kafka配置用例
    • 读取kafka中的json数据输出到DB示例
  • FAQ

# 组件说明

Kafka输入组件用于从 Apache Kafka 数据库中读取数据。

# 配置项说明

配置名称 数据类型 是否必须 默认值 描述
节点名称 String 是 - 当前创建的节点名称,由用户自定义且不可为空。命名可包含字母、数字、下划线。
选择数据源 String 是 - 当前输入绑定的数据源名称,从下拉选项中列出的指定的关联类型(Kafka)的数据源进行选择。
topic String 是 - Kafka 主题名称。
消费组名称 String 是 - Kafka 消费组名称 。
超时时间(ms) Integer 是 300000 读取超时时间。
启用超时终止 Boolean 是 false 超时终止的开关。如果启用则超时后会终止运行。
字段名称 String 是 KafkaMessageName 要读取的字段名称。
键值 String 是 - 通过键值设置更多的参数。

# FAQ

Q: 当使用的kafka需要认证时,DWS模型和DI Server要怎么配置?

A:不同的认证方式,使用的配置项不同,具体如下:

  • 选择 SSL+JKS 双向认证的kafka数据源时:

    步骤1. IDE中,kafka输入和kafka输出组件会自动增加以下配置,其中ssl.keystore.location和 ssl.keystore.location需要修改为步骤2中jks文件的路径:

    security.protocol = SSL
    ssl.keystore.location = ${DI_HOME}/ssl/{datasourceCode}/{envType}/kafka-keystore.jks
    ssl.keystore.password = keystore-password
    ssl.key.password = key-password
    ssl.truststore.location = ${DI_HOME}/ssl/{datasourceCode}/{envType}/kafka-truststore.jks
    ssl.truststore.password = truststore-password
    ssl.client.auth = required
    ssl.endpoint.identification.algorithm =    #设置值为一个空格
    ssl.client.auth = required
    

    参数说明:

    security.protocol:设置为 SSL,表示使用 SSL/TLS 协议。

    ssl.truststore.location:信任库(TrustStore)文件的路径(JKS 或 PEM 格式)。

    ssl.truststore.location:信任库(TrustStore)文件的路径(JKS 或 PEM 格式)。

    ssl.keystore.location:密钥库(KeyStore)文件的路径(JKS 或 PEM 格式)。

    ssl.keystore.password:密钥库的密码。

    ssl.key.password:私钥的密码(如果与密钥库密码不同则设置;如果与密钥库密码相同则不添加此参数)。

    ssl.client.auth 有以下三种配置选项:

    ​ none:(默认值):服务器不要求客户端提供证书。仅服务器向客户端提供证书,客户端验证服务器的证书。

    ​ requested:服务器请求客户端提供证书,但即使客户端未提供证书,连接仍然可以建立。这是一种宽松的双向认证模式。

    ​ required:服务器要求客户端必须提供有效证书。如果客户端未提供证书或证书无效,连接将被拒绝。这是一种严格的双向认证模式。

    ssl.endpoint.identification.algorithm:设置端点验证算法,通常为 HTTPS 或留空。这里设置为一个空格

image-20250224165221516

步骤2. DI Server需要在${DI_HOME}/ssl/{datasourceCode}/{envType}/目录下放置kafka-keystore.jks、kafka-truststore.jks认证文件

注意:
DI_HOME 即Primeton DI安装目录
datasourceCode为模型中使用DWS系统中的kafka数据源编码
envType为环境类型,如dev、test、pro
ssl及下层目录不存在时,可手动创建
文件夹层级格式不可修改,每层都必须有
  • 选择SASL_PLAINTEXT 认证,且认证机制为SCRAM-SHA-256或SCRAM-SHA-512的kafka数据源时:

    IDE中,kafka输入和kafka输出组件会自动增加以下配置;DI Server不需要其他配置:

    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = SCRAM-SHA-256  或 SCRAM-SHA-512
    sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="<your_username>" password="<your_password>";
    

    参数说明:

    security.protocol:设置为 SASL_PLAINTEXT,使用 SASL 认证,但数据传输不加密(明文传输)。

    sasl.mechanism:指定 SASL 认证机制,SCRAM-SHA-256:使用 SCRAM-SHA-256 算法认证;SCRAM-SHA-512:使用 SCRAM-SHA-512 算法认证。

    sasl.jaas.config:配置 SASL 认证的用户名和密码。

    image-20250109160442481

  • 选择kerberos 认证方式的kafka数据源时:

    1. IDE中,kafka输入和kafka输出组件会自动增加以下配置:
    security.protocol = SASL_PLAINTEXT
    sasl.mechanism = GSSAPI
    

    image-20250109170848237

    1. DI Server需要在 ${DI Server}安装目录/diserver/kerberos 目录下放置krb5.conf、krb5.keytab、jaas.conf和config.properties文件。

    jaas.conf 文件内容:

    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        useTicketCache=false
        serviceName="kafka"
        keyTab="/path/to/krb5.keytab"     		#krb5.keytab文件存放路径
        principal="kafka/server@KAFKA.COM";		#kafka票据信息
    };
    

    config.properties文件内容:

    username.client.kerberos.principal=kafka/server@KAFKA.COM     #kafka票据信息
    

    krb5.conf和krb5.keytab文件从kafka服务器获取。

    以上文件修改后,需要重启DI Server服务。

    image-20250109170425274

← JSON输入 MongoDB输入 →