【问题标题】:spark dealing with carbondataspark 处理 carbondata
【发布时间】:2018-05-17 07:24:02
【问题描述】:

下面是我试图用来在 S3 中创建 carbondata 表的代码 sn-p。然而,尽管在 hadoopconfiguration 中设置了 aws 凭证,它仍然抱怨没有设置密钥和访问密钥。这里有什么问题?

 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.CarbonSession._
 val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("s3n://url")
carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId","<accesskey>")
   carbon.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","<secretaccesskey>")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table(id string,name string,city string,age Int) STORED BY 'carbondata'")

最后一个命令产生错误:

java.lang.IllegalArgumentException:AWS 访问密钥 ID 和密钥 必须将访问密钥指定为用户名或密码 (分别)s3n URL,或通过设置 fs.s3n.awsAccessKeyId 或 fs.s3n.awsSecretAccessKey 属性(分别)

Spark Version : 2.2.1
Command used to start spark-shell:
$SPARK_PATH/bin/spark-shell --jars /localpath/jar/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2/apache-carbondata-1.3.1-bin-spark2.2.1-hadoop2.7.2.jar,/localpath/jar/spark-avro_2.11-4.0.0.jar --packages com.amazonaws:aws-java-sdk-pom:1.9.22,org.apache.hadoop:hadoop-aws:2.7.2,org.slf4j:slf4j-simple:1.7.21,asm:asm:3.2,org.xerial.snappy:snappy-java:1.1.7.1,com.databricks:spark-avro_2.11:4.0.0

更新:

发现 S3 支持仅在 1.4.0 RC1 中可用。所以我构建了 RC1 并针对相同的代码测试了下面的代码。但我似乎仍然遇到了问题。任何帮助表示赞赏。 代码:

import org.apache.spark.sql.CarbonSession._
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.core.constants.CarbonCommonConstants
object sample4 {
def main(args: Array[String]) {
val (accessKey, secretKey, endpoint) = getKeyOnPrefix("s3n://")
//val rootPath = new File(this.getClass.getResource("/").getPath
//                            + "../../../..").getCanonicalPath
val path = "/localpath/sample/data1.csv"
val spark = SparkSession
      .builder()
      .master("local")
      .appName("S3UsingSDKExample")
      .config("spark.driver.host", "localhost")
      .config(accessKey, "<accesskey>")
      .config(secretKey, "<secretkey>")
      //.config(endpoint, "s3-us-east-1.amazonaws.com")
      .getOrCreateCarbonSession()
      spark.sql("Drop table if exists carbon_table")

    spark.sql(
      s"""
         | CREATE TABLE if not exists carbon_table(
         | shortField SHORT,
         | intField INT,
         | bigintField LONG,
         | doubleField DOUBLE,
         | stringField STRING,
         | timestampField TIMESTAMP,
         | decimalField DECIMAL(18,2),
         | dateField DATE,
         | charField CHAR(5),
         | floatField FLOAT
         | )
         | STORED BY 'carbondata'
         | LOCATION 's3n://bucketName/table/carbon_table'
         | TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
       """.stripMargin)

}


def getKeyOnPrefix(path: String): (String, String, String) = {
    val endPoint = "spark.hadoop." + ENDPOINT
    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
    } else {
      throw new Exception("Incorrect Store Path")
    }
  }
  def getSparkMaster(args: Array[String]): String = {
    if (args.length == 6) args(5)
    else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
    else "local"
  }
}

错误:

18/05/17 12:23:22 ERROR SegmentStatusManager: main Failed to read metadata of load
org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.ServiceException: Request Error: Empty key

我还尝试了(也尝试过 s3、s3n、s3a 协议)中的示例代码:

https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala

运行为:

S3Example.main(Array("accesskey","secretKey","s3://bucketName/path/carbon_table","https://bucketName.s3.amazonaws.com","local"))

错误堆栈跟踪:

