阅读 3038

现学现卖微信小程序开发(三):引入“Rx”,为小程序插上翅膀

现学现卖微信小程序开发(一)
现学现卖微信小程序开发(二)

引入“Rx”,为小程序插上翅膀

对于我这种“不用Rx会死星人“来说,一个平台如果没有Rx,简直痛苦死了。所以一直在研究怎么把RxJS引入到微信小程序中,这几天终于有了阶段性成果,那“Rx”为什么加引号?嗯,这是个好问题,原因是。。。经过几天的艰苦奋战,我终于还是没有找到把RxJS库正确引入到微信小程序的方法。所以呢,我找了一个替代品:xstream ( github.com/staltz/xstr… )。这个类库呢,和RxJS差不多,更轻量级,因为去掉了好多不常用的和重复的操作符,当然写法上也略有区别,感觉其实没有RxJS爽,但问题不大。

xstream的引入

和网上的其他类库的引入来比较的话,xstream引入的步骤不算太烦:

  1. 找一个目录,使用npm install xstream。
  2. 在小程序工程目录下新建一个libs目录,然后再建一个xstream目录。
  3. 然后在 node_modules/xstream 目录中把 index.js 拷贝到 libs/xstream下。
  4. 由于这个文件依赖 symbol-observable,而 symbol-observable 又依赖 ponyfill,所以我们去 node_modules/symbol-observable/lib中把 index.jsponyfill.js 都拷贝到libs/xstream下。哦,对了,要把 index.js 改名成 symbol-observable.js 要不就和上面重名了。
  5. 这样的话xstream的core就可以正常工作了,但如果你需要一些其他操作符,比如debounce等,可以去 node_modules/xstream/extra 中找,找到后把js文件(比如debounce.js)拷贝到 libs/xstream/extra中。

一些额外的操作符可以去xstream的extra目录中寻找

好了,xstream的引入至此已经完毕,我们看看怎么使用。

引入完毕后的目录结构

先来体验一下什么是流式编程,在 pageParams.onLoad 中加上如下代码,当然别忘了引入xs import xs from '../../libs/xstream/index'

  // 每隔1秒计数加1,
  // 过滤出偶数
  // 将数字转换为其平方
  // 5秒后结束

  let stream = xs.periodic(1000)
    .filter(i => i % 2 === 0)
    .map(i => i * i)
    .endWhen(xs.periodic(5000).take(1))

  // 到目前为止,stream还处于idle状态
  // 从有第一个订阅者开始,它就处于激活状态了

  stream.addListener({
    next: i => console.log(i),
    error: err => console.error(err),
    complete: () => console.log('completed'),
  })复制代码

到Console中看一下,输出结果为0,4,completed

console中的输出

我们来手动复原一下过程,首先 xs.periodic(1000) 是这样一个流

 periodic(1000)
---0---1---2---3---4---...复制代码

第一秒是发射0,0是偶数满足filter条件,进入转换0的平方还是0,结束条件未满足,于是输出0;第二秒时发射1,1为奇数被淘汰;第三秒时发射2,2是偶数满足filter条件,进入转换2的平方是4,结束条件未满足,于是输出4;第四秒时发射3,3为奇数被淘汰;第五秒时输出4,4是偶数满足filter条件,进入转换4的平方是16,但可惜结束条件已满足,输出completed。

这个小例子虽然简单,但是涉及到了多个流式编程的操作符,这种串(chain)起来的感觉真是很爽。

微信小程序中的响应式编程

由于微信小程序的基于回调函数的设计,我们需要对其API进行封装后使其具备响应式编程的能力。那么我们就拿本次的 todos.onLoad 来练手,没用xstream之前是下面的样子。

pageParams.onLoad = function () {
  const that = this
  wx.request({
    url: URL,
    data: JSON.stringify({}),
    header: { 'content-type': 'application/json' },
    method: 'GET',
    success: res => {
      console.log(res.data)
      that.setData({
        todos: res.data
      })
    },
    fail: () => console.error('something is wrong'),
    complete: () => console.log('get req completed')
  })
}复制代码

我们来用xstream改造一下吧

import xs from '../../lib/xstream/index'

