[CalendarServer-changes] [6497] CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore

source_changes at macosforge.org source_changes at macosforge.org
Mon Nov 1 14:13:40 PDT 2010


Revision: 6497
          http://trac.macosforge.org/projects/calendarserver/changeset/6497
Author:   glyph at apple.com
Date:     2010-11-01 14:13:37 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
reorganize some things, spike async threadpool (don't use it yet)

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py

Added Paths:
-----------
    CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py
    CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py

Added: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py	2010-11-01 21:13:37 UTC (rev 6497)
@@ -0,0 +1,464 @@
+# -*- test-case-name: txdav.caldav.datastore -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Asynchronous multi-process connection pool.
+"""
+
+import sys
+from cPickle import dumps, loads
+from itertools import count
+
+from zope.interface import implements
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import returnValue
+from txdav.idav import IAsyncTransaction
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import Boolean
+from twisted.python.failure import Failure
+from twisted.protocols.amp import Argument, String, Command, AMP, Integer
+from twisted.internet import reactor as _reactor
+from twisted.application.service import Service
+from txdav.base.datastore.threadutils import ThreadHolder
+from txdav.idav import AlreadyFinishedError
+from twisted.python import log
+
+
+class BaseSqlTxn(object):
+    """
+    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+    current process.
+    """
+    implements(IAsyncTransaction)
+
+    def __init__(self, connectionFactory, reactor=_reactor):
+        """
+        @param connectionFactory: A 0-argument callable which returns a DB-API
+            2.0 connection.
+        """
+        self._completed = False
+        self._holder = ThreadHolder(reactor)
+        self._holder.start()
+        def initCursor():
+            # support threadlevel=1; we can't necessarily cursor() in a
+            # different thread than we do transactions in.
+            self._connection = connectionFactory()
+            self._cursor = self._connection.cursor()
+
+        # Note: no locking necessary here; since this gets submitted first, all
+        # subsequent submitted work-units will be in line behind it and the
+        # cursor will already have been initialized.
+        self._holder.submit(initCursor)
+
+
+    def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
+        self._cursor.execute(sql, args)
+        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+            raise raiseOnZeroRowCount()
+        if self._cursor.description:
+            return self._cursor.fetchall()
+        else:
+            return None
+
+
+    noisy = False
+
+    def execSQL(self, *args, **kw):
+        result = self._holder.submit(
+            lambda : self._reallyExecSQL(*args, **kw)
+        )
+        if self.noisy:
+            def reportResult(results):
+                sys.stdout.write("\n".join([
+                    "",
+                    "SQL: %r %r" % (args, kw),
+                    "Results: %r" % (results,),
+                    "",
+                    ]))
+                return results
+            result.addBoth(reportResult)
+        return result
+
+
+    def commit(self):
+        if not self._completed:
+            self._completed = True
+            def reallyCommit():
+                self._connection.commit()
+            result = self._holder.submit(reallyCommit)
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def abort(self):
+        if not self._completed:
+            def reallyAbort():
+                self._connection.rollback()
+            self._completed = True
+            result = self._holder.submit(reallyAbort)
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def __del__(self):
+        if not self._completed:
+            print 'CommonStoreTransaction.__del__: OK'
+            self.abort()
+
+
+    def reset(self):
+        """
+        Call this when placing this transaction back into the pool.
+
+        @raise RuntimeError: if the transaction has not been committed or
+            aborted.
+        """
+        if not self._completed:
+            raise RuntimeError("Attempt to re-set active transaction.")
+
+
+    def stop(self):
+        """
+        Release the thread and database connection associated with this
+        transaction.
+        """
+        self._stopped = True
+        self._holder.submit(self._connection.close)
+        return self._holder.stop()
+
+
+
+class PooledDBAPITransaction(BaseSqlTxn):
+
+    def __init__(self, pool):
+        self.pool = pool
+        super(PooledDBAPITransaction, self).__init__(
+            self.pool.connectionFactory,
+            self.pool.reactor
+        )
+
+
+    def commit(self):
+        return self.repoolAfter(super(PooledDBAPITransaction, self).commit())
+
+
+    def abort(self):
+        return self.repoolAfter(super(PooledDBAPITransaction, self).abort())
+
+
+    def repoolAfter(self, d):
+        def repool(result):
+            self.pool.reclaim(self)
+            return result
+        return d.addCallback(repool)
+
+
+
+class ConnectionPool(Service, object):
+    """
+    This is a central service that has a threadpool and executes SQL statements
+    asynchronously, in a pool.
+    """
+
+    reactor = _reactor
+
+    def __init__(self, connectionFactory):
+        super(ConnectionPool, self).__init__()
+        self.free = []
+        self.busy = []
+        self.connectionFactory = connectionFactory
+
+
+    def startService(self):
+        """
+        No startup necessary.
+        """
+
+
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Forcibly abort any outstanding transactions.
+        """
+        for busy in self.busy:
+            try:
+                yield busy.abort()
+            except:
+                log.err()
+
+
+    def connection(self):
+        if self.free:
+            txn = self.free.pop(0)
+        else:
+            txn = PooledDBAPITransaction(self)
+        self.busy.append(txn)
+        return self.txn
+
+
+    def reclaim(self, txn):
+        txn.reset()
+        self.free.append(txn)
+        self.busy.remove(txn)
+
+
+
+def txnarg():
+    return [('transactionID', Integer())]
+
+
+
+class Pickle(Argument):
+    """
+    A pickle sent over AMP.  This is to serialize the 'args' argument to
+    execSQL, which is the dynamically-typed 'args' list argument to a DB-API
+    C{execute} function, as well as its dynamically-typed result ('rows').
+
+    This should be cleaned up into a nicer structure, but this is not a network
+    protocol, so we can be a little relaxed about security.
+    """
+
+    def toString(self, inObject):
+        return dumps(inObject)
+
+    def fromString(self, inString):
+        return loads(inString)
+
+
+
+
+class StartTxn(Command):
+    """
+    Start a transaction, identified with an ID generated by the client.
+    """
+    arguments = txnarg()
+
+
+
+class ExecSQL(Command):
+    """
+    Execute an SQL statement.
+    """
+    arguments = [('sql', String()),
+                 ('queryID', String()),
+                 ('args', Pickle())] + txnarg()
+
+
+
+class Row(Command):
+    """
+    A row has been returned.  Sent from server to client in response to
+    L{ExecSQL}.
+    """
+
+    arguments = [('queryID', String()),
+                 ('row', Pickle())]
+
+
+
+class QueryComplete(Command):
+    """
+    A query issued with ExecSQL is complete.
+    """
+
+    arguments = [('queryID', String()),
+                 ('norows', Boolean())]
+
+
+
+class Commit(Command):
+    arguments = txnarg()
+
+
+
+class Abort(Command):
+    arguments = txnarg()
+
+
+
+class _NoRows(Exception):
+    """
+    Placeholder exception to report zero rows.
+    """
+
+
+class ConnectionPoolConnection(AMP):
+    """
+    A L{ConnectionPoolConnection} is a single connection to a
+    L{ConnectionPool}.
+    """
+
+    def __init__(self, pool):
+        """
+        Initialize a mapping of transaction IDs to transaction objects.
+        """
+        super(ConnectionPoolConnection, self).__init__()
+        self.pool = pool
+
+
+    @StartTxn.responder
+    def start(self, transactionID):
+        self._txns[transactionID] = self.pool.connection()
+        return {}
+
+
+    @ExecSQL.responder
+    @inlineCallbacks
+    def sendSQL(self, transactionID, queryID, sql, args):
+        norows = True
+        try:
+            rows = yield self._txns[transactionID].execSQL(sql, args)
+        except _NoRows:
+            pass
+        else:
+            if rows is not None:
+                for row in rows:
+                    norows = False
+                    self.callRemote(Row, queryID=queryID, row=row)
+        self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+        returnValue({})
+
+
+    @inlineCallbacks
+    def _complete(self, transactionID, thunk):
+        txn = self._txns.pop(transactionID)
+        try:
+            yield thunk(txn)
+            returnValue({})
+        finally:
+            self.pool.reclaim(txn)
+
+
+    @Commit.responder
+    def commit(self, transactionID):
+        """
+        Successfully complete the given transaction.
+        """
+        return self._complete(transactionID, lambda x: x.commit())
+
+
+    @Abort.responder
+    def abort(self, transactionID):
+        """
+        Roll back the given transaction.
+        """
+        return self._complete(transactionID, lambda x: x.abort())
+
+
+
+class ConnectionPoolClient(AMP):
+    """
+    A client which can execute SQL.
+    """
+    def __init__(self):
+        super(ConnectionPoolClient, self).__init__()
+        self._nextID = count().next
+        self._txns = {}
+        self._queries = {}
+
+
+    def newTransaction(self):
+        txnid = str(self._nextID())
+        txn = Transaction(client=self, transactionID=txnid)
+        self._txns[txnid] = txn
+        self.callRemote(StartTxn, transactionID=txnid)
+        return txn
+
+
+    @Row.responder
+    def row(self, queryID, row):
+        self._queries[queryID].row(row)
+        return {}
+
+
+    @QueryComplete.responder
+    def complete(self, queryID, norows):
+        self.queries.pop(queryID).done(norows)
+        return {}
+
+
+
+class _Query(object):
+    def __init__(self, raiseOnZeroRowCount):
+        self.results = []
+        self.deferred = Deferred()
+        self.raiseOnZeroRowCount = raiseOnZeroRowCount
+
+
+    def row(self, row):
+        """
+        A row was received.
+        """
+        self.results.append(row)
+
+
+    def done(self, norows):
+        """
+        The query is complete.
+
+        @param norows: A boolean.  True if there were any rows.
+        """
+        if norows and self.raiseOnZeroRowCount is not None:
+            exc = self.raiseOnZeroRowCount()
+            self.deferred.errback(Failure(exc))
+        else:
+            self.deferred.callback(self.results)
+
+
+
+
+class Transaction(object):
+    """
+    Async transaction implementation.
+    """
+
+    implements(IAsyncTransaction)
+
+    def __init__(self, client, transactionID):
+        """
+        Initialize a transaction with a L{ConnectionPoolClient} and a unique
+        transaction identifier.
+        """
+        self.client = client
+        self.transactionID = transactionID
+
+
+    def execSQL(self, sql, args, raiseOnZeroRowCount=None):
+        queryID = self.client._nextID()
+        d = Deferred()
+        self.client._queries[queryID] = _Query(raiseOnZeroRowCount)
+        self.client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args)
+        return d
+
+
+    def complete(self, command):
+        return self.client.callRemote(
+            command, transactionID=self.transactionID
+            ).addCallback(lambda x: None)
+
+
+    def commit(self):
+        return self.complete(Commit)
+
+
+    def abort(self):
+        return self.complete(Abort)
+
+

Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py	2010-11-01 21:13:12 UTC (rev 6496)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py	2010-11-01 21:13:37 UTC (rev 6497)
@@ -18,17 +18,12 @@
 """
 Run and manage PostgreSQL as a subprocess.
 """
