Skip to content

Commit e8f0edb

Browse files
author
Vadym Kochetov
committed
ESDK-3918 Altered examples to be consistent.
1 parent ffda51c commit e8f0edb

File tree

8 files changed

+4
-379
lines changed

8 files changed

+4
-379
lines changed

Java/Eta/Applications/PerfTools/src/main/java/com/thomsonreuters/upa/perftools/common/ChannelHandler.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import java.util.concurrent.locks.Lock;
1212
import java.util.concurrent.locks.ReentrantLock;
1313

14-
import com.thomsonreuters.upa.shared.network.ChannelHelper;
1514
import com.thomsonreuters.upa.transport.Channel;
1615
import com.thomsonreuters.upa.transport.ChannelState;
1716
import com.thomsonreuters.upa.transport.Error;
@@ -397,8 +396,6 @@ private void initializeChannel(ClientChannelInfo clientChannelInfo, Error error)
397396
{
398397
case TransportReturnCodes.CHAN_INIT_IN_PROGRESS:
399398
if (_inProgInfo.flags() == InProgFlags.SCKT_CHNL_CHANGE) {
400-
clientChannelInfo.socketFdValue =
401-
ChannelHelper.defineFdValueOfSelectableChannel(channel.selectableChannel());
402399
requestFlush(clientChannelInfo);
403400
}
404401
break;
@@ -534,7 +531,6 @@ private void processActiveChannel(ClientChannelInfo clientChannelInfo)
534531
_initializingChannelList.remove(clientChannelInfo);
535532
clientChannelInfo.parentQueue = _activeChannelList;
536533
clientChannelInfo.parentQueue.add(clientChannelInfo);
537-
clientChannelInfo.socketFdValue = ChannelHelper.defineFdValueOfSelectableChannel(clientChannelInfo.channel.selectableChannel());
538534
}
539535