pageParams.onLoad = function () {
  const that = this
  const producer = {
    start: listener => {
      start: wx.request({
        url: URL,
        data: JSON.stringify({}),
        header: { 'content-type': 'application/json' },
        method: 'GET',
        success: res => listener.next(res),
        fail: () => listener.error('something is wrong'),
        complete: () => listener.complete()
      })
     },
  stop: () => {}
 }
 let http$ = xs.create(producer)
 http$.subscribe({
   next: res => that.setData({
     todos: res.data
   }),
   error: console.log('http request failed'),
   complete: console.log('http request completed')
 })
}复制代码

我勒个去,这比原来代码还多,搞什么?先别急,我们仔细想想其实前面的一大部分代码是在将传统的函数改造成流式的函数。这些改造工作如果在普通的HTML+Javascript环境中是很好解决的,因为不论是RxJS还是xstream都提供了诸如 fromfromEvent 等转换类操作符可以方便的帮我们进行这种传统到流式的转换。但现在不行啊,这些老外的类库写的时候肯定不会考虑微信的,那怎么办?只好自己写吧。

还是拿这个例子练手,我们创建一个叫 http.js 的文件,在这里面我们对应4种request方法(GET,POST,PUT,DELETE)分别构造了专门的函数用语转换。

import xs from '../lib/xstream/index'

const REQ_METHOD = {
  GET: 'GET',
  POST: 'POST',
  PUT: 'PUT',
  DELETE: 'DELETE'
}

let http =  {}

http.get = (url, data={}, header={'content-type': 'application/json'}) => {
  return http_request(url, REQ_METHOD.GET, data, header) 
}

http.post = (url, data={}, header={'content-type': 'application/json'}) => {
  return http_request(url, REQ_METHOD.POST, data, header)
}

http.put = (url, data={}, header={'content-type': 'application/json'}) => {
  return http_request(url, REQ_METHOD.PUT, data, header)
}

http.delete = (url, data={}, header={'content-type': 'application/json'}) => {
  return http_request(url, REQ_METHOD.DELETE, data, header)
}

function http_request(
  url, 
  method=REQ_METHOD.GET, 
  data={}, 
  header={'content-type': 'application/json'}) {
  const producer = {
    start: listener => {
      wx.request({
        url: url,
        data: JSON.stringify(data),
        header: header,
        method: method, 
        success: res => listener.next(res),
        fail: () => listener.error(`http request failed: ${url} | method: ${method} | data: ${data} | header: ${header}`),
        complete: () => listener.complete()
      })
    },
    stop: () => {}
  }
  return xs.create(producer)
}

module.exports = {
  http: http
}复制代码

这样一个工具类建好之后呢,我们的 onLoad 函数就变得很简单了是不是?

pageParams.onLoad =  function() {
  const that = this
  http.get(URL).subscribe({
    next: res => that.setData({
      todos: res.data
    }),
    error: err => console.error(err),
    complete: () => console.info('Todos--get completed')
  })
}复制代码

你想了一下跟我说,你特么唬我是不是,我不用xstream也可以这样封装,代码也会简洁很多啊。别急别急,我们费这么大劲把它转换成流式函数,不是只是为了简洁,而是我们可用使用响应式编程的很多特性了。比如上面的代码我们加一个需求,在出错后再进行若干次重试,但一切要控制在一个超时:比如10秒内。这个需求其实还是挺常见的,但是常规写法是比较痛苦的。我们看看用响应式编程方式怎么做。

let demo$ = xs.periodic(1000)
  .map(x => {
    const i = Math.floor((Math.random() * 10) + 1);
    if(x > i)
      x.throw(new Error('something is wrong'))
    return x
  })
demo$
  .replaceError((err) => demo$)
  .endWhen(xs.periodic(10000))
  .subscribe({
    next: x => console.log(x),
    error: err => console.warn(err),
    complete: () => console.info('I am completed')
  })复制代码

上面代码中我们每隔一秒( periodic(1000) )输出一个从0开始每次增长1的自然数,然后在转换函数中生成一个1-10的随机数,如果前面数据流发射的数大于这个随机数,我们就手动抛出一个异常,反之原样返回这个数字。定义好这个数据流后,我们按需求进行处理:

