Skip to content

Commit c7fea66

Browse files
zhouxinyuvongosling
authored andcommitted
[ROCKETMQ-335] Reload server certificate, private key and root ca when these are changed (apache#207)
1 parent 69043c0 commit c7fea66

File tree

7 files changed

+391
-1
lines changed

7 files changed

+391
-1
lines changed

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,14 @@
7373
import org.apache.rocketmq.common.stats.MomentStatsItem;
7474
import org.apache.rocketmq.remoting.RPCHook;
7575
import org.apache.rocketmq.remoting.RemotingServer;
76+
import org.apache.rocketmq.remoting.common.TlsMode;
7677
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
7778
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
7879
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
7980
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
8081
import org.apache.rocketmq.remoting.netty.RequestTask;
82+
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
83+
import org.apache.rocketmq.srvutil.FileWatchService;
8184
import org.apache.rocketmq.store.DefaultMessageStore;
8285
import org.apache.rocketmq.store.MessageArrivingListener;
8386
import org.apache.rocketmq.store.MessageStore;
@@ -136,6 +139,7 @@ public class BrokerController {
136139
private InetSocketAddress storeHost;
137140
private BrokerFastFailure brokerFastFailure;
138141
private Configuration configuration;
142+
private FileWatchService fileWatchService;
139143

140144
public BrokerController(
141145
final BrokerConfig brokerConfig,
@@ -387,6 +391,45 @@ public void run() {
387391
}
388392
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
389393
}
394+
395+
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
396+
// Register a listener to reload SslContext
397+
try {
398+
fileWatchService = new FileWatchService(
399+
new String[] {
400+
TlsSystemConfig.tlsServerCertPath,
401+
TlsSystemConfig.tlsServerKeyPath,
402+
TlsSystemConfig.tlsServerTrustCertPath
403+
},
404+
new FileWatchService.Listener() {
405+
boolean certChanged, keyChanged = false;
406+
@Override
407+
public void onChanged(String path) {
408+
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
409+
log.info("The trust certificate changed, reload the ssl context");
410+
reloadServerSslContext();
411+
}
412+
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
413+
certChanged = true;
414+
}
415+
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
416+
keyChanged = true;
417+
}
418+
if (certChanged && keyChanged) {
419+
log.info("The certificate and private key changed, reload the ssl context");
420+
certChanged = keyChanged = false;
421+
reloadServerSslContext();
422+
}
423+
}
424+
private void reloadServerSslContext() {
425+
((NettyRemotingServer) remotingServer).loadSslContext();
426+
((NettyRemotingServer) fastRemotingServer).loadSslContext();
427+
}
428+
});
429+
} catch (Exception e) {
430+
log.warn("FileWatchService created error, can't load the certificate dynamically");
431+
}
432+
}
390433
}
391434

392435
return result;
@@ -594,6 +637,10 @@ public void shutdown() {
594637
this.fastRemotingServer.shutdown();
595638
}
596639

640+
if (this.fileWatchService != null) {
641+
this.fileWatchService.shutdown();
642+
}
643+
597644
if (this.messageStore != null) {
598645
this.messageStore.shutdown();
599646
}
@@ -662,6 +709,10 @@ public void start() throws Exception {
662709
this.fastRemotingServer.start();
663710
}
664711

712+
if (this.fileWatchService != null) {
713+
this.fileWatchService.start();
714+
}
715+
665716
if (this.brokerOuterAPI != null) {
666717
this.brokerOuterAPI.start();
667718
}

namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvController.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
import org.apache.rocketmq.namesrv.routeinfo.BrokerHousekeepingService;
3131
import org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager;
3232
import org.apache.rocketmq.remoting.RemotingServer;
33+
import org.apache.rocketmq.remoting.common.TlsMode;
3334
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
3435
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
36+
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
37+
import org.apache.rocketmq.srvutil.FileWatchService;
3538
import org.slf4j.Logger;
3639
import org.slf4j.LoggerFactory;
3740

