【发布时间】:2019-12-23 10:11:10
【问题描述】:
我使用以下代码将 Flink 连接到 ElasticSearch。但是用 Flink 运行时,会显示很多错误。程序首先从一个端口输入数据,然后根据编写的程序读取命令行中的每一行。然后显示字数。主要问题是连接到弹性搜索时,不幸的是在连接时出错。这些是错误吗?将 Minimal Flink 连接到 Elastic Search 需要哪些类?
public class Elastic {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
text.print().setParallelism(1);
env.execute("Socket Window WordCount");
List<HttpHost> httpHosts = new ArrayList<HttpHost>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
httpHosts.add(new HttpHost("my-ip",9200,"http"));
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<String, String>();
json.put("data", element);
return Requests.indexRequest()
.index("iran")
.type("int")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
final Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
esSinkBuilder.setRestClientFactory(new RestClientFactory() {
@Override
public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
restClientBuilder.setDefaultHeaders(defaultHeaders)
.setMaxRetryTimeoutMillis(10000)
.setPathPrefix("a")
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setSocketTimeout(10000);
}
});
}
});
text.addSink(esSinkBuilder.build());
}
// Data type for words with count
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
我的弹性搜索版本:7.5.0 我的 flink 版本:1.8.3
我的错误:
sudo /etc/flink-1.8.3/bin/flink run -c org.apache.flink.Elastic /root/FlinkElastic-1.0.jar --port 9000
------------------------------------------------------------
The program finished with the following exception:
java.lang.RuntimeException: Could not look up the main(String[]) method from the class
org.apache.flink.Elastic:
org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction
at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:527)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:246)
... 7 more
Caused by: java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:521)
... 7 more
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more
我的 pom:
<groupId>org.apache.flink</groupId>
<artifactId>FlinkElastic</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.3</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.3</version>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
【问题讨论】:
-
“橡胶轮胎”这句话似乎格格不入。
-
我没明白你的意思。
-
重读您的问题。在第一段中,您谈到了连接到橡胶轮胎。
-
我真为你感到羞耻。这个词拼错了。主要词是elasticsearch。我编辑了帖子。
标签: elasticsearch apache-flink