第一个需求:遇到异常应该重试,那我们使用 replaceError((err) => demo$),每次遇到异常,我们都再执行一遍前面的数据流。
第二个需求:我们应该控制超时时间10秒,所以使用 .endWhen(xs.periodic(10000))

这样就轻松的解决了这个问题,我们来看看输出,一开始从0-3是比较正常,然后程序抛出了异常。我们的 replaceError((err) => demo$) 捕获到这个异常并且用 demo$ 替换错误,也就是说再次执行。慢着,那不是死循环了吗?没事,我们后面有个退出条件就是10秒结束该流。

Console中demo$的输出

当然需要注意一点,在xstream中所有的流默认都是Hot Observable。怎么理解这个概念呢,想象一下我们在看电视直播,我们所有的人不管你是什么时候打开的电视,我们开的内容、进度都是一样的。但Cold Observable并不一样,相当于是网络视频,你看到第20分钟,但这个时候我打开还是从头开始看。这个内在含义我们举一个小例子,下面是用RxJS写的一个每隔1秒生成一个增长1的自然数流,第一个订阅者立即开始,另一个订阅者2秒之后开始订阅,我们会看到下面的景象

RxJS实现的Cold Obervable的效果

但同样逻辑用xstream实现的下面代码,出来的是另一番景象。

  let demo$ = xs.periodic(1000)

  demo$.addListener({
    next: x => console.log(x)
  })

  setTimeout(()=>{
    demo$.addListener({
      next: x => console.log(x)
    })
  }, 2000)复制代码

用xstream生成的Hot Observable的效果

当然在很多场景中,这种差别不会带来本质的变化,比如http请求,本身就是一次性的请求,所以hot和cold的结果是一样的。当然RxJS作为大而全的类库是既支持Hot Observable又支持Cold Observable的。xstream的作者其实也是RxJS的contributor,但他认为在web前端领域hot的应用频率远比cold要强,所以做了这个精简版的响应式类库。

事件的处理

上述方法用于普通API的封装一点问题也没有,但是在做输入事件时,我遇到了一些小麻烦。当然获取输入事件并不是很困难,微信小程序对于输入事件的绑定也是在 wxml 中的 <input> 控件中用 bindinput 来指定一个eventHandler,这里我们起了个名叫 addTodo

<input bindinput="addTodo" placeholder="What do you want to do today?"/>复制代码

标准的微信小程序可以这样来写事件处理。

pageParams.addTodo = function(event) {
  ...
}复制代码

如果要把事件截获并以数据流输出的话,我们需要在onLoad中进行事件处理函数的定义,比如下面的代码可以让我们实现对于输入事件的定义,在其定义中我们其实使用了流数据的发射作为其函数体。

pageParams.onLoad =  function() {
  ...
  const evProducer = {
    start: listener => {
      this.addTodo = ev => {
        listener.next(ev.detail.value)
      }
    },
    stop: () => { }
  }
  const input$ = xs.create(evProducer)
}复制代码

这样封装后,我们可以使用一些操作符来实现,比如滤波器等功能,下面的代码片段就是滤掉快速输入时(小于400毫秒)的事件。

  input$.compose(debounce(400)).subscribe({
    next: val => console.log(val)
  })复制代码

但上面形式的封装有个问题就是我们要把这个封装提取出来作为一个单独函数时,由于this.addTodo仍为初始化,无法作为参数传递,而且这个 addTodo 我也不想写死。怎么办好呢?我试了几种方案后采用了使用 Object.defineProperty 的形式去动态定义pageParams对象的命名属性,当然还是有一些问题,仍然需要给这些方法一个初始值(有同学如果有更好的建议请指教)。下面就是目前实现的抽象封装代码,在下面的代码中,由于我们对外发射的是事件(event),所以其实它不光可以用于输入事件,理论上任意事件都可以。也就是说我们自己实现了类似 Rx.Observable.fromEvent 的功能。

import xs from '../lib/xstream/index'

let event = {}

event.fromEvent = (srcObj, propertyName) => {
  const evProducer = {
    start: (listener) => {
      Object.defineProperty(
        srcObj, 
        propertyName, 
        {value: ev => listener.next(ev)})
    },
    stop: () => {}
  }
  return xs.create(evProducer)
}

module.exports = {
  event: event
}复制代码

Todo的完善