@@ -54,6 +57,7 @@ public class NamesrvController {
5457
private ExecutorService remotingExecutor;
5558

5659
private Configuration configuration;
60+
private FileWatchService fileWatchService;
5761

5862
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
5963
this.namesrvConfig = namesrvConfig;
@@ -95,6 +99,44 @@ public void run() {
9599
}
96100
}, 1, 10, TimeUnit.MINUTES);
97101

102+
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
103+
// Register a listener to reload SslContext
104+
try {
105+
fileWatchService = new FileWatchService(
106+
new String[] {
107+
TlsSystemConfig.tlsServerCertPath,
108+
TlsSystemConfig.tlsServerKeyPath,
109+
TlsSystemConfig.tlsServerTrustCertPath
110+
},
111+
new FileWatchService.Listener() {
112+
boolean certChanged, keyChanged = false;
113+
@Override
114+
public void onChanged(String path) {
115+
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
116+
log.info("The trust certificate changed, reload the ssl context");
117+
reloadServerSslContext();
118+
}
119+
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
120+
certChanged = true;
121+
}
122+
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
123+
keyChanged = true;
124+
}
125+
if (certChanged && keyChanged) {
126+
log.info("The certificate and private key changed, reload the ssl context");
127+
certChanged = keyChanged = false;
128+
reloadServerSslContext();
129+
}
130+
}
131+
private void reloadServerSslContext() {
132+
((NettyRemotingServer) remotingServer).loadSslContext();
133+
}
134+
});
135+
} catch (Exception e) {
136+
log.warn("FileWatchService created error, can't load the certificate dynamically");
137+
}
138+
}
139+
98140
return true;
99141
}
100142

