pm2源码学习
22 December 2018

在使用过程中,本着对pm2 reload为啥可以不停服重启,pm2 update为啥可以解决各种奇怪的问题,为啥pm2有时会出现时序问题引发的异常,譬如pm2 logs自定义位置后有时启动会报log文件夹不存在而启动异常,然后其实log文件夹已在启动时被pm2成功创建了,我翻阅了下pm2的部分源码。记录下学习笔记。

看完发现pm2的官网文档更新的可能不及时,文档里的用法和源码其实是有差距的。所以也会记录一些源码里看到的有用的用法。

源码

pm2本是个cli工具,入口从/bin文件夹看起:

  • bin
    • pm2
    • pm2-dev
    • pm2-docker
    • pm2-runtime
  • pm2-dev: 开发利器啊,本地开发环境我们可以用pm2-dev去启动,这时候pm2不会以后台模式运行,而是在前台,所以一个ctrl+c就能方便退出。特点是能监听文件变更去自动重启,其实就是默认配置了watch:true, autorestart:true,不过我还是指明autorestart: false,不然开发过程中一个错误就导致不断重启,连抛错信息都来不及看清。还是让它抛错,然后改掉错误,再通过watch的功能调用它重启。

  • pm2-docker和pm2-runtime是一模一样的功能。都是以一个常驻进程的模式运行,好让docker容器不因CMD指令里的进程退出而终止运行。一般在docker容器里用pm2启动应用都会使用pm2-docker

再来讲核心的pm2部分。

概况

daemon

pm2的daemon是一个后台常驻进程,相当于一个管理员的角色,pm2 start这类启动命令里都会去检查daemon是否存在,不存在会启动。

daemon的核心是God。God上定义了诸多方法,有自己内部使用的findProcessByIdfindByNamekillProcess等,也有透出给client调用的功能诸如startProcessIdstopProcessIdrestartProcessIddeleteProcessIdreloadProcessIdmonitornotifyByProcessId等等等等。

God上还挂载了一个clusters_db,记录着进程列表和状态,供各处查询使用。

God上的bus则是一个事件机制,拥有addListenerremoveListeneremitonoff等方法:

bus : new EventEmitter2({
    wildcard: true,
    delimiter: ':',
    maxListeners: 1000
})

God的work则是一个轮询所启动的应用进程状态的一个功能,会以setInterval的方式去查询进程状态,如重启次数、占用内存。

client 与 daemon server

daemon里启动了一个server,可能会跟很多个client通信合作,取决于你在多少地方pm2 start了。

client与server的通信是通过rpc的,借助的是axonreq/rep模式。读源码时经常会读到this.client.call('xxx')而终止线索,这时候其实是通过rpc通信告知了server,调用了server上对应的方法。

// client 的定义
this.rep = axon.socket('rep');
this.client =  new rpc.Client(this.rep);
// client 的使用案例, from executeRemote
this.client.call(method, app_conf, fn);

// daemon server 的定义
this.rep = axon.socket('rep');
var server = new rpc.Server(this.rep);
server.expose({
    killMe : that.close.bind(this),
    snapshotPM2 : snapshotPM2,
    profileStart : startProfilingPM2,
    profileStop : stopProfilingPM2,
    prepare : God.prepare,
    getMonitorData : God.getMonitorData,
    ...
})

client

client的功能相对简单一点。

启停daemon(launchDaemon\killDaemon): client的start方法会去判断daemon是否存在,不存在启动daemon。

rpc通信(launchRPC):建立后,后续通过rpc与Daemon server通信:executeRemote -> client.call xxx

还有launchBusdisconnectBus,这个是通过axon的pub-emitter / sub-emitter模式通信。

this.sub = axon.socket('sub-emitter');
this.sub_sock = this.sub.connect(this.pub_socket_file);

查询monitorData:还有一些虽然直接挂到client上的工具方法如getAllProcessgetAllProcessIdgetProcessIdByNamegetProcessByNamegetProcessByNameOrId则都是通过调用executeRemote('getMonitorData')由Daemon去实现的

这里我还没理解为啥既用了RPC模式,又用了PUB模式

核心方法

executeApp:启动应用

executeAppGod启动应用进程的方法。分了cluster模式和fork模式,cluster借助了cluster的npm包来启动,form就是用的node原生的child_process的模式。

executeRemote:client调用daemon方法

Client.prototype.executeRemote = function executeRemote(method, app_conf, fn) {
  var self = this;

  // stop watch on stop | env is the process id
  if (method.indexOf('stop') !== -1) {
    this.stopWatch(method, app_conf);
  }
  // stop watching when process is deleted
  else if (method.indexOf('delete') !== -1) {
    this.stopWatch(method, app_conf);
  }
  // stop everything on kill
  else if (method.indexOf('kill') !== -1) {
    this.stopWatch('deleteAll', app_conf);
  }
  else if (method.indexOf('restartProcessId') !== -1 && process.argv.indexOf('--watch') > -1) {
    delete app_conf.env.current_conf.watch;
    this.toggleWatch(method, app_conf);
  }

  if (!this.client || !this.client.call) {
    this.start(function(error) {
      if (error) {
        if (fn)
          return fn(error);
        console.error(error);
        return process.exit(0);
      }
      if (self.client) {
        return self.client.call(method, app_conf, fn);
      }
    });
    return false;
  }

  debug('Calling daemon method pm2:%s on rpc socket:%s', method, this.rpc_socket_file);
  return this.client.call(method, app_conf, fn);
};

流程一览:pm2 update

pm2 update是个了解pm2工作流程的好入口,毕竟这个方法里包含了停止应用进程、停止daemon、重启daemon、重启daemon,启动、停止的流程都包含了。

