@@ -20,6 +20,7 @@ export interface SubscriberObject<T> {
20
20
error ?: any ;
21
21
complete ?: any ;
22
22
invalidate : ( ) => void ;
23
+ revalidate : ( ) => void ;
23
24
}
24
25
25
26
/**
@@ -110,8 +111,22 @@ const bind = <T>(object: T | null | undefined, fnName: keyof T) => {
110
111
111
112
const toSubscriberObject = < T > ( subscriber : Subscriber < T > ) : SubscriberObject < T > =>
112
113
typeof subscriber === 'function'
113
- ? { next : subscriber . bind ( null ) , invalidate : noop }
114
- : { next : bind ( subscriber , 'next' ) , invalidate : bind ( subscriber , 'invalidate' ) } ;
114
+ ? { next : subscriber . bind ( null ) , invalidate : noop , revalidate : noop }
115
+ : {
116
+ next : bind ( subscriber , 'next' ) ,
117
+ invalidate : bind ( subscriber , 'invalidate' ) ,
118
+ revalidate : bind ( subscriber , 'revalidate' ) ,
119
+ } ;
120
+
121
+ const callAllSubscribers = < T , U extends keyof SubscriberObject < T > > (
122
+ subscribers : Iterable < SubscriberObject < T > > ,
123
+ fn : U ,
124
+ ...args : Parameters < SubscriberObject < T > [ U ] >
125
+ ) => {
126
+ for ( const subscriber of [ ...subscribers ] ) {
127
+ subscriber [ fn ] ( ...args ) ;
128
+ }
129
+ } ;
115
130
116
131
const returnThis = function < T > ( this : T ) : T {
117
132
return this ;
@@ -123,8 +138,9 @@ const asReadable = <T>(store: Store<T>): Readable<T> => ({
123
138
[ symbolObservable ] : returnThis ,
124
139
} ) ;
125
140
141
+ const queueProcess = Symbol ( ) ;
126
142
let willProcessQueue = false ;
127
- const queue = new Map < SubscriberObject < any > , any > ( ) ;
143
+ const queue = new Set < { [ queueProcess ] ( ) : void } > ( ) ;
128
144
129
145
const callUnsubscribe = ( unsubscribe : Unsubscriber ) =>
130
146
typeof unsubscribe === 'function' ? unsubscribe ( ) : unsubscribe . unsubscribe ( ) ;
@@ -172,9 +188,9 @@ export const batch = <T>(fn: () => T): T => {
172
188
return fn ( ) ;
173
189
} finally {
174
190
if ( needsProcessQueue ) {
175
- for ( const [ subscriberObject , value ] of queue ) {
176
- queue . delete ( subscriberObject ) ;
177
- subscriberObject . next ( value ) ;
191
+ for ( const store of queue ) {
192
+ queue . delete ( store ) ;
193
+ store [ queueProcess ] ( ) ;
178
194
}
179
195
willProcessQueue = false ;
180
196
}
@@ -233,12 +249,16 @@ export function get<T>(store: SubscribableStore<T>): T {
233
249
export abstract class Store < T > implements Readable < T > {
234
250
private _subscribers = new Set < SubscriberObject < T > > ( ) ;
235
251
private _cleanupFn : null | Unsubscriber = null ;
252
+ private _notifiedValue : T ;
253
+ private _invalidated = false ;
236
254
237
255
/**
238
256
*
239
257
* @param _value - Initial value of the store
240
258
*/
241
- constructor ( private _value : T ) { }
259
+ constructor ( private _value : T ) {
260
+ this . _notifiedValue = this . _value ;
261
+ }
242
262
243
263
private _start ( ) {
244
264
this . _cleanupFn = this . onUse ( ) || noop ;
@@ -252,28 +272,50 @@ export abstract class Store<T> implements Readable<T> {
252
272
}
253
273
}
254
274
275
+ private [ queueProcess ] ( ) : void {
276
+ const value = this . _value ;
277
+ this . _invalidated = false ;
278
+ if ( notEqual ( this . _notifiedValue , value ) ) {
279
+ this . _notifiedValue = value ;
280
+ callAllSubscribers ( this . _subscribers , 'next' , value ) ;
281
+ } else {
282
+ callAllSubscribers ( this . _subscribers , 'revalidate' ) ;
283
+ }
284
+ }
285
+
286
+ protected invalidate ( ) : void {
287
+ if ( ! this . _invalidated ) {
288
+ this . _invalidated = true ;
289
+ callAllSubscribers ( this . _subscribers , 'invalidate' ) ;
290
+ }
291
+ }
292
+
293
+ protected revalidate ( ) : void {
294
+ if ( this . _invalidated ) {
295
+ batch ( ( ) => {
296
+ queue . add ( this as any ) ;
297
+ } ) ;
298
+ }
299
+ }
300
+
255
301
/**
256
302
* Replaces store's state with the provided value.
257
303
* Equivalent of {@link Writable.set}, but internal to the store.
258
304
*
259
305
* @param value - value to be used as the new state of a store.
260
306
*/
261
307
protected set ( value : T ) : void {
262
- if ( notEqual ( this . _value , value ) ) {
308
+ const different = notEqual ( this . _value , value ) ;
309
+ if ( different ) {
263
310
this . _value = value ;
264
311
if ( ! this . _cleanupFn ) {
265
- // subscriber not yet initialized
312
+ this . _notifiedValue = this . _value ;
266
313
return ;
267
314
}
268
- batch ( ( ) => {
269
- for ( const subscriber of this . _subscribers ) {
270
- const needInvalidate = ! queue . has ( subscriber ) ;
271
- queue . set ( subscriber , value ) ;
272
- if ( needInvalidate ) {
273
- subscriber . invalidate ( ) ;
274
- }
275
- }
276
- } ) ;
315
+ this . invalidate ( ) ;
316
+ }
317
+ if ( this . _invalidated ) {
318
+ this . revalidate ( ) ;
277
319
}
278
320
}
279
321
@@ -323,7 +365,10 @@ export abstract class Store<T> implements Readable<T> {
323
365
if ( this . _subscribers . size == 1 ) {
324
366
this . _start ( ) ;
325
367
}
326
- subscriberObject . next ( this . _value ) ;
368
+ subscriberObject . next ( this . _notifiedValue ) ;
369
+ if ( this . _invalidated ) {
370
+ subscriberObject . invalidate ( ) ;
371
+ }
327
372
328
373
const unsubscribe = ( ) => {
329
374
const removed = this . _subscribers . delete ( subscriberObject ) ;
@@ -491,6 +536,7 @@ export abstract class DerivedStore<
491
536
protected onUse ( ) : Unsubscriber | void {
492
537
let initDone = false ;
493
538
let pending = 0 ;
539
+ let changed = 0 ;
494
540
495
541
const stores = this . _stores ;
496
542
const isArray = Array . isArray ( stores ) ;
@@ -510,7 +556,8 @@ export abstract class DerivedStore<
510
556
} ;
511
557
512
558
const callDerive = ( ) => {
513
- if ( initDone && ! pending ) {
559
+ if ( initDone && ! pending && changed ) {
560
+ changed = 0 ;
514
561
callCleanup ( ) ;
515
562
cleanupFn = this . derive ( isArray ? dependantValues : dependantValues [ 0 ] ) || noop ;
516
563
}
@@ -520,11 +567,17 @@ export abstract class DerivedStore<
520
567
store . subscribe ( {
521
568
next : ( v ) => {
522
569
dependantValues [ idx ] = v ;
570
+ changed |= 1 << idx ;
523
571
pending &= ~ ( 1 << idx ) ;
524
572
callDerive ( ) ;
525
573
} ,
526
574
invalidate : ( ) => {
527
575
pending |= 1 << idx ;
576
+ this . invalidate ( ) ;
577
+ } ,
578
+ revalidate : ( ) => {
579
+ pending &= ~ ( 1 << idx ) ;
580
+ callDerive ( ) ;
528
581
} ,
529
582
} )
530
583
) ;
0 commit comments