Skip to content

Commit 995afb4

Browse files
committed
ESDK-1724
modify removeUserRequestFromClosedStream to handle pendingRequests properly
1 parent d6b41eb commit 995afb4

File tree

3 files changed

+158
-42
lines changed

3 files changed

+158
-42
lines changed

Java/Eta/ValueAdd/src/main/java/com/thomsonreuters/upa/valueadd/reactor/WlItemHandler.java

+35-42
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.nio.ByteBuffer;
44
import java.util.ArrayList;
55
import java.util.HashMap;
6+
import java.util.Iterator;
67
import java.util.LinkedList;
78
import java.util.Map;
89

@@ -564,7 +565,12 @@ else if (requestMsg.checkStreaming())
564565

565566
if (!requestMsg.checkPrivateStream() && _watchlist.loginHandler().supportSingleOpen())
566567
{
567-
addToPendingRequestTable(wlRequest, submitOptions);
568+
// wlRequest._requestMsg may not be set yet. However, if submitOptions.serviceName is not set,
569+
// addToPendingRequestTable uses the serviceId from wlRequst._requestMsg.msgKey so just
570+
// set it here
571+
if (submitOptions.serviceName() == null)
572+
wlRequest.requestMsg().msgKey().serviceId(requestMsg.msgKey().serviceId());
573+
addToPendingRequestTable(wlRequest, submitOptions);
568574
}
569575
}
570576
return ret;
@@ -1535,56 +1541,43 @@ private int removeUserRequestFromClosedStream(WlRequest wlRequest)
15351541
}
15361542
}
15371543

1538-
// remove from _pendingRequestByIdTable
1539-
for (int intKey : _pendingRequestByIdTable.keySet())
1544+
Iterator<Map.Entry<Integer, LinkedList<WlRequest>>> I = _pendingRequestByIdTable.entrySet().iterator();
1545+
while (I.hasNext())
15401546
{
1541-
LinkedList<WlRequest> wlRequestList = _pendingRequestByIdTable.remove(intKey);
1542-
1543-
for (int i = 0; i < wlRequestList.size(); i++)
1544-
{
1545-
WlRequest wlRequestInList = wlRequestList.get(i);
1546-
1547-
if (wlRequestInList.requestMsg().streamId() == wlRequest.requestMsg().streamId())
1548-
{
1549-
wlRequestList.remove(i);
1550-
_pendingRequestByIdTable.put(intKey, wlRequestList);
1551-
break;
1552-
}
1553-
else
1554-
{
1555-
_pendingRequestListPool.add(wlRequestList);
1556-
}
1557-
}
1547+
Map.Entry<Integer, LinkedList<WlRequest>> entry = I.next();
1548+
LinkedList<WlRequest> pendingRequests = entry.getValue();
1549+
Iterator<WlRequest> K = pendingRequests.iterator();
1550+
while (K.hasNext())
1551+
if (K.next().requestMsg().streamId() == wlRequest.requestMsg().streamId())
1552+
K.remove();
1553+
if (pendingRequests.isEmpty())
1554+
{
1555+
I.remove();
1556+
_pendingRequestListPool.add(pendingRequests);
1557+
}
15581558
}
15591559

