Skip to content

Commit a30ba18

Browse files
committed
Merge socket subscriptions to use live socket
1 parent d2fd855 commit a30ba18

File tree

3 files changed

+243
-7
lines changed

3 files changed

+243
-7
lines changed
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
import * as withAbsintheSocket from '@absinthe/socket';
2+
import { GraphQLClient } from 'graphql-request';
3+
import {
4+
Headers as HttpHeaders,
5+
Options,
6+
} from 'graphql-request/dist/src/types';
7+
import { print } from 'graphql/language/printer';
8+
import { Socket as PhoenixSocket } from 'phoenix';
9+
10+
export { default as gql } from 'graphql-tag';
11+
12+
export interface Query<Result extends any, Payload extends any = void> {
13+
(payload: Payload): Result;
14+
}
15+
16+
type Variable = string | number | boolean | null;
17+
18+
interface NoPayloadSubscription<R> {
19+
(action: (result: R) => void): void;
20+
dispose(): void;
21+
disposeWhere(cb: (variables: { [variables: string]: Variable }) => boolean);
22+
}
23+
24+
interface PayloadSubscription<P, R> {
25+
(payload: P, action: (result: R) => void): void;
26+
dispose(): void;
27+
disposeWhere(cb: (variables: { [variables: string]: Variable }) => boolean);
28+
}
29+
30+
interface Subscription {
31+
variables: { [key: string]: Variable };
32+
dispose: () => void;
33+
}
34+
35+
type Http = {
36+
endpoint: string;
37+
headers?: () => HttpHeaders;
38+
options?: Options;
39+
};
40+
41+
type Ws =
42+
| {
43+
endpoint: string;
44+
params?: () => { [key: string]: string | number | boolean };
45+
}
46+
| PhoenixSocket;
47+
48+
type Queries = {
49+
queries?: {
50+
[key: string]: (payload: any) => any;
51+
};
52+
mutations?: {
53+
[key: string]: (payload: any) => any;
54+
};
55+
subscriptions?: {
56+
[key: string]: (payload: any) => any;
57+
};
58+
};
59+
60+
export type Graphql<T extends Queries> = {
61+
initialize(http: Http, ws?: Ws): void;
62+
} & {
63+
queries: {
64+
[N in keyof T['queries']]: T['queries'][N] extends (
65+
payload: infer P
66+
) => infer R
67+
? P extends void
68+
? () => Promise<R>
69+
: (payload: P) => Promise<R>
70+
: never;
71+
};
72+
mutations: {
73+
[N in keyof T['mutations']]: T['mutations'][N] extends (
74+
payload: infer P
75+
) => infer R
76+
? P extends void
77+
? () => Promise<R>
78+
: (payload: P) => Promise<R>
79+
: never;
80+
};
81+
subscriptions: {
82+
[N in keyof T['subscriptions']]: T['subscriptions'][N] extends (
83+
payload: infer P
84+
) => infer R
85+
? P extends void
86+
? NoPayloadSubscription<R>
87+
: PayloadSubscription<P, R>
88+
: never;
89+
};
90+
};
91+
92+
function createError(message: string) {
93+
throw new Error(`OVERMIND-GRAPHQL: ${message}`);
94+
}
95+
96+
const _clients: { [url: string]: GraphQLClient } = {};
97+
const _subscriptions: {
98+
[query: string]: Subscription[];
99+
} = {};
100+
101+
export const graphql: <T extends Queries>(
102+
queries: T
103+
) => Graphql<T> = queries => {
104+
let _http: Http;
105+
let _ws: Ws;
106+
107+
function getClient(): GraphQLClient | null {
108+
if (_http) {
109+
const headers = // eslint-disable-next-line
110+
typeof _http.headers === 'function'
111+
? _http.headers()
112+
: _http.options && _http.options.headers
113+
? _http.options.headers
114+
: {};
115+
116+
if (_clients[_http.endpoint]) {
117+
_clients[_http.endpoint].setHeaders(headers);
118+
} else {
119+
_clients[_http.endpoint] = new GraphQLClient(_http.endpoint, {
120+
..._http.options,
121+
headers,
122+
});
123+
}
124+
125+
return _clients[_http.endpoint];
126+
}
127+
128+
return null;
129+
}
130+
131+
function getWsClient(): PhoenixSocket | null {
132+
if (_ws) {
133+
return withAbsintheSocket.create(_ws);
134+
}
135+
136+
return null;
137+
}
138+
139+
const evaluatedQueries = {
140+
queries: Object.keys(queries.queries || {}).reduce((aggr, key) => {
141+
aggr[key] = variables => {
142+
const query = queries.queries![key] as any;
143+
const client = getClient();
144+
145+
if (client) {
146+
return client.request(print(query), variables);
147+
}
148+
149+
throw createError(
150+
'You are running a query, though there is no HTTP endpoint configured'
151+
);
152+
};
153+
return aggr;
154+
}, {}),
155+
mutations: Object.keys(queries.mutations || {}).reduce((aggr, key) => {
156+
aggr[key] = variables => {
157+
const query = queries.mutations![key] as any;
158+
const client = getClient();
159+
160+
if (client) {
161+
return client.request(print(query), variables);
162+
}
163+
164+
throw createError(
165+
'You are running a query, though there is no HTTP endpoint configured'
166+
);
167+
};
168+
return aggr;
169+
}, {}),
170+
subscriptions: Object.keys(queries.subscriptions || {}).reduce(
171+
(aggr, key) => {
172+
const query = queries.subscriptions![key] as any;
173+
const queryString = print(query);
174+
175+
if (!_subscriptions[queryString]) {
176+
_subscriptions[queryString] = [];
177+
}
178+
179+
function subscription(arg1, arg2) {
180+
const client = getWsClient();
181+
182+
if (client) {
183+
const variables = arg2 ? arg1 : {};
184+
const action = arg2 || arg1;
185+
const notifier = withAbsintheSocket.send(client, {
186+
operation: queryString,
187+
variables,
188+
});
189+
190+
const observer = withAbsintheSocket.observe(client, notifier, {
191+
onResult: ({ data }) => {
192+
action(data);
193+
},
194+
});
195+
196+
_subscriptions[queryString].push({
197+
variables,
198+
dispose: () =>
199+
withAbsintheSocket.unobserve(client, notifier, observer),
200+
});
201+
} else {
202+
throw createError('There is no ws client available for this query');
203+
}
204+
}
205+
206+
subscription.dispose = () => {
207+
_subscriptions[queryString].forEach(sub => {
208+
sub.dispose();
209+
});
210+
_subscriptions[queryString].length = 0;
211+
};
212+
213+
subscription.disposeWhere = cb => {
214+
_subscriptions[queryString] = _subscriptions[queryString].reduce<
215+
Subscription[]
216+
>((subAggr, sub) => {
217+
if (cb(sub.variables)) {
218+
return subAggr;
219+
}
220+
return subAggr.concat(sub);
221+
}, []);
222+
};
223+
224+
aggr[key] = subscription;
225+
226+
return aggr;
227+
},
228+
{}
229+
),
230+
};
231+
232+
return {
233+
initialize(http: Http, ws?: Ws) {
234+
_http = http;
235+
if (ws) {
236+
_ws = ws;
237+
}
238+
},
239+
...evaluatedQueries,
240+
} as any;
241+
};

packages/app/src/app/overmind/effects/gql/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { graphql } from 'overmind-graphql';
1+
import { graphql } from '../../dependencies/overmind-graphql';
22

33
import * as collaboratorsMutations from './collaborators/mutations';
44
import * as collaboratorsQueries from './collaborators/queries';

packages/app/src/app/overmind/onInitialize.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,7 @@ export const onInitialize: OnInitialize = async (
3333
Authorization: `Bearer ${state.jwt}`,
3434
}),
3535
},
36-
{
37-
endpoint: `${location.origin.replace('http', 'ws')}/graphql-socket`,
38-
params: () => ({
39-
Authorization: `Bearer ${state.jwt}`,
40-
}),
41-
}
36+
effects.live.getSocket()
4237
);
4338

4439
effects.notifications.initialize({

0 commit comments

Comments
 (0)