+
 import os
 import pwd
-import sys
-#import thread
 
-from Queue import Queue
-
 from hashlib import md5
 
-from zope.interface import implements
-
 from twisted.python.procutils import which
 from twisted.internet.protocol import ProcessProtocol
 
@@ -40,10 +35,8 @@
 
 from twisted.protocols.basic import LineReceiver
 from twisted.internet import reactor
-from twisted.python.failure import Failure
-from txdav.idav import IAsyncTransaction
-from txdav.idav import AlreadyFinishedError
 from twisted.internet.defer import Deferred
+from txdav.base.datastore.asyncsqlpool import BaseSqlTxn
 
 from twisted.application.service import MultiService
 
@@ -222,6 +215,7 @@
         """
         self.output.append(data)
 
+
     def errReceived(self, data):
         """
         Some output was received on stderr.
@@ -237,182 +231,23 @@
 
 
 
-_DONE = object()
-
-_STATE_STOPPED = 'STOPPED'
-_STATE_RUNNING = 'RUNNING'
-_STATE_STOPPING = 'STOPPING'
-
-class ThreadHolder(object):
+class UnpooledSqlTxn(BaseSqlTxn):
     """
-    A queue which will hold a reactor threadpool thread open until all of the
-    work in that queue is done.
+    Unpooled variant (releases thread immediately on commit or abort),
+    currently exclusively for testing.
     """
