【问题标题】:S3 connector with HourlyPartitioner failingS3 连接器与 HourlyPartitioner 失败
【发布时间】:2021-05-30 10:27:33
【问题描述】:

当我们尝试使用默认配置通过 S3 接收器连接器写入 S3 时,工作正常,没有任何问题。但是当我们尝试每小时分区失败并出现以下错误时。 请找到代码和错误消息并帮助我们解决此问题

默认:

{
  "value.converter.schemas.enable": "false",
  "name": "tibconew1-test-s3standard-default-sink-connector",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "2",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "errors.tolerance": "all",
  "topics": [
    "test.s3custom.default.dax.shipment.data",
    "test.s3custom.default.dax.shipment.data",
    "test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn"
  ],
  "topics.regex": "",
  "errors.deadletterqueue.topic.name": "dlq_test.s3custom.default.dax.shipment.data",
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "flush.size": "1000",
  "s3.bucket.name": "test-stg-raw",
  "s3.region": "us-east-1",
  "s3.credentials.provider.class": "com.amazonaws.auth.InstanceProfileCredentialsProvider",
  "s3.acl.canned": "bucket-owner-full-control",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "streams_dir",
  "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}

每小时:

{
  "value.converter.schema.registry.url": "https://confschema.test-dsol-core.testdigital-stg.com",
  "value.converter.schemas.enable": "false",
  "name": "test.s3custom.hourly.tibco.dax_shipment.dpp_asn.sink-connector",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "2",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "errors.tolerance": "all",
  "topics": [
    "test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn"
  ],
  "topics.regex": "",
  "errors.deadletterqueue.topic.name": "dlq_test.s3custom.hourly.onprem.tibco.dax_shipment.dpp_asn.sink",
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "flush.size": "10",
  "s3.bucket.name": "test-stg-raw",
  "s3.region": "us-east-1",
  "s3.credentials.provider.class": "com.amazonaws.auth.InstanceProfileCredentialsProvider",
  "s3.acl.canned": "bucket-owner-full-control",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "streams_dir",
  "partitioner.class": "io.confluent.connect.storage.partitioner.HourlyPartitioner",
  "locale": "en-US",
  "timezone": "America/Chicago",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "DPP_ASN.LST_UPDT_TS"
} 

错误:

【问题讨论】:

    标签: apache-kafka-connect confluent-platform s3-kafka-connector


    【解决方案1】:

    终于找到了原因。由于从有效负载接收到的时间戳是无效格式,其中有额外的空间。所以我们在源端更正了格式。对于每小时分区器,连接器期望该值基于小时。 每小时分区:

    io.confluent.connect.storage.partitioner.HourlyPartitioner is equivalent to the TimeBasedPartitioner with path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH and

    消息是:"LST_UPDT_TS":"2021-02-01 07:16:23.567"

    更正为:"LST_UPDT_TS":"2015-08-01T17:00:00.69243-05:00"

    【讨论】:

      猜你喜欢
      • 2020-04-18
      • 2023-02-11
      • 2015-08-19
      • 1970-01-01
      • 2017-06-10
      • 2018-02-10
      • 2013-07-18
      • 2017-11-02
      相关资源
      最近更新 更多