Skip to content

Commit 34d511f

Browse files
authored
Merge pull request apolloconfig#529 from nobodyiam/clean-release-message
Clean release message on demand and do async notification if there are too many clients to notify
2 parents cac111a + ad94cc7 commit 34d511f

File tree

5 files changed

+162
-1
lines changed

5 files changed

+162
-1
lines changed

apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/config/BizConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,11 @@ public TimeUnit releaseMessageCacheScanIntervalTimeUnit() {
105105
return TimeUnit.SECONDS;
106106
}
107107

108+
public int releaseMessageNotificationBatch() {
109+
return getIntProperty("apollo.release-message.notification.batch", 100);
110+
}
111+
112+
public int releaseMessageNotificationBatchIntervalInMilli() {
113+
return getIntProperty("apollo.release-message.notification.batch.interval", 100);
114+
}
108115
}

apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/message/DatabaseMessageSender.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,50 @@
11
package com.ctrip.framework.apollo.biz.message;
22

3+
import com.google.common.collect.Queues;
4+
35
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
46
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
7+
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
58
import com.ctrip.framework.apollo.tracer.Tracer;
69
import com.ctrip.framework.apollo.tracer.spi.Transaction;
710

811
import org.slf4j.Logger;
912
import org.slf4j.LoggerFactory;
1013
import org.springframework.beans.factory.annotation.Autowired;
1114
import org.springframework.stereotype.Component;
15+
import org.springframework.transaction.annotation.Transactional;
1216

17+
import java.util.List;
1318
import java.util.Objects;
19+
import java.util.concurrent.BlockingQueue;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import javax.annotation.PostConstruct;
1426

1527
/**
1628
* @author Jason Song(song_s@ctrip.com)
1729
*/
1830
@Component
1931
public class DatabaseMessageSender implements MessageSender {
2032
private static final Logger logger = LoggerFactory.getLogger(DatabaseMessageSender.class);
33+
private static final int CLEAN_QUEUE_MAX_SIZE = 100;
34+
private BlockingQueue<Long> toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
35+
private final ExecutorService cleanExecutorService;
36+
private final AtomicBoolean cleanStopped;
2137

2238
@Autowired
2339
private ReleaseMessageRepository releaseMessageRepository;
2440

41+
public DatabaseMessageSender() {
42+
cleanExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("DatabaseMessageSender", true));
43+
cleanStopped = new AtomicBoolean(false);
44+
}
45+
2546
@Override
47+
@Transactional
2648
public void sendMessage(String message, String channel) {
2749
logger.info("Sending message {} to channel {}", message, channel);
2850
if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
@@ -33,7 +55,8 @@ public void sendMessage(String message, String channel) {
3355
Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
3456
Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
3557
try {
36-
releaseMessageRepository.save(new ReleaseMessage(message));
58+
ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
59+
toClean.offer(newMessage.getId());
3760
transaction.setStatus(Transaction.SUCCESS);
3861
} catch (Throwable ex) {
3962
logger.error("Sending message to database failed", ex);
@@ -42,4 +65,45 @@ public void sendMessage(String message, String channel) {
4265
transaction.complete();
4366
}
4467
}
68+
69+
@PostConstruct
70+
private void initialize() {
71+
cleanExecutorService.submit(() -> {
72+
while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) {
73+
try {
74+
Long rm = toClean.poll(1, TimeUnit.SECONDS);
75+
if (rm != null) {
76+
cleanMessage(rm);
77+
} else {
78+
TimeUnit.SECONDS.sleep(5);
79+
}
80+
} catch (Throwable ex) {
81+
Tracer.logError(ex);
82+
}
83+
}
84+
});
85+
}
86+
87+
private void cleanMessage(Long id) {
88+
boolean hasMore = true;
89+
//double check in case the release message is rolled back
90+
ReleaseMessage releaseMessage = releaseMessageRepository.findOne(id);
91+
if (releaseMessage == null) {
92+
return;
93+
}
94+
while (hasMore && !Thread.currentThread().isInterrupted()) {
95+
List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(
96+
releaseMessage.getMessage(), releaseMessage.getId());
97+
98+
releaseMessageRepository.delete(messages);
99+
hasMore = messages.size() == 100;
100+
101+
messages.forEach(toRemove -> Tracer.logEvent(
102+
String.format("ReleaseMessage.Clean.%s", toRemove.getMessage()), String.valueOf(toRemove.getId())));
103+
}
104+
}
105+
106+
void stopClean() {
107+
cleanStopped.set(true);
108+
}
45109
}

apollo-biz/src/main/java/com/ctrip/framework/apollo/biz/repository/ReleaseMessageRepository.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
1919

2020
ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);
2121

22+
List<ReleaseMessage> findFirst100ByMessageAndIdLessThanOrderByIdAsc(String message, Long id);
23+
2224
@Query("select message, max(id) as id from ReleaseMessage where message in :messages group by message")
2325
List<Object[]> findLatestReleaseMessagesGroupByMessages(@Param("messages") Collection<String> messages);
2426
}

apollo-configservice/src/main/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.google.gson.Gson;
1313
import com.google.gson.reflect.TypeToken;
1414

15+
import com.ctrip.framework.apollo.biz.config.BizConfig;
1516
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
1617
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
1718
import com.ctrip.framework.apollo.biz.message.Topics;
@@ -22,6 +23,7 @@
2223
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
2324
import com.ctrip.framework.apollo.core.ConfigConsts;
2425
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
26+
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
2527
import com.ctrip.framework.apollo.tracer.Tracer;
2628