-
-    def __init__(self, reactor):
-        self._reactor = reactor
-        self._state = _STATE_STOPPED
-        self._stopper = None
-        self._q = None
-
-
-    def _run(self):
-        """
-        Worker function which runs in a non-reactor thread.
-        """
-        while True:
-            work = self._q.get()
-            if work is _DONE:
-                def finishStopping():
-                    self._state = _STATE_STOPPED
-                    self._q = None
-                    s = self._stopper
-                    self._stopper = None
-                    s.callback(None)
-                self._reactor.callFromThread(finishStopping)
-                return
-            self._oneWorkUnit(*work)
-
-
-    def _oneWorkUnit(self, deferred, instruction):
-        try: 
-            result = instruction()
-        except:
-            etype, evalue, etb = sys.exc_info()
-            def relayFailure():
-                f = Failure(evalue, etype, etb)
-                deferred.errback(f)
-            self._reactor.callFromThread(relayFailure)
-        else:
-            self._reactor.callFromThread(deferred.callback, result)
-
-
-    def submit(self, work):
-        """
-        Submit some work to be run.
-
-        @param work: a 0-argument callable, which will be run in a thread.
-
-        @return: L{Deferred} that fires with the result of L{work}
-        """
-        d = Deferred()
-        self._q.put((d, work))
-        return d
-
-
-    def start(self):
-        """
-        Start this thing, if it's stopped.
-        """
-        if self._state != _STATE_STOPPED:
-            raise RuntimeError("Not stopped.")
-        self._state = _STATE_RUNNING
-        self._q = Queue(0)
-        self._reactor.callInThread(self._run)
-
-
-    def stop(self):
-        """
-        Stop this thing and release its thread, if it's running.
-        """
-        if self._state != _STATE_RUNNING:
-            raise RuntimeError("Not running.")
-        s = self._stopper = Deferred()
-        self._state = _STATE_STOPPING
-        self._q.put(_DONE)
-        return s
-
-
-
-class ThisProcessSqlTxn(object):
-    """
-    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
-    current process.
-    """
-    implements(IAsyncTransaction)
-
-    def __init__(self, connectionFactory):
-        """
-        @param connectionFactory: A 0-argument callable which returns a DB-API
-            2.0 connection.
-        """
-        self._completed = False
-        self._holder = ThreadHolder(reactor)
-        self._holder.start()
-        def initCursor():
-            # support threadlevel=1; we can't necessarily cursor() in a
-            # different thread than we do transactions in.
-
-            # FIXME: may need to be pooling ThreadHolders along with
-            # connections, if threadlevel=1 requires connect() be called in the
-            # same thread as cursor() et. al.
-            self._connection = connectionFactory()
-            self._cursor = self._connection.cursor()
-        self._holder.submit(initCursor)
-
-
-    def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
-        self._cursor.execute(sql, args)
-        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
-            raise raiseOnZeroRowCount()
-        if self._cursor.description:
-            return self._cursor.fetchall()
-        else:
-            return None
-
-
-    noisy = False
-
-    def execSQL(self, *args, **kw):
-        result = self._holder.submit(
-            lambda : self._reallyExecSQL(*args, **kw)
-        )
-        if self.noisy:
-            def reportResult(results):
-                sys.stdout.write("\n".join([
-                    "",
-                    "SQL: %r %r" % (args, kw),
-                    "Results: %r" % (results,),
-                    "",
-                    ]))
-                return results
-            result.addBoth(reportResult)
+    def commit(self):
+        result = super(UnpooledSqlTxn, self).commit()
+        self.stop()
         return result
 
