Skip to content

Commit 2aba7ed

Browse files
hgiasaclexi-lambda
authored andcommitted
add query execution time and response size to ws-server logs in websocket transport (hasura#3584)
1 parent dbfc9e2 commit 2aba7ed

File tree

5 files changed

+134
-59
lines changed

5 files changed

+134
-59
lines changed

server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Poll.hs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
2626
, newSinkId
2727
, SubscriberMap
2828
, OnChange
29+
, LGQResponse
30+
, LiveQueryResponse(..)
31+
, LiveQueryMetadata(..)
2932
) where
3033

3134
import Hasura.Prelude
@@ -35,6 +38,7 @@ import qualified Control.Concurrent.STM as STM
3538
import qualified Crypto.Hash as CH
3639
import qualified Data.Aeson.Extended as J
3740
import qualified Data.ByteString as BS
41+
import qualified Data.ByteString.Lazy as BL
3842
import qualified Data.HashMap.Strict as Map
3943
import qualified Data.Time.Clock as Clock
4044
import qualified Data.UUID as UUID
@@ -64,7 +68,21 @@ data Subscriber
6468
, _sOnChangeCallback :: !OnChange
6569
}
6670

67-
type OnChange = GQResponse -> IO ()
71+
-- | live query onChange metadata, used for adding more extra analytics data
72+
data LiveQueryMetadata
73+
= LiveQueryMetadata
74+
{ _lqmExecutionTime :: !Clock.NominalDiffTime
75+
}
76+
77+
data LiveQueryResponse
78+
= LiveQueryResponse
79+
{ _lqrPayload :: !BL.ByteString
80+
, _lqrExecutionTime :: !Clock.NominalDiffTime
81+
}
82+
83+
type LGQResponse = GQResult LiveQueryResponse
84+
85+
type OnChange = LGQResponse -> IO ()
6886

6987
newtype SubscriberId = SubscriberId { _unSinkId :: UUID.UUID }
7088
deriving (Show, Eq, Hashable, J.ToJSON)
@@ -159,9 +177,10 @@ pushResultToCohort
159177
:: GQResult EncJSON
160178
-- ^ a response that still needs to be wrapped with each 'Subscriber'’s root 'G.Alias'
161179
-> Maybe ResponseHash
180+
-> LiveQueryMetadata
162181
-> CohortSnapshot
163182
-> IO ()
164-
pushResultToCohort result respHashM cohortSnapshot = do
183+
pushResultToCohort result respHashM (LiveQueryMetadata dTime) cohortSnapshot = do
165184
prevRespHashM <- STM.readTVarIO respRef
166185
-- write to the current websockets if needed
167186
sinks <-
@@ -176,8 +195,10 @@ pushResultToCohort result respHashM cohortSnapshot = do
176195
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
177196
pushResultToSubscribers = A.mapConcurrently_ $ \(Subscriber alias action) ->
178197
let aliasText = G.unName $ G.unAlias alias
179-
wrapWithAlias response =
180-
encJToLBS $ encJFromAssocList [(aliasText, response)]
198+
wrapWithAlias response = LiveQueryResponse
199+
{ _lqrPayload = encJToLBS $ encJFromAssocList [(aliasText, response)]
200+
, _lqrExecutionTime = dTime
201+
}
181202
in action (wrapWithAlias <$> result)
182203

183204
-- -------------------------------------------------------------------------------------------------
@@ -301,11 +322,14 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do
301322
queryInit <- Clock.getCurrentTime
302323
mxRes <- runExceptT . runLazyTx' pgExecCtx $ executeMultiplexedQuery pgQuery queryVars
303324
queryFinish <- Clock.getCurrentTime
304-
Metrics.add (_rmQuery metrics) $
305-
realToFrac $ Clock.diffUTCTime queryFinish queryInit
306-
let operations = getCohortOperations cohortSnapshotMap mxRes
325+
let dt = Clock.diffUTCTime queryFinish queryInit
326+
queryTime = realToFrac dt
327+
lqMeta = LiveQueryMetadata dt
328+
operations = getCohortOperations cohortSnapshotMap lqMeta mxRes
329+
Metrics.add (_rmQuery metrics) queryTime
330+
307331
-- concurrently push each unique result
308-
A.mapConcurrently_ (uncurry3 pushResultToCohort) operations
332+
A.mapConcurrently_ (uncurry4 pushResultToCohort) operations
309333
pushFinish <- Clock.getCurrentTime
310334
Metrics.add (_rmPush metrics) $
311335
realToFrac $ Clock.diffUTCTime pushFinish queryFinish
@@ -316,8 +340,8 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do
316340
where
317341
Poller cohortMap _ = handler
318342

319-
uncurry3 :: (a -> b -> c -> d) -> (a, b, c) -> d
320-
uncurry3 f (a, b, c) = f a b c
343+
uncurry4 :: (a -> b -> c -> d -> e) -> (a, b, c, d) -> e
344+
uncurry4 f (a, b, c, d) = f a b c d
321345

