Skip to content

Commit 0be792a

Browse files
scwfdavies
authored andcommitted
[SPARK-12222] [CORE] Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception
Jira: https://issues.apache.org/jira/browse/SPARK-12222 Deserialize RoaringBitmap using Kryo serializer throw Buffer underflow exception: ``` com.esotericsoftware.kryo.KryoException: Buffer underflow. at com.esotericsoftware.kryo.io.Input.require(Input.java:156) at com.esotericsoftware.kryo.io.Input.skip(Input.java:131) at com.esotericsoftware.kryo.io.Input.skip(Input.java:264) ``` This is caused by a bug of kryo's `Input.skip(long count)`(EsotericSoftware/kryo#119) and we call this method in `KryoInputDataInputBridge`. Instead of upgrade kryo's version, this pr bypass the kryo's `Input.skip(long count)` by directly call another `skip` method in kryo's Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124), i.e. write the bug-fixed version of `Input.skip(long count)` in KryoInputDataInputBridge's `skipBytes` method. more detail link to apache#9748 (comment) Author: Fei Wang <wangfei1@huawei.com> Closes apache#10213 from scwf/patch-1. (cherry picked from commit 3934562) Signed-off-by: Davies Liu <davies.liu@gmail.com>
1 parent 9e82273 commit 0be792a

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,15 @@ private[serializer] class KryoInputDataInputBridge(input: KryoInput) extends Dat
398398
override def readUTF(): String = input.readString() // readString in kryo does utf8
399399
override def readInt(): Int = input.readInt()
400400
override def readUnsignedShort(): Int = input.readShortUnsigned()
401-
override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
401+
override def skipBytes(n: Int): Int = {
402+
var remaining: Long = n
403+
while (remaining > 0) {
404+
val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
405+
input.skip(skip)
406+
remaining -= skip
407+
}
408+
n
409+
}
402410
override def readFully(b: Array[Byte]): Unit = input.read(b)
403411
override def readFully(b: Array[Byte], off: Int, len: Int): Unit = input.read(b, off, len)
404412
override def readLine(): String = throw new UnsupportedOperationException("readLine")

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,21 @@
1717

1818
package org.apache.spark.serializer
1919

20-
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
20+
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileOutputStream, FileInputStream}
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable
2424
import scala.reflect.ClassTag
2525

2626
import com.esotericsoftware.kryo.Kryo
27+
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
28+
29+
import org.roaringbitmap.RoaringBitmap
2730

2831
import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite}
2932
import org.apache.spark.scheduler.HighlyCompressedMapStatus
3033
import org.apache.spark.serializer.KryoTest._
34+
import org.apache.spark.util.Utils
3135
import org.apache.spark.storage.BlockManagerId
3236

3337
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
@@ -350,6 +354,28 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
350354
assert(thrown.getMessage.contains(kryoBufferMaxProperty))
351355
}
352356

357+
test("SPARK-12222: deserialize RoaringBitmap throw Buffer underflow exception") {
358+
val dir = Utils.createTempDir()
359+
val tmpfile = dir.toString + "/RoaringBitmap"
360+
val outStream = new FileOutputStream(tmpfile)
361+
val output = new KryoOutput(outStream)
362+
val bitmap = new RoaringBitmap
363+
bitmap.add(1)
364+
bitmap.add(3)
365+
bitmap.add(5)
366+
bitmap.serialize(new KryoOutputDataOutputBridge(output))
367+
output.flush()
368+
output.close()
369+
370+
val inStream = new FileInputStream(tmpfile)
371+
val input = new KryoInput(inStream)
372+
val ret = new RoaringBitmap
373+
ret.deserialize(new KryoInputDataInputBridge(input))
374+
input.close()
375+
assert(ret == bitmap)
376+
Utils.deleteRecursively(dir)
377+
}
378+
353379
test("getAutoReset") {
354380
val ser = new KryoSerializer(new SparkConf).newInstance().asInstanceOf[KryoSerializerInstance]
355381
assert(ser.getAutoReset)

0 commit comments

Comments
 (0)