540536
/**

Java/Eta/Applications/PerfTools/src/main/java/com/thomsonreuters/upa/perftools/common/ClientChannelInfo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,4 @@ public class ClientChannelInfo
2222
public long nextSendPingTime; // Time before which a ping should be sent for this channel.
2323
public Queue<ClientChannelInfo> parentQueue; // Reference back to the list this channel is an element of.
2424
public ReactorChannel reactorChannel; // Use the VA Reactor instead of the UPA Channel for sending and receiving
25-
public int socketFdValue; // Defined Socket FD Value.
2625
}

Java/Eta/Applications/PerfTools/src/main/java/com/thomsonreuters/upa/perftools/common/LoginHandler.java

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
import com.thomsonreuters.upa.transport.TransportBuffer;
99
import com.thomsonreuters.upa.valueadd.domainrep.rdm.login.*;
1010

11-
import java.util.Objects;
12-
13-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
14-
1511
/**
1612
* This is the Login handler for the UPA Consumer and NIProvider application. It
1713
* provides methods for encoding and sending of login request, as well as
@@ -25,21 +21,18 @@ public class LoginHandler
2521

2622
public static final int MAX_MSG_SIZE = 1024;
2723
public static int TRANSPORT_BUFFER_SIZE_REQUEST = MAX_MSG_SIZE;
28-
public static int TRANSPORT_BUFFER_SIZE_RTT = MAX_MSG_SIZE;
2924
public static int TRANSPORT_BUFFER_SIZE_CLOSE = MAX_MSG_SIZE;
3025

3126
private ConsumerLoginState loginState = ConsumerLoginState.PENDING_LOGIN;
3227

3328
// For requests
3429
private String _userName;
3530
private String _applicationName;
36-
private boolean enableRtt;
3731

3832
private int _role = Login.RoleTypes.CONS;
3933

4034
private LoginRequest _loginRequest = (LoginRequest)LoginMsgFactory.createMsg();
4135
private LoginClose _loginClose = (LoginClose)LoginMsgFactory.createMsg();
42-
private LoginRTT loginRTT = (LoginRTT) LoginMsgFactory.createMsg();
4336

4437
private LoginRefresh _loginRefresh = (LoginRefresh)LoginMsgFactory.createMsg();
4538
private LoginStatus _loginStatus = (LoginStatus)LoginMsgFactory.createMsg();
@@ -54,7 +47,6 @@ public LoginHandler()
5447
_loginRequest.rdmMsgType(LoginMsgType.REQUEST);
5548
_loginStatus.rdmMsgType(LoginMsgType.STATUS);
5649
_loginRefresh.rdmMsgType(LoginMsgType.REFRESH);
57-
loginRTT.rdmMsgType(LoginMsgType.RTT);
5850
}
5951

6052
/**
@@ -108,14 +100,6 @@ public void role(int role)
108100
_role = role;
109101
}
110102

111-
public void enableRtt(boolean enableRtt) {
112-
this.enableRtt = enableRtt;
113-
}
114-
115-
public boolean enableRtt() {
116-
return this.enableRtt;
117-
}
118-
119103
/**
120104
* Sends a login request to a channel. This consists of getting a message
121105
* buffer, setting the login request information, encoding the login
@@ -150,10 +134,6 @@ public TransportBuffer getRequestMsg(Channel channel, Error error, EncodeIterato
150134
_loginRequest.attrib().applicationName().data(_applicationName);
151135
}
152136

153-
if (enableRtt) {
154-
_loginRequest.attrib().applyHasSupportRoundTripLatencyMonitoring();
155-
}
156-
157137
_loginRequest.applyHasRole();
158138
_loginRequest.role(_role);
159139

@@ -211,32 +191,6 @@ public TransportBuffer getCloseMsg(Channel channel, Error error, EncodeIterator
211191
return msgBuf;
212192
}
213193

214-
/**
215-
* Get transport buffer for earlier prepared RTT message (obtained from Provider)
216-
* @param channel - channel instance
217-
* @param error - error buffer for showing warnings and errors to the end user.
218-
* @param encodeIterator - instance for encoding login RTT.
219-
* @return
220-
*/
221-
public TransportBuffer getRttMsg(Channel channel, Error error, EncodeIterator encodeIterator) {
222-
final TransportBuffer transportBuffer = channel.getBuffer(TRANSPORT_BUFFER_SIZE_RTT, false, error);
223-
224-
if (Objects.isNull(transportBuffer)) {
225-
return null;
226-
}
227-
228-
encodeIterator.clear();
229-
encodeIterator.setBufferAndRWFVersion(transportBuffer, channel.majorVersion(), channel.minorVersion());
230-
231-
int ret = loginRTT.encode(encodeIterator);
232-
if (ret != CodecReturnCodes.SUCCESS) {
233-
error.text("Encoding of login RTT failed: <" + CodecReturnCodes.toString(ret) + ">");
234-
return null;
235-
}
236-
237-
return transportBuffer;
238-
}
239-
240194
/**
241195
* Processes login response. This consists of looking at the msg class and
242196
* decoding message into corresponding RDM login message. For every
@@ -261,8 +215,6 @@ public int processResponse(Msg msg, DecodeIterator dIter, Error error)
261215
case MsgClasses.UPDATE:
262216
System.out.println("Received Login Update");
263217
return CodecReturnCodes.SUCCESS;
264-
case MsgClasses.GENERIC:
265-
return handleLoginRtt(msg, dIter, error);
266218
case MsgClasses.CLOSE:
267219
System.out.println("Received Login Close");
268220
loginState = ConsumerLoginState.CLOSED;
@@ -344,29 +296,4 @@ else if (state.streamState() == StreamStates.CLOSED)
344296
}
345297
return CodecReturnCodes.SUCCESS;
346298
}
347-
348-
private int handleLoginRtt(Msg msg, DecodeIterator dIter, Error error) {
349-
if (enableRtt) {
350-
loginRTT.clear();
351-
int ret = loginRTT.decode(dIter, msg);
352-
if (ret != CodecReturnCodes.SUCCESS) {
353-
error.text("Decoding of login RTT failed: <" + CodecReturnCodes.toString(ret) + ">");
354-
return ret;
355-
}
356-
357-
}
358-
return CodecReturnCodes.SUCCESS;
359-
}
360-
361-
public void logRttResponse(int socketId) {
362-
System.out.printf("\nReceived login RTT message from Provider %d.\n", socketId);
363-
System.out.printf("\tTicks: %du\n", NANOSECONDS.toMicros(loginRTT.ticks()));
364-
if (loginRTT.checkHasRTLatency()) {
365-
System.out.printf("\tLast Latency: %du\n", NANOSECONDS.toMicros(loginRTT.rtLatency()));
366-
}
367-
if (loginRTT.checkHasTCPRetrans()) {
368-
System.out.printf("\tProvider side TCP Retransmissions: %du\n", loginRTT.tcpRetrans());
369-
}
370-
System.out.println();
371-
}
372299
}

Java/Eta/Applications/PerfTools/src/main/java/com/thomsonreuters/upa/perftools/common/ProviderPerfConfig.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ public class ProviderPerfConfig
5353
private static boolean _directWrite; // direct write enabled
5454
private static boolean _useReactor; // Use the VA Reactor instead of the UPA Channel for sending and receiving.
5555

56-
private static boolean enableRtt; // enable RTT feature
57-
5856
static
5957
{
6058
CommandLine.programName("upajProvPerf");
@@ -89,7 +87,6 @@ public class ProviderPerfConfig
8987
CommandLine.addOption("runTime", 360, "Runtime of the application, in seconds");
9088
CommandLine.addOption("threads", 1, "Number of provider threads to create");
9189
CommandLine.addOption("reactor", false, "Use the VA Reactor instead of the UPA Channel for sending and receiving");
92-
CommandLine.addOption("rtt", false, "application (provider) supports calculation of Round Trip Latency");
9390
}
9491

9592
private ProviderPerfConfig()
@@ -135,7 +132,6 @@ public static void init(String[] args)
135132
String latencyGenMsgRate = CommandLine.value("genericMsgLatencyRate");
136133
_directWrite = CommandLine.booleanValue("directWrite");
137134
_useReactor = CommandLine.booleanValue("reactor");
138-
enableRtt = CommandLine.booleanValue("rtt");
139135
try
140136
{
141137
_runTime = CommandLine.intValue("runTime");
@@ -283,8 +279,7 @@ private static void createConfigString(BindOptions bindOptions)
283279
" Service Name: " + _serviceName + "\n" +
284280
" Service ID: " + _serviceId + "\n" +
285281
" Open Limit: " + _openLimit + "\n" +
286-
" Use Reactor: " + (_useReactor ? "Yes" : "No") + "\n" +
287-
" RTT enabled: " + (enableRtt ? "Yes" : "No") + "\n";
282+
" Use Reactor: " + (_useReactor ? "Yes" : "No") + "\n";
288283
}
289284

290285
/**
@@ -874,12 +869,4 @@ public static boolean useReactor()
874869
{
875870
return _useReactor;
876871
}
877-
878-
/**
879-
* Flag which determines that application (provider) supports calculation of Round Trip Latency (or not).
880-
* @return true when provider has support of RTT feature. False when this feature is disabled.
881-
*/
882-
public static boolean enableRtt() {
883-
return enableRtt;
884-
}
885872
}

