【问题标题】:Kafka producer Spring Cloud app won't publish anythingKafka 生产者 Spring Cloud 应用程序不会发布任何内容
【发布时间】:2022-02-01 19:18:19
【问题描述】:

我开始将我的 Spring Cloud 应用程序与托管在 docker-compose 文件中的 kafka 和 zookeeper 连接起来,它们已连接,但是当我运行应用程序并期望 Producer 发布消息时,什么也得不到……我没有知道错误是什么,我按照这个例子:https://github.com/ihuaylupo/manning-smia/tree/master/chapter10/licensing-service/src/main/java/com/optimagrowth/license/events

工作流程是来自 github repo 示例,例如,当我在服务上调用和 endPoint Post 时,我希望我的 kafka 生产者发布关于主题的消息,并让 Kafka 消费者使用该消息,但一切正常 - 端点对 DB 执行,除了 kafka 生产和消费消息,甚至没有错误知道我在哪里做错了......

Docker 编写文件:

 zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    

  kafkaserver:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 #kafka 192.168.99.100 #kafka - ip because i want to access kafka and zookeeper from outside of containers, i.e localhost
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
      - KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
      - listeners=PLAINTEXT://:9092
      - advertised.listeners=PLAINTEXT://192.168.99.100:9092
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper

Kafka 消费者 Spring Cloud 属性:

spring.cloud.stream.bindings.inboundOrgChanges.destination=orgChangeTopic
spring.cloud.stream.bindings.inboundOrgChanges.content-type=application/json
spring.cloud.stream.bindings.inboundOrgChanges.group=studentsGroup
spring.cloud.stream.kafka.binder.brokers=localhost #kafka
spring.cloud.stream.kafka.binder.zkNodes=localhost

Kafka 生产者 Spring Cloud 属性:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=http://192.168.99.100:2181
spring.cloud.stream.kafka.binder.brokers=http://192.168.99.100:9092

卡夫卡制作人...

@EnableBinding(Source.class) // - on my main Spring app
@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

@Getter @Setter @ToString
public class OrganizationChangeModel {
    private String type;
    private String action;
    private String organizationId;
    private String correlationId;

    public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
        super();
        this.type = type;
        this.action = action;
        this.organizationId = organizationId;
        this.correlationId = correlationId;
    }
}

@Service class ServiceEx {
    @Autowired
    SimpleSourceBean simpleSourceBean;

    public Organization findById(String organizationId) {
        Optional<Organization> opt = repository.findById(organizationId);
        simpleSourceBean.publishOrganizationChange("GET", organizationId);
        return (opt.isPresent()) ? opt.get() : null;
    }   //wont do anything }

编辑: Docker-compose 文件:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - spring-cloud-network

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
      - KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper
    networks:
      - spring-cloud-network