15601560
// remove from _pendingRequestByNameTable
1561-
for (String stringKey : _pendingRequestByNameTable.keySet())
1562-
{
1563-
LinkedList<WlRequest> wlRequestList = _pendingRequestByNameTable.remove(stringKey);
1564-
1565-
for (int i = 0; i < wlRequestList.size(); i++)
1566-
{
1567-
WlRequest wlRequestInList = wlRequestList.get(i);
1568-
1569-
if (wlRequestInList.requestMsg().streamId() == wlRequest.requestMsg().streamId())
1570-
{
1571-
wlRequestList.remove(i);
1572-
if (wlRequestList.size() > 0)
1573-
{
1574-
_pendingRequestByNameTable.put(stringKey, wlRequestList);
1575-
}
1576-
break;
1577-
}
1578-
else
1579-
{
1580-
_pendingRequestListPool.add(wlRequestList);
1581-
}
1561+
Iterator<Map.Entry<String, LinkedList<WlRequest>>> J = _pendingRequestByNameTable.entrySet().iterator();
1562+
while (J.hasNext())
1563+
{
1564+
Map.Entry<String, LinkedList<WlRequest>> entry = J.next();
1565+
LinkedList<WlRequest> pendingRequests = entry.getValue();
1566+
Iterator<WlRequest> K = pendingRequests.iterator();
1567+
while(K.hasNext())
1568+
if (K.next().requestMsg().streamId() == wlRequest.requestMsg().streamId()) {
1569+
K.remove();
15821570
}
1571+
if (pendingRequests.isEmpty())
1572+
{
1573+
J.remove();
1574+
_pendingRequestListPool.add(pendingRequests);
1575+
}
15831576
}
15841577

15851578
closeWlRequest(wlRequest);
15861579
repoolWlRequest(wlRequest);
1587-
1580+
15881581
return ret;
15891582
}
15901583

Java/Eta/ValueAdd/src/test/java/com/thomsonreuters/upa/valueadd/reactor/ReactorWatchlistJUnitNew.java

+122
Original file line numberDiff line numberDiff line change
@@ -19533,4 +19533,126 @@ public void dualRequestWithDifferentServiceTest()
1953319533

1953419534
TestReactorComponent.closeSession(consumer, provider);
1953519535
}
19536+
19537+
@Test
19538+
public void closeWhileDisconnectedTest()
19539+
{
19540+
for (boolean useServiceNames : new boolean[] {true, false} )
19541+
{
19542+
ReactorSubmitOptions submitOptions = ReactorFactory.createReactorSubmitOptions();
19543+
Msg msg = CodecFactory.createMsg();
19544+
RequestMsg requestMsg = (RequestMsg)msg;
19545+
CloseMsg closeMsg = (CloseMsg)msg;
19546+
19547+
/* Create reactors. */
19548+
TestReactor consumerReactor = new TestReactor();
19549+
TestReactor providerReactor = new TestReactor();
19550+
19551+
/* Create consumer. */
19552+
Consumer consumer = new Consumer(consumerReactor);
19553+
ConsumerRole consumerRole = (ConsumerRole)consumer.reactorRole();
19554+
consumerRole.initDefaultRDMLoginRequest();
19555+
consumerRole.initDefaultRDMDirectoryRequest();
19556+
consumerRole.channelEventCallback(consumer);
19557+
consumerRole.loginMsgCallback(consumer);
19558+
consumerRole.directoryMsgCallback(consumer);
19559+
consumerRole.dictionaryMsgCallback(consumer);
19560+
consumerRole.defaultMsgCallback(consumer);
19561+
consumerRole.watchlistOptions().enableWatchlist(true);
19562+
consumerRole.watchlistOptions().channelOpenCallback(consumer);
19563+
consumerRole.watchlistOptions().requestTimeout(3000);
19564+
19565+
/* Create provider. */
19566+
Provider provider = new Provider(providerReactor);
19567+
ProviderRole providerRole = (ProviderRole)provider.reactorRole();
19568+
providerRole.channelEventCallback(provider);
19569+
providerRole.loginMsgCallback(provider);
19570+
providerRole.directoryMsgCallback(provider);
19571+
providerRole.dictionaryMsgCallback(provider);
19572+
providerRole.defaultMsgCallback(provider);
19573+
19574+
/* Connect the consumer and provider. Setup login & directory streams automatically. */
19575+
ConsumerProviderSessionOptions opts = new ConsumerProviderSessionOptions();
19576+
opts.setupDefaultLoginStream(true);
19577+
opts.setupDefaultDirectoryStream(true);
19578+
provider.bind(opts);
19579+
TestReactor.openSession(consumer, provider, opts);
19580+
19581+
final class TestData
19582+
{
19583+
private Integer streamId;
19584+
private Integer serviceId;
19585+
private String serviceName;
19586+
19587+
public TestData(Integer idStream, Integer idService, String nameService)
19588+
{
19589+
streamId = idStream;
19590+
serviceId = idService;
19591+
serviceName = nameService;
19592+
}
19593+
public Integer streamId() { return streamId; }
19594+
public Integer serviceId() { return serviceId; }
19595+
public String serviceName() { return serviceName; }
19596+
}
19597+
ArrayList<TestData> requestData = new ArrayList<TestData>();
19598+
requestData.add(new TestData(5, 1, Provider.defaultService().info().serviceName().toString()));
19599+
requestData.add(new TestData(6, 2, "DIRECT_FEED2"));
19600+
requestData.add(new TestData(7, 3, "DIRECT_FEED3"));
19601+
19602+
// send consumer requests
19603+
for (TestData td : requestData)
19604+
{
19605+
requestMsg.clear();
19606+
requestMsg.msgClass(MsgClasses.REQUEST);
19607+
requestMsg.streamId(td.streamId());
19608+
requestMsg.domainType(DomainTypes.MARKET_PRICE);
19609+
requestMsg.applyStreaming();
19610+
requestMsg.msgKey().applyHasName();
19611+
requestMsg.msgKey().name().data("TRI.N");
19612+
19613+
submitOptions.clear();
19614+
if (useServiceNames)
19615+
submitOptions.serviceName(td.serviceName());
19616+
else {
19617+
requestMsg.msgKey().applyHasServiceId();
19618+
requestMsg.msgKey().serviceId(td.serviceId());
19619+
}
19620+
assertTrue(consumer.submit(requestMsg, submitOptions) >= ReactorReturnCodes.SUCCESS);
19621+
}
19622+
19623+
/* Consumer receives status. */
19624+
consumerReactor.dispatch(2);
19625+
consumerReactor.pollEvent();
19626+
consumerReactor.pollEvent();
19627+
19628+
/* Provider receives the request for the known service. */
19629+
providerReactor.dispatch(1);
19630+
providerReactor.pollEvent();
19631+
19632+
/* Provider disconnects. */
19633+
provider.closeChannel();
19634+
19635+
// Consumer receives four events
19636+
consumerReactor.dispatch(4);
19637+
assertEquals(TestReactorEventTypes.CHANNEL_EVENT, consumerReactor.pollEvent().type());
19638+
assertEquals(TestReactorEventTypes.LOGIN_MSG, consumerReactor.pollEvent().type());
19639+
assertEquals(TestReactorEventTypes.DIRECTORY_MSG, consumerReactor.pollEvent().type());
19640+
assertEquals(TestReactorEventTypes.MSG, consumerReactor.pollEvent().type());
19641+
19642+
/* Consumer closes requests. */
19643+
for (TestData td : requestData )
19644+
{
19645+
closeMsg.clear();
19646+
closeMsg.msgClass(MsgClasses.CLOSE);
19647+
closeMsg.streamId(td.streamId());
19648+
closeMsg.domainType(DomainTypes.MARKET_PRICE);
19649+
closeMsg.containerType(DataTypes.NO_DATA);
19650+
submitOptions.clear();
19651+
submitOptions.serviceName(td.serviceName());
19652+
assertTrue(consumer.submitAndDispatch(closeMsg, submitOptions) >= ReactorReturnCodes.SUCCESS);
19653+
}
19654+
19655+
TestReactorComponent.closeSession(consumer, provider);
19656+
}
19657+
}
1953619658
}

Java/Eta/ValueAdd/src/test/java/com/thomsonreuters/upa/valueadd/reactor/TestReactorEventTypes.java

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public enum TestReactorEventTypes {
1313
LOGIN_MSG, /** RDMLoginMsgEvent */
1414
DIRECTORY_MSG, /** RDMDirectoryMsgEvent */
1515
DICTIONARY_MSG, /** RDMDictionaryMsgEvent */
16+
STATUS_MSG,
1617
MSG, /** ReactorMsgEvent */
1718
TUNNEL_STREAM_STATUS, /** TunnelStreamStatusEvent */
1819
TUNNEL_STREAM_MSG, /** TunnelStreamMsgEvent */

0 commit comments

Comments
 (0)