Skip to content

Commit a269ee3

Browse files
hoebbelsBedbzn
authored andcommitted
feat(state): introduce configurable scheduler
1 parent 3ee6fda commit a269ee3

File tree

6 files changed

+95
-42
lines changed

6 files changed

+95
-42
lines changed

libs/state/selections/src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export {
22
createAccumulationObservable,
3-
RX_ACCUMULATOR_FN,
3+
defaultAccumulator,
44
} from './lib/accumulation-observable';
55
export { CompareFn, KeyCompareMap, PickSlice } from './lib/interfaces/index';
66
export { AccumulationFn, Accumulator } from './lib/model';
@@ -9,7 +9,7 @@ export {
99
select,
1010
selectSlice,
1111
stateful,
12-
} from './lib/operators/index';
12+
} from './lib/operators';
1313
export { createSideEffectObservable } from './lib/side-effect-observable';
1414
export {
1515
isDefined,

libs/state/selections/src/lib/accumulation-observable.ts

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import { inject, InjectionToken } from '@angular/core';
21
import {
32
BehaviorSubject,
43
ConnectableObservable,
54
EMPTY,
65
merge,
6+
MonoTypeOperatorFunction,
77
Observable,
88
queueScheduler,
9+
SchedulerLike,
910
Subject,
1011
Subscription,
1112
} from 'rxjs';
@@ -22,44 +23,26 @@ import {
2223
} from 'rxjs/operators';
2324
import { AccumulationFn, Accumulator } from './model';
2425

25-
const defaultAccumulator: AccumulationFn = <T>(st: T, sl: Partial<T>): T => {
26+
export const defaultAccumulator: AccumulationFn = <T>(
27+
st: T,
28+
sl: Partial<T>,
29+
): T => {
2630
return { ...st, ...sl };
2731
};
2832

29-
/**
30-
* Injection token for the default accumulator function.
31-
*
32-
* @example
33-
* providers: [
34-
* {
35-
* provide: RX_ACCUMULATOR_FN,
36-
* useValue: (state, slice) => ({ ...state, ...slice })
37-
* }
38-
* ]
39-
*/
40-
export const RX_ACCUMULATOR_FN = new InjectionToken<AccumulationFn>(
41-
'RX_ACCUMULATOR_FN',
42-
{
43-
providedIn: 'root',
44-
factory: () => defaultAccumulator,
45-
},
46-
);
47-
4833
export function createAccumulationObservable<T extends object>(
4934
stateObservables = new Subject<Observable<Partial<T>>>(),
5035
stateSlices = new Subject<Partial<T>>(),
36+
accumulatorObservable = new BehaviorSubject(defaultAccumulator),
37+
scheduler: SchedulerLike | null = queueScheduler,
5138
): Accumulator<T> {
52-
const accumulatorFn = inject(RX_ACCUMULATOR_FN);
53-
const accumulatorObservable = new BehaviorSubject(accumulatorFn);
39+
const observeStateOn = <R>(): MonoTypeOperatorFunction<R> =>
40+
scheduler ? observeOn(scheduler) : (o$) => o$;
5441
const signal$ = merge(
55-
stateObservables.pipe(
56-
distinctUntilChanged(),
57-
mergeAll(),
58-
observeOn(queueScheduler),
59-
),
60-
stateSlices.pipe(observeOn(queueScheduler)),
42+
stateObservables.pipe(distinctUntilChanged(), mergeAll(), observeStateOn()),
43+
stateSlices.pipe(observeStateOn()),
6144
).pipe(
62-
withLatestFrom(accumulatorObservable.pipe(observeOn(queueScheduler))),
45+
withLatestFrom(accumulatorObservable.pipe(observeStateOn())),
6346
scan(
6447
(state, [slice, stateAccumulator]) => stateAccumulator(state, slice),
6548
{} as T,

libs/state/selections/src/lib/side-effect-observable.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
1-
import { merge, Observable, queueScheduler, Subject, Subscribable, Subscription } from 'rxjs';
1+
import {
2+
merge,
3+
Observable,
4+
queueScheduler,
5+
SchedulerLike,
6+
Subject,
7+
Subscribable,
8+
Subscription,
9+
} from 'rxjs';
210
import { mergeAll, observeOn } from 'rxjs/operators';
311

412
export function createSideEffectObservable<T>(
5-
stateObservables = new Subject<Observable<T>>()
13+
stateObservables = new Subject<Observable<T>>(),
14+
scheduler: SchedulerLike | null = queueScheduler,
615
): {
716
effects$: Observable<T>;
817
nextEffectObservable: (effect$: Observable<T>) => void;
918
subscribe: () => Subscription;
1019
} & Subscribable<T> {
1120
const effects$: Observable<T> = merge(
12-
stateObservables.pipe(mergeAll(), observeOn(queueScheduler))
21+
stateObservables.pipe(
22+
mergeAll(),
23+
scheduler ? observeOn(scheduler) : (o$) => o$,
24+
),
1325
);
1426

1527
function nextEffectObservable(effect$: Observable<T>): void {
@@ -23,6 +35,6 @@ export function createSideEffectObservable<T>(
2335
return {
2436
effects$,
2537
nextEffectObservable,
26-
subscribe
38+
subscribe,
2739
};
2840
}

libs/state/src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
export {
2+
provideRxStateConfig,
3+
RX_ACCUMULATOR_FN,
4+
RX_STATE_SCHEDULER,
5+
withAccumulatorFn,
6+
withScheduler,
7+
withSyncScheduler,
8+
} from './lib/provide-rx-state-config';
19
export { rxState, RxStateSetupFn } from './lib/rx-state';
210
export {
311
ProjectStateFn,

libs/state/src/lib/provide-rx-state-config.ts

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import { InjectionToken, Provider } from '@angular/core';
2-
import { AccumulationFn, RX_ACCUMULATOR_FN } from '../../selections/src';
2+
import {
3+
AccumulationFn,
4+
defaultAccumulator,
5+
} from '@rx-angular/state/selections';
6+
import { queueScheduler, SchedulerLike } from 'rxjs';
37

48
export const enum RX_STATE_CONFIGS {
59
Scheduler,
@@ -11,6 +15,25 @@ interface RxStateConfigFn {
1115
providers: Provider[];
1216
}
1317

18+
/**
19+
* Injection token for the default accumulator function.
20+
*
21+
* @example
22+
* providers: [
23+
* {
24+
* provide: RX_ACCUMULATOR_FN,
25+
* useValue: (state, slice) => ({ ...state, ...slice })
26+
* }
27+
* ]
28+
*/
29+
export const RX_ACCUMULATOR_FN = new InjectionToken<AccumulationFn>(
30+
'RX_ACCUMULATOR_FN',
31+
{
32+
providedIn: 'root',
33+
factory: () => defaultAccumulator,
34+
},
35+
);
36+
1437
/**
1538
* Injection token for the default accumulator function.
1639
* @param fn
@@ -22,11 +45,11 @@ export function withAccumulatorFn(fn: AccumulationFn): RxStateConfigFn {
2245
};
2346
}
2447

25-
export const RX_STATE_SCHEDULER = new InjectionToken<any>(
48+
export const RX_STATE_SCHEDULER = new InjectionToken<SchedulerLike | 'sync'>(
2649
'RX_STATE_SCHEDULER',
2750
{
2851
providedIn: 'root',
29-
factory: () => undefined,
52+
factory: () => queueScheduler,
3053
},
3154
);
3255

@@ -35,14 +58,25 @@ export const RX_STATE_SCHEDULER = new InjectionToken<any>(
3558
* @param fn
3659
*/
3760
export function withScheduler(
38-
scheduler: any /* TODO: add type here*/,
61+
scheduler: SchedulerLike | 'sync',
3962
): RxStateConfigFn {
4063
return {
4164
kind: RX_STATE_CONFIGS.Scheduler,
4265
providers: [{ provide: RX_STATE_SCHEDULER, useValue: scheduler }],
4366
};
4467
}
4568

69+
/**
70+
* Injection token for the default scheduler for rxState.
71+
* @param fn
72+
*/
73+
export function withSyncScheduler(): RxStateConfigFn {
74+
return {
75+
kind: RX_STATE_CONFIGS.Scheduler,
76+
providers: [{ provide: RX_STATE_SCHEDULER, useValue: 'sync' }],
77+
};
78+
}
79+
4680
/**
4781
* This function is used to provide the configuration for the rxState function.
4882
*

libs/state/src/lib/rx-state.service.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,21 @@ import {
1919
select,
2020
} from '@rx-angular/state/selections';
2121
import {
22+
BehaviorSubject,
2223
EMPTY,
2324
isObservable,
2425
Observable,
2526
OperatorFunction,
27+
Subject,
2628
Subscribable,
2729
Subscription,
2830
Unsubscribable,
2931
} from 'rxjs';
3032
import { catchError, map, tap } from 'rxjs/operators';
33+
import {
34+
RX_ACCUMULATOR_FN,
35+
RX_STATE_SCHEDULER,
36+
} from './provide-rx-state-config';
3137
import { createSignalStateProxy, SignalStateProxy } from './signal-state-proxy';
3238

3339
export type ProjectStateFn<Type> = (oldState: Type) => Partial<Type>;
@@ -73,8 +79,18 @@ export class RxState<State extends object>
7379
{
7480
private subscription = new Subscription();
7581

76-
private accumulator = createAccumulationObservable<State>();
77-
private effectObservable = createSideEffectObservable();
82+
protected scheduler = inject(RX_STATE_SCHEDULER, { optional: true });
83+
84+
private accumulator = createAccumulationObservable<State>(
85+
new Subject<Observable<Partial<State>>>(),
86+
new Subject<Partial<State>>(),
87+
new BehaviorSubject(inject(RX_ACCUMULATOR_FN)),
88+
this.scheduler === 'sync' ? null : this.scheduler,
89+
);
90+
private effectObservable = createSideEffectObservable(
91+
new Subject<Observable<State>>(),
92+
this.scheduler === 'sync' ? null : this.scheduler,
93+
);
7894

7995
private readonly injector = inject(Injector);
8096

0 commit comments

Comments
 (0)