  facultate:
    container_name: facultate
    build: C:\Users\marius\com.balabasciuc.springmicroservicesinaction\facultateservice
    restart: on-failure
    ports:
      - "1002:1002"
    environment:
      SPRING_CLOUD_CONFIG_URI: "http://config:7070"
      EUREKA_HOST: server
      EUREKA_PORT: 9001
      DATABASE_HOST: database
     # SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: kafka:9092
    depends_on:
      - kafka
      - zookeeper
      - server
    networks:
      - spring-cloud-network
volumes:
  simple:
    driver: local
networks:
  spring-cloud-network:
    driver: bridge

Spring Cloud 生产者道具:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=kafka
spring.cloud.stream.kafka.binder.zkNodes=zookeeper

Docker 日志:

facultate    | 2022-02-01 11:15:47.246  INFO 1 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
facultate    | 2022-02-01 11:15:47.247  INFO 1 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
facultate    | 2022-02-01 11:15:47.251  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
facultate    | 2022-02-01 11:15:47.876  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
facultate    | 2022-02-01 11:15:47.876  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
facultate    | 2022-02-01 11:15:48.138  INFO 1 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: orgChangeTopic
facultate    | 2022-02-01 11:15:48.150  INFO 1 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
facultate    |  bootstrap.servers = [kafka:9092]
facultate    |  client.dns.lookup = use_all_dns_ips
facultate    |  client.id =
facultate    |  connections.max.idle.ms = 300000
facultate    |  default.api.timeout.ms = 60000
facultate    |  metadata.max.age.ms = 300000
facultate    |  metric.reporters = []
facultate    |  metrics.num.samples = 2
facultate    |  metrics.recording.level = INFO
facultate    |  metrics.sample.window.ms = 30000
facultate    |  receive.buffer.bytes = 65536
facultate    |  reconnect.backoff.max.ms = 1000
facultate    |  reconnect.backoff.ms = 50
facultate    |  request.timeout.ms = 30000
facultate    |  retries = 2147483647
facultate    |  retry.backoff.ms = 100
facultate    |  sasl.client.callback.handler.class = null
facultate    |  sasl.jaas.config = null
facultate    |  sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate    |  sasl.kerberos.min.time.before.relogin = 60000
facultate    |  sasl.kerberos.service.name = null
facultate    |  sasl.kerberos.ticket.renew.jitter = 0.05
facultate    |  sasl.kerberos.ticket.renew.window.factor = 0.8
facultate    |  sasl.login.callback.handler.class = null
facultate    |  sasl.login.class = null
facultate    |  sasl.login.refresh.buffer.seconds = 300
facultate    |  sasl.login.refresh.min.period.seconds = 60
facultate    |  sasl.login.refresh.window.factor = 0.8
facultate    |  sasl.login.refresh.window.jitter = 0.05
facultate    |  sasl.mechanism = GSSAPI
facultate    |  security.protocol = PLAINTEXT
facultate    |  security.providers = null
facultate    |  send.buffer.bytes = 131072
facultate    |  socket.connection.setup.timeout.max.ms = 30000
facultate    |  socket.connection.setup.timeout.ms = 10000
facultate    |  ssl.cipher.suites = null
facultate    |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate    |  ssl.endpoint.identification.algorithm = https
facultate    |  ssl.engine.factory.class = null
facultate    |  ssl.key.password = null
facultate    |  ssl.keymanager.algorithm = SunX509
facultate    |  ssl.keystore.certificate.chain = null
facultate    |  ssl.keystore.key = null
facultate    |  ssl.keystore.location = null
facultate    |  ssl.keystore.password = null
facultate    |  ssl.keystore.type = JKS
facultate    |  ssl.protocol = TLSv1.3
facultate    |  ssl.provider = null
facultate    |  ssl.secure.random.implementation = null
facultate    |  ssl.trustmanager.algorithm = PKIX
facultate    |  ssl.truststore.certificates = null
facultate    |  ssl.truststore.location = null
facultate    |  ssl.truststore.password = null
facultate    |  ssl.truststore.type = JKS
facultate    |
facultate    | 2022-02-01 11:15:48.614  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
facultate    | 2022-02-01 11:15:48.618  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
facultate    | 2022-02-01 11:15:48.619  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1643714148612
facultate    | 2022-02-01 11:15:53.683  INFO 1 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
facultate    | 2022-02-01 11:15:53.767  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
facultate    | 2022-02-01 11:15:53.775  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
facultate    | 2022-02-01 11:15:53.775  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
facultate    | 2022-02-01 11:15:53.899  INFO 1 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
facultate    |  acks = 1
facultate    |  batch.size = 16384
facultate    |  bootstrap.servers = [kafka:9092]
facultate    |  buffer.memory = 33554432
facultate    |  client.dns.lookup = use_all_dns_ips
facultate    |  client.id = producer-1
facultate    |  compression.type = none
facultate    |  connections.max.idle.ms = 540000
facultate    |  delivery.timeout.ms = 120000
facultate    |  enable.idempotence = true
facultate    |  interceptor.classes = []
facultate    |  key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate    |  linger.ms = 0
facultate    |  max.block.ms = 60000
facultate    |  max.in.flight.requests.per.connection = 5
facultate    |  max.request.size = 1048576
facultate    |  metadata.max.age.ms = 300000
facultate    |  metadata.max.idle.ms = 300000
facultate    |  metric.reporters = []
facultate    |  metrics.num.samples = 2
facultate    |  metrics.recording.level = INFO
facultate    |  metrics.sample.window.ms = 30000
facultate    |  partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
facultate    |  receive.buffer.bytes = 32768
facultate    |  reconnect.backoff.max.ms = 1000
facultate    |  reconnect.backoff.ms = 50
facultate    |  request.timeout.ms = 30000
facultate    |  retries = 2147483647
facultate    |  retry.backoff.ms = 100
facultate    |  sasl.client.callback.handler.class = null
facultate    |  sasl.jaas.config = null
facultate    |  sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate    |  sasl.kerberos.min.time.before.relogin = 60000
facultate    |  sasl.kerberos.service.name = null
facultate    |  sasl.kerberos.ticket.renew.jitter = 0.05
facultate    |  sasl.kerberos.ticket.renew.window.factor = 0.8
facultate    |  sasl.login.callback.handler.class = null
facultate    |  sasl.login.class = null
facultate    |  sasl.login.refresh.buffer.seconds = 300
facultate    |  sasl.login.refresh.min.period.seconds = 60
facultate    |  sasl.login.refresh.window.factor = 0.8
facultate    |  sasl.login.refresh.window.jitter = 0.05
facultate    |  sasl.mechanism = GSSAPI
facultate    |  security.protocol = PLAINTEXT
facultate    |  security.providers = null
facultate    |  send.buffer.bytes = 131072
facultate    |  socket.connection.setup.timeout.max.ms = 30000
facultate    |  socket.connection.setup.timeout.ms = 10000
facultate    |  ssl.cipher.suites = null
facultate    |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate    |  ssl.endpoint.identification.algorithm = https
facultate    |  ssl.engine.factory.class = null
facultate    |  ssl.key.password = null
facultate    |  ssl.keymanager.algorithm = SunX509
facultate    |  ssl.keystore.certificate.chain = null
facultate    |  ssl.keystore.key = null
facultate    |  ssl.keystore.location = null
facultate    |  ssl.keystore.password = null
facultate    |  ssl.keystore.type = JKS
facultate    |  ssl.protocol = TLSv1.3
facultate    |  ssl.provider = null
facultate    |  ssl.secure.random.implementation = null
facultate    |  ssl.trustmanager.algorithm = PKIX
facultate    |  ssl.truststore.certificates = null
facultate    |  ssl.truststore.location = null
facultate    |  ssl.truststore.password = null
facultate    |  ssl.truststore.type = JKS
facultate    |  transaction.timeout.ms = 60000
facultate    |  transactional.id = null
facultate    |  value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate    |
facultate    | 2022-02-01 11:15:54.147  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
facultate    | 2022-02-01 11:15:54.148  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
facultate    | 2022-02-01 11:15:54.148  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1643714154147
facultate    | 2022-02-01 11:15:54.293  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: RK0OLlwdRKK-oQ5dsWuHBw
facultate    | 2022-02-01 11:15:54.431  INFO 1 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application-1.output' has 1 subscriber(s).

Kafka 日志容器:

kafka        | [2022-02-01 10:57:12,073] INFO [Partition dresses-0 broker=1001] Log loaded for partition dresses-0 with initial high watermark 0 (kafka.cluster.Partition)
zookeeper    | 2022-02-01 10:57:09,689 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001f44830000 type:multi cxid:0x4e zxid:0x31 txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/prefer
red_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka        | [2022-02-01 11:15:52,573] INFO Creating topic orgChangeTopic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
kafka        | [2022-02-01 11:15:53,447] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(orgChangeTopic-0) (kafka.server.ReplicaFetcherManager)
kafka        | [2022-02-01 11:15:53,484] INFO [Log partition=orgChangeTopic-0, dir=/kafka/kafka-logs-bde9c032b736] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka        | [2022-02-01 11:15:53,488] INFO Created log for partition orgChangeTopic-0 in /kafka/kafka-logs-bde9c032b736/orgChangeTopic-0 with properties {} (kafka.log.LogManager)
kafka        | [2022-02-01 11:15:53,505] INFO [Partition orgChangeTopic-0 broker=1001] No checkpointed highwatermark is found for partition orgChangeTopic-0 (kafka.cluster.Partition)

如果我发布一些内容以生成为 kafka 生成的消息,则日志中不会出现任何内容...或 kafka

【问题讨论】:

  • 1) Kafka 和 Zookeeper 都是 HTTP 服务。为什么要在属性中添加http://? 2) 正如我在同一台主机上have answered to you before...You shouldn't be using IP addresses to connect to Docker containers 3) 您的客户端中应该有错误日志以解决连接问题
  • 另外,listenersadvertised.listeners 不是有效的环境变量,所以我建议回到您之前的问题中,其中 compose 文件和 spring 属性更正确
  • 您的配置现在看起来更好了。我不确定我是否理解这个问题了;什么是“slk”?更改 docker 映像不应该解决任何问题。
  • 嘿@OneCricketeer 它有效...抱歉浪费了您的时间,非常感谢您的耐心等待,我有时只是愚蠢
  • 嘿...它基本上只是为 Spring 云应用程序启用了调试日志记录,这样我可以看到从生产者'logging:level:com.netflix:WARN org.springframework 发布的消息。 web: WARN com.project.springmicroservicesinaction: DEBUG'

标签: java apache-kafka spring-kafka spring-cloud-stream


【解决方案1】:

基于注解的编程模型(诸如EnableBindingSourceSink 等)在 Spring Cloud Stream 中已被弃用,并将在框架即将发布的 4.0 行中完全删除。

您可以使用以下功能样式添加源:

public Supplier< OrganizationChangeModel> source() {
  return () -> {
    //return OrganizationChangeModel here. 
  }
}

在这种情况下,绑定名称变为source-out-0,因为供应商方法被命名为source

但是,在您的情况下,由于您想使用 REST 端点以编程方式发布,我建议您使用 StreamBridge API 来执行此操作。有关详细信息,请参阅 StreamBridge docs。基本思路是使用StreamBridgesend方法通过输出绑定发布数据。

Spring Cloud Stream samples 存储库中的很多示例都使用此版本的 dockerized Kafka。您可能想比较它是如何在那里设置的。

【讨论】:

  • 仅此一项并不能改变生产者的配置不正确的事实
猜你喜欢
  • 1970-01-01
  • 2021-06-18
  • 1970-01-01
  • 1970-01-01
  • 2021-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多