Java/Eta/Applications/PerfTools/src/main/java/com/thomsonreuters/upa/perftools/upajconsperf/ConsPerfConfig.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ public class ConsPerfConfig
5959
private boolean _useWatchlist; /* Use the VA Reactor watchlist instead of the UPA Channel for sending and receiving. */
6060
private boolean _busyRead; /* If set, the application will continually read rather than using notification. */
6161

62-
private boolean enableRtt;
63-
6462
{
6563
CommandLine.programName("upajConsPerf");
6664
CommandLine.addOption("steadyStateTime", 300, "Time consumer will run the steady-state portion of the test. Also used as a timeout during the startup-state portion");
@@ -97,7 +95,6 @@ public class ConsPerfConfig
9795
CommandLine.addOption("reactor", false, "Use the VA Reactor instead of the UPA Channel for sending and receiving");
9896
CommandLine.addOption("watchlist", false, "Use the VA Reactor watchlist instead of the UPA Channel for sending and receiving");
9997
CommandLine.addOption("busyRead", false, "If set, the application will continually read rather than using notification.");
100-
CommandLine.addOption("rtt", false, "Enables rtt support by a consumer. If provider make distribution of RTT messages, consumer will return back them. In another case, consumer will ignore them.");
10198
}
10299

103100
/**
@@ -140,7 +137,6 @@ public void init (String[] args, int maxThreads)
140137
_useReactor = CommandLine.booleanValue("reactor");
141138
_useWatchlist = CommandLine.booleanValue("watchlist");
142139
_busyRead = CommandLine.booleanValue("busyRead");
143-
enableRtt = CommandLine.booleanValue("rtt");
144140
try
145141
{
146142
_steadyStateTime = CommandLine.intValue("steadyStateTime");
@@ -333,8 +329,7 @@ else if (_useReactor)
333329
" Tick Rate: " + _ticksPerSec + "\n" +
334330
" Prime JVM: " + (_primeJVM ? "Yes" : "No") + "\n" +
335331
" Reactor/Watchlist Usage: " + reactorWatchlistUsageString + "\n" +
336-
" Busy Read: " + (_busyRead ? "Yes" : "No") + "\n" +
337-
" Enable RTT: " + (enableRtt ? "Yes" : "No") + "\n";
332+
" Busy Read: " + (_busyRead ? "Yes" : "No") + "\n";
338333
}
339334

340335
/* APPLICATION configuration */
@@ -720,14 +715,4 @@ public boolean busyRead()
720715
{
721716
return _busyRead;
722717
}
723-
724-
/**
725-
* Flag which determines support of RTT feature by a consumer.
726-
*
727-
* @return Returns true when consumer will return back RTT messages from provider.
728-
* Return false when consumer will ignore them.
729-
*/
730-
public boolean enableRtt() {
731-
return enableRtt;
732-
}
733718
}

