Skip to content

Commit f1122dd

Browse files
nongliyhuai
authored andcommitted
[SPARK-11328][SQL] Improve error message when hitting this issue
The issue is that the output commiter is not idempotent and retry attempts will fail because the output file already exists. It is not safe to clean up the file as this output committer is by design not retryable. Currently, the job fails with a confusing file exists error. This patch is a stop gap to tell the user to look at the top of the error log for the proper message. This is difficult to test locally as Spark is hardcoded not to retry. Manually verified by upping the retry attempts. Author: Nong Li <nong@databricks.com> Author: Nong Li <nongli@gmail.com> Closes apache#10080 from nongli/spark-11328. (cherry picked from commit 47a0abc) Signed-off-by: Yin Huai <yhuai@databricks.com>
1 parent d77bf0b commit f1122dd

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,24 @@ private[sql] abstract class BaseWriterContainer(
124124
}
125125
}
126126

127+
protected def newOutputWriter(path: String): OutputWriter = {
128+
try {
129+
outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
130+
} catch {
131+
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
132+
if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
133+
// Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
134+
// attempts, the task will fail because the output file is created from a prior attempt.
135+
// This often means the most visible error to the user is misleading. Augment the error
136+
// to tell the user to look for the actual error.
137+
throw new SparkException("The output file already exists but this could be due to a " +
138+
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
139+
"the first error.\n File exists error: " + e)
140+
}
141+
throw e
142+
}
143+
}
144+
127145
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
128146
val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
129147

@@ -234,7 +252,7 @@ private[sql] class DefaultWriterContainer(
234252
executorSideSetup(taskContext)
235253
val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
236254
configuration.set("spark.sql.sources.output.path", outputPath)
237-
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
255+
val writer = newOutputWriter(getWorkPath)
238256
writer.initConverter(dataSchema)
239257

240258
var writerClosed = false
@@ -403,7 +421,7 @@ private[sql] class DynamicPartitionWriterContainer(
403421
val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
404422
configuration.set(
405423
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
406-
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
424+
val newWriter = super.newOutputWriter(path.toString)
407425
newWriter.initConverter(dataSchema)
408426
newWriter
409427
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetO
4141
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
4242
* left empty).
4343
*/
44-
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
44+
private[datasources] class DirectParquetOutputCommitter(
45+
outputPath: Path, context: TaskAttemptContext)
4546
extends ParquetOutputCommitter(outputPath, context) {
4647
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
4748

0 commit comments

Comments
 (0)