【问题标题】:Newly created spark executor running in kubernetes doesn't know ignite configuration新创建的运行在 kubernetes 中的 spark executor 不知道 ignite 配置
【发布时间】:2020-02-05 20:04:48
【问题描述】:

我有一个在 kubernetes 上运行的 spark 驱动程序和执行程序,并且执行程序与 apache ignite 实例进行对话。但是如果executor-1 死了,executor-2 将由驱动程序创建。

现在新创建的executor-2 正在抱怨执行者 2):

class org.apache.ignite.IgniteIllegalStateException: Ignite instance with provided name doesn't exist. Did you call Ignition.start(..) to start an Ignite instance? [name=shared-grid]

at org.apache.ignite.internal.IgnitionEx.grid(IgnitionEx.java:1390)
at org.apache.ignite.Ignition.ignite(Ignition.java:531)
at org.apache.ignite.spark.impl.package$.ignite(package.scala:86)
at org.apache.ignite.spark.impl.IgniteRelationProvider$$anonfun$configProvider$1$2.apply(IgniteRelationProvider.scala:226)
at org.apache.ignite.spark.impl.IgniteRelationProvider$$anonfun$configProvider$1$2.apply(IgniteRelationProvider.scala:223)
at org.apache.ignite.spark.Once.apply(IgniteContext.scala:224)
at org.apache.ignite.spark.IgniteContext.ignite(IgniteContext.scala:145)
at org.apache.ignite.spark.impl.IgniteSQLDataFrameRDD.compute(IgniteSQLDataFrameRDD.scala:65)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我们需要做些什么来将 ignite 实例详细信息传递给新创建的 executor 实例

【问题讨论】:

  • shared-grid 是什么?它应该在哪个 JVM 中运行?

标签: apache-spark ignite


【解决方案1】:

您是否使用过 IgniteSparkSession?我找不到票,但它看起来像是一个已知问题,有时 IgniteSparkSession 无法启动物理分布式集群上的内部客户端。下一个代码:

IgniteSparkSession igniteSession = IgniteSparkSession.builder()
               .appName("Spark Ignite catalog example")
               .igniteConfig(configPath)
               .getOrCreate();

会产生以下异常:

class org.apache.ignite.IgniteIllegalStateException: Ignite instance with provided name doesn't exist. Did you call Ignition.start(..) to start an Ignite instance? [name=grid]

作为解决方法,您可以在启动作业之前尝试使用每个 spark 节点的提供配置启动客户端节点,但我不确定它是否会正常工作。

我建议在当前问题无法解决之前避免使用 IgniteSparkSession

请使用 DataFrame API 语法:

String configPath = "client.xml";

SparkConf sparkConf = new SparkConf()
 .setAppName("Example");

SparkSession session = SparkSession.builder()
 .config(sparkConf)
 .getOrCreate();

Dataset < Row > csvDataset = session.read()
 .format("csv")
 .option("sep", ",")
 .option("header", true)
 .load("person.csv");

Dataset < Row > resultDF = csvDataset
 .select("id", "name", "city_id", "company")
 .sort("id")
 .limit(10000);

for (int i = 0; i < 10; i++) {
 DataFrameWriter < Row > df = resultDF
  .write()
  .format(IgniteDataFrameSettings.FORMAT_IGNITE())
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE(), configPath)
  .option(IgniteDataFrameSettings.OPTION_TABLE(), "Person")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS(), "id, city_id")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS(), "template=partitioned,backups=1")
  .mode(Append);

 df.save();
}

session.close();

此代码运行良好。我将检查 JIRA 问题。可能我会创建一个新的。

更新:这是新票https://issues.apache.org/jira/browse/IGNITE-12637

【讨论】:

  • 是的。我们使用 IgniteSparkSession
  • 知道这个问题是否在最新的 ignite 版本中得到修复
  • 否,但您可以尝试至少在 Ignite developer 上启动一个线程。可能有人可以在那里提供帮助。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-12-14
  • 1970-01-01
  • 1970-01-01
  • 2016-12-27
  • 2021-08-18
  • 1970-01-01
  • 2018-02-03
相关资源
最近更新 更多