Skip to content

Commit 14de7b5

Browse files
committed
- Changed TP.ProbeHandler --> DiagnosticsHandler.ProbeHandler
- LargeMergeTest: using only *one* DiagnosticsHandler now
1 parent 8d395b9 commit 14de7b5

File tree

7 files changed

+52
-25
lines changed

7 files changed

+52
-25
lines changed

src/org/jgroups/JChannel.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@
1010
import org.jgroups.logging.Log;
1111
import org.jgroups.logging.LogFactory;
1212
import org.jgroups.protocols.TP;
13-
import org.jgroups.stack.AddressGenerator;
14-
import org.jgroups.stack.Protocol;
15-
import org.jgroups.stack.ProtocolStack;
16-
import org.jgroups.stack.StateTransferInfo;
13+
import org.jgroups.stack.*;
1714
import org.jgroups.util.*;
1815
import org.jgroups.util.UUID;
1916
import org.w3c.dom.Element;
@@ -83,7 +80,7 @@ public class JChannel extends Channel {
8380

8481
protected long sent_msgs=0, received_msgs=0, sent_bytes=0, received_bytes=0;
8582

86-
private final TP.ProbeHandler probe_handler=new MyProbeHandler();
83+
private final DiagnosticsHandler.ProbeHandler probe_handler=new MyProbeHandler();
8784

8885

8986

@@ -1015,7 +1012,7 @@ private TimeScheduler getTimer() {
10151012

10161013
/* ------------------------------- End of Private Methods ---------------------------------- */
10171014

1018-
class MyProbeHandler implements TP.ProbeHandler {
1015+
class MyProbeHandler implements DiagnosticsHandler.ProbeHandler {
10191016

10201017
public Map<String, String> handleProbe(String... keys) {
10211018
Map<String, String> map=new HashMap<String, String>(2);

src/org/jgroups/blocks/RequestCorrelator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import org.jgroups.logging.Log;
77
import org.jgroups.logging.LogFactory;
88
import org.jgroups.protocols.TP;
9+
import org.jgroups.stack.DiagnosticsHandler;
910
import org.jgroups.stack.Protocol;
1011
import org.jgroups.util.Buffer;
1112
import org.jgroups.util.Util;
@@ -628,10 +629,10 @@ public String toString() {
628629

629630

630631

631-
private static class MyProbeHandler implements TP.ProbeHandler {
632+
private static class MyProbeHandler implements DiagnosticsHandler.ProbeHandler {
632633
private final ConcurrentMap<Long,RspCollector> requests;
633634

634-
public MyProbeHandler(ConcurrentMap<Long, RspCollector> requests) {
635+
private MyProbeHandler(ConcurrentMap<Long,RspCollector> requests) {
635636
this.requests=requests;
636637
}
637638

src/org/jgroups/protocols/pbcast/GMS.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.jgroups.logging.Log;
88
import org.jgroups.protocols.TP;
99
import org.jgroups.protocols.pbcast.GmsImpl.Request;
10+
import org.jgroups.stack.DiagnosticsHandler;
1011
import org.jgroups.stack.Protocol;
1112
import org.jgroups.util.*;
1213
import org.jgroups.util.Queue;
@@ -27,7 +28,7 @@
2728
* @author Bela Ban
2829
*/
2930
@MBean(description="Group membership protocol")
30-
public class GMS extends Protocol implements TP.ProbeHandler {
31+
public class GMS extends Protocol implements DiagnosticsHandler.ProbeHandler {
3132
private static final String CLIENT="Client";
3233
private static final String COORD="Coordinator";
3334
private static final String PART="Participant";
@@ -179,7 +180,7 @@ public boolean getMergeKillerRunning() {
179180
}
180181

181182
@ManagedAttribute(description="Stringified version of merge_id")
182-
public String getMergeIdAsString() {return merger.getMergeIdAsString();}
183+
public String getMergeId() {return merger.getMergeIdAsString();}
183184

184185
@ManagedOperation
185186
public String printPreviousMembers() {
@@ -275,7 +276,7 @@ public boolean isCoordinator() {
275276
return coord != null && local_addr != null && local_addr.equals(coord);
276277
}
277278

278-
public MergeId getMergeId() {
279+
public MergeId _getMergeId() {
279280
return impl instanceof CoordGmsImpl? ((CoordGmsImpl)impl).getMergeId() : null;
280281
}
281282

src/org/jgroups/protocols/pbcast/NAKACK.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* @author Bela Ban
3434
*/
3535
@MBean(description="Reliable transmission multipoint FIFO protocol")
36-
public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, TP.ProbeHandler {
36+
public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand, DiagnosticsHandler.ProbeHandler {
3737
private static final int NUM_REBROADCAST_MSGS=3;
3838

3939
/* ----------------------------------------------------- Properties --------------------- ------------------------------------ */
@@ -187,11 +187,6 @@ public class NAKACK extends Protocol implements Retransmitter.RetransmitCommand,
187187
protected final BoundedList<String> digest_history=new BoundedList<String>(10);
188188

189189

190-
191-
public NAKACK() {
192-
}
193-
194-
195190
public long getXmitRequestsReceived() {return xmit_reqs_received.get();}
196191
public long getXmitRequestsSent() {return xmit_reqs_sent.get();}
197192
public long getXmitResponsesReceived() {return xmit_rsps_received.get();}

src/org/jgroups/stack/ProtocolStack.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public class ProtocolStack extends Protocol {
4646
private volatile boolean stopped=true;
4747

4848

49-
private final TP.ProbeHandler props_handler=new TP.ProbeHandler() {
49+
private final DiagnosticsHandler.ProbeHandler props_handler=new DiagnosticsHandler.ProbeHandler() {
5050

5151
public Map<String, String> handleProbe(String... keys) {
5252
for(String key: keys) {

tests/junit/org/jgroups/protocols/GMS_MergeTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ static void _testMergeRequestTimeout(String props, String cluster_name) throws E
100100
merge_request.putHeader(GMS_ID, hdr);
101101
GMS gms=(GMS)c1.getProtocolStack().findProtocol(GMS.class);
102102
gms.setMergeTimeout(2000);
103-
MergeId merge_id=gms.getMergeId();
103+
MergeId merge_id=gms._getMergeId();
104104
assert merge_id == null;
105105
System.out.println("starting merge");
106106
gms.up(new Event(Event.MSG, merge_request));
@@ -109,13 +109,13 @@ static void _testMergeRequestTimeout(String props, String cluster_name) throws E
109109
System.out.println("sleeping for " + timeout + " ms, then fetching merge_id: should be null (cancelled by the MergeCanceller)");
110110
long target_time=System.currentTimeMillis() + timeout;
111111
while(System.currentTimeMillis() < target_time) {
112-
merge_id=gms.getMergeId();
112+
merge_id=gms._getMergeId();
113113
if(merge_id == null)
114114
break;
115115
Util.sleep(500);
116116
}
117117

118-
merge_id=gms.getMergeId();
118+
merge_id=gms._getMergeId();
119119
System.out.println("merge_id = " + merge_id);
120120
assert merge_id == null : "MergeCanceller didn't kick in";
121121
}

tests/junit/org/jgroups/tests/LargeMergeTest.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,22 @@
22

33
import org.jgroups.Global;
44
import org.jgroups.JChannel;
5+
import org.jgroups.logging.Log;
6+
import org.jgroups.logging.LogFactory;
57
import org.jgroups.protocols.*;
68
import org.jgroups.protocols.pbcast.GMS;
79
import org.jgroups.protocols.pbcast.NAKACK;
810
import org.jgroups.protocols.pbcast.STABLE;
11+
import org.jgroups.stack.DiagnosticsHandler;
912
import org.jgroups.stack.ProtocolStack;
1013
import org.jgroups.util.*;
14+
import org.jgroups.util.ThreadFactory;
1115
import org.testng.annotations.AfterMethod;
1216
import org.testng.annotations.BeforeMethod;
1317
import org.testng.annotations.Test;
1418

19+
import java.io.IOException;
20+
import java.net.InetAddress;
1521
import java.util.HashMap;
1622
import java.util.Map;
1723
import java.util.concurrent.*;
@@ -23,14 +29,21 @@
2329
*/
2430
@Test(groups=Global.FUNCTIONAL,sequential=true)
2531
public class LargeMergeTest {
26-
static final int NUM=50; // number of members
32+
static final int NUM=80; // number of members
2733

2834
protected final JChannel[] channels=new JChannel[NUM];
2935

36+
protected MyDiagnosticsHandler handler;
37+
38+
3039

3140
@BeforeMethod
3241
void setUp() throws Exception {
33-
42+
handler=new MyDiagnosticsHandler(InetAddress.getByName("224.0.75.75"), 7500,
43+
LogFactory.getLog("DiagnosticsHandler"),
44+
new DefaultSocketFactory(),
45+
new DefaultThreadFactory(Util.getGlobalThreadGroup(), "", false));
46+
3447
ThreadGroup test_group=new ThreadGroup("LargeMergeTest");
3548
TimeScheduler timer=new TimeScheduler2(new DefaultThreadFactory(test_group, "Timer", true, true),
3649
5,10,
@@ -44,14 +57,14 @@ void setUp() throws Exception {
4457

4558

4659

60+
4761
System.out.print("Connecting channels: ");
4862
for(int i=0; i < NUM; i++) {
4963
SHARED_LOOPBACK shared_loopback=(SHARED_LOOPBACK)new SHARED_LOOPBACK().setValue("enable_bundling", false);
50-
// shared_loopback.setValue("enable_diagnostics",false);
5164
shared_loopback.setTimer(timer);
5265
shared_loopback.setOOBThreadPool(oob_thread_pool);
5366
shared_loopback.setDefaultThreadPool(thread_pool);
54-
67+
shared_loopback.setDiagnosticsHandler(handler);
5568

5669
channels[i]=Util.createChannel(shared_loopback,
5770
new DISCARD().setValue("discard_all",true),
@@ -85,11 +98,13 @@ void tearDown() throws Exception {
8598
stack.stopStack(cluster_name);
8699
stack.destroy();
87100
}
101+
handler.destroy();
88102
}
89103

90104

91105

92106
public void testClusterFormationAfterMerge() {
107+
// Util.keyPress("<enter>");
93108
System.out.println("\nEnabling message traffic between members to start the merge");
94109
for(JChannel ch: channels) {
95110
Discovery ping=(Discovery)ch.getProtocolStack().findProtocol(PING.class);
@@ -142,4 +157,22 @@ public void testClusterFormationAfterMerge() {
142157
}
143158

144159

160+
161+
protected class MyDiagnosticsHandler extends DiagnosticsHandler {
162+
163+
protected MyDiagnosticsHandler(InetAddress diagnostics_addr, int diagnostics_port, Log log, SocketFactory socket_factory, ThreadFactory thread_factory) {
164+
super(diagnostics_addr,diagnostics_port,log,socket_factory,thread_factory);
165+
}
166+
167+
public void start() throws IOException {
168+
super.start();
169+
}
170+
171+
public void stop() {
172+
}
173+
174+
public void destroy() {
175+
super.stop();
176+
}
177+
}
145178
}

0 commit comments

Comments
 (0)