浅谈 Puma 的并发模型与实现

这篇文章已经是整个 Rack 系列文章的第五篇了,在前面的文章中我们见到了多线程模型、多进程模型以及事件驱动的 I/O 模型,对于几种常见的 webserver 已经很了解了,其实无论是 Ruby 还是其他社区对于 webserver 的实现也就是这么几种方式:多线程、多线程和 Reactor。

puma-logo

在这篇文章中要介绍的 Puma 只是混合了两种 I/O 模型,同时使用多进程和多线程来提高应用的并行能力。

文中使用的 Puma 版本是 v3.10.0,如果你使用了不同版本的 Puma,原理上的区别不会太大,只是在一些方法的实现上会有一些细微的不同。

Rack 默认处理器

Puma 是目前 Rack 中优先级最高的默认 webserver,如果直接使用 rackup 命令并且当前机器上安装了 puma,那么 Rack 会自动选择 Puma 作为当前处理 HTTP 请求的服务器:

def self.default
  pick ['puma', 'thin', 'webrick']
end

$ rackup
Puma starting in single mode...
* Version 3.10.0 (ruby 2.3.3-p222), codename: Russell's Teapot
* Min threads: 0, max threads: 16
* Environment: development
* Listening on tcp://localhost:9292
Use Ctrl-C to stop

通过在 Rack::Handler 下创建一个新的 module Puma 再实现类方法 .run,我们就可以直接将启动的过程转交给 Puma::Launcher 处理:

module Rack
  module Handler
    module Puma
      def self.run(app, options = {})
        conf   = self.config(app, options)
        events = options.delete(:Silent) ? ::Puma::Events.strings : ::Puma::Events.stdio
        launcher = ::Puma::Launcher.new(conf, :events => events)

        yield launcher if block_given?
        begin
          launcher.run
        rescue Interrupt
          puts "* Gracefully stopping, waiting for requests to finish"
          launcher.stop
          puts "* Goodbye!"
        end
      end
    end
  end
end

启动器 Launcher

Puma 中的启动器确实没有做太多的工作,大部分的代码其实都是在做配置,从 ENV 和上下文的环境中读取参数,而整个初始化方法中需要注意的地方也只有不同 @runner 的初始化了:

From: lib/puma/launcher.rb @ line 44:
Owner: Puma::Launcher

def initialize(conf, launcher_args={})
  @runner        = nil
  @events        = launcher_args[:events] || Events::DEFAULT
  @argv          = launcher_args[:argv] || []
  @config        = conf
  @config.load

  Dir.chdir(@restart_dir)

  if clustered?
    @events.formatter = Events::PidFormatter.new
    @options[:logger] = @events

    @runner = Cluster.new(self, @events)
  else
    @runner = Single.new(self, @events)
  end

  @status = :run
end

#initialize 方法中,@runner 的初始化是根据当前配置中的 worker 数决定的,如果当前的 worker > 0,那么就会选择 Cluster 作为 @runner,否则就会选择 Single,在初始化结束之后会执行 Launcher#run 方法启动当前的 Puma 进程:

From: lib/puma/launcher.rb @ line 165:
Owner: Puma::Launcher

def run
  previous_env = ENV.to_h

  setup_signals
  @runner.run

  case @status
  when :halt
    log "* Stopping immediately!"
  when :run, :stop
    graceful_stop
  when :restart
    log "* Restarting..."
    ENV.replace(previous_env)
    @runner.before_restart
    restart!
  when :exit
    # nothing
  end
end

在这个简单的 #run 方法中,Puma 通过 #setup_singals 设置了一些信号的响应过程,在这之后执行 Runner#run 启动 Puma 的服务。

启动服务

根据配置文件中不同的配置项,Puma 在启动时有两种不同的选择,一种是当前的 worker 数为 0,这时会通过 Single 启动单机模式的 Puma 进程,另一种情况是 worker 数大于 0,它使用 Cluster 的 runner 启动一组 Puma 进程。

single-cluster

在这一节中文章将会简单介绍不同的 runner 是如何启动 Puma 进程的。

单机模式

Puma 单机模式的启动通过 Single 类来处理,而定义这个类的文件 single.rb 中其实并没有多少代码,我们从中就可以看到单机模式下 Puma 的启动其实并不复杂:

From: lib/puma/single.rb @ line 40:
Owner: Puma::Single

def run
  output_header "single"

  if daemon?
    log "* Daemonizing..."
    Process.daemon(true)
    redirect_io
  end

  load_and_bind
  @launcher.write_state
  @server = server = start_server

  begin
    server.run.join
  rescue Interrupt
    # Swallow it
  end
end

