Skip to content

Commit 50b7dff

Browse files
committed
ESDK-3854: Implement posting for EMA
1 parent 1a4ba58 commit 50b7dff

File tree

18 files changed

+3130
-2624
lines changed

18 files changed

+3130
-2624
lines changed

Java/Ema/Core/src/main/java/com/thomsonreuters/ema/access/ActiveServerConfig.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ abstract class ActiveServerConfig extends BaseConfig
1919
final static boolean DEFAULT_ACCEPT_MSG_SAMEKEY_BUT_DIFF_STREAM = false;
2020
final static boolean DEFAULT_ACCEPT_MSG_THAT_CHANGES_SERVICE = false;
2121
final static boolean DEFAULT_ACCEPT_MSG_WITHOUT_QOS_IN_RANGE = false;
22+
final static boolean DEFAULT_ENFORCE_ACK_ID_VALIDATION = false;
2223
final static int DEFAULT_CONNECTION_PINGTIMEOUT = 60000;
2324
final static int DEFAULT_CONNECTION_MINPINGTIMEOUT = 20000;
2425
final static int DEFAULT_SERVER_SYS_SEND_BUFFER_SIZE = 65535;
@@ -33,7 +34,8 @@ abstract class ActiveServerConfig extends BaseConfig
3334
boolean acceptMessageSameKeyButDiffStream;
3435
boolean acceptMessageThatChangesService;
3536
boolean acceptMessageWithoutQosInRange;
36-
37+
boolean enforceAckIDValidation;
38+
3739
private LongObject serviceId = new LongObject();
3840
private HashMap<LongObject, ServiceDictionaryConfig> serviceDictionaryConfigMap;
3941

@@ -49,6 +51,7 @@ abstract class ActiveServerConfig extends BaseConfig
4951
acceptMessageSameKeyButDiffStream = DEFAULT_ACCEPT_MSG_SAMEKEY_BUT_DIFF_STREAM;
5052
acceptMessageThatChangesService = DEFAULT_ACCEPT_MSG_THAT_CHANGES_SERVICE;
5153
acceptMessageWithoutQosInRange = DEFAULT_ACCEPT_MSG_WITHOUT_QOS_IN_RANGE;
54+
enforceAckIDValidation = DEFAULT_ENFORCE_ACK_ID_VALIDATION;
5255
}
5356

5457
void clear()
@@ -60,20 +63,22 @@ void clear()
6063
acceptMessageSameKeyButDiffStream = DEFAULT_ACCEPT_MSG_SAMEKEY_BUT_DIFF_STREAM;
6164
acceptMessageThatChangesService = DEFAULT_ACCEPT_MSG_THAT_CHANGES_SERVICE;
6265
acceptMessageWithoutQosInRange = DEFAULT_ACCEPT_MSG_WITHOUT_QOS_IN_RANGE;
66+
enforceAckIDValidation = DEFAULT_ENFORCE_ACK_ID_VALIDATION;
6367
serviceDictionaryConfigMap.clear();
6468
}
6569

6670
StringBuilder configTrace()
6771
{
6872
super.configTrace();
69-
traceStr.append("\n\t defaultServiceName: ").append(defaultServiceName)
70-
.append("\n\t acceptMessageWithoutAcceptingRequests: ").append(acceptMessageWithoutAcceptingRequests)
71-
.append("\n\t acceptDirMessageWithoutMinFilters: ").append(acceptDirMessageWithoutMinFilters)
72-
.append("\n\t acceptMessageWithoutBeingLogin: ").append(acceptMessageWithoutBeingLogin)
73-
.append("\n\t acceptMessageSameKeyButDiffStream: ").append(acceptMessageSameKeyButDiffStream)
74-
.append("\n\t acceptMessageThatChangesService: ").append(acceptMessageThatChangesService)
75-
.append("\n\t acceptMessageWithoutQosInRange: ").append(acceptMessageWithoutQosInRange);
76-
73+
traceStr.append("\n\t defaultServiceName: ").append(defaultServiceName)
74+
.append("\n\t acceptMessageWithoutAcceptingRequests: ").append(acceptMessageWithoutAcceptingRequests)
75+
.append("\n\t acceptDirMessageWithoutMinFilters: ").append(acceptDirMessageWithoutMinFilters)
76+
.append("\n\t acceptMessageWithoutBeingLogin: ").append(acceptMessageWithoutBeingLogin)
77+
.append("\n\t acceptMessageSameKeyButDiffStream: ").append(acceptMessageSameKeyButDiffStream)
78+
.append("\n\t acceptMessageThatChangesService: ").append(acceptMessageThatChangesService)
79+
.append("\n\t acceptMessageWithoutQosInRange: ").append(acceptMessageWithoutQosInRange)
80+
.append("\n\t enforceAckIDValidation: ").append(enforceAckIDValidation);
81+
7782
return traceStr;
7883
}
7984

@@ -107,9 +112,9 @@ void setServiceDictionaryConfigCollection(Collection<ServiceDictionaryConfig> se
107112
while(iterator.hasNext())
108113
{
109114
ServiceDictionaryConfig serviceDictionaryConfig = iterator.next();
110-
115+
111116
if( ( serviceDictionaryConfig.dictionaryProvidedList.size() != 0 ) || ( serviceDictionaryConfig.dictionaryUsedList.size() != 0 ) )
112-
serviceDictionaryConfigMap.put( new LongObject().value(serviceDictionaryConfig.serviceId), serviceDictionaryConfig);
117+
serviceDictionaryConfigMap.put( new LongObject().value(serviceDictionaryConfig.serviceId), serviceDictionaryConfig);
113118
}
114119
}
115120
}