Java/Eta/Applications/PerfTools/src/main/java/com/thomsonreuters/upa/perftools/upajconsperf/ConsumerThread.java

Lines changed: 2 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import com.thomsonreuters.upa.rdm.DomainTypes;
5252
import com.thomsonreuters.upa.rdm.InstrumentNameTypes;
5353
import com.thomsonreuters.upa.rdm.Login;
54-
import com.thomsonreuters.upa.shared.network.ChannelHelper;
5554
import com.thomsonreuters.upa.transport.Channel;
5655
import com.thomsonreuters.upa.transport.ChannelInfo;
5756
import com.thomsonreuters.upa.transport.ConnectOptions;
@@ -77,7 +76,6 @@
7776
import com.thomsonreuters.upa.valueadd.domainrep.rdm.directory.DirectoryUpdate;
7877
import com.thomsonreuters.upa.valueadd.domainrep.rdm.directory.Service;
7978
import com.thomsonreuters.upa.valueadd.domainrep.rdm.login.LoginMsg;
80-
import com.thomsonreuters.upa.valueadd.domainrep.rdm.login.LoginRTT;
8179
import com.thomsonreuters.upa.valueadd.domainrep.rdm.login.LoginRefresh;
8280
import com.thomsonreuters.upa.valueadd.reactor.ConsumerCallback;
8381
import com.thomsonreuters.upa.valueadd.reactor.ConsumerRole;
@@ -101,8 +99,6 @@
10199
import com.thomsonreuters.upa.valueadd.reactor.ReactorReturnCodes;
102100
import com.thomsonreuters.upa.valueadd.reactor.ReactorSubmitOptions;
103101

104-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
105-
106102
/** Provides the logic that consumer connections use in upajConsPerf for
107103
* connecting to a provider, requesting items, and processing the received
108104
* refreshes and updates.
@@ -114,8 +110,6 @@ public class ConsumerThread implements Runnable, ResponseCallback, ConsumerCallb
114110
private static final int LATENCY_RANDOM_ARRAY_SET_COUNT = 20;
115111

116112
public static final int MAX_MSG_SIZE = 1024;
117-
public static int TRANSPORT_BUFFER_SIZE_REQUEST = MAX_MSG_SIZE;
118-
public static int TRANSPORT_BUFFER_SIZE_CLOSE = MAX_MSG_SIZE;
119113

120114
private ConsumerThreadInfo _consThreadInfo; /* thread information */
121115
private ConsPerfConfig _consPerfConfig; /* configuration information */
@@ -184,8 +178,6 @@ public class ConsumerThread implements Runnable, ResponseCallback, ConsumerCallb
184178
private Buffer _fieldDictionaryName = CodecFactory.createBuffer(); // Use the VA Reactor instead of the UPA Channel for sending and receiving
185179
private Buffer _enumTypeDictionaryName = CodecFactory.createBuffer(); // Use the VA Reactor instead of the UPA Channel for sending and receiving
186180

187-
private Map<Channel, Integer> fdSocketValueMap = new HashMap<>();
188-
189181
{
190182
_eIter = CodecFactory.createEncodeIterator();
191183
_dIter = CodecFactory.createDecodeIterator();
@@ -431,9 +423,8 @@ private void connect()
431423
System.exit(-1);
432424
}
433425
}
426+
434427
if (handshake == TransportReturnCodes.SUCCESS) {
435-
final int fdSocketValue = ChannelHelper.defineFdValueOfSelectableChannel(_channel.selectableChannel());
436-
fdSocketValueMap.put(_channel, fdSocketValue);
437428
break;
438429
}
439430

