don't set listen(2) backlog on inherited sockets
[yahns.git] / lib / yahns / queue_kqueue.rb
blob27c68fc84190204e1e63f94c2d9dc0a322a487e6
1 # -*- encoding: binary -*-
2 # Copyright (C) 2013-2016 all contributors <[email protected]>
3 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
4 # frozen_string_literal: true
6 # This is the dangerous, low-level kqueue interface for sleepy_penguin
7 # It is safe as long as you're aware of all potential concurrency
8 # issues given multithreading, GC, and kqueue itself.
9 class Yahns::Queue < SleepyPenguin::Kqueue::IO # :nodoc:
10   include SleepyPenguin
11   attr_accessor :fdmap # Yahns::Fdmap
13   # public
14   QEV_QUIT = nil # Level Trigger for QueueQuitter
15   QEV_RD = EvFilt::READ
16   QEV_WR = EvFilt::WRITE
18   ADD_ONESHOT = Ev::ADD | Ev::ONESHOT # private
20   # for HTTP and HTTPS servers, we rely on the io writing to us, first
21   # flags: QEV_RD/QEV_WR (usually QEV_RD)
22   def queue_add(io, flags)
23     # order is very important here, this thread cannot do anything with
24     # io once we've issued kevent EV_ADD because another thread may use it
25     @fdmap.add(io)
26     fflags = ADD_ONESHOT
27     if flags == QEV_QUIT
28       fflags = Ev::ADD
29       flags = QEV_WR
30     end
31     kevent(Kevent[io.fileno, flags, fflags, 0, 0, io])
32   end
34   # non-blocking listeners are level-trigger
35   def queue_add_acceptor(io)
36     kevent(Kevent[io.fileno, EvFilt::READ, Ev::ADD, 0, 0, io])
37     # no EPOLLEXCLUSIVE analogy, so assume thundering herds :<
38     false
39   end
41   def queue_mod(io, flags)
42     kevent(Kevent[io.fileno, flags, ADD_ONESHOT, 0, 0, io])
43   end
45   def thr_init
46     Thread.current[:yahns_rbuf] = ''.dup
47     Thread.current[:yahns_fdmap] = @fdmap
48     Thread.current[:yahns_queue] = self
49   end
51   # returns an infinitely running thread
52   def worker_thread(logger, max_events)
53     Thread.new do
54       thr_init
55       begin
56         kevent(nil, max_events) do |_,_,_,_,_,io| # don't care for flags for now
57           next if io.closed?
58           # Note: we absolutely must not do anything with io after
59           # we've called kevent(...,EV_ADD) on it, io is exclusive to this
60           # thread only until kevent(...,EV_ADD) is called on it.
61           case rv = io.yahns_step
62           when :wait_readable
63             kevent(Kevent[io.fileno, QEV_RD, ADD_ONESHOT, 0, 0, io])
64           when :wait_writable
65             kevent(Kevent[io.fileno, QEV_WR, ADD_ONESHOT, 0, 0, io])
66           when :ignore # only used by rack.hijack
67             # we cannot EV_DELETE after hijacking, the hijacker
68             # may have already closed it  Likewise, io.fileno is not
69             # expected to work, so we had to erase it from fdmap before hijack
70           when nil, :close
71             # this must be the ONLY place where we call IO#close on
72             # things that got inside the queue AND fdmap
73             @fdmap.sync_close(io)
74           else
75             raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
76           end
77         end
78       rescue StandardError, LoadError, SyntaxError => e
79         break if closed? # can still happen due to shutdown_timeout
80         Yahns::Log.exception(logger, 'queue loop', e)
81       end while true
82     end
83   end
84 end