【发布时间】:2018-01-16 18:08:05
【问题描述】:
我在 Spark 2.2.0 中有一个不可序列化的类异常。 以下过程是我在 Scala 中尝试执行的操作:
- 从 HDFS 读取一组 JPEG 图像。
- 构建
java.awt.image.BufferedImageS的数组。 - 要提取
java.awt.image.BufferedImage缓冲区并将其存储在每个图像的二维数组中,方法是构建一个包含图像缓冲区信息Array[Array[Int]]的二维数组数组。 - 使用
sc.parallelize方法将Array[Array[Int]]转换为org.apache.spark.rdd.RDD[Array[Array[Int]]]。 - 通过转换初始
org.apache.spark.rdd.RDD[Array[Array[Int]]]来分布式执行图像处理操作。
这是代码:
import org.apache.spark.sql.SparkSession
import javax.imageio.ImageIO
import java.io.ByteArrayInputStream
def binarize(image: Array[Array[Int]], threshold: Int) : Array[Array[Int]] = {
val height = image.size
val width = image(0).size
val result = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
result(i)(j) = if (image(i)(j) <= threshold) 0 else 255
}
}
result
}
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val files = sc.binaryFiles(inPathStr).collect
val AWTImageArray = files.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
ImageIO.read(new ByteArrayInputStream(buffer))
}
val ImgBuffers = AWTImageArray.map { image =>
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}
这个算法有一个众所周知的异常:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: java.awt.image.BufferedImage
Serialization stack:
- object not serializable (class: java.awt.image.BufferedImage, ...
我不明白为什么 Spark 在应用程序中创建第一个 RDD 之前使用它时会尝试序列化 BufferedImage 类。如果我尝试创建RDD[BufferedImage],不是应该序列化BufferedImage 类吗?
谁能解释一下这是怎么回事?
提前谢谢你...
【问题讨论】:
-
你是对的。对不起。我更新了我的问题。
-
请参阅下面的更新答案。
标签: apache-spark serialization rdd bufferedimage