@@ -43,10 +43,11 @@ import Hasura.Server.Auth (AuthMode, UserAuth
43
43
import Hasura.Server.Context
44
44
import Hasura.Server.Cors
45
45
import Hasura.Server.Utils (RequestId , diffTimeToMicro ,
46
- getRequestId )
46
+ getRequestId , withElapsedTime )
47
47
48
48
import qualified Hasura.GraphQL.Execute as E
49
49
import qualified Hasura.GraphQL.Execute.LiveQuery as LQ
50
+ import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as LQ
50
51
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
51
52
import qualified Hasura.Logging as L
52
53
@@ -85,9 +86,20 @@ data WSConnData
85
86
type WSServer = WS. WSServer WSConnData
86
87
87
88
type WSConn = WS. WSConn WSConnData
89
+
88
90
sendMsg :: (MonadIO m ) => WSConn -> ServerMsg -> m ()
89
- sendMsg wsConn =
90
- liftIO . WS. sendMsg wsConn . encodeServerMsg
91
+ sendMsg wsConn msg =
92
+ liftIO $ WS. sendMsg wsConn $ WS. WSQueueResponse (encodeServerMsg msg) Nothing
93
+
94
+ sendMsgWithMetadata :: (MonadIO m ) => WSConn -> ServerMsg -> LQ. LiveQueryMetadata -> m ()
95
+ sendMsgWithMetadata wsConn msg (LQ. LiveQueryMetadata execTime) =
96
+ liftIO $ WS. sendMsg wsConn $ WS. WSQueueResponse bs wsInfo
97
+ where
98
+ bs = encodeServerMsg msg
99
+ wsInfo = Just $ WS. WSEventInfo
100
+ { WS. _wseiQueryExecutionTime = Just $ realToFrac execTime
101
+ , WS. _wseiResponseSize = Just $ BL. length bs
102
+ }
91
103
92
104
data OpDetail
93
105
= ODStarted
@@ -192,7 +204,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead = do
192
204
expTime <- liftIO $ STM. atomically $ do
193
205
connState <- STM. readTVar $ (_wscUser . WS. getData) wsConn
194
206
case connState of
195
- CSNotInitialised _ -> STM. retry
207
+ CSNotInitialised _ -> STM. retry
196
208
CSInitError _ -> STM. retry
197
209
CSInitialised _ expTimeM _ ->
198
210
maybe STM. retry return expTimeM
@@ -310,8 +322,9 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
310
322
logOpEv ODStarted (Just reqId)
311
323
-- log the generated SQL and the graphql query
312
324
L. unLogger logger $ QueryLog query genSql reqId
313
- resp <- liftIO $ runExceptT action
314
- either (postExecErr reqId) sendSuccResp resp
325
+ (dt, resp) <- withElapsedTime $ liftIO $ runExceptT action
326
+ let lqMeta = LQ. LiveQueryMetadata dt
327
+ either (postExecErr reqId) (`sendSuccResp` lqMeta) resp
315
328
sendCompleted (Just reqId)
316
329
317
330
runRemoteGQ :: E. ExecutionCtx -> RequestId -> UserInfo -> [H. Header ]
@@ -323,15 +336,16 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
323
336
err400 NotSupported " subscription to remote server is not supported"
324
337
325
338
-- if it's not a subscription, use HTTP to execute the query on the remote
326
- resp <- runExceptT $ flip runReaderT execCtx $
339
+ (dt, resp) <- withElapsedTime $ runExceptT $ flip runReaderT execCtx $
327
340
E. execRemoteGQ reqId userInfo reqHdrs q rsi opDef
328
- either (postExecErr reqId) (sendRemoteResp reqId . _hrBody) resp
341
+ let ocMeta = LQ. LiveQueryMetadata dt
342
+ either (postExecErr reqId) (\ val -> sendRemoteResp reqId (_hrBody val) ocMeta) resp
329
343
sendCompleted (Just reqId)
330
344
331
- sendRemoteResp reqId resp =
345
+ sendRemoteResp reqId resp meta =
332
346
case J. eitherDecodeStrict (encJToBS resp) of
333
347
Left e -> postExecErr reqId $ invalidGqlErr $ T. pack e
334
- Right res -> sendMsg wsConn $ SMData $ DataMsg opId ( GRRemote res)
348
+ Right res -> sendMsgWithMetadata wsConn ( SMData $ DataMsg opId $ GRRemote res) meta
335
349
336
350
invalidGqlErr err = err500 Unexpected $
337
351
" Failed parsing GraphQL response from remote: " <> err
@@ -357,19 +371,19 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
357
371
358
372
sendStartErr e = do
359
373
let errFn = getErrFn errRespTy
360
- sendMsg wsConn $ SMErr $ ErrorMsg opId $ errFn False $
361
- err400 StartFailed e
374
+ sendMsg wsConn $
375
+ SMErr $ ErrorMsg opId $ errFn False $ err400 StartFailed e
362
376
logOpEv (ODProtoErr e) Nothing
363
377
364
378
sendCompleted reqId = do
365
- sendMsg wsConn $ SMComplete $ CompletionMsg opId
379
+ sendMsg wsConn ( SMComplete $ CompletionMsg opId)
366
380
logOpEv ODCompleted reqId
367
381
368
382
postExecErr reqId qErr = do
369
383
let errFn = getErrFn errRespTy
370
384
logOpEv (ODQueryErr qErr) (Just reqId)
371
- sendMsg wsConn $ SMData $ DataMsg opId $
372
- GRHasura $ GQExecError $ pure $ errFn False qErr
385
+ sendMsg wsConn $ SMData $
386
+ DataMsg opId $ GRHasura $ GQExecError $ pure $ errFn False qErr
373
387
374
388
-- why wouldn't pre exec error use graphql response?
375
389
preExecErr reqId qErr = do
@@ -378,11 +392,11 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
378
392
let err = case errRespTy of
379
393
ERTLegacy -> errFn False qErr
380
394
ERTGraphqlCompliant -> J. object [" errors" J. .= [errFn False qErr]]
381
- sendMsg wsConn $ SMErr $ ErrorMsg opId err
395
+ sendMsg wsConn ( SMErr $ ErrorMsg opId err)
382
396
383
397
sendSuccResp encJson =
384
- sendMsg wsConn $ SMData $ DataMsg opId $
385
- GRHasura $ GQSuccess $ encJToLBS encJson
398
+ sendMsgWithMetadata wsConn
399
+ ( SMData $ DataMsg opId $ GRHasura $ GQSuccess $ encJToLBS encJson)
386
400
387
401
withComplete :: ExceptT () IO () -> ExceptT () IO a
388
402
withComplete action = do
@@ -391,9 +405,12 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
391
405
throwError ()
392
406
393
407
-- on change, send message on the websocket
394
- liveQOnChange resp =
395
- WS. sendMsg wsConn $ encodeServerMsg $ SMData $
396
- DataMsg opId (GRHasura resp)
408
+ liveQOnChange :: LQ. OnChange
409
+ liveQOnChange (GQSuccess (LQ. LiveQueryResponse bs dTime)) =
410
+ sendMsgWithMetadata wsConn (SMData $ DataMsg opId $ GRHasura $ GQSuccess bs) $
411
+ LQ. LiveQueryMetadata dTime
412
+ liveQOnChange resp = sendMsg wsConn $ SMData $ DataMsg opId $ GRHasura $
413
+ LQ. _lqrPayload <$> resp
397
414
398
415
catchAndIgnore :: ExceptT () IO () -> IO ()
399
416
catchAndIgnore m = void $ runExceptT m
0 commit comments