Skip to content

Commit 065d974

Browse files
committed
addition of connection close in HBaseRelation
1 parent a324f6f commit 065d974

File tree

6 files changed

+21
-10
lines changed

6 files changed

+21
-10
lines changed

src/main/scala/org/apache/spark/sql/hbase/HBaseCatalog.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ private[hbase] class HBaseCatalog(@transient hbaseContext: SQLContext,
181181
"""The directory of a certain regionserver is not accessible,
182182
|please add 'cd ~' before 'start regionserver' in your regionserver start script.""")
183183
}
184+
metadataTable.close()
184185
} else {
185186
deploySuccessfully_internal = Some(true)
186187
}

src/main/scala/org/apache/spark/sql/hbase/HBaseRelation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,18 @@ private[hbase] case class HBaseRelation(
160160
logger.debug(s"HBaseRelation config has zkPort="
161161
+ s"${getConf.get("hbase.zookeeper.property.clientPort")}")
162162

163-
@transient lazy val connection_ =
163+
@transient var internal_connection : Connection = _
164+
165+
def connection_ = {
164166
if (connection == null) {
165-
ConnectionFactory.createConnection(getConf)
167+
if (internal_connection == null) {
168+
internal_connection = ConnectionFactory.createConnection(getConf)
169+
}
170+
internal_connection
166171
} else {
167172
connection
168173
}
174+
}
169175

170176
@transient private var htable_ : Table = _
171177

@@ -188,11 +194,15 @@ private[hbase] case class HBaseRelation(
188194
refs.indexWhere(_.exprId == partitionKeys(keyIndex).exprId)
189195
}
190196

191-
def closeHTable() = {
197+
def close() = {
192198
if (htable_ != null) {
193199
htable_.close()
194200
htable_ = null
195201
}
202+
if (connection == null && internal_connection != null) {
203+
internal_connection.close()
204+
internal_connection = null
205+
}
196206
}
197207

198208
// corresponding logical relation
@@ -712,7 +722,7 @@ private[hbase] case class HBaseRelation(
712722
if (puts.nonEmpty) {
713723
htable.put(puts.toList)
714724
}
715-
closeHTable()
725+
close()
716726
}
717727

718728

src/main/scala/org/apache/spark/sql/hbase/HBaseSQLReaderRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ class HBaseSQLReaderRDD(val relation: HBaseRelation,
194194
def close() = {
195195
try {
196196
scanner.close()
197-
relation.closeHTable()
197+
relation.close()
198198
} catch {
199199
case e: Exception => logWarning("Exception in scanner.close", e)
200200
}

src/main/scala/org/apache/spark/sql/hbase/SparkSqlRegionObserver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ class HBaseCoprocessorSQLReaderRDD(var relation: HBaseRelation,
8989
def close() = {
9090
try {
9191
scanner.close()
92-
relation.closeHTable()
92+
relation.close()
9393
} catch {
9494
case e: Exception => logWarning("Exception in scanner.close", e)
9595
}

src/main/scala/org/apache/spark/sql/hbase/execution/hbaseCommands.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ case class BulkLoadIntoTableCommand(
284284
if (recordsWritten > 0) {
285285
load.doBulkLoad(targetPath, relation.connection_.getAdmin, relation.htable,
286286
relation.connection_.getRegionLocator(relation.hTableName))
287-
relation.closeHTable()
287+
relation.close()
288288
}
289289
}
290290
1
@@ -304,7 +304,7 @@ case class BulkLoadIntoTableCommand(
304304
load.doBulkLoad(tablePath, relation.connection_.getAdmin, relation.htable,
305305
relation.connection_.getRegionLocator(relation.hTableName))
306306
}
307-
relation.closeHTable()
307+
relation.close()
308308
logDebug(s"finish BulkLoad on table ${relation.htable.getName}:" +
309309
s" ${System.currentTimeMillis()}")
310310
Seq.empty[Row]

src/test/scala/org/apache/spark/sql/hbase/TestHbase.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ object TestHbase {
2929
def hsc: HBaseSQLContext = {
3030
if (hsc_ == null) {
3131
hsc_ = new HBaseSQLContext(new SparkContext("local", "TestSQLContext", new SparkConf(true)
32-
.set("spark.hadoop.hbase.zookeeper.quorum", "localhost")
33-
.set("spark.hadoop.dfs.replication", "1")))
32+
.set("spark.hadoop.hbase.zookeeper.quorum", "localhost")))
3433
}
3534
hsc_
3635
}
@@ -50,6 +49,7 @@ object TestHbase {
5049

5150
def stop: Unit = {
5251
hsc_.catalog.stopAdmin()
52+
hsc_.catalog.connection.close()
5353
hsc.sparkContext.stop()
5454
hsc_ = null
5555
testUtil.cleanupDataTestDirOnTestFS()

0 commit comments

Comments
 (0)