@@ -19533,4 +19533,126 @@ public void dualRequestWithDifferentServiceTest()
19533
19533
19534
19534
TestReactorComponent.closeSession(consumer, provider);
19535
19535
}
19536
+
19537
+ @Test
19538
+ public void closeWhileDisconnectedTest()
19539
+ {
19540
+ for (boolean useServiceNames : new boolean[] {true, false} )
19541
+ {
19542
+ ReactorSubmitOptions submitOptions = ReactorFactory.createReactorSubmitOptions();
19543
+ Msg msg = CodecFactory.createMsg();
19544
+ RequestMsg requestMsg = (RequestMsg)msg;
19545
+ CloseMsg closeMsg = (CloseMsg)msg;
19546
+
19547
+ /* Create reactors. */
19548
+ TestReactor consumerReactor = new TestReactor();
19549
+ TestReactor providerReactor = new TestReactor();
19550
+
19551
+ /* Create consumer. */
19552
+ Consumer consumer = new Consumer(consumerReactor);
19553
+ ConsumerRole consumerRole = (ConsumerRole)consumer.reactorRole();
19554
+ consumerRole.initDefaultRDMLoginRequest();
19555
+ consumerRole.initDefaultRDMDirectoryRequest();
19556
+ consumerRole.channelEventCallback(consumer);
19557
+ consumerRole.loginMsgCallback(consumer);
19558
+ consumerRole.directoryMsgCallback(consumer);
19559
+ consumerRole.dictionaryMsgCallback(consumer);
19560
+ consumerRole.defaultMsgCallback(consumer);
19561
+ consumerRole.watchlistOptions().enableWatchlist(true);
19562
+ consumerRole.watchlistOptions().channelOpenCallback(consumer);
19563
+ consumerRole.watchlistOptions().requestTimeout(3000);
19564
+
19565
+ /* Create provider. */
19566
+ Provider provider = new Provider(providerReactor);
19567
+ ProviderRole providerRole = (ProviderRole)provider.reactorRole();
19568
+ providerRole.channelEventCallback(provider);
19569
+ providerRole.loginMsgCallback(provider);
19570
+ providerRole.directoryMsgCallback(provider);
19571
+ providerRole.dictionaryMsgCallback(provider);
19572
+ providerRole.defaultMsgCallback(provider);
19573
+
19574
+ /* Connect the consumer and provider. Setup login & directory streams automatically. */
19575
+ ConsumerProviderSessionOptions opts = new ConsumerProviderSessionOptions();
19576
+ opts.setupDefaultLoginStream(true);
19577
+ opts.setupDefaultDirectoryStream(true);
19578
+ provider.bind(opts);
19579
+ TestReactor.openSession(consumer, provider, opts);
19580
+
19581
+ final class TestData
19582
+ {
19583
+ private Integer streamId;
19584
+ private Integer serviceId;
19585
+ private String serviceName;
19586
+
19587
+ public TestData(Integer idStream, Integer idService, String nameService)
19588
+ {
19589
+ streamId = idStream;
19590
+ serviceId = idService;
19591
+ serviceName = nameService;
19592
+ }
19593
+ public Integer streamId() { return streamId; }
19594
+ public Integer serviceId() { return serviceId; }
19595
+ public String serviceName() { return serviceName; }
19596
+ }
19597
+ ArrayList<TestData> requestData = new ArrayList<TestData>();
19598
+ requestData.add(new TestData(5, 1, Provider.defaultService().info().serviceName().toString()));
19599
+ requestData.add(new TestData(6, 2, "DIRECT_FEED2"));
19600
+ requestData.add(new TestData(7, 3, "DIRECT_FEED3"));
19601
+
19602
+ // send consumer requests
19603
+ for (TestData td : requestData)
19604
+ {
19605
+ requestMsg.clear();
19606
+ requestMsg.msgClass(MsgClasses.REQUEST);
19607
+ requestMsg.streamId(td.streamId());
19608
+ requestMsg.domainType(DomainTypes.MARKET_PRICE);
19609
+ requestMsg.applyStreaming();
19610
+ requestMsg.msgKey().applyHasName();
19611
+ requestMsg.msgKey().name().data("TRI.N");
19612
+
19613
+ submitOptions.clear();
19614
+ if (useServiceNames)
19615
+ submitOptions.serviceName(td.serviceName());
19616
+ else {
19617
+ requestMsg.msgKey().applyHasServiceId();
19618
+ requestMsg.msgKey().serviceId(td.serviceId());
19619
+ }
19620
+ assertTrue(consumer.submit(requestMsg, submitOptions) >= ReactorReturnCodes.SUCCESS);
19621
+ }
19622
+
19623
+ /* Consumer receives status. */
19624
+ consumerReactor.dispatch(2);
19625
+ consumerReactor.pollEvent();
19626
+ consumerReactor.pollEvent();
19627
+
19628
+ /* Provider receives the request for the known service. */
19629
+ providerReactor.dispatch(1);
19630
+ providerReactor.pollEvent();
19631
+
19632
+ /* Provider disconnects. */
19633
+ provider.closeChannel();
19634
+
19635
+ // Consumer receives four events
19636
+ consumerReactor.dispatch(4);
19637
+ assertEquals(TestReactorEventTypes.CHANNEL_EVENT, consumerReactor.pollEvent().type());
19638
+ assertEquals(TestReactorEventTypes.LOGIN_MSG, consumerReactor.pollEvent().type());
19639
+ assertEquals(TestReactorEventTypes.DIRECTORY_MSG, consumerReactor.pollEvent().type());
19640
+ assertEquals(TestReactorEventTypes.MSG, consumerReactor.pollEvent().type());
19641
+
19642
+ /* Consumer closes requests. */
19643
+ for (TestData td : requestData )
19644
+ {
19645
+ closeMsg.clear();
19646
+ closeMsg.msgClass(MsgClasses.CLOSE);
19647
+ closeMsg.streamId(td.streamId());
19648
+ closeMsg.domainType(DomainTypes.MARKET_PRICE);
19649
+ closeMsg.containerType(DataTypes.NO_DATA);
19650
+ submitOptions.clear();
19651
+ submitOptions.serviceName(td.serviceName());
19652
+ assertTrue(consumer.submitAndDispatch(closeMsg, submitOptions) >= ReactorReturnCodes.SUCCESS);
19653
+ }
19654
+
19655
+ TestReactorComponent.closeSession(consumer, provider);
19656
+ }
19657
+ }
19536
19658
}
0 commit comments