节约时间,不处理直接贴我读时记的流程了。。。

update :
    Client.executeRemote(‘notifyKillPM2'
        God.pm2_being_killed = true;
     dump :
        getMonitorData:
            writeFileSync  DUMP_FILE_PATH MonitorData

     - killDaemon:
            _operate deleteProcessId all :
               Client.executeRemote deleteProcessId:
                   Client.stopWatch(deleteProcessId
                   Client.client.call(deleteProcessId
                        God.stopProcessId
                        -delete God.clusters_db[i
                            killProcess:process.kill
                            -God.notify('exit'
                -Client.notifyGod(‘stop'
                    this.executeRemote(‘notifyByProcessId'
                        Client.client.call notifyByProcessId
                            God.bus.emit('process:event’
              
                stopProcessId - 
                    - delete God.clusters_db[id]
            - killAgent
                (KMDaemon = require('@pm2/agent/src/InteractorClient’), KMDaemon.killInteractorDaemon(that._conf :This module is used by PM2 to communicate with PM2.io's servers.:https://github.com/keymetrics/pm2-io-agent/blob/master/src/InteractorClient.js  
                -Client.killDaemon                  
                     this.executeRemote('killMe', {pid : process.pid})
                        God.bus.emit('pm2:kill’
                            that.rpc_socket.close
                            that.pub_socket.close
                            process.kill(parseInt(opts.pid), 'SIGQUIT’)
                            fs.unlinkSync(that.pid_path)
                            process.exit(cst.SUCCESS_EXIT)
     
       -that.Client.launchDaemon({interactor:false}
            require('child_process').spawn node Daemon.js
            -KMDaemon.launchAndInteract L重启Daemon
       -that.Client.launchRPC
            var req = axon.socket('req’);  this.client = new rpc.Client(req);
       -that.resurrect
            readFileSync DUMP_FILE_PATH
            parseDumpFile DUMP_FILE_PATH:to get process list
            that.Client.executeRemote(‘getMonitorData'
            将monitor里拿到的进程列表遍历过滤掉没save到DUMP_FILE_PATH的进程 进行后续prepare处理
            that.Client.executeRemote(‘prepare’ -> self.client.call prepare
                God.executeApp env
                    God.nodeApp:cluster.fork  or God.forkMode:spawn
                    -set God.clusters_db
                    -God.notify('online'

       -that.launchAll
            Modularizer.launchModules(CLI, cb);一系列start pm2 module的活动
       -KMDaemon.launchAndInteract

注:缩进表述函数内部发生的步骤,即调用层级。- fn的格式标识fn是缩进之前的函数的回调函数。pm2里回调层级实在是好多。。。

可以看到先是通过查询getMonitorData将当前运行的进程列表写入了DUMP_FILE_PATH,为之后的重启保存好数据。

然后killDaemon步骤里先是process.kill了所有记录在clusters_db里的应用id, 然后在Client.killDaemon步骤里process.kill(parseInt(opts.pid), 'SIGQUIT’),这个pid也就是pm2 daemon的进程id了。在kill之前还关闭了rpc、pub的socket通信通道的连接

全部kill后,接着就来到重启的步骤了。先在Client.launchDaemon里通过require('child_process').spawn启动了daemon.js的守护进程,然后当前执行命令的client和新启动的daemon建立通信通道,接着在resurrect里读取DUMP_FILE_PATH结合getMonitorData拿到的进程信息,进行prepare操作,这里调用了God.executeApp启动了应用进程。

所以pm2 update是一个daemon和应用进程都restart的过程,真因为如此,才会解决一些宿主环境变更带来的鬼畜的问题。譬如我遇到linux机器上一个应用从fork模式切换到cluster模式后,无论pm2 reload restart还是直接delete后重启,cluster出来的进程都会因为莫名其妙接收到kill的SIGINT 0 信号而不断重启直至超过重启次数而停止。通过pm2 update将daemon也重启了一起就正常了。

reload

pm2 reloadpm2 restart的过程也是值得看一下的。

cluster模式的reload能不停服重启,原因也很简单,因为它是先启动新的进程,待新进程都启动完毕后,再去kill老进程的,自然服务不会中断。

不过真的要想不停服重启,应用必须做好处理,否则接收到kill的信号,应用就退出了,当前正在处理的工作譬如正在处理的一个请求还未返回就被退出了,也就会异常。正如官网所说的:

To be sure that all requests are properly handled in a reload, you need to be sure that your application shutdown, not leaving unanswered requests.

A graceful shutdown makes sure to handle all remaining queries before exiting the application and closes all external connections.

所以我们在接收到SIGINT的kill信号后应该:

  • 通知负载均衡器不要再给当前机器发送请求(如果有负载均衡的话)
  • 处理完正在进行中的请求
  • 释放资源,如数据库连接
  • 退出

eg:

const app = express()
const port = process.env.port || 8000

app.get('/', (req, res) => { res.end('Hello world') })

const server = require('http').createServer(app)
server.listen(port, () => {
  console.log('Express server listening on port ' + server.address().port)
})

process.on('SIGINT', () => {
  console.info('SIGINT signal received.')

  // Stops the server from accepting new connections and finishes existing connections.
  server.close(function(err) {
    // if error, log and exit with error (1 code)
    if (err) {
      console.error(err)
      process.exit(1)
    }

    // close your database connection and exit with success (0 code)
    // for example with mongoose
    mongoose.connection.close(function () {
      console.log('Mongoose connection disconnected')
      process.exit(0)
    })
  })
})

参考

过程中也翻到一篇不错的pm2源码阅读文章:https://github.com/chen2009277025/pm2_sourceread

Graceful Shutdown