【问题标题】:Change metastore URI in Spark在 Spark 中更改元存储 URI
【发布时间】:2018-12-17 19:58:08
【问题描述】:

在我的工作中有不同的环境(开发、预生产和生产),在每个环境中,我们在其 Hive 元存储中都有某些表。 我的用户有权通过直线访问和查询所有这些元存储,但我想使用 sqlContext(或 HiveContext)在 spark-shell 会话中访问这些元存储。

例如,当我使用 ssh 访问 Preproduction 环境并启动 spark-shell 会话时,它会自动创建一个 sqlContext 变量,我可以使用该变量对 Preproduction Metastore 执行查询。

我还可以使用直线从 Preproduction Metastore 对 Production Metastore 执行查询,因此我尝试更改 Hive 中的一些配置 (How to connect to a Hive metastore programmatically in SparkSQL?)。我更改了以下属性:

hive.metastore.urishive.server2.authentication.kerberos.principal 到生产环境中的对应属性。

我在 spark-shell 中的代码:

   System.setProperty("hive.server2.authentication.kerberos.principal","hive/URL@URL2")
    System.setProperty("hive.metastore.uris","thrift://URLTOPRODUCTION:9083")
    import org.apache.spark.sql.hive.HiveContext
    val hive=new HiveContext(sc)
    val df=hive.sql("SELECT * FROM DB.Table limit 10")

但是当我执行上一个代码块的最后一句时,我得到了以下错误。

java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1

    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:406)

    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)

    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:762)

    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:693)

    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)

    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)

    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)

    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)

    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)

    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)

    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$1.apply(interfaces.scala:449)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$1.apply(interfaces.scala:447)

    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)

    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)

    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489)

    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487)

    at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:202)

    at scala.Option.getOrElse(Option.scala:120)

    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)

    at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)

    at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)

    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$12.apply(HiveMetastoreCatalog.scala:504)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$$anonfun$12.apply(HiveMetastoreCatalog.scala:503)

    at scala.Option.getOrElse(Option.scala:120)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog$$convertToParquetRelation(HiveMetastoreCatalog.scala:503)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:565)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:545)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:335)

    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

    at scala.collection.AbstractIterator.to(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:332)

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:281)

    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

    at scala.collection.Iterator$class.foreach(Iterator.scala:727)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)

    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)

    at scala.collection.AbstractIterator.to(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)

    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)

    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321)

    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:545)

    at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:539)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)

    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)

    at scala.collection.immutable.List.foldLeft(List.scala:84)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)

    at scala.collection.immutable.List.foreach(List.scala:318)

    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)

    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37)

    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37)

    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35)

    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)

    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)

    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:829)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)

    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)

    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)

    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:41)

    at $iwC$$iwC$$iwC.<init>(<console>:43)

    at $iwC$$iwC.<init>(<console>:45)

    at $iwC.<init>(<console>:47)

    at <init>(<console>:49)

    at .<init>(<console>:53)

    at .<clinit>(<console>)

    at .<init>(<console>:7)

    at .<clinit>(<console>)

    at $print(<console>)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)

    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)

    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)

    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)

    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)

    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)

    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)

    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)

    at org.apache.spark.repl.Main$.main(Main.scala:35)

    at org.apache.spark.repl.Main.main(Main.scala)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:498)

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

   Caused by: java.net.UnknownHostException: nameservice1

    ... 141 more

我正在使用带有 Spark 1.6.0 和 Scala 2.10.5 的 Cloudera 发行版。

有解决办法吗?提前致谢

【问题讨论】:

  • 顺便说一句,以hive.server2. 开头的属性对 Metastore 没有任何影响。 IE。这不是你需要的。
  • 您是否在以core-site.xmlhdfs-site.xml 开头的CLASSPATH 目录(或匹配$HADOOP_CONF_DIR 的目录)中添加了相关的Hadoop conf 文件?
  • 是的,我的 spark-shell 会话正在使用这些文件,因为当我检查我创建的 HiveContext 的配置时,它具有这些文件中定义的所有属性。就像我说的,这在预生产环境中运行良好,但是当我尝试链接到生产 Metastore 时出现此错误。

标签: scala apache-spark hive


【解决方案1】:

最后,在我查看了 spark-shell 在服务器中自动创建的 sqlContext 变量的配置之后,我发现有很多 url 和配置变量,并且我在 HDFS 或其他中没有权限我需要在 PROD 元存储上执行查询的服务器。

我知道使用 beeline 查询 PROD 元存储是有效的,我知道我可以通过 JDBC 查询这个元存储,所以我将调用的 JDBC URL 获取到 beeline。

然后我使用这个 JDBC URL 并开始使用本机 Java(来自 Scala)方法和实用程序通过 JDBC 连接数据库:

/*We will need hive-jdbc-0.14.0.jar to connect to a Hive metastore via JDBC */
import java.sql.ResultSetMetaData
import java.sql.{DriverManager, Connection, Statement, ResultSet}
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
/* In the following lines I connect to Prod Metastore via JDBC and I execute the query as if I am connecting to a simple DB. Notice that, using this method, you are not using distributed computing */
val url="jdbc:hive2://URL_TO_PROD_METASTORE/BD;CREDENTIALS OR URL TO KERBEROS"
val query="SELECT * FROM BD.TABLE LIMIT 100"
val driver="org.apache.hive.jdbc.HiveDriver"
Class.forName(driver).newInstance
val conn: Connection = DriverManager.getConnection(url)
val r: ResultSet = conn.createStatement.executeQuery(query)
val list =scala.collection.mutable.MutableList[Row]()
/* Now we want to get all the values from all the columns. Notice that I creat a ROW object for each row of the results. Then I add each Row to a MutableList*/
while(r.next()){
  var value : Array[String] = new Array[String](r.getMetaData.getColumnCount())
  for(i<-1 to r.getMetaData.getColumnCount()){
  value(i-1) = r.getString(i)}
  list+=Row.fromSeq(value)}

/* Now we have the results of the query to PROD metastore and we want to transform this data to a Dataframe so we have to create a StructType with the names of the columns and we also need a list of rows with previous results */
var array : Array[StructField] = new Array[StructField] (r.getMetaData.getColumnCount())
for(i<- 1 to r.getMetaData.getColumnCount){
 array(i-1) =StructField(r.getMetaData.getColumnName(i),StringType)}
val struct=StructType(array)
val rdd=sc.parallelize(list)
val df=sqlContext.createDataFrame(rdd,struct)
r.close
conn.close

请注意,这个问题与我的其他答案之一有关。因为将 Hive 查询结果导出到 CSV 的最佳方式是使用 Spark (How to export a Hive table into a CSV file?)。为此,我想从 PRE 服务器中的 Spark 会话中查询 Prod 元存储。

【讨论】:

    猜你喜欢
    • 2011-01-26
    • 2014-07-27
    • 2023-03-17
    • 1970-01-01
    • 2014-06-16
    • 1970-01-01
    相关资源
    最近更新 更多