Skip to content

Commit 7c145ba

Browse files
ESDK-3844 fixed oauth2 token refresh after previous failure both for Reactor and Watchlist layers
1 parent 1e96f99 commit 7c145ba

File tree

5 files changed

+826
-50
lines changed

5 files changed

+826
-50
lines changed

Java/Eta/ValueAdd/src/main/java/com/thomsonreuters/upa/valueadd/reactor/Reactor.java

+20-11
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.thomsonreuters.upa.codec.StreamStates;
3131
import com.thomsonreuters.upa.rdm.ClassesOfService;
3232
import com.thomsonreuters.upa.rdm.DomainTypes;
33+
import com.thomsonreuters.upa.rdm.Login;
3334
import com.thomsonreuters.upa.transport.Channel;
3435
import com.thomsonreuters.upa.transport.ChannelState;
3536
import com.thomsonreuters.upa.transport.ConnectOptions;
@@ -797,7 +798,9 @@ else if(role.type() == ReactorRoleTypes.NIPROVIDER)
797798
reactorChannel._loginRequestForEDP = (LoginRequest)LoginMsgFactory.createMsg();
798799
reactorChannel._loginRequestForEDP.rdmMsgType(LoginMsgType.REQUEST);
799800

800-
loginRequest.copy(reactorChannel._loginRequestForEDP);
801+
loginRequest.copy(reactorChannel._loginRequestForEDP);
802+
reactorChannel._loginRequestForEDP.userNameType(Login.UserIdTypes.AUTHN_TOKEN);
803+
reactorChannel._loginRequestForEDP.flags(reactorChannel._loginRequestForEDP.flags() & ~LoginRequestFlags.HAS_PASSWORD);
801804
}
802805
}
803806

@@ -966,7 +969,7 @@ private void createRestClient(ReactorErrorInfo errorInfo)
966969
{
967970
public void onNewAuthToken(ReactorChannel reactorChannel, ReactorAuthTokenInfo authTokenInfo, ReactorErrorInfo errorInfo)
968971
{
969-
if (reactorChannel != null && (reactorChannel.state() == State.READY || reactorChannel.state() == State.EDP_RT))
972+
if (reactorChannel != null && (reactorChannel.state() == State.UP || reactorChannel.state() == State.READY || reactorChannel.state() == State.EDP_RT))
970973
{
971974
// only send reissue if watchlist enabled
972975
if (reactorChannel.watchlist() != null)
@@ -982,14 +985,12 @@ public void onError(ReactorChannel reactorChannel, ReactorErrorInfo errorInfo)
982985
reactorChannel.reactor().sendChannelWarningEvent(reactorChannel, errorInfo);
983986

984987
if(reactorChannel.sessionMgntState() == ReactorChannel.SessionMgntState.REQ_AUTH_TOKEN_USING_REFRESH_TOKEN ||
985-
reactorChannel.sessionMgntState() == ReactorChannel.SessionMgntState.REQ_AUTH_TOKEN_USING_PASSWORD)
988+
reactorChannel.sessionMgntState() == ReactorChannel.SessionMgntState.REQ_AUTH_TOKEN_USING_PASSWORD ||
989+
reactorChannel.sessionMgntState() == ReactorChannel.SessionMgntState.REQ_FAILURE_FOR_TOKEN_SERVICE)
986990
{
987-
if (reactorChannel.state() != ReactorChannel.State.EDP_RT && reactorChannel.state() != ReactorChannel.State.EDP_RT_FAILED)
991+
if (reactorChannel.handlesTokenReissueFailed() )
988992
{
989-
if (reactorChannel.handlesTokenReissueFailed() )
990-
{
991-
reactorChannel.reactor().sendAuthTokenWorkerEvent(reactorChannel, reactorChannel._reactorAuthTokenInfo);
992-
}
993+
reactorChannel.reactor().sendAuthTokenWorkerEvent(reactorChannel, reactorChannel._reactorAuthTokenInfo);
993994
}
994995
}
995996
}
@@ -1129,9 +1130,17 @@ void loginReissue(ReactorChannel reactorChannel, String authToken, ReactorErrorI
11291130
}
11301131

11311132
if (loginRequest != null)
1132-
{
1133-
loginRequest.applyNoRefresh();
1134-
encodeAndWriteLoginRequest(loginRequest, reactorChannel, errorInfo);
1133+
{
1134+
_reactorLock.lock();
1135+
1136+
try
1137+
{
1138+
reactorChannel.watchlist().loginHandler().sendLoginRequest(true, errorInfo);
1139+
}
1140+
finally
1141+
{
1142+
_reactorLock.unlock();
1143+
}
11351144
}
11361145
}
11371146

Java/Eta/ValueAdd/src/main/java/com/thomsonreuters/upa/valueadd/reactor/RestClient.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ public int RestResponseCallback(RestResponse response, RestEvent event)
302302
errorText = response.jsonObject().toString();
303303
}
304304

