【发布时间】:2014-05-28 06:00:07
【问题描述】:
我正在运行其网站中提到的这个 kafka 生产者示例
代码:
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "host.broker-1:9093, host.broker-2:9093, host.broker-3:9095");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "test.app.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
public class SimplePartitioner implements Partitioner{
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
更多细节:
我在主机代理[1-3]远程的主机(调用是生产者)上运行此应用程序
我可以从生产者主机 ping 和 ssh 代理主机。
在server.properties中提供了advertised.host.name(它们在brokers中分别命名为server[1-3].properties
属性:
broker.id=1
port=9093
host.name=host.broker.internal.name
advertised.host.name=host-broker1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/1/kafka-logs-1,/data/2/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
知道如何解决这个错误吗?
【问题讨论】:
-
显然,您的生产者无法向 Kafka 集群发送消息。最常见的原因是生产者配置错误或集群已关闭,或存在网络问题(例如,广告代理名称是否可从生产者主机路由?)。很难从这么多信息中说出更多信息。
-
@sandris 请现在看一下描述,如果有帮助的话。
-
检查您是否可以从运行 java 应用程序的计算机远程登录:“telnet host.broker-1 9093”从 kafka Broker 远程登录到 zookeeper:“telnet zk1 2181”我将删除所有 dns 名称并尝试原始 IP 地址。例如: "props.put("metadata.broker.list", "0.0.0.0:9093")" 并删除属性: "host.name=host.broker.internal.name" "advertised.host.name=主机代理1"
标签: apache-kafka