322346
getCohortSnapshot (cohortVars, handlerC) = do
323347
let Cohort resId respRef curOpsTV newOpsTV = handlerC
@@ -331,11 +355,11 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do
331355
getQueryVars cohortSnapshotMap =
332356
Map.toList $ fmap _csVariables cohortSnapshotMap
333357

334-
getCohortOperations cohortSnapshotMap = \case
358+
getCohortOperations cohortSnapshotMap actionMeta = \case
335359
Left e ->
336360
-- TODO: this is internal error
337361
let resp = GQExecError [encodeGQErr False e]
338-
in [ (resp, Nothing, snapshot)
362+
in [ (resp, Nothing, actionMeta, snapshot)
339363
| (_, snapshot) <- Map.toList cohortSnapshotMap
340364
]
341365
Right responses ->
@@ -345,4 +369,4 @@ pollQuery metrics batchSize pgExecCtx pgQuery handler = do
345369
-- from Postgres strictly and (2) even if we didn’t, hashing will have to force the
346370
-- whole thing anyway.
347371
respHash = mkRespHash (encJToBS result)
348-
in (GQSuccess result, Just respHash,) <$> Map.lookup respId cohortSnapshotMap
372+
in (GQSuccess result, Just respHash, actionMeta,) <$> Map.lookup respId cohortSnapshotMap

server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,11 @@ import Hasura.Server.Auth (AuthMode, UserAuth
4343
import Hasura.Server.Context
4444
import Hasura.Server.Cors
4545
import Hasura.Server.Utils (RequestId, diffTimeToMicro,
46-
getRequestId)
46+
getRequestId, withElapsedTime)
4747

4848
import qualified Hasura.GraphQL.Execute as E
4949
import qualified Hasura.GraphQL.Execute.LiveQuery as LQ
50+
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as LQ
5051
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
5152
import qualified Hasura.Logging as L
5253

@@ -85,9 +86,20 @@ data WSConnData
8586
type WSServer = WS.WSServer WSConnData
8687

8788
type WSConn = WS.WSConn WSConnData
89+
8890
sendMsg :: (MonadIO m) => WSConn -> ServerMsg -> m ()
89-
sendMsg wsConn =
90-
liftIO . WS.sendMsg wsConn . encodeServerMsg
91+
sendMsg wsConn msg =
92+
liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse (encodeServerMsg msg) Nothing
93+
94+
sendMsgWithMetadata :: (MonadIO m) => WSConn -> ServerMsg -> LQ.LiveQueryMetadata -> m ()
95+
sendMsgWithMetadata wsConn msg (LQ.LiveQueryMetadata execTime) =
96+
liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse bs wsInfo
97+
where
98+
bs = encodeServerMsg msg
99+
wsInfo = Just $ WS.WSEventInfo
100+
{ WS._wseiQueryExecutionTime = Just $ realToFrac execTime
101+
, WS._wseiResponseSize = Just $ BL.length bs
102+
}
91103

92104
data OpDetail
93105
= ODStarted
@@ -192,7 +204,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do
192204
expTime <- liftIO $ STM.atomically $ do
193205
connState <- STM.readTVar $ (_wscUser . WS.getData) wsConn
194206
case connState of
195-
CSNotInitialised _ -> STM.retry
207+
CSNotInitialised _ -> STM.retry
196208
CSInitError _ -> STM.retry
197209
CSInitialised _ expTimeM _ ->
198210
maybe STM.retry return expTimeM
@@ -310,8 +322,9 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
310322
logOpEv ODStarted (Just reqId)
311323
-- log the generated SQL and the graphql query
312324
L.unLogger logger $ QueryLog query genSql reqId
313-
resp <- liftIO $ runExceptT action
314-
either (postExecErr reqId) sendSuccResp resp
325+
(dt, resp) <- withElapsedTime $ liftIO $ runExceptT action
326+
let lqMeta = LQ.LiveQueryMetadata dt
327+
either (postExecErr reqId) (`sendSuccResp` lqMeta) resp
315328
sendCompleted (Just reqId)
316329

317330
runRemoteGQ :: E.ExecutionCtx -> RequestId -> UserInfo -> [H.Header]
@@ -323,15 +336,16 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
323336
err400 NotSupported "subscription to remote server is not supported"
324337

325338
-- if it's not a subscription, use HTTP to execute the query on the remote
326-
resp <- runExceptT $ flip runReaderT execCtx $
339+
(dt, resp) <- withElapsedTime $ runExceptT $ flip runReaderT execCtx $
327340
E.execRemoteGQ reqId userInfo reqHdrs q rsi opDef
328-
either (postExecErr reqId) (sendRemoteResp reqId . _hrBody) resp
341+
let ocMeta = LQ.LiveQueryMetadata dt
342+
either (postExecErr reqId) (\val -> sendRemoteResp reqId (_hrBody val) ocMeta) resp
329343
sendCompleted (Just reqId)
330344

331-
sendRemoteResp reqId resp =
345+
sendRemoteResp reqId resp meta =
332346
case J.eitherDecodeStrict (encJToBS resp) of
333347
Left e -> postExecErr reqId $ invalidGqlErr $ T.pack e
334-
Right res -> sendMsg wsConn $ SMData $ DataMsg opId (GRRemote res)
348+
Right res -> sendMsgWithMetadata wsConn (SMData $ DataMsg opId $ GRRemote res) meta
335349

336350
invalidGqlErr err = err500 Unexpected $
337351
"Failed parsing GraphQL response from remote: " <> err
@@ -357,19 +371,19 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
357371

358372
sendStartErr e = do
359373
let errFn = getErrFn errRespTy
360-
sendMsg wsConn $ SMErr $ ErrorMsg opId $ errFn False $
361-
err400 StartFailed e
374+
sendMsg wsConn $
375+
SMErr $ ErrorMsg opId $ errFn False $ err400 StartFailed e
362376
logOpEv (ODProtoErr e) Nothing
363377

364378
sendCompleted reqId = do
365-
sendMsg wsConn $ SMComplete $ CompletionMsg opId
379+
sendMsg wsConn (SMComplete $ CompletionMsg opId)
366380
logOpEv ODCompleted reqId
367381

368382
postExecErr reqId qErr = do
369383
let errFn = getErrFn errRespTy
370384
logOpEv (ODQueryErr qErr) (Just reqId)
371-
sendMsg wsConn $ SMData $ DataMsg opId $
372-
GRHasura $ GQExecError $ pure $ errFn False qErr
385+
sendMsg wsConn $ SMData $
386+
DataMsg opId $ GRHasura $ GQExecError $ pure $ errFn False qErr
373387

374388
-- why wouldn't pre exec error use graphql response?
375389
preExecErr reqId qErr = do
@@ -378,11 +392,11 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
378392
let err = case errRespTy of
379393
ERTLegacy -> errFn False qErr
380394
ERTGraphqlCompliant -> J.object ["errors" J..= [errFn False qErr]]
381-
sendMsg wsConn $ SMErr $ ErrorMsg opId err
395+
sendMsg wsConn (SMErr $ ErrorMsg opId err)
382396

383397
sendSuccResp encJson =
384-
sendMsg wsConn $ SMData $ DataMsg opId $
385-
GRHasura $ GQSuccess $ encJToLBS encJson
398+
sendMsgWithMetadata wsConn
399+
(SMData $ DataMsg opId $ GRHasura $ GQSuccess $ encJToLBS encJson)
386400

387401
withComplete :: ExceptT () IO () -> ExceptT () IO a
388402
withComplete action = do
@@ -391,9 +405,12 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
391405
throwError ()
392406

393407
-- on change, send message on the websocket
394-
liveQOnChange resp =
395-
WS.sendMsg wsConn $ encodeServerMsg $ SMData $
396-
DataMsg opId (GRHasura resp)
408+
liveQOnChange :: LQ.OnChange
409+
liveQOnChange (GQSuccess (LQ.LiveQueryResponse bs dTime)) =
410+
sendMsgWithMetadata wsConn (SMData $ DataMsg opId $ GRHasura $ GQSuccess bs) $
411+
LQ.LiveQueryMetadata dTime
412+
liveQOnChange resp = sendMsg wsConn $ SMData $ DataMsg opId $ GRHasura $
413+
LQ._lqrPayload <$> resp
397414

398415
catchAndIgnore :: ExceptT () IO () -> IO ()
399416
catchAndIgnore m = void $ runExceptT m

server/src-lib/Hasura/GraphQL/Transport/WebSocket/Protocol.hs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,13 @@ instance J.FromJSON ClientMsg where
5656
parseJSON = J.withObject "ClientMessage" $ \obj -> do
5757
t <- obj J..: "type"
5858
case t of
59-
"connection_init" -> CMConnInit <$> obj J..:? "payload"
60-
"start" -> CMStart <$> J.parseJSON (J.Object obj)
61-
"stop" -> CMStop <$> J.parseJSON (J.Object obj)
59+
"connection_init" -> CMConnInit <$> obj J..:? "payload"
60+
"start" -> CMStart <$> J.parseJSON (J.Object obj)
61+
"stop" -> CMStop <$> J.parseJSON (J.Object obj)
6262
"connection_terminate" -> return CMConnTerm
63-
_ -> fail $ "unexpected type for ClientMessage: " <> t
63+
_ -> fail $ "unexpected type for ClientMessage: " <> t
6464

6565
-- server to client messages
66-
6766
data DataMsg
6867
= DataMsg
6968
{ _dmId :: !OperationId

0 commit comments

Comments
 (0)