【问题标题】:Elasticsearch sink only get new messages and not the previous one using kafka-connect-elasticsearch + timestamp SMTElasticsearch sink 只获取新消息,而不是使用 kafka-connect-elasticsearch + timestamp SMT 的前一个消息
【发布时间】:2019-05-24 02:05:48
【问题描述】:

我正在使用 kafka-connect-elasticsearch 插件从我的 kafka 获取消息到 Elasticsearch。 我在 kafka 中的数据包含一个日期字段(时间戳格式)。

我的第一个问题是,当我使用这个插件时,Elasticsearch 索引没有将日期字段识别为日期类型,而是将其识别为长 ... 我在我的连接器配置中使用 SMT 转换来解决这个问题。

这是我当前的配置,允许我在 Elastic 中推送数据:

{
  "name": "elasticsearch-sink-test",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "test.test",
    "key.ignore": "true",
    "connection.url": "http://localhost:9200",
    "type.name": "kafka-connect",
    "name": "elasticsearch-sink-test",
    "Batch.size": 100,
    "max.buffered.records": 1000,
    "Max.retries": 10,
    "Retry.backoff.ms": 1000,
    "flush.timeout.ms": 20000,
    "max.in.flight.requests": 3
    "transforms": "date",
    "transforms.date.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.date.target.type": "Date",
    "transforms.date.field": "date",
    "transforms.date.format": "yyyy-MM-dd HH:mm:ss"
  }
}

我现在的问题是: Elasticsearch 不会获取存储在 kafka 中的所有先前消息,而只会获取新消息(在我启动 Elasticsearch 连接器后,所有新消息都被推送到 kafka)。

如何配置连接器以使弹性获取所有消息? 是否有任何解决方法使弹性“理解”日期字段是时间戳?

(有关信息,我的数据源是带有 debezium CDC 连接器的 MongoDB)

提前谢谢你

【问题讨论】:

    标签: elasticsearch apache-kafka apache-kafka-connect


    【解决方案1】:

    如何配置连接器以使弹性获取所有消息?

    就像普通的 Kafka 消费者一样,您需要将偏移量设置为最早的

    consumer.auto.offset.reset=earliest 
    

    make elastic“理解”日期字段是时间戳是否有任何解决方法?

    是的,在 Elasticsearch 中使用索引或动态映射。默认情况下,所有摄取的数字都只是数值。只有正确格式化的日期字符串才会真正被索引为日期。如果您不控制 Elasticsearch 服务器或索引设置,这通常是由该系统的管理员设置的

    【讨论】:

    • 感谢您的回答,但还是不行。我在配置文件的末尾添加了“consumer.auto.offset.reset”:“earliset”,没有任何变化。这是设置偏移量的好方法吗?如果没有怎么做,请
    • 既然你已经启动了一次连接器,那么它被设置为最新的偏移量......你需要重置消费者组或重命名连接器。我已验证此设置有效,但您在评论中最早拼写错误
    • 现在完美运行。我重命名了连接器并更正了错字...谢谢
    猜你喜欢
    • 2019-06-11
    • 2021-09-19
    • 1970-01-01
    • 2015-07-17
    • 2017-03-26
    • 1970-01-01
    • 1970-01-01
    • 2021-01-26
    • 2021-07-15
    相关资源
    最近更新 更多