【问题标题】:flink SQL 1.13.0&1.13.1 elasticsearch sink failedflink SQL 1.13.0&1.13.1 elasticsearch sink 失败
【发布时间】:2021-07-05 14:19:47
【问题描述】:
Flink SQL> INSERT INTO es_sink SELECT 'hello';
[INFO] Submitting SQL update statement to the cluster...

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.elasticsearch.table.RowElasticsearchSinkFunction.<init>(Lorg/apache/flink/streaming/connectors/elasticsearch/table/IndexGenerator;Ljava/lang/String;Lorg/apache/flink/api/common/serialization/SerializationSchema;Lorg/apache/flink/elasticsearch7/shaded/org/elasticsearch/common/xcontent/XContentType;Lorg/apache/flink/streaming/connectors/elasticsearch/table/RequestFactory;Ljava/util/function/Function;)V
    at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSink.lambda$getSinkRuntimeProvider$0(Elasticsearch7DynamicSink.java:129)
    at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:161)
    at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:130)
    at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
    at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:226)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:226)
    at org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:518)
    at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:507)
    at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:409)
    at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
    at java.util.Optional.ifPresent(Optional.java:159)
    at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
    at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
    at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more

【问题讨论】:

  • 能否请您检查连接器 jar 是否与 Flink 版本匹配?
  • root@8fcd04ab3185:/opt/flink/lib# flink -v Version: 1.13.1, Commit ID: a7f3192 root@8fcd04ab3185:/opt/flink/lib# ls |grep elastic flink-sql-connector-elasticsearch6_2.11-1.13.1.jar flink-sql-connector-elasticsearch7_2.11-1.13.1.jar root@8fcd04ab3185:/opt/flink/lib#
  • 一般来说,您不应该将连接器放入 lib 中,而是将它们捆绑在您的用户代码中。对于 SQL,可能会有些不同。您如何执行查询?
  • SQL :INSERT INTO es_sink SELECT 'hello'; Flink 1.12.4 SQL 执行正常

标签: apache-flink flink-sql


【解决方案1】:

根据您的评论,您的类路径中似乎必须有冲突的 ES 版本。

不要将罐子放入lib,最好是execute the sqlClient with the -l option

【讨论】:

  • root@1475b3cda2fc:/opt/flink/sql/hids_project# ls userlib/ flink-sql-connector-elasticsearch7_2.11-1.13.1.jar root@1475b3cda2fc:/opt/flink/sql/ hids_project#
  • sql-client.sh -i tables/tmp_es.sql -l userlib/
  • 引起:java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.elasticsearch.table.RowElasticsearchSinkFunction
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多