【发布时间】:2011-10-16 03:41:27
【问题描述】:
我发现 Scala 的 collection.mutable.PriorityQueue 出现了一些奇怪的行为。我正在执行外部排序并使用 1M 记录对其进行测试。每次我运行测试并验证 10-20 条记录之间的结果没有正确排序。我用java.util.PriorityQueue 替换了scala PriorityQueue 实现,它100% 的时间都有效。有什么想法吗?
这是代码(抱歉有点长……)。我使用来自http://sortbenchmark.org/ 的工具gensort -a 1000000 和valsort 对其进行测试
def externalSort(inFileName: String, outFileName: String)
(implicit ord: Ordering[String]): Int = {
val MaxTempFiles = 1024
val TempBufferSize = 4096
val inFile = new java.io.File(inFileName)
/** Partitions input file and sorts each partition */
def partitionAndSort()(implicit ord: Ordering[String]):
List[java.io.File] = {
/** Gets block size to use */
def getBlockSize: Long = {
var blockSize = inFile.length / MaxTempFiles
val freeMem = Runtime.getRuntime().freeMemory()
if (blockSize < freeMem / 2)
blockSize = freeMem / 2
else if (blockSize >= freeMem)
System.err.println("Not enough free memory to use external sort.")
blockSize
}
/** Sorts and writes data to temp files */
def writeSorted(buf: List[String]): java.io.File = {
// Create new temp buffer
val tmp = java.io.File.createTempFile("external", "sort")
tmp.deleteOnExit()
// Sort buffer and write it out to tmp file
val out = new java.io.PrintWriter(tmp)
try {
for (l <- buf.sorted) {
out.println(l)
}
} finally {
out.close()
}
tmp
}
val blockSize = getBlockSize
var tmpFiles = List[java.io.File]()
var buf = List[String]()
var currentSize = 0
// Read input and divide into blocks
for (line <- io.Source.fromFile(inFile).getLines()) {
if (currentSize > blockSize) {
tmpFiles ::= writeSorted(buf)
buf = List[String]()
currentSize = 0
}
buf ::= line
currentSize += line.length() * 2 // 2 bytes per char
}
if (currentSize > 0) tmpFiles ::= writeSorted(buf)
tmpFiles
}
/** Merges results of sorted partitions into one output file */
def mergeSortedFiles(fs: List[java.io.File])
(implicit ord: Ordering[String]): Int = {
/** Temp file buffer for reading lines */
class TempFileBuffer(val file: java.io.File) {
private val in = new java.io.BufferedReader(
new java.io.FileReader(file), TempBufferSize)
private var curLine: String = ""
readNextLine() // prep first value
def currentLine = curLine
def isEmpty = curLine == null
def readNextLine() {
if (curLine == null) return
try {
curLine = in.readLine()
} catch {
case _: java.io.EOFException => curLine = null
}
if (curLine == null) in.close()
}
override protected def finalize() {
try {
in.close()
} finally {
super.finalize()
}
}
}
val wrappedOrd = new Ordering[TempFileBuffer] {
def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = {
ord.compare(o1.currentLine, o2.currentLine)
}
}
val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
)(wrappedOrd)
// Init queue with item from each file
for (tmp <- fs) {
val buf = new TempFileBuffer(tmp)
if (!buf.isEmpty) pq += buf
}
var count = 0
val out = new java.io.PrintWriter(new java.io.File(outFileName))
try {
// Read each value off of queue
while (pq.size > 0) {
val buf = pq.dequeue()
out.println(buf.currentLine)
count += 1
buf.readNextLine()
if (buf.isEmpty) {
buf.file.delete() // don't need anymore
} else {
// re-add to priority queue so we can process next line
pq += buf
}
}
} finally {
out.close()
}
count
}
mergeSortedFiles(partitionAndSort())
}
【问题讨论】:
-
Scala 版本是 2.9.0.1
-
我一直在编写代码来回答一个关于 codility 的测试,该测试适用于 Java 的 Priority Queue 但不适用于 Scala 的大型数据集。他们声称正在使用 scala 2.12 。我不确定这些链接将保持多长时间... Java 版本 app.codility.com/demo/results/demoXCWYCR-6PZ Scala 版本 app.codility.com/demo/results/demoKABUXG-7FV
-
我在代码中犯了一个错误。我以为我可以按顺序从 PriorityQueue 中获取元素,但实际上只能通过使用 dequeue 函数按排序顺序获取元素。有效的代码是这个:app.codility.com/demo/results/demoMU2HYZ-S3H
标签: scala priority-queue