{singhi}
自己的Observable构思与实现

引言

长期使用 Rxjs 之后,可以明白一句话:

Observables can “return” multiple values over time

它是一个这样的 Function(或者类似的东西),你一旦调用(或者叫订阅)了它,它就可以在时间轴上不间断地“返回”值,每次“返回”,你定义的消费函数(Observer)都会接收到这个值并执行。

按照这个思路:首先,一个Obseravle对象应该有一个当前值字段以保存任意时间点传入的值,姑且把这个时间点叫做赋值时间点(next);然后,消费函数(Observer)必需被缓存起来,这样下一个赋值时间点(next)便可以执行所有的消费函数(Observer)。

Rxjs 中一个普通的Obseravle对象,赋值时间点(next)之后的订阅才是有效的!当然BehaviorObservable会有一个当前值,任何时候的一个订阅都会立即收到当前值、执行消费函数(Observer)。

因项目需要,我实现的这个Obsevable类有点像BehaviorObservable和Subject:

定义类

其中,_observers 用于做消费函数缓存,_val 用于保存当前值

function Observable() {
    this._observers = [];
    this._val = null;
}

定义next方法

Observable.prototype.next = function (val) {
    this._val = val
    var _observer = null
    for(var i in this._observers) {
        _observer = this._observers[i]
        if (!_observer) continue
        Promise.resolve(val).then(_observer)
    }
}

定义subscribe方法

Observable.prototype.subscribe = function (observer) {
    if (typeof observer !== 'function') return
    Promise.resolve(this._val).then(observer)
    this._observers.push(observer)
}

添加 map 方法

Observable.prototype.map = function (project) {
    var obs = new Observable()
    this.subscribe(function(v) {
        obs.next(project(v))
    })
    return obs
}

使用

const Observable = require('./Observable.class')

const obs = new Observable()
obs.next('99')

obs.subscribe((v) => {
    console.log(v)
})

obs.next('100')
obs.next('101')

结果

observable
observable

以上内容纯属随意模仿,并没有看过rxjs源码,如觉不正确,请勿参考。