【问题标题】:Using spark-kernel comm API使用 spark-kernel 通信 API
【发布时间】:2016-06-16 10:36:45
【问题描述】:

我最近开始使用 spark-kernel。

如教程和示例代码中所述,我能够设置客户端并使用它在 spark-kernel 上执行代码 sn-ps 并检索此 example code 中给出的结果。

现在,我需要使用 spark-kernel 提供的 comm API。我试过这个tutorial,但我无法让它工作。事实上,我不知道如何让它发挥作用。

我尝试了以下代码,但是当我运行此代码时,我在内核上收到此错误“Received invalid target for Comm Open: my_target”。

package examples
import scala.runtime.ScalaRunTime._
import scala.collection.mutable.ListBuffer
import com.ibm.spark.kernel.protocol.v5.MIMEType
import com.ibm.spark.kernel.protocol.v5.client.boot.ClientBootstrap
import com.ibm.spark.kernel.protocol.v5.client.boot.layers.{StandardHandlerInitialization, StandardSystemInitialization}
import com.ibm.spark.kernel.protocol.v5.content._
import com.typesafe.config.{Config, ConfigFactory}
import Array._

object commclient extends App{
val profileJSON: String = """
{
      "stdin_port" : 48691,
      "control_port" : 44808,
      "hb_port" : 49691,
      "shell_port" : 40544,
      "iopub_port" : 43462,
      "ip" : "127.0.0.1",
      "transport" : "tcp",
      "signature_scheme" : "hmac-sha256",
      "key" : ""
}
""".stripMargin

val config: Config = ConfigFactory.parseString(profileJSON)
val client = (new ClientBootstrap(config)
    with StandardSystemInitialization
    with StandardHandlerInitialization).createClient()

def printResult(result: ExecuteResult) = {
    println(s"${result.data.get(MIMEType.PlainText).get}")
}
def printStreamContent(content:StreamContent) = {
    println(s"${content.text}")
}
def printError(reply:ExecuteReplyError) = {
    println(s"Error was: ${reply.ename.get}")
}

client.comm.register("my_target").addMsgHandler {
(commWriter, commId, data) =>
    println(data)
    commWriter.close()
}

// Initiate the Comm connection
client.comm.open("my_target")

}

谁能告诉我如何运行这段代码:

// Register the callback to respond to being opened from the client
kernel.comm.register("my target").addOpenHandler { 
    (commWriter, commId, targetName, data) =>
        commWriter.writeMsg(Map("response" -> "Hello World!"))
}

如果有人能指点我完成有关使用 comm API 的工作示例,我将不胜感激。

任何帮助将不胜感激。谢谢

【问题讨论】:

    标签: api apache-spark kernel comm


    【解决方案1】:

    您可以使用您的客户端在一个程序中运行此服务器(内核)端注册一次。然后您的其他程序可以使用此通道与内核通信。 这是我在上面提到的第一个程序中运行注册的一种方式:

    client.execute(
    """
    // Register the callback to respond to being opened from the client
    kernel.comm.register("my target").
        addOpenHandler {
            (commWriter, commId, targetName, data) =>
                commWriter.writeMsg(org.apache.toree.kernel.protocol.v5.MsgData("response" -> "Toree Hello World!"))
        }.
        addMsgHandler {
            (commWriter, _, data) =>
                if (!data.toString.contains("closing")) {
                    commWriter.writeMsg(data)
                } else {
                    commWriter.writeMsg(org.apache.toree.kernel.protocol.v5.MsgData("closing" -> "done"))
                }
        }
    """.stripMargin
    )
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-17
      • 1970-01-01
      • 2019-12-21
      • 1970-01-01
      • 2011-09-03
      相关资源
      最近更新 更多