don't set listen(2) backlog on inherited sockets
[yahns.git] / lib / yahns / proxy_http_response.rb
blobdb9c4b75ee9135b85ef24533129e40d165a1fecd
1 # -*- encoding: binary -*-
2 # Copyright (C) 2015-2016 all contributors <[email protected]>
3 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
4 # frozen_string_literal: true
6 require_relative 'wbuf_lite'
8 # loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for
9 # constants.
10 module Yahns::HttpResponse # :nodoc:
12   # switch and yield
13   def proxy_unbuffer(wbuf, nxt = :ignore)
14     @state = wbuf
15     wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=)
16     proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD :
17                     Yahns::Queue::QEV_WR)
18     nxt
19   end
21   def wbuf_alloc(req_res)
22     if req_res.proxy_pass.proxy_buffering
23       Yahns::Wbuf.new(nil, req_res.alive)
24     else
25       Yahns::WbufLite.new(req_res)
26     end
27   end
29   # write everything in buf to our client socket (or wbuf, if it exists)
30   # it may return a newly-created wbuf or nil
31   def proxy_write(wbuf, buf, req_res)
32     unless wbuf
33       # no write buffer, try to write directly to the client socket
34       case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
35       when nil then return # done writing buf, likely
36       when String, Array # partial write, hope the skb grows
37         buf = rv
38       when :wait_writable, :wait_readable
39         wbuf = req_res.resbuf ||= wbuf_alloc(req_res)
40         break
41       end while true
42     end
44     wbuf.wbuf_write(self, buf)
45     wbuf.busy ? wbuf : nil
46   end
48   def proxy_err_response(code, req_res, exc)
49     logger = self.class.logger # Yahns::HttpContext#logger
50     case exc
51     when nil
52       logger.error('premature upstream EOF')
53     when Kcar::ParserError
54       logger.error("upstream response error: #{exc.message}")
55     when String
56       logger.error(exc)
57     else
58       Yahns::Log.exception(logger, 'upstream error', exc)
59     end
60     # try to write something, but don't care if we fail
61     Integer === code and
62       kgio_trywrite("HTTP/1.1 #{code} #{
63                      Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n") rescue nil
65     shutdown rescue nil
66     @input = @input.close if @input
68     # this is safe ONLY because we are in an :ignore state after
69     # Fdmap#forget when we got hijacked:
70     close
72     nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
73   ensure
74     wbuf = req_res.resbuf
75     wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort)
76   end
78   def wait_on_upstream(req_res)
79     req_res.resbuf ||= wbuf_alloc(req_res)
80     :wait_readable # self remains in :ignore, wait on upstream
81   end
83   def proxy_res_headers(res, req_res)
84     status, headers = res
85     code = status.to_i
86     msg = Rack::Utils::HTTP_STATUS_CODES[code]
87     env = @hs.env
88     have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code) &&
89                 env['REQUEST_METHOD'] != 'HEAD'.freeze
90     flags = MSG_DONTWAIT
91     alive = @hs.next? && self.class.persistent_connections
92     term = false
93     response_headers = req_res.proxy_pass.response_headers
95     res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup
96     headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
97       case key
98       when /\A(?:Connection|Keep-Alive)\z/i
99         next # do not let some upstream headers leak through
100       when %r{\AContent-Length\z}i
101         term = true
102         flags |= MSG_MORE if have_body && value.to_i > 0
103       when %r{\ATransfer-Encoding\z}i
104         term = true if value =~ /\bchunked\b/i
105       end
107       # response header mapping
108       case val = response_headers[key]
109       when :ignore
110         next
111       when String
112         value = val
113       end
115       res << "#{key}: #{value}\r\n"
116     end
118     # For now, do not add a Date: header, assume upstream already did it
119     # but do not care if they did not
121     # chunk the response ourselves if the client supports it,
122     # but the backend does not terminate properly
123     if alive && ! term && have_body
124       if env['HTTP_VERSION'] == 'HTTP/1.1'.freeze
125         res << "Transfer-Encoding: chunked\r\n".freeze
126       else # we can't persist HTTP/1.0 and HTTP/0.9 w/o Content-Length
127         alive = false
128       end
129     end
130     res << (alive ? "Connection: keep-alive\r\n\r\n".freeze
131                   : "Connection: close\r\n\r\n".freeze)
133     # send the headers
134     case rv = kgio_syssend(res, flags)
135     when nil # all done, likely
136       res.clear
137       break
138     when String # partial write, highly unlikely
139       flags = MSG_DONTWAIT
140       res = rv # hope the skb grows
141     when :wait_writable, :wait_readable # highly unlikely in real apps
142       proxy_write(nil, res, req_res)
143       break # keep buffering body...
144     end while true
145     req_res.alive = alive
146     have_body
147   end
149   def read_len(len)
150     max = 16384
151     max = len if len && len < max
152     max
153   end
155   def proxy_read_body(tip, kcar, req_res)
156     chunk = ''.dup if kcar.chunked?
157     len = kcar.body_bytes_left
158     rbuf = Thread.current[:yahns_rbuf]
159     alive = req_res.alive
160     wbuf = req_res.resbuf
162     case tmp = tip.shift || req_res.kgio_tryread(read_len(len), rbuf)
163     when String
164       if len
165         kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
166       elsif chunk
167         kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
168         next if chunk.empty? # call req_res.kgio_tryread for more
169         tmp = chunk_out(chunk)
170       elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
171         tmp = chunk_out(tmp)
172       # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
173       end
174       wbuf = proxy_write(wbuf, tmp, req_res)
175       chunk.clear if chunk
176       if Yahns::WbufLite === wbuf
177         req_res.proxy_trailers = [ rbuf.dup, tip ] if chunk && kcar.body_eof?
178         return proxy_unbuffer(wbuf)
179       end
180     when nil # EOF
181       # HTTP/1.1 upstream, unexpected premature EOF:
182       msg = "upstream EOF (#{len} bytes left)" if len
183       msg = 'upstream EOF (chunk)' if chunk
184       return proxy_err_response(nil, req_res, msg) if msg
186       # HTTP/1.0 upstream:
187       wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
188       req_res.shutdown
189       return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf
190       return proxy_busy_mod(wbuf, req_res)
191     when :wait_readable
192       return wait_on_upstream(req_res)
193     end until kcar.body_eof?
195     if chunk
196       # tip is an empty array and becomes trailer storage
197       req_res.proxy_trailers = [ rbuf.dup, tip ]
198       return proxy_read_trailers(kcar, req_res)
199     end
200     proxy_busy_mod(wbuf, req_res)
201   end
203   def proxy_read_trailers(kcar, req_res)
204     chunk, tlr = req_res.proxy_trailers
205     rbuf = Thread.current[:yahns_rbuf]
206     wbuf = req_res.resbuf
208     until kcar.trailers(tlr, chunk)
209       case rv = req_res.kgio_tryread(16384, rbuf)
210       when String
211         chunk << rv
212       when :wait_readable
213         return wait_on_upstream(req_res)
214       when nil # premature EOF
215         return proxy_err_response(nil, req_res, 'upstream EOF (trailers)')
216       end # no loop here
217     end
218     wbuf = proxy_write(wbuf, trailer_out(tlr), req_res)
219     return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
220     proxy_busy_mod(wbuf, req_res)
221   end
223   # start streaming the response once upstream is done sending headers to us.
224   # returns :wait_readable if we need to read more from req_res
225   # returns :ignore if we yield control to the client(self)
226   # returns nil if completely done
227   def proxy_response_start(res, tip, kcar, req_res)
228     have_body = proxy_res_headers(res, req_res)
229     tip = tip.empty? ? [] : [ tip ]
231     if have_body
232       req_res.proxy_trailers = nil # define to avoid uninitialized warnings
233       return proxy_read_body(tip, kcar, req_res)
234     end
236     # unlikely
237     wbuf = req_res.resbuf
238     return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
240     # all done reading response from upstream, req_res will be discarded
241     # when we return nil:
242     proxy_busy_mod(wbuf, req_res)
243   end
245   def proxy_response_finish(kcar, req_res)
246     req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res)
247                            : proxy_read_body([], kcar, req_res)
248   end
250   def proxy_wait_next(qflags)
251     Thread.current[:yahns_fdmap].remember(self)
252     # We must allocate a new, empty request object here to avoid a TOCTTOU
253     # in the following timeline
254     #
255     # original thread:                                 | another thread
256     # HttpClient#yahns_step                            |
257     # r = k.app.call(env = @hs.env)  # socket hijacked into epoll queue
258     # <thread is scheduled away>                       | epoll_wait readiness
259     #                                                  | ReqRes#yahns_step
260     #                                                  | proxy dispatch ...
261     #                                                  | proxy_busy_mod
262     # ************************** DANGER BELOW ********************************
263     #                                                  | HttpClient#yahns_step
264     #                                                  | # clears env
265     # sees empty env:                                  |
266     # return :ignore if env.include?('rack.hijack_io') |
267     #
268     # In other words, we cannot touch the original env seen by the
269     # original thread since it must see the 'rack.hijack_io' value
270     # because both are operating in the same Yahns::HttpClient object.
271     # This will happen regardless of GVL existence
272     hs = Unicorn::HttpRequest.new
273     hs.buf.replace(@hs.buf)
274     @hs = hs
276     # n.b. we may not touch anything in this object once we call queue_mod,
277     # another thread is likely to take it!
278     Thread.current[:yahns_queue].queue_mod(self, qflags)
279   end
281   def proxy_busy_mod(wbuf, req_res)
282     if wbuf
283       # we are completely done reading and buffering the upstream response,
284       # but have not completely written the response to the client,
285       # yield control to the client socket:
286       @state = wbuf
287       proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD :
288                       Yahns::Queue::QEV_WR)
289       # no touching self after proxy_wait_next, we may be running
290       # HttpClient#yahns_step in a different thread at this point
291     else
292       case http_response_done(req_res.alive)
293       when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
294       when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
295       when :close then close
296       end
297     end
298     nil # signal close for ReqRes#yahns_step
299   end
301   # n.b.: we can use String#size for optimized dispatch under YARV instead
302   # of String#bytesize because all the IO read methods return a binary
303   # string when given a maximum read length
304   def chunk_out(buf)
305     [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
306   end
308   def trailer_out(tlr)
309     "0\r\n#{tlr.map! do |k,v| "#{k}: #{v}\r\n" end.join}\r\n"
310   end