【问题标题】:spark Athena connector火花雅典娜连接器
【发布时间】:2017-12-06 11:01:55
【问题描述】:

我需要在 spark 中使用 Athena,但是 spark 在使用 JDBC 驱动程序时使用preparedStatement,它给了我一个例外 “com.amazonaws.athena.jdbc.NotImplementedException:方法 Connection.prepareStatement 尚未实现”

请告诉我如何在 spark 中连接 Athena

【问题讨论】:

    标签: pyspark amazon-athena


    【解决方案1】:

    我不知道您将如何从 Spark 连接到 Athena,但您不需要 - 您可以非常轻松地从 Spark 查询 Athena 包含(或者更准确地说是“寄存器”)的数据。

    雅典娜分为两部分

    1. Hive Metastore(现在称为 Glue 数据目录),其中包含数据库和表名称以及所有基础文件之间的映射
    2. Presto 查询引擎,可将您的 SQL 转换为针对这些文件的数据操作

    当您启动 EMR 集群(v5.8.0 及更高版本)时,您可以指示它连接到您的 Glue 数据目录。这是“创建集群”对话框中的复选框。当您选中此选项时,您的 Spark SqlContext 将连接到 Glue 数据目录,您将能够看到 Athena 中的表。

    然后您可以正常查询这些表。

    请参阅https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html 了解更多信息。

    【讨论】:

    • 您能否提供一个“然后您可以正常查询这些表”的示例。我的查询返回所有空值
    • 如果您运行(从 Athena)describe table mytable,您应该能够看到数据的位置。检查它是否正确。您不希望将表数据保存在 EMR 集群上,而是需要在 s3 上。
    【解决方案2】:

    您可以使用这个 JDBC 驱动程序:SimbaAthenaJDBC

    <dependency>
        <groupId>com.syncron.amazonaws</groupId>
        <artifactId>simba-athena-jdbc-driver</artifactId>
        <version>2.0.2</version>
    </dependency>
    

    使用:

    SparkSession spark = SparkSession
        .builder()
        .appName("My Spark Example")
        .getOrCreate();
    
    Class.forName("com.simba.athena.jdbc.Driver");
    
    Properties connectionProperties = new Properties();
    connectionProperties.put("User", "AWSAccessKey");
    connectionProperties.put("Password", "AWSSecretAccessKey");
    connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
    connectionProperties.put("AwsCredentialsProviderClass", 
        "com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
    connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
    connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");
    
    List<String> predicateList =
        Stream
            .of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
            .collect(Collectors.toList());
    String[] predicates = new String[predicateList.size()];
    predicates = predicateList.toArray(predicates);
    
    Dataset<Row> data =
        spark.read()
            .jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
                "my_env.my_table", predicates, connectionProperties);
    

    您也可以在 Flink 应用程序中使用此驱动程序:

    TypeInformation[] fieldTypes = new TypeInformation[] {
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO
    };
    
    RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
    
    JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
        .setDrivername("com.simba.athena.jdbc.Driver")
        .setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
        .setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
        .setRowTypeInfo(rowTypeInfo)
        .finish();
    
    DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);
    

    【讨论】:

    • 在第一个示例中,我如何使用数据库名称而不是在 jdbc url 中指定表名。
    • @rakeeee "my_env.my_table" = "my_database.my_table"。如果要连接到额外的表,请进行额外的 JDBC 连接。
    • @Wil 为什么我们需要将UserPassword 放在属性中?...我想/my-folder/.athenaCredentials 中已经提供了?另外,.athenaCredentials 文件的格式应该是什么?我尝试了这里建议的格式simba.com/products/Athena/doc/JDBC_InstallGuide/content/jdbc/…。 ...但我仍然收到此错误...Caused by: com.simba.athena.support.exceptions.GeneralException: [Simba][AthenaJDBC](100071) An error has been thrown from the AWS Athena client. The security token included in the request is invalid
    • @Raj 上面有 2 个不同的示例,一个用于 Flink,一个用于 Spark。在撰写本文时,这就是我让驱动程序与 Flink 一起工作的方式,但我想 Flink 的较新版本将支持凭证文件。
    【解决方案3】:

    您无法将 Spark 直接连接到 Athena。 Athena 只是针对 s3 的Prestodb 的一个实现。与 Presto 不同,Athena 不能针对 HDFS 上的数据。

    但是,如果您想使用 Spark 查询 s3 中的数据,那么您很幸运有 HUE,它可以让您从 Spark on Elastic Map Reduce (EMR) 查询 s3 中的数据。

    另请参阅: Developer Guide for Hadoop User Experience (HUE) on EMR.

    【讨论】:

    • 实际上可以使用 Athena jdbc 驱动程序将 spark 连接到 Athena,如前所述。
    【解决方案4】:

    如果您想使用雅典娜的数据,@Kirk Broadhurst 的回复是正确的。 如果你想使用 Athena 引擎,那么,github 上有一个库可以解决preparedStatement 的问题。

    请注意,由于缺乏 Maven 等方面的经验,我没有成功使用该库

    【讨论】:

      【解决方案5】:

      其实你可以使用 B2W 的 Spark Athena Driver。

      https://github.com/B2W-BIT/athena-spark-driver

      【讨论】:

        猜你喜欢
        • 2021-01-27
        • 2021-06-14
        • 1970-01-01
        • 1970-01-01
        • 2018-10-19
        • 2021-01-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多