[macruby-changes] [3982] ControlTower/branches/gcd-ify

source_changes at macosforge.org source_changes at macosforge.org
Fri Apr 30 01:16:35 PDT 2010


Revision: 3982
          http://trac.macosforge.org/projects/ruby/changeset/3982
Author:   joshua.ballanco at apple.com
Date:     2010-04-30 01:16:30 -0700 (Fri, 30 Apr 2010)
Log Message:
-----------
Initial check-in of the GCD work

Modified Paths:
--------------
    ControlTower/branches/gcd-ify/bin/control_tower
    ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb

Modified: ControlTower/branches/gcd-ify/bin/control_tower
===================================================================
--- ControlTower/branches/gcd-ify/bin/control_tower	2010-04-30 05:25:10 UTC (rev 3981)
+++ ControlTower/branches/gcd-ify/bin/control_tower	2010-04-30 08:16:30 UTC (rev 3982)
@@ -9,7 +9,8 @@
 @options = {
   :rackup => './config.ru',
   :port => '8080',
-  :host => 'localhost'
+  :host => 'localhost',
+  :concurrent => false
 }
 
 OptionParser.new do |opts|
@@ -25,7 +26,7 @@
     @options[:host] = host
   end
 
-  opts.on("-c", "--[no]-concurrency", "Handle requests concurrently") do |concurrent|
+  opts.on("-c", "--[no-]concurrency", "Handle requests concurrently") do |concurrent|
     @options[:concurrent] = concurrent
   end
 end.parse!
@@ -35,11 +36,6 @@
   exit 1
 end
 
-unless File.exist? File.expand_path(@options[:rackup])
-  puts "We only know how to deal with Rack-up configs for now"
-  exit 1
-end
-
 # Under construction...everything is development!
 ENV['RACK_ENV'] = 'development'
 

Modified: ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb
===================================================================
--- ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb	2010-04-30 05:25:10 UTC (rev 3981)
+++ ControlTower/branches/gcd-ify/lib/control_tower/rack_socket.rb	2010-04-30 08:16:30 UTC (rev 3982)
@@ -5,52 +5,49 @@
 
 module ControlTower
   class RackSocket
-    READ_SIZE = 16 * 1024
-    RACK_VERSION = 'rack.version'.freeze
     VERSION = [1,0].freeze
 
     def initialize(host, port, server, concurrent)
       @server = server
       @socket = TCPServer.new(host, port)
       @status = :closed # Start closed and give the server time to start
-      prepare_environment
+      @concurrent = concurrent
 
-      #if concurrent
-      #  @env['rack.multithread'] = true
-      #  @request_queue = Dispatch::Queue.concurrent
-      #else
-      #  @env['rack.multithread'] = false
-      #  @request_queue = Dispatch::Queue.new('com.apple.ControlTower.rack_socket_queue')
-      #end
-      #@request_group = Dispatch::Group.new
+      if @concurrent
+        @request_queue = Dispatch::Queue.concurrent
+        $stdout.puts "Caution: Wake turbulance! Heavy landing on parallel runway."
+      else
+        @request_queue = Dispatch::Queue.new('com.apple.ControlTower.rack_socket_queue')
+      end
+      @request_group = Dispatch::Group.new
     end
 
     def open
       @status = :open
       while (@status == :open)
         connection = @socket.accept
+        $stderr.puts "Got a connection: #{connection}(fd:#{connection.to_i})" if ENV['CT_DEBUG']
 
-        # TODO -- Concurrency doesn't quite work yet...
-        #@request_group.dispatch(@request_queue) do
-          req_data = parse!(connection, prepare_environment)
-          data = @server.handle_request(req_data)
-          data.each do |chunk|
-            connection.write chunk
+        @request_queue.async(@request_group) do
+          parse!(connection, prepare_environment) do |env|
+            response = @server.handle_request(env)
+            response.each do |chunk|
+              connection.write chunk
+            end
           end
-          connection.close
-        #end
+        end
       end
     end
 
     def close
-      @status = :close
+      @status = :closed
 
       # You get 30 seconds to empty the request queue and get outa here!
       Dispatch::Source.timer(30, 0, 1, Dispatch::Queue.concurrent) do
         puts "Timed out waiting for connections to close"
         exit 1
       end
-      #@request_group.wait
+      @request_group.wait
       @socket.close
     end
 
@@ -60,43 +57,51 @@
     def prepare_environment
       { 'rack.errors' => $stderr,
         'rack.input' => '',
-        'rack.multiprocess' => false, # No multiprocess, yet...probably never
+        'rack.multiprocess' => false, # No multiprocess, yet...possibly never
         'rack.run_once' => false,
-        RACK_VERSION => VERSION }
+        'rack.multithread' => @concurrent ? true : false,
+        'rack.version' => VERSION }
     end
 
-    def parse!(connection, env)
+    def parse!(connection, env, &block)
+      connection_queue = Dispatch::Queue.new('com.apple.ControlTower.connection_queue')
       parser = ::CTParser.new
       data = ""
-      headers_done = false
+      parsing_headers = true
       content_length = 0
 
-      while (!headers_done || env['rack.input'].bytesize < content_length) do
-        select([connection], nil, nil, 1)
-        if headers_done
-          begin
-            data = connection.readpartial(READ_SIZE)
-            env['rack.input'] << data
-          rescue EOFError
-            break
+      Dispatch::Source.new(Dispatch::Source::READ, connection, 0, connection_queue) do |source|
+        $stderr.puts "#{source.data} bytes incoming" if ENV['CT_DEBUG']
+        begin
+          if parsing_headers
+            $stderr.puts "Parsing headers..." if ENV['CT_DEBUG']
+            data << connection.readpartial(source.data)
+            nread = parser.parseData(data, forEnvironment: env, startingAt: nread)
+            if parser.finished
+              parsing_headers = false
+              $stderr.puts "Headers done! Content-Length: #{env['CONTENT_LENGTH']}" if ENV['CT_DEBUG']
+              content_length = env['CONTENT_LENGTH'].to_i
+            end
+          else
+            $stderr.puts "Reading body" if ENV['CT_DEBUG']
+            env['rack.input'] << connection.readpartial(source.data)
           end
-        else
-          data << connection.readpartial(READ_SIZE)
-          nread = parser.parseData(data, forEnvironment: env, startingAt: nread)
-          if parser.finished
-            headers_done = true
-            content_length = env['CONTENT_LENGTH'].to_i
-          end
+        rescue EOFError
+          content_length = env['rack.input'].bytesize
         end
+
+        $stderr.puts "Input Length: #{env['rack.input'].bytesize}, Content-Length: #{content_length}" if ENV['CT_DEBUG']
+        unless parsing_headers || env['rack.input'].bytesize < content_length
+          # Rack says "Make that a IO!"
+          body = Tempfile.new('control-tower-request-body-')
+          body << env['rack.input']
+          body.rewind
+          env['rack.input'] = body
+          block.call(env)
+          $stderr.puts "All done. Canceling source for #{source.handle}(fd:#{source.handle.to_i})" if ENV['CT_DEBUG']
+          source.cancel!
+        end
       end
-
-      # Rack says "Make that a StringIO!"
-      body = Tempfile.new('control-tower-request-body-')
-      body << env['rack.input']
-      body.rewind
-      env['rack.input'] = body
-      # Returning what we've got...
-      return env
     end
   end
 end
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/macruby-changes/attachments/20100430/c7de7626/attachment.html>


More information about the macruby-changes mailing list