305+
/* This is used to indicate that the token is no longer valid */
306+
reactorChannel._loginRequestForEDP.userName().data("");
307+
305308
reactorChannel.reactor().populateErrorInfo(_errorInfo, ReactorReturnCodes.FAILURE, "RestClient.RestResponseCallback",
306309
"Failed REST request from HTTP status code " + response.statusCode() + " for user: " + ((ConsumerRole)reactorChannel.role()).rdmLoginRequest().userName().toString() +
307310
". Text: " + errorText);
@@ -358,22 +361,27 @@ public int RestErrorCallback(RestEvent event)
358361
{
359362
ReactorChannel reactorChannel = (ReactorChannel)event.userSpecObj();
360363

361-
onError( reactorChannel, event.errorInfo() );
362-
if (reactorChannel != null && reactorChannel.state() == State.EDP_RT)
364+
if (reactorChannel != null)
363365
{
364366
if(reactorChannel.sessionMgntState() == SessionMgntState.REQ_AUTH_TOKEN_USING_REFRESH_TOKEN ||
365367
reactorChannel.sessionMgntState() == SessionMgntState.REQ_AUTH_TOKEN_USING_PASSWORD)
366368
{
367369
reactorChannel.sessionMgntState(SessionMgntState.REQ_FAILURE_FOR_TOKEN_SERVICE);
370+
/* This is used to indicate that the token is no longer valid */
371+
reactorChannel._loginRequestForEDP.userName().data("");
368372
}
369373
else
370374
{
371375
reactorChannel.sessionMgntState(SessionMgntState.REQ_FAILURE_FOR_SERVICE_DISCOVERY);
372376
}
373-
374-
reactorChannel.state(State.EDP_RT_FAILED);
377+
378+
if (reactorChannel.state() == State.EDP_RT) {
379+
reactorChannel.state(State.EDP_RT_FAILED);
380+
}
375381
}
376-
382+
383+
onError( reactorChannel, event.errorInfo() );
384+
377385
return ReactorReturnCodes.SUCCESS;
378386
}
379387

Java/Eta/ValueAdd/src/main/java/com/thomsonreuters/upa/valueadd/reactor/WlDirectoryHandler.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -1310,20 +1310,23 @@ int loginStreamClosed()
13101310

13111311
if (_stream.state().streamState() == StreamStates.OPEN)
13121312
{
1313-
if (_watchlist.loginHandler().wlStream().state().streamState() == StreamStates.CLOSED_RECOVER)
1313+
if (_watchlist.loginHandler().wlStream().state().streamState() == StreamStates.CLOSED_RECOVER || _watchlist.reactorChannel().enableSessionManagement())
13141314
{
13151315
// login stream in close recover state
13161316
_stream.state().clear();
13171317
_stream.state().streamState(StreamStates.CLOSED_RECOVER);
13181318
_stream.state().dataState(DataStates.SUSPECT);
1319+
1320+
// clear service cache
1321+
_serviceCache.clearCache(false);
13191322
}
1320-
1321-
// clear service cache
1322-
_serviceCache.clearCache(false);
13231323

13241324
// close stream if login stream is closed
1325-
if (_watchlist.loginHandler().wlStream().state().streamState() == StreamStates.CLOSED)
1325+
else if (_watchlist.loginHandler().wlStream().state().streamState() == StreamStates.CLOSED)
13261326
{
1327+
// clear service cache
1328+
_serviceCache.clearCache(false);
1329+
13271330
closeDirectoryStream();
13281331
}
13291332
}

Java/Eta/ValueAdd/src/main/java/com/thomsonreuters/upa/valueadd/reactor/WlLoginHandler.java

+51-29
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,7 @@ public int readMsg(WlStream wlStream, DecodeIterator dIter, Msg msg, ReactorErro
748748
_stream.close();
749749
_stream.returnToPool();
750750
_stream = null;
751+
_loginRequestForEDP = null;
751752
}
752753
break;
753754
case StreamStates.OPEN:
@@ -1147,50 +1148,57 @@ void channelDown()
11471148
}
11481149
}
11491150