@@ -511,9 +502,6 @@ private void connect()
511502
{
512503
_role.rdmLoginRequest().userName().data(_consPerfConfig.username());
513504
}
514-
if (_consPerfConfig.enableRtt()) {
515-
_role.rdmLoginRequest().attrib().applyHasSupportRoundTripLatencyMonitoring();
516-
}
517505
_role.initDefaultRDMDirectoryRequest();
518506
// enable watchlist if configured
519507
if (_consPerfConfig.useWatchlist())
@@ -570,8 +558,7 @@ public void run()
570558
_loginHandler.applicationName("upajConsPerf");
571559
_loginHandler.userName(_consPerfConfig.username());
572560
_loginHandler.role(Login.RoleTypes.CONS);
573-
_loginHandler.enableRtt(_consPerfConfig.enableRtt());
574-
561+
575562
// Send login request message
576563
TransportBuffer msg = _loginHandler.getRequestMsg(_channel, _error, _eIter);
577564
if (msg != null)
@@ -997,15 +984,6 @@ private void processLoginResp()
997984
}
998985
}
999986

1000-
if (_consPerfConfig.enableRtt()
1001-
&& Objects.equals(MsgClasses.GENERIC, _responseMsg.msgClass())
1002-
&& Objects.equals(DataTypes.ELEMENT_LIST, _responseMsg.containerType())) {
1003-
final TransportBuffer loginRttMsgBuffer = _loginHandler.getRttMsg(_channel, _error, _eIter);
1004-
write(loginRttMsgBuffer);
1005-
_loginHandler.logRttResponse(fdSocketValueMap.get(_channel));
1006-
return;
1007-
}
1008-
1009987
//Handle login states
1010988
ConsumerLoginState loginState = _loginHandler.loginState();
1011989
if (loginState == ConsumerLoginState.OK_SOLICITED)
@@ -1911,10 +1889,6 @@ public int reactorChannelEventCallback(ReactorChannelEvent event)
19111889
return ReactorCallbackReturnCodes.FAILURE;
19121890
}
19131891

1914-
//define socket id
1915-
final int fdSocketValue = ChannelHelper.defineFdValueOfSelectableChannel(_reactorChannel.channel().selectableChannel());
1916-
fdSocketValueMap.put(_reactorChannel.channel(), fdSocketValue);
1917-
19181892
System.out.printf("Channel active. " + _reactorChannnelInfo.channelInfo().toString() + "\n");
19191893

19201894
break;
@@ -1942,10 +1916,6 @@ public int reactorChannelEventCallback(ReactorChannelEvent event)
19421916
System.out.println("selector register failed: " + e.getLocalizedMessage());
19431917
return ReactorCallbackReturnCodes.SUCCESS;
19441918
}
1945-
1946-
//define socket id
1947-
final int fdSocketValue = ChannelHelper.defineFdValueOfSelectableChannel(_reactorChannel.channel().selectableChannel());
1948-
fdSocketValueMap.put(_reactorChannel.channel(), fdSocketValue);
19491919
break;
19501920
}
19511921
case ReactorChannelEventTypes.CHANNEL_READY:
@@ -2088,19 +2058,6 @@ public int rdmLoginMsgCallback(RDMLoginMsgEvent event)
20882058
closeChannelAndShutDown("Provider for this connection does not support posting.");
20892059
}
20902060
break;
2091-
case RTT:
2092-
LoginRTT loginRTT = (LoginRTT) event.rdmLoginMsg();
2093-
System.out.printf("\nReceived login RTT message from Provider %d.\n",
2094-
fdSocketValueMap.get(event.reactorChannel().channel()));
2095-
System.out.printf("\tTicks: %du\n", NANOSECONDS.toMicros(loginRTT.ticks()));
2096-
if (loginRTT.checkHasRTLatency()) {
2097-
System.out.printf("\tLast Latency: %du\n", NANOSECONDS.toMicros(loginRTT.rtLatency()));
2098-
}
2099-
if (loginRTT.checkHasTCPRetrans()) {
2100-
System.out.printf("\tProvider side TCP Retransmissions: %du\n", loginRTT.tcpRetrans());
2101-
}
2102-
System.out.println("RTT Response sent to provider by reactor.\n");
2103-
break;
21042061
default:
21052062
break;
21062063
}

0 commit comments

Comments
 (0)