【发布时间】:2020-02-01 21:45:24
【问题描述】:
我从here 部署了 Kafka。我还像这样添加到docker-compose.yml Postgres 容器中:
postgres:
image: postgres
hostname: kafka-postgres
container_name: kafka-postgres
depends_on:
- ksql-server
- broker
- schema-registry
- connect
ports:
- 5432:5432
创建了一个主题页面浏览量。
我还使用设置创建了 DatagenConnector 并运行它。
{
"name": "datagen-pageviews",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"kafka.topic": "pageviews",
"max.interval": "100",
"iterations": "999999999",
"quickstart": "pageviews"
}
据我所知,连接器为主题定义了一个模式:
{
"type": "record",
"name": "pageviews",
"namespace": "ksql",
"fields": [
{
"name": "viewtime",
"type": "long"
},
{
"name": "userid",
"type": "string"
},
{
"name": "pageid",
"type": "string"
}
],
"connect.name": "ksql.pageviews"
}
我的下一步是创建 JdbcSinkConnector,它将数据从 Kafka 主题传输到 Postgres 表。那行得通。连接器的设置:
{
"name": "from-kafka-to-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"pageviews"
],
"connection.url": "jdbc:postgresql://kafka-postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "********",
"auto.create": "true",
"auto.evolve": "true"
}
然后我尝试自己向该主题发送消息。但失败并出现错误:
[2020-02-01 21:16:11,750] ERROR 在任务 to-pg-0 中遇到错误。 使用类执行阶段“VALUE_CONVERTER” 'io.confluent.connect.avro.AvroConverter',其中消耗的记录是 {topic='pageviews', partition=0, offset=23834, 时间戳=1580591160374,时间戳类型=创建时间}。 (org.apache.kafka.connect.runtime.errors.LogReporter) org.apache.kafka.connect.errors.DataException:反序列化失败 Avro 的主题页面浏览量数据:在 io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) 在 org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) 在 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) 在 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) 在 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起: org.apache.kafka.common.errors.SerializationException: 错误 反序列化 id -1 的 Avro 消息原因: org.apache.kafka.common.errors.SerializationException:未知的魔法 字节!
所以发送方法很重要。我就是这样做的(Python,confluent-kafka-python):
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.poll(0)
producer.produce(topic, json.dumps({
'viewtime': 123,
'userid': 'user_1',
'pageid': 'page_1'
}).encode('utf8'), on_delivery=kafka_delivery_report)
producer.flush()
也许我应该提供带有消息的架构(AvroProducer)?
【问题讨论】:
-
您使用的是哪个库?
kafka-python或confluent-kafka-python?
标签: python apache-kafka avro apache-kafka-connect kafka-producer-api