如果我们启动了后台模式,就会通过 Puma 为 Process 模块扩展的方法 .daemon 在后台启动新的 Puma 进程,启动的过程其实和 Unicorn 中的差不多:

From: lib/puma/daemon_ext.rb @ line 12:
Owner: #<Class:Process>

def self.daemon(nochdir=false, noclose=false)
  exit if fork
  Process.setsid
  exit if fork

  Dir.chdir "/" unless nochdir

  if !noclose
    STDIN.reopen File.open("/dev/null", "r")
    null_out = File.open "/dev/null", "w"
    STDOUT.reopen null_out
    STDERR.reopen null_out
  end

  0
end

在 Puma 中通过两次 fork 同时将当前进程从终端中分离出来,最终就可以得到一个独立的 Puma 进程,你可以通过下面的图片简单理解这个过程:

puma-daemonize

当我们在后台启动了一个 Puma 的 master 进程之后就可以开始启动 Puma 的服务器了,也就是 Puma::Server 的实例:

From: lib/puma/runner.rb @ line 151:
Owner: Puma::Runner

def start_server
  min_t = @options[:min_threads]
  max_t = @options[:max_threads]

  server = Puma::Server.new app, @launcher.events, @options
  server.min_threads = min_t
  server.max_threads = max_t
  server.inherit_binder @launcher.binder

  if @options[:mode] == :tcp
    server.tcp_mode!
  end

  unless development?
    server.leak_stack_on_error = false
  end

  server
end

这里有很多不是特别重要的代码,需要注意的是 Server 初始化的过程以及最大、最小线程数的设置,这些信息都是通过命令行或者配置文件传入的,例如 puma -t 8:32 表示当前的最小线程数为 8、最大线程数为 32 个,Puma 会根据当前的流量自动调节同一个进程中的线程个数。

服务在启动时会创建一个线程池 ThreadPool 并传入一个用于处理请求的 block,这个方法的实现其实非常长,这里省略了很多代码;

From: lib/puma/server.rb @ line 255:
Owner: Puma::Server

def run(background=true)
  queue_requests = @queue_requests

  @thread_pool = ThreadPool.new(@min_threads,
                                @max_threads,
                                IOBuffer) do |client, buffer|
    process_now = false

    begin
      if queue_requests
        process_now = client.eagerly_finish
      else
        client.finish
        process_now = true
      end
    rescue MiniSSL::SSLError, HttpParserError => e
      # ...
    rescue ConnectionError
      client.close
    else
      if process_now
        process_client client, buffer
      else
        client.set_timeout @first_data_timeout
        @reactor.add client
      end
    end
  end

  if queue_requests
    @reactor = Reactor.new self, @thread_pool
    @reactor.run_in_thread
  end

  @thread = Thread.new { handle_servers }
  @thread
end

上述代码创建了一个新的 Reactor 对象并在一个新的线程中执行 #handle_servers 接受客户端的请求,文章会在后面介绍请求的处理。

集群模式

如果在启动 puma 进程时使用 -w 参数,例如下面的命令:

$ puma -w 3
[20904] Puma starting in cluster mode...
[20904] * Version 3.10.0 (ruby 2.3.3-p222), codename: Russell's Teapot
[20904] * Min threads: 0, max threads: 16
[20904] * Environment: development
[20904] * Process workers: 3
[20904] * Phased restart available
[20904] * Listening on tcp://0.0.0.0:9292
[20904] Use Ctrl-C to stop
[20904] - Worker 2 (pid: 20907) booted, phase: 0
[20904] - Worker 1 (pid: 20906) booted, phase: 0
[20904] - Worker 0 (pid: 20905) booted, phase: 0

