Revision: 3982 http://trac.macosforge.org/projects/ruby/changeset/3982 Author: joshua.ballanco@apple.com Date: 2010-04-30 01:16:30 -0700 (Fri, 30 Apr 2010) Log Message: ----------- Initial check-in of the GCD work Modified Paths: -------------- ControlTower/branches/gcd-ify/bin/control_tower ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb Modified: ControlTower/branches/gcd-ify/bin/control_tower =================================================================== --- ControlTower/branches/gcd-ify/bin/control_tower 2010-04-30 05:25:10 UTC (rev 3981) +++ ControlTower/branches/gcd-ify/bin/control_tower 2010-04-30 08:16:30 UTC (rev 3982) @@ -9,7 +9,8 @@ @options = { :rackup => './config.ru', :port => '8080', - :host => 'localhost' + :host => 'localhost', + :concurrent => false } OptionParser.new do |opts| @@ -25,7 +26,7 @@ @options[:host] = host end - opts.on("-c", "--[no]-concurrency", "Handle requests concurrently") do |concurrent| + opts.on("-c", "--[no-]concurrency", "Handle requests concurrently") do |concurrent| @options[:concurrent] = concurrent end end.parse! @@ -35,11 +36,6 @@ exit 1 end -unless File.exist? File.expand_path(@options[:rackup]) - puts "We only know how to deal with Rack-up configs for now" - exit 1 -end - # Under construction...everything is development! ENV['RACK_ENV'] = 'development' Modified: ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb =================================================================== --- ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb 2010-04-30 05:25:10 UTC (rev 3981) +++ ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb 2010-04-30 08:16:30 UTC (rev 3982) @@ -5,52 +5,49 @@ module ControlTower class RackSocket - READ_SIZE = 16 * 1024 - RACK_VERSION = 'rack.version'.freeze VERSION = [1,0].freeze def initialize(host, port, server, concurrent) @server = server @socket = TCPServer.new(host, port) @status = :closed # Start closed and give the server time to start - prepare_environment + @concurrent = concurrent - #if concurrent - # @env['rack.multithread'] = true - # @request_queue = Dispatch::Queue.concurrent - #else - # @env['rack.multithread'] = false - # @request_queue = Dispatch::Queue.new('com.apple.ControlTower.rack_socket_queue') - #end - #@request_group = Dispatch::Group.new + if @concurrent + @request_queue = Dispatch::Queue.concurrent + $stdout.puts "Caution: Wake turbulance! Heavy landing on parallel runway." + else + @request_queue = Dispatch::Queue.new('com.apple.ControlTower.rack_socket_queue') + end + @request_group = Dispatch::Group.new end def open @status = :open while (@status == :open) connection = @socket.accept + $stderr.puts "Got a connection: #{connection}(fd:#{connection.to_i})" if ENV['CT_DEBUG'] - # TODO -- Concurrency doesn't quite work yet... - #@request_group.dispatch(@request_queue) do - req_data = parse!(connection, prepare_environment) - data = @server.handle_request(req_data) - data.each do |chunk| - connection.write chunk + @request_queue.async(@request_group) do + parse!(connection, prepare_environment) do |env| + response = @server.handle_request(env) + response.each do |chunk| + connection.write chunk + end end - connection.close - #end + end end end def close - @status = :close + @status = :closed # You get 30 seconds to empty the request queue and get outa here! Dispatch::Source.timer(30, 0, 1, Dispatch::Queue.concurrent) do puts "Timed out waiting for connections to close" exit 1 end - #@request_group.wait + @request_group.wait @socket.close end @@ -60,43 +57,51 @@ def prepare_environment { 'rack.errors' => $stderr, 'rack.input' => '', - 'rack.multiprocess' => false, # No multiprocess, yet...probably never + 'rack.multiprocess' => false, # No multiprocess, yet...possibly never 'rack.run_once' => false, - RACK_VERSION => VERSION } + 'rack.multithread' => @concurrent ? true : false, + 'rack.version' => VERSION } end - def parse!(connection, env) + def parse!(connection, env, &block) + connection_queue = Dispatch::Queue.new('com.apple.ControlTower.connection_queue') parser = ::CTParser.new data = "" - headers_done = false + parsing_headers = true content_length = 0 - while (!headers_done || env['rack.input'].bytesize < content_length) do - select([connection], nil, nil, 1) - if headers_done - begin - data = connection.readpartial(READ_SIZE) - env['rack.input'] << data - rescue EOFError - break + Dispatch::Source.new(Dispatch::Source::READ, connection, 0, connection_queue) do |source| + $stderr.puts "#{source.data} bytes incoming" if ENV['CT_DEBUG'] + begin + if parsing_headers + $stderr.puts "Parsing headers..." if ENV['CT_DEBUG'] + data << connection.readpartial(source.data) + nread = parser.parseData(data, forEnvironment: env, startingAt: nread) + if parser.finished + parsing_headers = false + $stderr.puts "Headers done! Content-Length: #{env['CONTENT_LENGTH']}" if ENV['CT_DEBUG'] + content_length = env['CONTENT_LENGTH'].to_i + end + else + $stderr.puts "Reading body" if ENV['CT_DEBUG'] + env['rack.input'] << connection.readpartial(source.data) end - else - data << connection.readpartial(READ_SIZE) - nread = parser.parseData(data, forEnvironment: env, startingAt: nread) - if parser.finished - headers_done = true - content_length = env['CONTENT_LENGTH'].to_i - end + rescue EOFError + content_length = env['rack.input'].bytesize end + + $stderr.puts "Input Length: #{env['rack.input'].bytesize}, Content-Length: #{content_length}" if ENV['CT_DEBUG'] + unless parsing_headers || env['rack.input'].bytesize < content_length + # Rack says "Make that a IO!" + body = Tempfile.new('control-tower-request-body-') + body << env['rack.input'] + body.rewind + env['rack.input'] = body + block.call(env) + $stderr.puts "All done. Canceling source for #{source.handle}(fd:#{source.handle.to_i})" if ENV['CT_DEBUG'] + source.cancel! + end end - - # Rack says "Make that a StringIO!" - body = Tempfile.new('control-tower-request-body-') - body << env['rack.input'] - body.rewind - env['rack.input'] = body - # Returning what we've got... - return env end end end