1150-
/* Handles channel up event. */
1151-
void channelUp(ReactorErrorInfo errorInfo)
1151+
/* Sends login request */
1152+
void sendLoginRequest(boolean noRefresh, ReactorErrorInfo errorInfo)
11521153
{
11531154
boolean newStream = false;
11541155
boolean newRequest = false;
11551156

11561157
_userloginStreamOpen = true;
1157-
1158-
// notify login stream that channel is up
1159-
if (_stream != null)
1160-
{
1161-
_stream.channelUp();
1162-
}
1163-
1158+
11641159
LoginRequest loginRequest = null;
11651160

11661161
if (_watchlist.reactorChannel().enableSessionManagement())
11671162
{
1163+
String authToken = _watchlist.reactorChannel()._loginRequestForEDP.userName().toString();
1164+
1165+
/* Don't send a login request as the access token is invalid */
1166+
if(authToken.isEmpty())
1167+
return;
1168+
11681169
if (_loginRequestForEDP == null
11691170
&& _watchlist.role().rdmLoginRequest() != null)
11701171
{
11711172
newRequest = true;
11721173
_loginRequestForEDP = (LoginRequest) LoginMsgFactory.createMsg();
11731174
_loginRequestForEDP.rdmMsgType(LoginMsgType.REQUEST);
11741175

1175-
_watchlist.reactorChannel()._loginRequestForEDP.copy(_loginRequestForEDP);
1176-
1177-
// create login stream if not created yet
1178-
if (_stream == null)
1179-
{
1180-
newStream = true;
1176+
_watchlist.reactorChannel()._loginRequestForEDP.copy(_loginRequestForEDP);
1177+
}
1178+
else
1179+
{
1180+
_loginRequestForEDP.userName().data(authToken);
1181+
}
1182+
1183+
loginRequest = _loginRequestForEDP;
1184+
1185+
if(noRefresh)
1186+
loginRequest.applyNoRefresh();
1187+
1188+
// create login stream if not created yet
1189+
if (_stream == null)
1190+
{
1191+
newStream = true;
11811192

1182-
_loginRequestForEDP.streamId(_loginStreamId);
1193+
_loginRequestForEDP.streamId(_loginStreamId);
11831194

1184-
// create stream
1185-
_stream = ReactorFactory.createWlStream();
1186-
_stream.handler(this);
1187-
_stream.watchlist(_watchlist);
1188-
_stream.streamId(_loginRequestForEDP.streamId());
1189-
_stream.domainType(_loginRequestForEDP.domainType());
1190-
}
1191-
1195+
// create stream
1196+
_stream = ReactorFactory.createWlStream();
1197+
_stream.handler(this);
1198+
_stream.watchlist(_watchlist);
1199+
_stream.streamId(_loginRequestForEDP.streamId());
1200+
_stream.domainType(_loginRequestForEDP.domainType());
11921201
}
1193-
loginRequest = _loginRequestForEDP;
11941202
}
11951203
else
11961204
{
@@ -1219,10 +1227,11 @@ void channelUp(ReactorErrorInfo errorInfo)
12191227
}
12201228
}
12211229
_loginRequest.rdmMsgType(LoginMsgType.REQUEST);
1222-
loginRequest = _loginRequest;
1223-
}
1230+
loginRequest = _loginRequest;
12241231

1225-
1232+
if(noRefresh)
1233+
loginRequest.applyNoRefresh();
1234+
}
12261235

12271236
// send login request via stream
12281237
if (loginRequest != null && _stream != null)
@@ -1231,7 +1240,7 @@ void channelUp(ReactorErrorInfo errorInfo)
12311240
{
12321241
loginRequest.flags(loginRequest.flags() & ~LoginRequestFlags.PAUSE_ALL);
12331242
}
1234-
if (loginRequest.checkNoRefresh())
1243+
if (!noRefresh && loginRequest.checkNoRefresh())
12351244
{
12361245
loginRequest.flags(loginRequest.flags() & ~LoginRequestFlags.NO_REFRESH);
12371246
}
@@ -1273,6 +1282,7 @@ void channelUp(ReactorErrorInfo errorInfo)
12731282
if (newRequest)
12741283
{
12751284
_loginRequest = null;
1285+
_loginRequestForEDP = null;
12761286
}
12771287

12781288
// if new stream, return stream to pool
@@ -1283,6 +1293,18 @@ void channelUp(ReactorErrorInfo errorInfo)
12831293
}
12841294
}
12851295
}
1296+
1297+
/* Handles channel up event. */
1298+
void channelUp(ReactorErrorInfo errorInfo)
1299+
{
1300+
// notify login stream that channel is up
1301+
if (_stream != null)
1302+
{
1303+
_stream.channelUp();
1304+
}
1305+
1306+
sendLoginRequest(false, errorInfo);
1307+
}
12861308

12871309
@Override
12881310
public int requestTimeout(WlStream wlStream, ReactorErrorInfo errorInfo)

0 commit comments

Comments
 (0)