按着我们上面的封装,现在的 todos.js 看起来是这个样子。我们可以看到和Web API交互的部分都不在Page中了,虽然微信中不支持service,但我们完全可以另写一个文件存储Web API的交互。

import { xs, http, event, debounce } from '../../wxstream/index'
const URL = 'http://localhost:3000/todos'

// 非常遗憾的是目前仍需要初始化event handler,否则会出现undefined错误
let pageParams = {
  data: { todos: [] },
  addTodo: () => {},
  changeText: () => {},
  removeTodo: () => {},
  toggleTodo: () => {}
}

// 获得所有todos的流
const todos$ = http.get(URL).map(res => res.data)

// 输入框的文本变化事件流
const input$ = event
  .fromEvent(pageParams, 'changeText')
  .compose(debounce(500))
  .map(ev => ev.detail.value)

// 添加按钮的点触事件流
const addTodo$ = event
  .fromEvent(pageParams, 'addTodo')
  .mapTo(null) //null because we do NOT care about the value

// 根据按钮的点击和输入合并成一个新的流,提交服务器产生新的Todo
const newTodo$ = xs.combine(input$, addTodo$)
  .map(([input, click]) => {
    const todoToBeAdded = {
      desc: input,
      completed: false
    }
    return http.post(URL, todoToBeAdded)
  })
  .flatten()

// 监视toggleTodo事件,该事件发生后提交服务器更新该Todo
const toggleTodo$ = event
  .fromEvent(pageParams, 'toggleTodo')
  .map(ev => ev.target.dataset.todo)
  .map(todo => {
    const url = `${URL}/${todo.id}`
    const updatedTodo = Object.assign({}, todo, { completed: !todo.completed })
    return http.put(url, updatedTodo).mapTo(updatedTodo)
  })
  .flatten()

// 监视removeTodo事件,该事件发生后提交服务器更新该Todo
const removeTodo$ = event
  .fromEvent(pageParams, 'removeTodo')
  .map(ev => ev.target.dataset.todo)
  .map(todo => {
    const url = `${URL}/${todo.id}`;
    return http.delete(url, todo).mapTo(todo)
  })
  .flatten()

let sub_todos, sub_new, sub_toggle, sub_remove 

// 现在页面逻辑中没有服务端API的交互了,只有对成员数组的控制
pageParams.onLoad = function() {
  const that = this
  sub_todos = todos$.subscribe({
    next: todos => that.setData({
      todos: todos
    }),
    error: err => {console.log(err)},
    complete: () => {console.log('Todos--get completed')}
  })

  sub_new = newTodo$.subscribe({
    next: res => that.setData({
      todos: [
        ...that.data.todos,
        res.data
      ]
    })
  })

  sub_toggle = toggleTodo$.subscribe({
    next: value => that.setData({
      todos: that.data.todos.map(todo => {
        if (todo.id === value.id) {
          return value
        }
        return todo
      })
    }),
    error: err => console.error(err),
    complete: () => console.info('Todos--toggle completed')
  })

  sub_remove = removeTodo$.subscribe({
    next: value => that.setData({
      todos: that.data.todos.filter(todo => todo.id !== value.id)
    }),
    error: err => console.error(err),
    complete: () => console.info('Todos--toggle completed')
  })
}

// 取消订阅,释放内存
pageParams.onUnload = () => {
  if(sub_todos !== null)
    sub_todos.unsubscribe()
  if(sub_new !== null)
    sub_new.unsubscribe()
  if(sub_toggle !== null)
    sub_toggle.unsubscribe()
  if(sub_remove !== null)
    sub_remove.unsubscribe()
}

Page(pageParams)复制代码

wxstream(微信小程序的流式封装)

我为了这件事,建了一个Github项目叫 wxstreamgithub.com/wpcfan/wxst… ) 就是“微信stream”的缩写。只要把这个项目拉下来,拷贝到微信小程序目录,就立即可用了,包括xstream的支持都在里面了。目前还没什么文档,接口也大部分都没测过呢,实在汗颜。后续我逐渐添加文档和进行测试吧,现在只是个骨架,大家也帮忙测一下吧;-)。我看看过几天有时间再研究一些redux怎么在微信小程序中使用,到时候再写yi pian。

关注下面的标签,发现更多相似文章
评论