Skip to content

Commit 9870e5c

Browse files
JoshRosenAndrew Or
authored andcommitted
[SPARK-12251] Document and improve off-heap memory configurations
This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs. - Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6). - Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix. - Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (apache#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion. - Document these configurations on the configuration page. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#10237 from JoshRosen/SPARK-12251. (cherry picked from commit 23a9e62) Signed-off-by: Andrew Or <andrew@databricks.com>
1 parent d0307de commit 9870e5c

File tree

15 files changed

+65
-22
lines changed

15 files changed

+65
-22
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
597597
"spark.streaming.fileStream.minRememberDuration" -> Seq(
598598
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
599599
"spark.yarn.max.executor.failures" -> Seq(
600-
AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
600+
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
601+
"spark.memory.offHeap.enabled" -> Seq(
602+
AlternateConfig("spark.unsafe.offHeap", "1.6"))
601603
)
602604

603605
/**

core/src/main/scala/org/apache/spark/memory/MemoryManager.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager(
5050

5151
storageMemoryPool.incrementPoolSize(storageMemory)
5252
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
53-
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0))
53+
offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))
5454

5555
/**
5656
* Total available memory for storage, in bytes. This amount can vary over time, depending on
@@ -182,7 +182,13 @@ private[spark] abstract class MemoryManager(
182182
* sun.misc.Unsafe.
183183
*/
184184
final val tungstenMemoryMode: MemoryMode = {
185-
if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP
185+
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
186+
require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
187+
"spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
188+
MemoryMode.OFF_HEAP
189+
} else {
190+
MemoryMode.ON_HEAP
191+
}
186192
}
187193

188194
/**

core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite {
2929
public void leakedPageMemoryIsDetected() {
3030
final TaskMemoryManager manager = new TaskMemoryManager(
3131
new StaticMemoryManager(
32-
new SparkConf().set("spark.unsafe.offHeap", "false"),
32+
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
3333
Long.MAX_VALUE,
3434
Long.MAX_VALUE,
3535
1),
@@ -41,8 +41,10 @@ public void leakedPageMemoryIsDetected() {
4141

4242
@Test
4343
public void encodePageNumberAndOffsetOffHeap() {
44-
final TaskMemoryManager manager = new TaskMemoryManager(
45-
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "true")), 0);
44+
final SparkConf conf = new SparkConf()
45+
.set("spark.memory.offHeap.enabled", "true")
46+
.set("spark.memory.offHeap.size", "1000");
47+
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
4648
final MemoryBlock dataPage = manager.allocatePage(256, null);
4749
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
4850
// encode. This test exercises that corner-case:
@@ -55,7 +57,7 @@ public void encodePageNumberAndOffsetOffHeap() {
5557
@Test
5658
public void encodePageNumberAndOffsetOnHeap() {
5759
final TaskMemoryManager manager = new TaskMemoryManager(
58-
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
60+
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
5961
final MemoryBlock dataPage = manager.allocatePage(256, null);
6062
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
6163
Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
@@ -104,4 +106,15 @@ public void cooperativeSpilling() {
104106
assert(manager.cleanUpAllAllocatedMemory() == 0);
105107
}
106108

109+
@Test
110+
public void offHeapConfigurationBackwardsCompatibility() {
111+
// Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
112+
// was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
113+
final SparkConf conf = new SparkConf()
114+
.set("spark.unsafe.offHeap", "true")
115+
.set("spark.memory.offHeap.size", "1000");
116+
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
117+
assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP);
118+
}
119+
107120
}

core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class PackedRecordPointerSuite {
3535

3636
@Test
3737
public void heap() throws IOException {
38-
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
38+
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
3939
final TaskMemoryManager memoryManager =
4040
new TaskMemoryManager(new TestMemoryManager(conf), 0);
4141
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
@@ -54,7 +54,9 @@ public void heap() throws IOException {
5454

5555
@Test
5656
public void offHeap() throws IOException {
57-
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "true");
57+
final SparkConf conf = new SparkConf()
58+
.set("spark.memory.offHeap.enabled", "true")
59+
.set("spark.memory.offHeap.size", "10000");
5860
final TaskMemoryManager memoryManager =
5961
new TaskMemoryManager(new TestMemoryManager(conf), 0);
6062
final MemoryBlock page0 = memoryManager.allocatePage(128, null);

core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
public class ShuffleInMemorySorterSuite {
3535

3636
final TestMemoryManager memoryManager =
37-
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
37+
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
3838
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
3939
final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager);
4040

@@ -64,7 +64,7 @@ public void testBasicSorting() throws Exception {
6464
"Lychee",
6565
"Mango"
6666
};
67-
final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
67+
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
6868
final TaskMemoryManager memoryManager =
6969
new TaskMemoryManager(new TestMemoryManager(conf), 0);
7070
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);

core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void setUp() throws IOException {
108108
spillFilesCreated.clear();
109109
conf = new SparkConf()
110110
.set("spark.buffer.pageSize", "1m")
111-
.set("spark.unsafe.offHeap", "false");
111+
.set("spark.memory.offHeap.enabled", "false");
112112
taskMetrics = new TaskMetrics();
113113
memoryManager = new TestMemoryManager(conf);
114114
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);

core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public void setup() {
8484
memoryManager =
8585
new TestMemoryManager(
8686
new SparkConf()
87-
.set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
88-
.set("spark.memory.offHeapSize", "256mb"));
87+
.set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator())
88+
.set("spark.memory.offHeap.size", "256mb"));
8989
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
9090

9191
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class UnsafeExternalSorterSuite {
5858

5959
final LinkedList<File> spillFilesCreated = new LinkedList<File>();
6060
final TestMemoryManager memoryManager =
61-
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
61+
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
6262
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
6363
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
6464
final PrefixComparator prefixComparator = new PrefixComparator() {

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private static String getStringFromDataPage(Object baseObject, long baseOffset,
4646
@Test
4747
public void testSortingEmptyInput() {
4848
final TaskMemoryManager memoryManager = new TaskMemoryManager(
49-
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
49+
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
5050
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
5151
final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer,
5252
memoryManager,
@@ -71,7 +71,7 @@ public void testSortingOnlyByIntegerPrefix() throws Exception {
7171
"Mango"
7272
};
7373
final TaskMemoryManager memoryManager = new TaskMemoryManager(
74-
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
74+
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
7575
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
7676
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
7777
final Object baseObject = dataPage.getBaseObject();

core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
4747
conf.clone
4848
.set("spark.memory.fraction", "1")
4949
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
50-
.set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
50+
.set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
5151
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
5252
maxStorageMemory = 0,
5353
numCores = 1)

0 commit comments

Comments
 (0)