1
1
package org .skywalking .apm .collector .cluster .zookeeper ;
2
2
3
+ import java .util .HashSet ;
3
4
import java .util .Iterator ;
4
5
import java .util .LinkedHashMap ;
5
6
import java .util .List ;
6
7
import java .util .Map ;
7
8
import java .util .Set ;
8
- import java .util .HashSet ;
9
9
import org .apache .zookeeper .CreateMode ;
10
10
import org .apache .zookeeper .WatchedEvent ;
11
11
import org .apache .zookeeper .Watcher ;
@@ -42,14 +42,14 @@ public ClusterZKDataMonitor() {
42
42
registrations = new LinkedHashMap <>();
43
43
}
44
44
45
- @ Override public void process (WatchedEvent event ) {
45
+ @ Override public synchronized void process (WatchedEvent event ) {
46
46
logger .info ("changed path {}, event type: {}" , event .getPath (), event .getType ().name ());
47
47
if (listeners .containsKey (event .getPath ())) {
48
48
List <String > paths ;
49
49
try {
50
50
paths = client .getChildren (event .getPath (), true );
51
51
ClusterDataListener listener = listeners .get (event .getPath ());
52
- Set <String > remoteNodes = new HashSet <String >();
52
+ Set <String > remoteNodes = new HashSet <>();
53
53
Set <String > notifiedNodes = listener .getAddresses ();
54
54
if (CollectionUtils .isNotEmpty (paths )) {
55
55
for (String serverPath : paths ) {
@@ -65,9 +65,12 @@ public ClusterZKDataMonitor() {
65
65
}
66
66
}
67
67
}
68
- for (String address : notifiedNodes ) {
68
+
69
+ String [] notifiedNodeArray = notifiedNodes .toArray (new String [notifiedNodes .size ()]);
70
+ for (int i = notifiedNodeArray .length - 1 ; i >= 0 ; i --) {
71
+ String address = notifiedNodeArray [i ];
69
72
if (remoteNodes .isEmpty () || !remoteNodes .contains (address )) {
70
- logger .info ("path children has been changed , path and data: {}" , event .getPath () + "/" + address );
73
+ logger .info ("path children has been remove , path and data: {}" , event .getPath () + "/" + address );
71
74
listener .removeAddress (address );
72
75
listener .serverQuitNotify (address );
73
76
}
0 commit comments