[CalendarServer-changes] [9676] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:45 PDT 2012


Revision: 9676
          http://trac.macosforge.org/projects/calendarserver/changeset/9676
Author:   glyph at apple.com
Date:     2012-08-11 01:55:45 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
Final hookup; connect the connection pool up to the listening control socket in the master.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/calendarserver/tap/caldav.py
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/q/calendarserver/tap/caldav.py	2012-08-11 08:55:44 UTC (rev 9675)
+++ CalendarServer/branches/users/glyph/q/calendarserver/tap/caldav.py	2012-08-11 08:55:45 UTC (rev 9676)
@@ -61,6 +61,7 @@
 from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
 
+from txdav.common.datastore.sql_tables import schema
 from txdav.common.datastore.upgrade.sql.upgrade import (
     UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
 )
@@ -94,6 +95,7 @@
 from calendarserver.controlsocket import ControlSocketConnectingService
 from twisted.protocols.amp import AMP
 from twext.enterprise.queue import WorkerFactory as QueueWorkerFactory
+from twext.enterprise.queue import PeerConnectionPool
 from calendarserver.accesslog import AMPCommonAccessLoggingObserver
 from calendarserver.accesslog import AMPLoggingFactory
 from calendarserver.accesslog import RotatingFileAccessLoggingObserver
@@ -801,7 +803,6 @@
             controlSocketClient.addFactory(_LOG_ROUTE, f)
             from txdav.common.datastore.sql import CommonDataStore as SQLStore
             if isinstance(store, SQLStore):
-                from txdav.common.datastore.sql_tables import schema
                 def queueMasterAvailable(connectionFromMaster):
                     store.queuer = connectionFromMaster
                 queueFactory = QueueWorkerFactory(store.newTransaction, schema,
@@ -1287,12 +1288,22 @@
         # filesystem to the database (if that's necessary, and there is
         # filesystem data in need of upgrading).
         def spawnerSvcCreator(pool, store):
+            from twisted.internet import reactor
+            pool = PeerConnectionPool(reactor, store.newTransaction,
+                                      7654, schema)
+            controlSocket.addFactory(_QUEUE_ROUTE,
+                                     pool.workerListenerFactory())
+            # TODO: now that we have the shared control socket, we should get
+            # rid of the connection dispenser and make a shared / async
+            # connection pool implementation that can dispense transactions
+            # synchronously as the interface requires.
             if pool is not None and config.SharedConnectionPool:
                 self.log_warn("Using Shared Connection Pool")
                 dispenser = ConnectionDispenser(pool)
             else:
                 dispenser = None
             multi = MultiService()
+            pool.setServiceParent(multi)
             spawner = SlaveSpawnerService(
                 self, monitor, dispenser, dispatcher, options["config"],
                 inheritFDs=inheritFDs, inheritSSLFDs=inheritSSLFDs

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:44 UTC (rev 9675)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:45 UTC (rev 9676)
@@ -881,6 +881,16 @@
         self.peers.append(peer)
 
 
+    def workerListenerFactory(self):
+        """
+        Factory that listens for connections from workers.
+        """
+        f = Factory()
+        f.buildProtocol = lambda addr: ConnectionFromWorker(self.schema,
+                                                            self.workerPool)
+        return f
+
+
     def removePeerConnection(self, peer):
         """
         Remove a L{ConnectionFromPeerNode} to the active list of peers.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/0f8c19a0/attachment.html>


More information about the calendarserver-changes mailing list