-
-    def commit(self):
-        if not self._completed:
-            self._completed = True
-            def reallyCommit():
-                self._connection.commit()
-                self._connection.close()
-            result = self._holder.submit(reallyCommit)
-            self._holder.stop()
-            return result
-        else:
-            raise AlreadyFinishedError()
-
-
     def abort(self):
-        if not self._completed:
-            def reallyAbort():
-                self._connection.rollback()
-                self._connection.close()
-            self._completed = True
-            result = self._holder.submit(reallyAbort)
-            self._holder.stop()
-            return result
-        else:
-            raise AlreadyFinishedError()
+        result = super(UnpooledSqlTxn, self).abort()
+        self.stop()
+        return result
 
 
-    def __del__(self):
-        if not self._completed:
-            print 'CommonStoreTransaction.__del__: OK'
-            self.abort()
 
-
-
 class PostgresService(MultiService):
 
     def __init__(self, dataStoreDirectory, subServiceFactory,
@@ -516,6 +351,7 @@
 
         w = DiagnosticConnectionWrapper(connection, label)
         c = w.cursor()
+
         # Turn on standard conforming strings.  This option is _required_ if
         # you want to get correct behavior out of parameter-passing with the
         # pgdb module.  If it is not set then the server is potentially
@@ -530,6 +366,12 @@
         # preferable to see some exceptions while we're in this state than to
         # have the entire worker process hang.
         c.execute("set statement_timeout=30000")
+
+        # pgdb (as per DB-API 2.0) automatically puts the connection into a
+        # 'executing a transaction' state when _any_ statement is executed on
+        # it (even these not-touching-any-data statements); make sure to commit
+        # first so that the application sees a fresh transaction, and the
+        # connection can safely be pooled without executing anything on it.
         w.commit()
         c.close()
         return w
@@ -539,7 +381,7 @@
         """
         Create a L{IAsyncTransaction} based on a thread in the current process.
         """
-        return ThisProcessSqlTxn(lambda : self.produceConnection(label))
+        return UnpooledSqlTxn(lambda : self.produceConnection(label))
 
 
     def ready(self):

Added: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py	2010-11-01 21:13:37 UTC (rev 6497)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+from Queue import Queue
+
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred
+
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+    """
+    A queue which will hold a reactor threadpool thread open until all of the
+    work in that queue is done.
+    """
+
+    def __init__(self, reactor):
+        self._reactor = reactor
+        self._state = _STATE_STOPPED
+        self._stopper = None
+        self._q = None
+
+
+    def _run(self):
+        """
+        Worker function which runs in a non-reactor thread.
+        """
+        while True:
+            work = self._q.get()
+            if work is _DONE:
+                def finishStopping():
+                    self._state = _STATE_STOPPED
+                    self._q = None
+                    s = self._stopper
+                    self._stopper = None
+                    s.callback(None)
+                self._reactor.callFromThread(finishStopping)
+                return
+            self._oneWorkUnit(*work)
+
+
+    def _oneWorkUnit(self, deferred, instruction):
+        try: 
+            result = instruction()
+        except:
+            etype, evalue, etb = sys.exc_info()
+            def relayFailure():
+                f = Failure(evalue, etype, etb)
+                deferred.errback(f)
+            self._reactor.callFromThread(relayFailure)
+        else:
+            self._reactor.callFromThread(deferred.callback, result)
+
+
+    def submit(self, work):
+        """
+        Submit some work to be run.
+
+        @param work: a 0-argument callable, which will be run in a thread.
+
+        @return: L{Deferred} that fires with the result of L{work}
+        """
+        d = Deferred()
+        self._q.put((d, work))
+        return d
+
+
+    def start(self):
+        """
+        Start this thing, if it's stopped.
+        """
+        if self._state != _STATE_STOPPED:
+            raise RuntimeError("Not stopped.")
+        self._state = _STATE_RUNNING
+        self._q = Queue(0)
+        self._reactor.callInThread(self._run)
+
+
+    def stop(self):
+        """
+        Stop this thing and release its thread, if it's running.
+        """
+        if self._state != _STATE_RUNNING:
+            raise RuntimeError("Not running.")
+        s = self._stopper = Deferred()
+        self._state = _STATE_STOPPING
+        self._q.put(_DONE)
+        return s
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/49dd0931/attachment-0001.html>


More information about the calendarserver-changes mailing list