$ ps aux | grep puma
draveness        20909   0.0  0.0  4296440    952 s001  S+   10:23AM   0:00.01 grep --color=auto --exclude-dir=.bzr --exclude-dir=CVS --exclude-dir=.git --exclude-dir=.hg --exclude-dir=.svn puma
draveness        20907   0.0  0.1  4358888  12128 s003  S+   10:23AM   0:00.07 puma: cluster worker 2: 20904 [Desktop]
draveness        20906   0.0  0.1  4358888  12148 s003  S+   10:23AM   0:00.07 puma: cluster worker 1: 20904 [Desktop]
draveness        20905   0.0  0.1  4358888  12196 s003  S+   10:23AM   0:00.07 puma: cluster worker 0: 20904 [Desktop]
draveness        20904   0.0  0.2  4346784  25632 s003  S+   10:23AM   0:00.67 puma 3.10.0 (tcp://0.0.0.0:9292) [Desktop]

上述命令就会启动一个 Puma 的 master 进程和三个 worker 进程,Puma 集群模式就是通过 Puma::Cluster 类来启动的,而启动集群的方法 #run 仍然是一个非常长的方法,在这里仍然省去了很多的代码:

From: lib/puma/cluster.rb @ line 386:
Owner: Puma::Cluster

def run
  @status = :run

  output_header "cluster"
  log "* Process workers: #{@options[:workers]}"

  read, @wakeup = Puma::Util.pipe

  Process.daemon(true)
  spawn_workers

  begin
    while @status == :run
      begin
        res = IO.select([read], nil, nil, WORKER_CHECK_INTERVAL)

        if res
          req = read.read_nonblock(1)
          result = read.gets
          pid = result.to_i

          if w = @workers.find { |x| x.pid == pid }
            case req
            when "b"
              w.boot!
            when "t"
              w.dead!
            when "p"
              w.ping!(result.sub(/^\d+/,'').chomp)
            end
          else
            log "! Out-of-sync worker list, no #{pid} worker"
          end
        end

      rescue Interrupt
        @status = :stop
      end
    end

    stop_workers unless @status == :halt
  ensure
    read.close
    @wakeup.close
  end
end

在使用 #spawn_workers 之后,当前 master 进程就开始通过 Socket 监听所有来自worker 的消息,例如当前的状态以及心跳检查等等。

#spawn_workers 方法会通过 fork 创建当前集群中缺少的 worker 数,在新的进程中执行 #worker 方法并将 worker 保存在 master 的 @workers 数组中:

From: lib/puma/cluster.rb @ line 116:
Owner: Puma::Cluster

def spawn_workers
  diff = @options[:workers] - @workers.size
  return if diff < 1

  master = Process.pid

  diff.times do
    idx = next_worker_index
    pid = fork { worker(idx, master) }

    debug "Spawned worker: #{pid}"
    @workers << Worker.new(idx, pid, @phase, @options)
  end
end

在 fork 出的新进程中,#worker 方法与单机模式中一样都创建了新的 Server 实例,调用 #run#join 方法启动服务:

From: lib/puma/cluster.rb @ line 231:
Owner: Puma::Cluster

def worker(index, master)
  title  = "puma: cluster worker #{index}: #{master}"
  $0 = title

  server = start_server
  server.run.join
end

与 Unicorn 完全相同,Puma 使用了一个 master 进程来管理所有的 worker 进程:

puma-cluster-mode

虽然 Puma 集群中的所有节点也都是由 master 管理的,但是所有的事件和信号会由各个接受信号的进程处理的,只有在特定事件发生时会通知主进程。

处理请求

在 Puma 中所有的请求都是通过 ServerThreadPool 协作来响应的,我们在 #handler_servers 方法中通过 IO.select 监听一组套接字上的读写事件:

From: lib/puma/server.rb @ line 334:
Owner: Puma::Server

def handle_servers
  begin
    sockets = @binder.ios
    pool = @thread_pool

    while @status == :run
      begin
        ios = IO.select sockets
        ios.first.each do |sock|
          begin
            if io = sock.accept_nonblock
              client = Client.new io, @binder.env(sock)
              pool << client
              pool.wait_until_not_full
            end
          rescue Errno::ECONNABORTED
            io.close rescue nil
          end
        rescue Object => e
          @events.unknown_error self, e, "Listen loop"
        end
      end
    rescue Exception => e
      # ...
  end
end

当有读写事件发生时会非阻塞的接受 Socket,创建新的 Client 对象最后加入到线程池中交给线程池来处理接下来的请求。

From: lib/puma/thread_pool.rb @ line 140:
Owner: Puma::ThreadPool

def <<(work)
  @mutex.synchronize do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

ThreadPool 覆写了 #<< 方法,在这个方法中它将 Client 对象加入到 @todo 数组中,通过对比几个参数选择是否创建一个新的线程来处理当前队列中的任务。

重新回到 ThreadPool 的初始化方法 #initialize 中,线程池在初始化时就会创建最低数量的线程保证当前的 worker 进程中有足够的工作线程能够处理客户端的请求:

From: lib/puma/thread_pool.rb @ line 21:
Owner: Puma::ThreadPool

def initialize(min, max, *extra, &block)
  @mutex = Mutex.new

  @todo = []

  @spawned = 0
  @waiting = 0

  @min = Integer(min)
  @max = Integer(max)
  @block = block
  @extra = extra

  @workers = []

  @mutex.synchronize do
    @min.times { spawn_thread }
  end
end

每一个线程都是通过 Thread.new 创建的,我们会在这个线程执行的过程中执行传入的 block:

From: lib/puma/thread_pool.rb @ line 21:
Owner: Puma::ThreadPool

def spawn_thread
  @spawned += 1

  th = Thread.new(@spawned) do |spawned|
    todo  = @todo
    block = @block
    mutex = @mutex

    extra = @extra.map { |i| i.new }

    while true
      work = nil

      continue = true

      mutex.synchronize do
        work = todo.shift
      end

      begin
        block.call(work, *extra)
      rescue Exception => e
        STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
      end
    end

    mutex.synchronize do
      @spawned -= 1
      @workers.delete th
    end
  end

  @workers << th
  th
end

在每一个工作完成之后,也会在一个互斥锁内部使用 #delete 方法将当前线程从数组中删除,在这里执行的 block 中将客户端对象 Client 加入了 Reactor 中等待之后的处理。

@thread_pool = ThreadPool.new(@min_threads,
                              @max_threads,
                              IOBuffer) do |client, buffer|
  begin
    client.finish
  rescue MiniSSL::SSLError => e
    # ...
  else
    process_client client, buffer
  end
end

如过当前任务不需要立即处理,就会向 Reactor 加入任务等待一段时间,否则就会立即由 #process_client 方法进行处理,其中调用了 #handle_request 方法尝试处理当前的网络请求:

From: lib/puma/server.rb @ line 439:
Owner: Puma::Server

def process_client(client, buffer)
  begin
    while true
      case handle_request(client, buffer)
      when false
        return
      when true
        return unless @queue_requests
        buffer.reset
        unless client.reset(@status == :run)
          client.set_timeout @persistent_timeout
          @reactor.add client
          return
        end
      end
    end
  rescue StandardError => e
    # ...
  ensure
    # ...
  end
end

用于处理网络请求的方法 #handle_request 足足有 200 多行,代码中处理非常多的实现细节,在这里实在是不想一行一行代码看过去,也就简单梳理一下这段代码的脉络了:

From: lib/puma/server.rb @ line 574:
Owner: Puma::Server

def handle_request(req, lines)
  env = req.env
  client = req.io

  # ...

  begin
    status, headers, res_body = @app.call(env)

    headers.each do |k, vs|
      # ...
    end

    fast_write client, lines.to_s
    res_body.each do |part|
      fast_write client, part
      client.flush
    end
  ensure
    body.close
  end
end

我们在这里直接将这段代码压缩至 20 行左右,你可以看到与其他的 webserver 完全相同,这里也调用了 Rack 应用的 #call 方法获得了一个三元组,然后通过 #fast_write 将请求写回客户端的 Socket 结束这个 HTTP 请求。

并发模型

到目前为止,我们已经对 Puma 是如何处理 HTTP 请求的有一个比较清晰的认识了,对于每一个 HTTP 请求都会由操作系统选择不同的进程来处理,这部分的负载均衡完全是由 OS 层来做的,当请求被分配给某一个进程时,当前进程会根据持有的线程数选择是否对请求进行处理,在这时可能会创建新的 Thread 对象来处理这个请求,也可能会把当前请求暂时扔到 Reactor 中进行等待。

puma-concurrency-model

Reactor 主要是为了提高 Puma 服务的性能存在的产物,它能够让当前的 worker 接受所有请求并将它们以队列的形式传入处理器中;如果当前的系统中存在慢客户端,那么也会占用处理请求的资源,不过由于 Puma 是多进程多线程模型的,所以影响没有那么严重,但是我们也经常会通过反向代理来解决慢客户端的问题。

总结

相比于多进程单线程的 Unicorn,Puma 提供了更灵活的配置功能,每一个进程的线程数都能在一定范围内进行收缩,目前也是绝大多数的 Ruby 项目使用的 webserver,从不同 webserver 的发展我们其实可以看出混合方式的并发模型虽然实现更加复杂,但是确实能够提供更高的性能和容错。

Puma 项目使用了 Rubocop 来规范项目中的代码风格,相比其他的 webserver 来说确实有更好的阅读体验,只是偶尔出现的长方法会让代码在理解时出现一些问题。

关于图片和转载

知识共享许可协议
本作品采用知识共享署名 4.0 国际许可协议进行许可。 转载时请注明原文链接,图片在使用时请保留图片中的全部内容,可适当缩放并在引用处附上图片所在的文章链接,图片使用 Sketch 进行绘制。

关于评论和留言

如果对本文 浅谈 Puma 的并发模型与实现 的内容有疑问,请在下面的评论系统中留言,谢谢。

原文链接:浅谈 Puma 的并发模型与实现 · 面向信仰编程

Follow: Draveness · GitHub

Draveness

Rails / Elixir / iOS

Beijing, China draveness.me