【问题标题】:KSQL: UDF does not accept parameters (STRING, STRING)KSQL:UDF 不接受参数(字符串、字符串)
【发布时间】:2020-11-02 16:49:02
【问题描述】:

我在尝试使用 UDF 设置 ETL 管道时遇到了 KSQL 问题。在 ETL 过程中的某个时刻,我需要将特定信息与数据中的描述字段 (VARCHAR) 隔离开来。一个虚构的上下文示例:

description = "species=dog.sex=male.color=blonde.age=10."(真实数据格式相同)

我编写了一个简单的 UDF 来按需隔离任何信息。它看起来像这样:

package com.my.package;

/** IMPORTS **/
import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;

/** ClASS DEFINITION **/
@UdfDescription(name = "extract_from_description",
                author = "Me",
                version = "0.0.1",
                description = "Given a description and a request for information, isolates and returns the requested information. Pass requested tag as 'tag='".) 
public class Extract_From_Description {

    @Udf(description = "Given a description and a request for information, isolates and returns the requested information. Pass requested tag as 'tag='.)
    public String extract_from_description(final String description, final String request) {
        return description.split(request)[1].split("\\.")[0];
    }
}

我可以上传和注册函数就好了,它在我运行时列出并正确描述:

ksql> list functions;
ksql> describe function EXTRACT_FROM_DESCRIPTION;

我这样调用函数来创建一个新流:

CREATE STREAM result AS
    SELECT recordId,
           OtherVariables,
           EXTRACT_FROM_DESCRIPTION(description, 'species=') AS species
    FROM parent_stream
    EMIT CHANGES;

我收到一个我无法理解的错误:

函数“extract_from_description”不接受参数(STRING、STRING)。 有效的替代方案是:

显然 KSQL 无法正确解释函数的输入应该是什么(看起来它不需要输入?)我不知道为什么。我已经阅读了文档,看看我是否以一种奇怪的方式定义了我的函数,但找不到示例和我的函数之间的任何区别。我确实注意到应该有几种方法来定义函数接受的输入并尝试了所有方法,但结果总是相同的。

我使用 Maven 为这个函数创建了 jar 文件(JDK1.8.0_201)。谁能帮我弄清楚发生了什么?

TL;DR: My KSQL UDF 不接受类型为 (String, String) 的输入,即使函数指定输入应为 (String, String) 类型

【问题讨论】:

    标签: java apache-kafka jar user-defined-functions ksqldb


    【解决方案1】:

    找到问题,在这里为可能遇到相同问题的任何人回答。 您需要使用@UdfParameter 指定参数,如下所示:

    import io.confluent.ksql.function.udf.UdfParameter; // add this to the list of imports
    
    // add @UdfParameter(name) to each input variable
    public String extract_from_description(@UdfParameter(value = "description") final String description, @UdfParameter(value = "request") final String request){
               
      function body
    
    }
    

    【讨论】:

    • 我将参数标记为@UDFParameter,并没有将它们标记为“最终”,花了 3 个小时在上面。你的帖子有帮助,谢谢。 docs.ksqldb.io/en/latest/how-to-guides/… 以上来自 confluent 的教程应更新以包含此内容。
    猜你喜欢
    • 2017-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-23
    • 1970-01-01
    • 2013-05-05
    • 2016-07-18
    • 2014-08-05
    相关资源
    最近更新 更多