温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

nodejs流基类怎么实现

发布时间:2021-12-17 09:29:35 来源:亿速云 阅读:103 作者:iii 栏目:大数据

这篇文章主要讲解了“nodejs流基类怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“nodejs流基类怎么实现”吧!

流是对数据生产,消费的一种抽象,今天先分析一下流基类的实现

const EE = require('events');
const util = require('util');
// 流的基类
function Stream() {
  EE.call(this);
}
// 继承事件订阅分发的能力
util.inherits(Stream, EE);
 

流的基类只提供了一个函数就是pipe。用于实现管道化。这个方法代码比较多,分开说。

 

1 处理数据事件

 function ondata(chunk) {
    // 源流有数据到达,并且目的流可写
    if (dest.writable) {
      // 目的流过载并且源流实现了pause方法,那就暂停可读流的读取操作,等待目的流触发drain事件
      if (false === dest.write(chunk) && source.pause) {
        source.pause();
      }
    }
 }
  // 监听data事件,可读流有数据的时候,会触发data事件
  source.on('data', ondata);

  function ondrain() {
    // 目的流可写了,并且可读流可读,切换成自动读取模式
    if (source.readable && source.resume) {
      source.resume();
    }
  }
  // 监听drain事件,目的流可以消费数据了就会触发该事件
  dest.on('drain', ondrain);
 

这是管道化时流控实现的地方,主要是利用了write返回值和drain事件。

 

流关闭/结束处理

 // 目的流不是标准输出或标准错误,并且end不等于false
  if (!dest._isStdio && (!options || options.end !== false)) {
    // 源流没有数据可读了,执行end回调,告诉目的流,没有数据可读了
    source.on('end', onend);
    // 源流关闭了,执行close回调
    source.on('close', onclose);
  }
  // 两个函数只会执行一次,也只会执行一个
  var didOnEnd = false;
  function onend() {
    if (didOnEnd) return;
    didOnEnd = true;
    // 执行目的流的end函数,说明写数据完毕
    dest.end();
  }

  function onclose() {
    if (didOnEnd) return;
    didOnEnd = true;
    // 销毁目的流
    if (typeof dest.destroy === 'function') dest.destroy();
  }
 

这里是处理源流结束和关闭后,通知目的流的逻辑。

 

错误处理和事件清除

  // remove all the event listeners that were added.
  function cleanup() {
    source.removeListener('data', ondata);
    dest.removeListener('drain', ondrain);

    source.removeListener('end', onend);
    source.removeListener('close', onclose);

    source.removeListener('error', onerror);
    dest.removeListener('error', onerror);

    source.removeListener('end', cleanup);
    source.removeListener('close', cleanup);

    dest.removeListener('close', cleanup);
  }

  function onerror(er) {
    // 出错了,清除注册的事件,包括正在执行的onerror函数
    cleanup();
    // 如果用户没有监听流的error事件,则抛出错误,所以我们业务代码需要监听error事件
    if (EE.listenerCount(this, 'error') === 0) {
      throw er; // Unhandled stream error in pipe.
    }
  }
  // 监听流的error事件
  source.on('error', onerror);
  dest.on('error', onerror);
  // 源流关闭或者没有数据可读时,清除注册的事件
  source.on('end', cleanup);
  source.on('close', cleanup);
  // 目的流关闭了也清除他注册的事件
  dest.on('close', cleanup);
 

这里主要是处理了error事件和流关闭/结束/出错时清除订阅的事件。这就是流基类的所有逻辑。

感谢各位的阅读,以上就是“nodejs流基类怎么实现”的内容了,经过本文的学习后,相信大家对nodejs流基类怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI