【问题标题】:Flink sql writes no data to es cluster?Flink sql没有数据写入es集群?
【发布时间】:2020-12-09 10:20:30
【问题描述】:

问题描述:我想通过sql查询我的mysql或者hive中的数据,写入我的es集群。程序可以运行成功但是es没有数据

  1. 软件版本:
  • flink:1.11
  • es:6.2.2
  • 蜂巢:1.2.1
  • mysql: 5.7
  1. 下面是我的代码

public class HiveExample {
    public static void main(String[] args) throws DatabaseNotExistException {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tabEnv = TableEnvironment.create(settings);


        String sql =
                "insert into user_action_es_sink " +
                        "select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from dragonfly.web_page limit 10" ;


        String sporeUserAuthCreateTableSQL = "CREATE TABLE users (\n" +
                "  `id` INT,\n" +
                "  `userid` INT,\n" +
                "  `type` INT,\n" +
                "   PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = 'jdbc:mysql://localhost:3306/spore',\n" +
                "  'table-name' = 'spore_user_auth',\n" +
                "  'driver' = 'com.mysql.jdbc.Driver',\n" +
                "  'username' = 'xxxx',\n" +
                "  'password'  = 'xxxx'\n" +
                ")";

        tabEnv.executeSql(sporeUserAuthCreateTableSQL);

        String esTable = "CREATE TABLE user_action_es_sink (\n" +
                "  uid INT,\n" +
                "  appid INT,\n" +
                "  prepage_id INT,\n" +
                "  page_id INT,\n" +
                "  action_id STRING,\n" +
                "  page_name STRING,\n" +
                "  action_name STRING,\n" +
                "  prepage_name STRING,\n" +
                "  stat_time BIGINT,\n" +
                "  dt DATE\n" +
//                "  PRIMARY KEY (uid,dt) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'elasticsearch-6',\n" +
                "  'hosts' = 'http://localhost:9200',\n" +
                "  'index' = 'mytest',\n" +
                "  'document-type' = 'user_action'\n" +
//                "  'sink.bulk-flush.max-size' = '0',\n" +
//                "  'sink.bulk-flush.max-actions' = '0',\n" +
//                "  'sink.bulk-flush.interval' = '0'\n"+
//                "  'format' = 'json',\n" +
//                "  'json.fail-on-missing-field' = 'false',\n"+
//                "  'json.ignore-parse-errors' = 'true'\n" +
                ")";

        tabEnv.executeSql(esTable);

        tabEnv.executeSql("insert into user_action_es_sink select 100123,5,11,1,'a','b','111','bbb',cast(11111 as bigint),cast('2020-11-11' as date) from users limit 10").print();


    }
}

我的 pom 文件:



<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-orc-nohive_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>1.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
            <!--<scope>test</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.13</version>
        </dependency>
    </dependencies>

代码没有提示我任何异常信息,但是数据没有写入,不清楚是什么原因造成的问题。

感谢您的帮助:)

【问题讨论】:

    标签: java elasticsearch apache-flink flink-sql


    【解决方案1】:

    executeSql函数在异步节点上工作,如果你在你的IDE中测试,调用executeSql函数后主函数将退出,主函数完成后底层迷你集群将关闭。这只存在于本地测试中,生产集群始终处于活动状态,提交的作业会正常执行。

    如果您想在IDE中等待作业执行,您可以使用以下方法。

    tabEnv.executeSql("insert into user_action_es_sink select xxx ")
          .getJobClient().get()          
          .getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
    

    在 Flink 1.12 中,有一种简单的方法可以做到这一点:

    tabEnv.executeSql("insert into user_action_es_sink select xxx ")
          .await();
    

    【讨论】:

      猜你喜欢
      • 2022-07-07
      • 1970-01-01
      • 2016-02-24
      • 1970-01-01
      • 1970-01-01
      • 2021-08-22
      • 2019-12-10
      • 2023-03-19
      • 2010-10-07
      相关资源
      最近更新 更多