fix throttle bug

This commit is contained in:
Val Erastov 2022-10-01 18:36:56 -07:00
parent 1362c2e816
commit 26f327fd6b
2 changed files with 27 additions and 19 deletions

View file

@ -1,3 +1,4 @@
import {ThrottleStream} from "lstream/throttle";
interface Observable<T> {
attach(callback: (value: T) => any): () => void
@ -17,7 +18,7 @@ interface Stream<T> extends Observable<T> {
distinct(): Stream<T>;
throttle(delay?: number, accumulator?: any): Stream<T>;
throttle(delay?: number, accumulator?: any): ThrottleStream<T>;
pipe(otherStream): () => void;
}

View file

@ -1,25 +1,32 @@
import {StreamBase} from './base';
import {Emitter} from "lstream/emitter";
export class ThrottleStream extends StreamBase {
export class ThrottleStream extends Emitter {
constructor(stream, delay = 0, accumulator = v => v) {
constructor(stream, delay = 0, accumulator = (v, accum) => v) {
super();
this.stream = stream;
this.delay = delay;
this.accumulator = accumulator;
this._value = undefined;
this.scheduled = false;
this.timeoutID = null;
stream.attach(val => {
this._value = accumulator(val, this._value);
if (!this.scheduled) {
this.scheduled = true;
this.timeoutID = setTimeout(this.wakeUp, delay);
}
});
}
attach(observer) {
let scheduled = false;
let value = undefined;
return this.stream.attach(val => {
value = this.accumulator(val);
if (!scheduled) {
setTimeout(() => {
scheduled = false;
observer(value);
});
}
}, this.delay)
wakeUp = () => {
this.scheduled = false;
this.next(this._value);
this._value = undefined;
}
thrust() {
if (this.scheduled) {
clearTimeout(this.timeoutID);
this.wakeUp();
}
}
}