今天和大家聊一聊node中的stream的背压机制。


为什么要有流

在编写服务时,经常会需要涉及到文件或者数据压缩的问题。

使用合适的压缩算法能够有效减少请求文件的大小,从而减少网络中的数据传输量,提升响应速度。

假设我们采用最普通的方式处理一个文件的获取,压缩,发送这个过程,就会是如下所示的图形表示:

image-20200608104848227

我们看到整个执行流程是串行的,所有耗时都会累加,导致整个过程耗时很长。

流的出现就是为了解决这个串行处理的问题。

如下图所示,我们将整个文件分成一个个小块,利用生产者消费者模式,上一个阶段的操作有一小部分完成后,

下一个阶段的操作就可以开始执行

image-20200608111829945

这样从宏观上看,整个处理流程就可以并行执行,从而大大减少处理耗时。

image-20200608105448750

背压问题

背压问题来源于生产者消费者模式中,消费者处理速度过慢。

比如说,我们下载过程,处理速度为3Mb/s,而压缩过程,处理速度为1Mb/s,这样的话,很快缓冲区队列就会形成堆积。

要么导致整个过程内存消耗增加,要么导致整个缓冲区慢,部分数据丢失。

什么是背压处理

背压处理可以理解为一个向上”喊话”的过程。

当压缩处理发现自己的缓冲区数据挤压超过阈值的时候,就对下载处理“喊话”,我忙不过来了,不要再发了。

下载处理收到消息就暂停向下发送数据。

image-20200608115814994

而当缓存区处理至空时,又会重新通知下载处理,继续发送数据。

这样就能够实现,整个流的处理始终以保持以消费者速度进行消耗,不会引起重大积压。

pipe的生命周期

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

从图中我们可以看到pipe对流的背压处理:

  • 将数据按照chunk进行划分,写入
  • 当chunk过大,或者队列忙碌时,暂停读取
  • 当队列为空时,继续读取数据。

参考文档:


本文会经常更新,请阅读原文: https://xinyuehtx.github.io/post/%E7%90%86%E8%A7%A3%E6%B5%81%E7%9A%84%E8%83%8C%E5%8E%8B%E6%9C%BA%E5%88%B6.html ,以避免陈旧错误知识的误导,同时有更好的阅读体验。

知识共享许可协议 本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。欢迎转载、使用、重新发布,但务必保留文章署名黄腾霄(包含链接: https://xinyuehtx.github.io ),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请 与我联系