【问题标题】:Neo4j Sink not recieving Kafka Topic eventsNeo4j Sink 未收到 Kafka 主题事件
【发布时间】:2019-09-16 19:08:19
【问题描述】:

我使用 docker compose 设置了一个 Neo4j 实例,其中包含流插件和 apoc 程序。代码如下:

neo4j-sink:
    image: neo4j:3.5
    hostname: neo4j-sink
    container_name: neo4j-sink
#    depends_on:
#      - neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
       - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_zookeeper_connect: 0.0.0.0:2181
      NEO4J_kafka_bootstrap_servers: 0.0.0.0:9092
#      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_memory_heap_max_size: 2G
      NEO4J_kafka_max_poll_records: 10000
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_sink_enabled: "true"
      NEO4J_kafka_group_id: "neo4j_sink_1"
      NEO4J_enable_auto_commit: "true"
      NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "WITH event.value as payload MERGE (a:Article {id: payload.ID}) ON CREATE set a.descr = payload.og_description"

这导致接收器初始化并监听主题:

neo4j-sink    | 2019-09-14 09:45:54.962+0000 INFO  Starting the Kafka Sink
neo4j-sink    | 2019-09-14 09:45:55.473+0000 INFO  Creating Sink daemon Job
neo4j-sink    | 2019-09-14 09:45:55.478+0000 DEBUG Subscribed topics with Cypher queries: {ARTICLECOMMIT=WITH event.value as payload MERGE (a:Article {id: payload.ID}) ON CREATE set a.descr = payload.og_description}
neo4j-sink    | 2019-09-14 09:45:55.479+0000 DEBUG Subscribed topics with CDC configuration: {CDC_SOURCE_ID=[], CDC_SCHEMA=[]}
neo4j-sink    | 2019-09-14 09:45:55.480+0000 INFO  Kafka Sink started
neo4j-sink    | 2019-09-14 09:45:55.480+0000 INFO  Streams Sink module initialised

但是从命令行生产者或者当我通过我的流发送消息时,它没有在 neo4J 中注册。我可以确认该主题正在从我的流中接收消息;我检查了 Kafka-console-consumer 并在流中收听。请告知我是否做错了什么。

我试过这个:

$ kafka-console-producer --broker-list 0.0.0.0:9092 --topic ARTICLECOMMIT
>{"ID":12345,"og_descrpition":"12312312"}

这是发布到主题的最基本尝试。但是neo4j没有收到任何事件。有人可以帮忙吗

【问题讨论】:

    标签: neo4j apache-kafka apache-kafka-connect neo4j-apoc


    【解决方案1】:

    我认为可能有两个问题。首先,我相信您的 Cypher 是不正确的 - 您没有发布具有 payload 属性的事件,而只是在顶层发布了具有 IDog_description 的事件。以下似乎更有可能起作用:

    NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "MERGE (a:Article {id: event.ID}) ON CREATE set a.descr = event.og_description"
    

    但是,如果这是唯一的错误,您会在日志中看到类似于以下内容的错误:

    neo4j-sink | 2019-09-16 10:56:39.543+0000 ERROR Error while executing the query Cannot merge node using null property value for id
    

    由于您没有提到看到任何错误,因此最有可能的候选是该插件实际上并没有在听该主题,而是默默地失败了。我在本地尝试复制此问题时遇到此问题。

    要检查,我会使用

    kafka-consumer-groups --bootstrap-server 0.0.0.0:9092 --list --all-groups
    

    您应该将neo4j_sink_1 视为已注册的消费者组。如果您,则存在配置问题 - 很可能从我所看到的情况来看,您的 Kafka 实例实际上并未侦听主机 0.0.0.0 的端口 9092。

    以下docker-compose.yml 是我在本地工作的内容,它使用修改后的 Cypher 查询版本启动了一个 1 节点 Kafka 集群和 Neo4j:

    version: "3.7"
    
    services:
      zoo1:
        image: zookeeper:3.4.9
        hostname: zoo1
        ports:
          - "2181:2181"
        environment:
            ZOO_MY_ID: 1
            ZOO_PORT: 2181
            ZOO_SERVERS: server.1=zoo1:2888:3888    
        volumes:
          - ./zk-single-kafka-single/zoo1/data:/data
          - ./zk-single-kafka-single/zoo1/datalog:/datalog
    
      kafka1:
        image: confluentinc/cp-kafka:5.3.0
        hostname: kafka1
        ports:
          - "9092:9092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 1
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1    
        volumes:
          - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
        depends_on:
          - zoo1
    
      core1:
        image: neo4j:3.5
        hostname: neo4j-sink
        container_name: neo4j-sink
        ports:
          - "7474:7474"
          - "7687:7687"
        volumes:
          - ./core1_plugins:/plugins
          - ./core1_logs:/logs
        environment:
          NEO4J_kafka_zookeeper_connect: zoo1:2181
          NEO4J_kafka_bootstrap_servers: kafka1:19092
          NEO4J_kafka_group_id: "neo4j_sink"
          NEO4J_kafka_group_instance_id: "neo4j_sink"
          NEO4J_streams_sink_topic_cypher_ARTICLECOMMIT: "MERGE (a:Article {id: event.ID}) ON CREATE set a.descr = event.og_description"
          NEO4J_dbms_logs_debug_level: DEBUG
          NEO4J_streams_sink_enabled: "true"
          NEO4J_streams_procedures_enabled: "true"
        depends_on:
          - kafka1
    

    最后一点 - Neo4j Streams 插件的there's a breaking issue with version 3.5.3 在您正确配置 Kafka 后会在启动时引发异常。我使用version 3.5.2 来避免回归,显然下一个版本会发布补丁。

    【讨论】:

      猜你喜欢
      • 2021-09-08
      • 1970-01-01
      • 1970-01-01
      • 2018-11-04
      • 2021-10-27
      • 2022-06-30
      • 2020-11-14
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多