2729
import org.slf4j.Logger;
@@ -41,6 +43,9 @@
4143
import java.util.List;
4244
import java.util.Map;
4345
import java.util.Set;
46+
import java.util.concurrent.ExecutorService;
47+
import java.util.concurrent.Executors;
48+
import java.util.concurrent.TimeUnit;
4449

4550
/**
4651
* @author Jason Song(song_s@ctrip.com)
@@ -61,6 +66,8 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
6166
new TypeToken<List<ApolloConfigNotification>>() {
6267
}.getType();
6368

69+
private final ExecutorService largeNotificationBatchExecutorService;
70+
6471
@Autowired
6572
private WatchKeysUtil watchKeysUtil;
6673

@@ -76,6 +83,14 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
7683
@Autowired
7784
private Gson gson;
7885

86+
@Autowired
87+
private BizConfig bizConfig;
88+
89+
public NotificationControllerV2() {
90+
largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create
91+
("NotificationControllerV2", true));
92+
}
93+
7994
@RequestMapping(method = RequestMethod.GET)
8095
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
8196
@RequestParam(value = "appId") String appId,
@@ -220,6 +235,27 @@ public void handleMessage(ReleaseMessage message, String channel) {
220235
//create a new list to avoid ConcurrentModificationException
221236
List<DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>> results =
222237
Lists.newArrayList(deferredResults.get(content));
238+
239+
//do async notification if too many clients
240+
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
241+
largeNotificationBatchExecutorService.submit(() -> {
242+
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
243+
bizConfig.releaseMessageNotificationBatch());
244+
for (int i = 0; i < results.size(); i++) {
245+
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
246+
try {
247+
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
248+
} catch (InterruptedException e) {
249+
//ignore
250+
}
251+
}
252+
logger.debug("Async notify {}", results.get(i));
253+
results.get(i).setResult(notification);
254+
}
255+
});
256+
return;
257+
}
258+
223259
logger.debug("Notify {} clients for key {}", results.size(), content);
224260

225261
for (DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result : results) {

apollo-configservice/src/test/java/com/ctrip/framework/apollo/configservice/controller/NotificationControllerV2Test.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.google.common.collect.Sets;
88
import com.google.gson.Gson;
99

10+
import com.ctrip.framework.apollo.biz.config.BizConfig;
1011
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
1112
import com.ctrip.framework.apollo.biz.message.Topics;
1213
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
@@ -27,6 +28,7 @@
2728
import org.springframework.web.context.request.async.DeferredResult;
2829

2930
import java.util.List;
31+
import java.util.concurrent.TimeUnit;
3032

3133
import static org.junit.Assert.assertEquals;
3234
import static org.junit.Assert.assertTrue;
@@ -57,6 +59,9 @@ public class NotificationControllerV2Test {
5759
private NamespaceUtil namespaceUtil;
5860
@Mock
5961
private WatchKeysUtil watchKeysUtil;
62+
@Mock
63+
private BizConfig bizConfig;
64+
6065
private Gson gson;
6166

6267
private Multimap<String, DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>>
@@ -66,11 +71,16 @@ public class NotificationControllerV2Test {
6671
public void setUp() throws Exception {
6772
controller = new NotificationControllerV2();
6873
gson = new Gson();
74+
75+
when(bizConfig.releaseMessageNotificationBatch()).thenReturn(100);
76+
when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(5);
77+
6978
ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService);
7079
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
7180
ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil);
7281
ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil);
7382
ReflectionTestUtils.setField(controller, "gson", gson);
83+
ReflectionTestUtils.setField(controller, "bizConfig", bizConfig);
7484

7585
someAppId = "someAppId";
7686
someCluster = "someCluster";
@@ -283,6 +293,48 @@ public void testPollNotificationWithMultipleNamespacesAndHandleMessage() throws
283293
assertEquals(someId, notification.getNotificationId());
284294
}
285295

296+
@Test
297+
public void testPollNotificationWithHandleMessageInBatch() throws Exception {
298+
String someWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
299+
.join(someAppId, someCluster, defaultNamespace);
300+
int someBatch = 1;
301+
int someBatchInterval = 10;
302+
303+
Multimap<String, String> watchKeysMap =
304+
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey));
305+
306+
String notificationAsString =
307+
transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId);
308+
309+
when(watchKeysUtil
310+
.assembleAllWatchKeys(someAppId, someCluster, Sets.newHashSet(defaultNamespace),
311+
someDataCenter)).thenReturn(watchKeysMap);
312+
313+
when(bizConfig.releaseMessageNotificationBatch()).thenReturn(someBatch);
314+
when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(someBatchInterval);
315+
316+
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
317+
deferredResult = controller
318+
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
319+
someClientIp);
320+
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
321+
anotherDeferredResult = controller
322+
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
323+
someClientIp);
324+
325+
long someId = 1;
326+
ReleaseMessage someReleaseMessage = new ReleaseMessage(someWatchKey);
327+
someReleaseMessage.setId(someId);
328+
329+
controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);
330+
331+
assertTrue(!anotherDeferredResult.hasResult());
332+
333+
TimeUnit.MILLISECONDS.sleep(someBatchInterval * 3);
334+
335+
assertTrue(anotherDeferredResult.hasResult());
336+
}
337+
286338
private String transformApolloConfigNotificationsToString(
287339
String namespace, long notificationId) {
288340
List<ApolloConfigNotification> notifications =

0 commit comments

Comments
 (0)