Java/Ema/Core/src/main/java/com/thomsonreuters/ema/access/ConfigManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ class ConfigManager
237237
public static final int IProviderAcceptMessageSameKeyButDiffStream = 811;
238238
public static final int IProviderAcceptMessageThatChangesService = 812;
239239
public static final int IProviderAcceptMessageWithoutQosInRange = 813;
240+
public static final int IProviderEnforceAckIDValidation = 814;
240241

241242
// Server: Global
242243
public static final int ServerGroup = 900;
@@ -477,6 +478,7 @@ class ConfigManager
477478
IProviderTagDict.add( "AcceptMessageSameKeyButDiffStream", IProviderAcceptMessageSameKeyButDiffStream);
478479
IProviderTagDict.add( "AcceptMessageThatChangesService", IProviderAcceptMessageThatChangesService);
479480
IProviderTagDict.add( "AcceptMessageWithoutQosInRange", IProviderAcceptMessageWithoutQosInRange);
481+
IProviderTagDict.add( "EnforceAckIDValidation", IProviderEnforceAckIDValidation );
480482
IProviderTagDict.add( "FieldDictionaryFragmentSize", DictionaryFieldDictFragmentSize);
481483
IProviderTagDict.add( "EnumTypeFragmentSize", DictionaryEnumTypeFragmentSize);
482484
IProviderTagDict.add( "XmlTraceFileName",XmlTraceFileName );
@@ -674,7 +676,8 @@ class ConfigManager
674676
"DictionaryRequestTimeOut",
675677
"DirectoryRequestTimeOut",
676678
"DisconnectOnGap",
677-
"EnableSessionManagement",
679+
"EnableSessionManagement",
680+
"EnforceAckIDValidation",
678681
"EnumTypeFragmentSize",
679682
"FieldDictionaryFragmentSize",
680683
"GuaranteedOutputBuffers",

Java/Ema/Core/src/main/java/com/thomsonreuters/ema/access/DirectoryServiceStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1893,7 +1893,7 @@ ServiceIdInteger createServiceIdInteger()
18931893

18941894
}
18951895

1896-
class ServiceIdInteger extends VaNode
1896+
static class ServiceIdInteger extends VaNode
18971897
{
18981898
int _value;
18991899

Java/Ema/Core/src/main/java/com/thomsonreuters/ema/access/ItemInfo.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.thomsonreuters.ema.access;
22

3+
import java.util.concurrent.ConcurrentHashMap;
4+
35
import com.thomsonreuters.upa.codec.Buffer;
46
import com.thomsonreuters.upa.codec.CodecFactory;
57
import com.thomsonreuters.upa.codec.MsgKey;
@@ -15,6 +17,7 @@ class ItemInfo extends VaNode
1517
private LongObject _streamId;
1618
private MsgKey _msgKey;
1719
private boolean _sentRefresh;
20+
private java.util.Map<Long, Integer> _postIdsCount;
1821

1922
class ItemInfoFlags
2023
{
@@ -165,6 +168,39 @@ void setSentRefresh()
165168
_sentRefresh = true;
166169
}
167170

171+
public void addPostId(long postId){
172+
if(_postIdsCount == null){
173+
_postIdsCount = new ConcurrentHashMap<>();
174+
}
175+
Integer oldCount = _postIdsCount.get(postId);
176+
if(oldCount == null){
177+
_postIdsCount.put(postId, 1);
178+
}
179+
else{
180+
_postIdsCount.put(postId, oldCount + 1);
181+
}
182+
183+
}
184+
185+
public boolean removePostId(long id){
186+
if(_postIdsCount == null){
187+
return false;
188+
}
189+
Integer count = _postIdsCount.get(id);
190+
if(count == null)
191+
{
192+
return false;
193+
}
194+
else if(count > 1){
195+
_postIdsCount.put(id, count-1);
196+
}
197+
else
198+
{
199+
_postIdsCount.remove(id);
200+
}
201+
return true;
202+
}
203+
168204
@Override
169205
public int hashCode()
170206
{
@@ -219,5 +255,17 @@ void clear()
219255
_streamId.clear();
220256
_domainType = 0;
221257
_msgKey.clear();
258+
if(_postIdsCount != null) {
259+
_postIdsCount.clear();
260+
}
261+
}
262+
263+
@Override
264+
public void returnToPool() {
265+
if(_postIdsCount != null) {
266+
_postIdsCount.clear();
267+
}
268+
269+
super.returnToPool();
222270
}
223271
}

Java/Ema/Core/src/main/java/com/thomsonreuters/ema/access/JUnitTestConnect.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public class JUnitTestConnect
174174
public static final int IProviderAcceptMessageSameKeyButDiffStream = ConfigManager.IProviderAcceptMessageSameKeyButDiffStream;
175175
public static final int IProviderAcceptMessageThatChangesService = ConfigManager.IProviderAcceptMessageThatChangesService;
176176
public static final int IProviderAcceptMessageWithoutQosInRange = ConfigManager.IProviderAcceptMessageWithoutQosInRange;
177+
public static final int IProviderEnforceAckIDValidation = ConfigManager.IProviderEnforceAckIDValidation;
177178

178179
// Server: Global
179180
public static final int Server = ConfigManager.Server;
@@ -1259,6 +1260,8 @@ else if (configParam == IProviderAcceptMessageWithoutBeingLogin)
12591260
return activeConfig.acceptMessageWithoutBeingLogin;
12601261
else if (configParam == IProviderAcceptMessageWithoutQosInRange)
12611262
return activeConfig.acceptMessageWithoutQosInRange;
1263+
else if (configParam == IProviderEnforceAckIDValidation)
1264+
return activeConfig.enforceAckIDValidation;
12621265
}
12631266
else if (type == ConfigGroupTypeServer)
12641267
{

0 commit comments

Comments
 (0)