【问题标题】:KafkaStreams How to instantiate a ConsumerRecordFactory?KafkaStreams 如何实例化 ConsumerRecordFactory?
【发布时间】:2019-04-18 16:04:26
【问题描述】:

我正在尝试使用 Kafka Streams 提供的 ConsumerRecordFactory,主要是 confluent doc 来测试流应用程序,这是我到目前为止的代码:

// Properties of the application
Properties streamsConfiguration = new Properties();

// Give the Streams application a unique name.  The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
        "input-topic",
        new StringSerializer(),
        new IntegerSerializer()
);

// Create a test record
ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);

我的问题是当我编译我的代码时出现以下错误:

Error:(70, 52) java: reference to create is ambiguous
  both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
  and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match

所以我知道 kafka 流定义了泛型方法 create(K,V,long),并且当我使用非泛型类型创建工厂时,我创建了一个与第一个冲突的新方法。

我的问题是我应该如何实例化我的ConsumerRecordFactory

我尝试使用ConsumerRecordFactory&lt;Object, Integer&gt; 使我的工厂更通用,但推断的类型不匹配。而且我找不到其他示例,confluent github repo kafka-streams-examples 似乎没有使用ConsumerRecordFactory,而this SO answer 似乎使用与文档相同的代码。

(我知道问题更多在于 java 而不是 kafka 流,但我认为用apache-kafka-streams 标记它是接触习惯ConsumerRecordFactory 的人的好方法)

【问题讨论】:

    标签: apache-kafka-streams inferred-type


    【解决方案1】:

    以下代码存在一些问题:

    // Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
            "input-topic",
            new StringSerializer(),
            new IntegerSerializer() );
    
    // Create a test record
    ConsumerRecordFactory<byte[], byte[]> record = factory.create("key", 42L);
    
    1. 您在ConsumerRecordFactory 中将valueType 定义为Integer,但在create() 方法中您传递的是Long 类型值。
    2. factory.create() 返回 ConsumerRecord 而不是 ConsumerRecordFactory

    关于方法的歧义,你是对的。所以避免这个问题,使用以下:

    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
            new StringSerializer(),
            new IntegerSerializer()
    );
    // Use ConsumerRecord here instead of ConsumerRecordFactory
    ConsumerRecord<byte[], byte[]> record = factory.create("input-topic","key", 42);
    

    【讨论】:

    • 您对类型错误的看法是正确的,这是过去的错误副本,但不是我的实际代码。关于您的解决方案,它似乎正在工作的工厂,但我不明白为什么不在工厂中定义主题名称而是在记录中解决问题?
    • 太棒了。当您的 是字符串时,就会出现这种歧义。但是,如果您定义了其他一些密钥类型,它将起作用,因为方法签名将是唯一的。您可以在这里找到更多详细信息:kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
    • 如果您认为它可以解决您的问题,请随时接受/支持答案:)
    • 是的,我只是在阅读文档并在接受之前进行了更多测试:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-06-18
    • 2016-02-07
    • 2017-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多