【发布时间】:2021-07-05 14:19:47
【问题描述】:
Flink SQL> INSERT INTO es_sink SELECT 'hello';
[INFO] Submitting SQL update statement to the cluster...
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.elasticsearch.table.RowElasticsearchSinkFunction.<init>(Lorg/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch7/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory;Ljava/util/function/Function;)V
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.lambda$getSinkRuntimeProvider$0(Elasticsearch7DynamicSink.java:129)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:161)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:130)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226)
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226)
at org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:518)
at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:507)
at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:409)
at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
【问题讨论】:
-
能否请您检查连接器 jar 是否与 Flink 版本匹配?
-
root@8fcd04ab3185:/opt/flink/lib# flink -v Version: 1.13.1, Commit ID: a7f3192 root@8fcd04ab3185:/opt/flink/lib# ls |grep elastic flink-sql-connector-elasticsearch6_2.11-1.13.1.jar flink-sql-connector-elasticsearch7_2.11-1.13.1.jar root@8fcd04ab3185:/opt/flink/lib# -
一般来说,您不应该将连接器放入 lib 中,而是将它们捆绑在您的用户代码中。对于 SQL,可能会有些不同。您如何执行查询?
-
SQL :INSERT INTO es_sink SELECT 'hello'; Flink 1.12.4 SQL 执行正常