org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException:请求错误:空键在 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:175) 在 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221) 在 sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) 在 com.sun.proxy.$Proxy21.retrieveINode(未知来源) org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340) 在 org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) 在 org.apache.carbondata.core.datastore.filesystem.AbstractDFSCarbonFile.isFileExist(AbstractDFSCarbonFile.java:426) 在 org.apache.carbondata.core.datastore.impl.FileFactory.isFileExist(FileFactory.java:201) 在 org.apache.carbondata.core.statusmanager.SegmentStatusManager.readTableStatusFile(SegmentStatusManager.java:246) 在 org.apache.carbondata.core.statusmanager.SegmentStatusManager.readLoadMetadata(SegmentStatusManager.java:197) 在 org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(ManageDictionaryAndBTree.java:101) 在 org.apache.spark.sql.hive.CarbonFileMetastore.dropTable(CarbonFileMetastore.scala:460) 在 org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand.processMetadata(CarbonCreateTableCommand.scala:148) 在 org.apache.spark.sql.execution.command.MetadataCommand.run(package.scala:68) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:67) 在 org.apache.spark.sql.Dataset.(Dataset.scala:183) 在 org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:107) 在 org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:96) 在 org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:144) 在 org.apache.spark.sql.CarbonSession.sql(CarbonSession.scala:94) 在 $line19.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$S3Example$.main(:68) at $line26.$read$$iw$$iw $$iw$$iw$$iw$$iw$$iw$$iw.(:31) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:36) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:38) 在 $line26.$read$$iw$$iw$$iw$$iw$$iw.(:40) 在 $line26.$read$$iw$$iw$$iw$$iw.(:42) 在 $line26.$read$$iw$$iw$$iw.(:44) 在 $line26.$read$$iw$$iw.(:46) 在 $line26.$read$$iw.(:48) 在 $line26.$read.(:50) 在 $line26.$read$.(:54) 在 $line26.$read$.() 在 $line26.$eval$.$print$lzycompute(:7) 在 $line26.$eval$.$print(:6) 在 $line26.$eval.$print() 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) 在 scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) 在 scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) 在 scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) 在 scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) 在 scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) 在 scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) 在 scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) 在 scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) 在 scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807) 在 scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681) 在 scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395) 在 scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) 在 scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909) 在 scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97) 在 scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909) 在 org.apache.spark.repl.Main$.doMain(Main.scala:74) 在 org.apache.spark.repl.Main$.main(Main.scala:54) 在 org.apache.spark.repl.Main.main(Main.scala) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) 在 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 在 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 在 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) 在 org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 引起 作者:org.jets3t.service.S3ServiceException:请求错误:空键 在 org.jets3t.service.S3Service.getObject(S3Service.java:1470) 在 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:163)

是我传递错误的任何论点。 我可以使用 aws cli 访问 s3 路径:

aws s3 ls s3://bucketName/path

存在于 S3 中。

【问题讨论】:

    标签: scala apache-spark amazon-s3 carbon-data


    【解决方案1】:

    你可以用这个例子试试https://github.com/apache/carbondata/blob/master/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala

    在创建 carbonSession 之后,您必须先提供 aws 凭据属性才能触发。

    如果您已经创建了 sparkContext 而没有提供 aws 属性。然后,即使您将其提供给 carbonContext,它也不会获取这些属性。

    【讨论】:

    • 看起来在最新版本 1.3.1 中,S3 支持不可用。在上面的示例代码中,它引用了 - CarbonCommonConstants.S3_PREFIX。但是在作为 1.3.1 的一部分的源代码中:carbondata-parent-1.3.1/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java,我看到这个常量甚至没有定义。
    • 是的,它在 1.4.0-RC1 版本中提供了支持
    • 好的,感谢您的澄清!让我试试 RC1,看看它是否适合我!
    • 我尝试过 1.4.0 RC1。你能检查我上面粘贴的代码,让我知道我做错了什么。感谢您的帮助!
    【解决方案2】:

    您好 vikas 查看您的异常空密钥仅意味着您的访问密钥和密钥未在 carbon 会话中绑定,因为当我们提供 s3 实现时,我们编写的逻辑是,如果用户没有提供任何密钥,那么它然后它们的值应为空

    为了让事情变得简单 首先使用此命令构建 carbon 数据 jar

    mvn -Pspark-2.1 清洁包 然后用这个命令执行 spark submit

    ./spark-submit --jars file:///home/anubhav/Downloads/softwares/spark-2.2.1-bin-hadoop2.7/carbonlib/apache-carbondata-1.4.0-SNAPSHOT-bin- spark2.2.1-hadoop2.7.2.jar --class org.apache.carbondata.examples.S3Example /home/anubhav/Documents/carbondata/carbondata/carbondata/examples/spark2/target/carbondata-examples-spark2-1.4.0- SNAPSHOT.jar 本地

    用你的替换我的 jar 路径,看看它应该可以工作,它对我有用

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-01-11
      • 1970-01-01
      • 2017-01-24
      • 2016-05-26
      • 2020-03-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多