@@ -111,12 +153,20 @@ private void registerProcessor() {
111153

112154
public void start() throws Exception {
113155
this.remotingServer.start();
156+
157+
if (this.fileWatchService != null) {
158+
this.fileWatchService.start();
159+
}
114160
}
115161

116162
public void shutdown() {
117163
this.remotingServer.shutdown();
118164
this.remotingExecutor.shutdown();
119165
this.scheduledExecutorService.shutdown();
166+
167+
if (this.fileWatchService != null) {
168+
this.fileWatchService.shutdown();
169+
}
120170
}
121171

122172
public NamesrvConfig getNamesrvConfig() {

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public abstract class NettyRemotingAbstract {
9393
/**
9494
* SSL context via which to create {@link SslHandler}.
9595
*/
96-
protected SslContext sslContext;
96+
protected volatile SslContext sslContext;
9797

9898
/**
9999
* Constructor, specifying capacity of one-way and asynchronous semaphores.

remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ public Thread newThread(Runnable r) {
139139
});
140140
}
141141

142+
loadSslContext();
143+
}
144+
145+
public void loadSslContext() {
142146
TlsMode tlsMode = TlsSystemConfig.tlsMode;
143147
log.info("Server is running in TLS {} mode", tlsMode.getName());
144148

remoting/src/test/java/org/apache/rocketmq/remoting/TlsTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.rocketmq.remoting.common.TlsMode;
2626
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
2727
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
28+
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
2829
import org.apache.rocketmq.remoting.netty.TlsHelper;
2930
import org.apache.rocketmq.remoting.protocol.LanguageCode;
3031
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -134,6 +135,9 @@ else if ("noClientAuthFailure".equals(name.getMethodName())) {
134135
clientConfig.setUseTLS(false);
135136
} else if ("serverRejectsSSLClient".equals(name.getMethodName())) {
136137
tlsMode = TlsMode.DISABLED;
138+
} else if ("reloadSslContextForServer".equals(name.getMethodName())) {
139+
tlsClientAuthServer = false;
140+
tlsServerNeedClientAuth = "none";
137141
}
138142

139143
remotingServer = RemotingServerTest.createRemotingServer();
@@ -156,6 +160,26 @@ public void basicClientServerIntegrationTest() throws Exception {
156160
requestThenAssertResponse();
157161
}
158162

163+
@Test
164+
public void reloadSslContextForServer() throws Exception {
165+
requestThenAssertResponse();
166+
167+
//Use new cert and private key
168+
tlsClientKeyPath = getCertsPath("badClient.key");
169+
tlsClientCertPath = getCertsPath("badClient.pem");
170+
171+
((NettyRemotingServer) remotingServer).loadSslContext();
172+
173+
//Request Again
174+
requestThenAssertResponse();
175+
176+
//Start another client
177+
NettyClientConfig clientConfig = new NettyClientConfig();
178+
clientConfig.setUseTLS(true);
179+
RemotingClient remotingClient = RemotingServerTest.createRemotingClient(clientConfig);
180+
requestThenAssertResponse(remotingClient);
181+
}
182+
159183
@Test
160184
public void serverNotNeedClientAuth() throws Exception {
161185
requestThenAssertResponse();
@@ -289,6 +313,10 @@ private static RemotingCommand createRequest() {
289313
}
290314

291315
private void requestThenAssertResponse() throws Exception {
316+
requestThenAssertResponse(remotingClient);
317+
}
318+
319+
private void requestThenAssertResponse(RemotingClient remotingClient) throws Exception {
292320
RemotingCommand response = remotingClient.invokeSync("localhost:8888", createRequest(), 1000 * 3);
293321
assertTrue(response != null);
294322
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.srvutil;
19+
20+
import com.google.common.base.Strings;
21+
import java.io.File;
22+
import java.io.IOException;
23+
import java.nio.file.Files;
24+
import java.nio.file.Path;
25+
import java.nio.file.Paths;
26+
import java.security.MessageDigest;
27+
import java.security.NoSuchAlgorithmException;
28+
import java.util.ArrayList;
29+
import java.util.List;
30+
import org.apache.rocketmq.common.ServiceThread;
31+
import org.apache.rocketmq.common.UtilAll;
32+
import org.apache.rocketmq.common.constant.LoggerName;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
35+
36+
public class FileWatchService extends ServiceThread {
37+
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
38+
39+
private final List<String> watchFiles;
40+
private final List<String> fileCurrentHash;
41+
private final Listener listener;
42+
private static final int WATCH_INTERVAL = 500;
43+
private MessageDigest md = MessageDigest.getInstance("MD5");
44+
45+
public FileWatchService(final String[] watchFiles,
46+
final Listener listener) throws Exception {
47+
this.listener = listener;
48+
this.watchFiles = new ArrayList<>();
49+
this.fileCurrentHash = new ArrayList<>();
50+
51+
for (int i = 0; i < watchFiles.length; i++) {
52+
if (!Strings.isNullOrEmpty(watchFiles[i]) && new File(watchFiles[i]).exists()) {
53+
this.watchFiles.add(watchFiles[i]);
54+
this.fileCurrentHash.add(hash(watchFiles[i]));
55+
}
56+
}
57+
}
58+
59+
@Override
60+
public String getServiceName() {
61+
return "FileWatchService";
62+
}
63+
64+
@Override
65+
public void run() {
66+
log.info(this.getServiceName() + " service started");
67+
68+
while (!this.isStopped()) {
69+
try {
70+
this.waitForRunning(WATCH_INTERVAL);
71+
72+
for (int i = 0; i < watchFiles.size(); i++) {
73+
String newHash;
74+
try {
75+
newHash = hash(watchFiles.get(i));
76+
} catch (Exception ignored) {
77+
log.warn(this.getServiceName() + " service has exception when calculate the file hash. ", ignored);
78+
continue;
79+
}
80+
if (!newHash.equals(fileCurrentHash.get(i))) {
81+
fileCurrentHash.set(i, newHash);
82+
listener.onChanged(watchFiles.get(i));
83+
}
84+
}
85+
} catch (Exception e) {
86+
log.warn(this.getServiceName() + " service has exception. ", e);
87+
}
88+
}
89+
log.info(this.getServiceName() + " service end");
90+
}
91+
92+
private String hash(String filePath) throws IOException, NoSuchAlgorithmException {
93+
Path path = Paths.get(filePath);
94+
md.update(Files.readAllBytes(path));
95+
byte[] hash = md.digest();
96+
return UtilAll.bytes2string(hash);
97+
}
98+
99+
public interface Listener {
100+
/**
101+
* Will be called when the target files are changed
102+
* @param path the changed file path
103+
*/
104+
void onChanged(String path);
105+
}
106+
}

0 commit comments

Comments
 (0)