[CalendarServer-changes] [6497] CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore
source_changes at macosforge.org
source_changes at macosforge.org
Mon Nov 1 14:13:40 PDT 2010
Revision: 6497
http://trac.macosforge.org/projects/calendarserver/changeset/6497
Author: glyph at apple.com
Date: 2010-11-01 14:13:37 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
reorganize some things, spike async threadpool (don't use it yet)
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
Added Paths:
-----------
CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py
CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py
Added: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py (rev 0)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py 2010-11-01 21:13:37 UTC (rev 6497)
@@ -0,0 +1,464 @@
+# -*- test-case-name: txdav.caldav.datastore -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Asynchronous multi-process connection pool.
+"""
+
+import sys
+from cPickle import dumps, loads
+from itertools import count
+
+from zope.interface import implements
+
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import returnValue
+from txdav.idav import IAsyncTransaction
+from twisted.internet.defer import Deferred
+from twisted.protocols.amp import Boolean
+from twisted.python.failure import Failure
+from twisted.protocols.amp import Argument, String, Command, AMP, Integer
+from twisted.internet import reactor as _reactor
+from twisted.application.service import Service
+from txdav.base.datastore.threadutils import ThreadHolder
+from txdav.idav import AlreadyFinishedError
+from twisted.python import log
+
+
+class BaseSqlTxn(object):
+ """
+ L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+ current process.
+ """
+ implements(IAsyncTransaction)
+
+ def __init__(self, connectionFactory, reactor=_reactor):
+ """
+ @param connectionFactory: A 0-argument callable which returns a DB-API
+ 2.0 connection.
+ """
+ self._completed = False
+ self._holder = ThreadHolder(reactor)
+ self._holder.start()
+ def initCursor():
+ # support threadlevel=1; we can't necessarily cursor() in a
+ # different thread than we do transactions in.
+ self._connection = connectionFactory()
+ self._cursor = self._connection.cursor()
+
+ # Note: no locking necessary here; since this gets submitted first, all
+ # subsequent submitted work-units will be in line behind it and the
+ # cursor will already have been initialized.
+ self._holder.submit(initCursor)
+
+
+ def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
+ self._cursor.execute(sql, args)
+ if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+ raise raiseOnZeroRowCount()
+ if self._cursor.description:
+ return self._cursor.fetchall()
+ else:
+ return None
+
+
+ noisy = False
+
+ def execSQL(self, *args, **kw):
+ result = self._holder.submit(
+ lambda : self._reallyExecSQL(*args, **kw)
+ )
+ if self.noisy:
+ def reportResult(results):
+ sys.stdout.write("\n".join([
+ "",
+ "SQL: %r %r" % (args, kw),
+ "Results: %r" % (results,),
+ "",
+ ]))
+ return results
+ result.addBoth(reportResult)
+ return result
+
+
+ def commit(self):
+ if not self._completed:
+ self._completed = True
+ def reallyCommit():
+ self._connection.commit()
+ result = self._holder.submit(reallyCommit)
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def abort(self):
+ if not self._completed:
+ def reallyAbort():
+ self._connection.rollback()
+ self._completed = True
+ result = self._holder.submit(reallyAbort)
+ return result
+ else:
+ raise AlreadyFinishedError()
+
+
+ def __del__(self):
+ if not self._completed:
+ print 'CommonStoreTransaction.__del__: OK'
+ self.abort()
+
+
+ def reset(self):
+ """
+ Call this when placing this transaction back into the pool.
+
+ @raise RuntimeError: if the transaction has not been committed or
+ aborted.
+ """
+ if not self._completed:
+ raise RuntimeError("Attempt to re-set active transaction.")
+
+
+ def stop(self):
+ """
+ Release the thread and database connection associated with this
+ transaction.
+ """
+ self._stopped = True
+ self._holder.submit(self._connection.close)
+ return self._holder.stop()
+
+
+
+class PooledDBAPITransaction(BaseSqlTxn):
+
+ def __init__(self, pool):
+ self.pool = pool
+ super(PooledDBAPITransaction, self).__init__(
+ self.pool.connectionFactory,
+ self.pool.reactor
+ )
+
+
+ def commit(self):
+ return self.repoolAfter(super(PooledDBAPITransaction, self).commit())
+
+
+ def abort(self):
+ return self.repoolAfter(super(PooledDBAPITransaction, self).abort())
+
+
+ def repoolAfter(self, d):
+ def repool(result):
+ self.pool.reclaim(self)
+ return result
+ return d.addCallback(repool)
+
+
+
+class ConnectionPool(Service, object):
+ """
+ This is a central service that has a threadpool and executes SQL statements
+ asynchronously, in a pool.
+ """
+
+ reactor = _reactor
+
+ def __init__(self, connectionFactory):
+ super(ConnectionPool, self).__init__()
+ self.free = []
+ self.busy = []
+ self.connectionFactory = connectionFactory
+
+
+ def startService(self):
+ """
+ No startup necessary.
+ """
+
+
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Forcibly abort any outstanding transactions.
+ """
+ for busy in self.busy:
+ try:
+ yield busy.abort()
+ except:
+ log.err()
+
+
+ def connection(self):
+ if self.free:
+ txn = self.free.pop(0)
+ else:
+ txn = PooledDBAPITransaction(self)
+ self.busy.append(txn)
+ return self.txn
+
+
+ def reclaim(self, txn):
+ txn.reset()
+ self.free.append(txn)
+ self.busy.remove(txn)
+
+
+
+def txnarg():
+ return [('transactionID', Integer())]
+
+
+
+class Pickle(Argument):
+ """
+ A pickle sent over AMP. This is to serialize the 'args' argument to
+ execSQL, which is the dynamically-typed 'args' list argument to a DB-API
+ C{execute} function, as well as its dynamically-typed result ('rows').
+
+ This should be cleaned up into a nicer structure, but this is not a network
+ protocol, so we can be a little relaxed about security.
+ """
+
+ def toString(self, inObject):
+ return dumps(inObject)
+
+ def fromString(self, inString):
+ return loads(inString)
+
+
+
+
+class StartTxn(Command):
+ """
+ Start a transaction, identified with an ID generated by the client.
+ """
+ arguments = txnarg()
+
+
+
+class ExecSQL(Command):
+ """
+ Execute an SQL statement.
+ """
+ arguments = [('sql', String()),
+ ('queryID', String()),
+ ('args', Pickle())] + txnarg()
+
+
+
+class Row(Command):
+ """
+ A row has been returned. Sent from server to client in response to
+ L{ExecSQL}.
+ """
+
+ arguments = [('queryID', String()),
+ ('row', Pickle())]
+
+
+
+class QueryComplete(Command):
+ """
+ A query issued with ExecSQL is complete.
+ """
+
+ arguments = [('queryID', String()),
+ ('norows', Boolean())]
+
+
+
+class Commit(Command):
+ arguments = txnarg()
+
+
+
+class Abort(Command):
+ arguments = txnarg()
+
+
+
+class _NoRows(Exception):
+ """
+ Placeholder exception to report zero rows.
+ """
+
+
+class ConnectionPoolConnection(AMP):
+ """
+ A L{ConnectionPoolConnection} is a single connection to a
+ L{ConnectionPool}.
+ """
+
+ def __init__(self, pool):
+ """
+ Initialize a mapping of transaction IDs to transaction objects.
+ """
+ super(ConnectionPoolConnection, self).__init__()
+ self.pool = pool
+
+
+ @StartTxn.responder
+ def start(self, transactionID):
+ self._txns[transactionID] = self.pool.connection()
+ return {}
+
+
+ @ExecSQL.responder
+ @inlineCallbacks
+ def sendSQL(self, transactionID, queryID, sql, args):
+ norows = True
+ try:
+ rows = yield self._txns[transactionID].execSQL(sql, args)
+ except _NoRows:
+ pass
+ else:
+ if rows is not None:
+ for row in rows:
+ norows = False
+ self.callRemote(Row, queryID=queryID, row=row)
+ self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+ returnValue({})
+
+
+ @inlineCallbacks
+ def _complete(self, transactionID, thunk):
+ txn = self._txns.pop(transactionID)
+ try:
+ yield thunk(txn)
+ returnValue({})
+ finally:
+ self.pool.reclaim(txn)
+
+
+ @Commit.responder
+ def commit(self, transactionID):
+ """
+ Successfully complete the given transaction.
+ """
+ return self._complete(transactionID, lambda x: x.commit())
+
+
+ @Abort.responder
+ def abort(self, transactionID):
+ """
+ Roll back the given transaction.
+ """
+ return self._complete(transactionID, lambda x: x.abort())
+
+
+
+class ConnectionPoolClient(AMP):
+ """
+ A client which can execute SQL.
+ """
+ def __init__(self):
+ super(ConnectionPoolClient, self).__init__()
+ self._nextID = count().next
+ self._txns = {}
+ self._queries = {}
+
+
+ def newTransaction(self):
+ txnid = str(self._nextID())
+ txn = Transaction(client=self, transactionID=txnid)
+ self._txns[txnid] = txn
+ self.callRemote(StartTxn, transactionID=txnid)
+ return txn
+
+
+ @Row.responder
+ def row(self, queryID, row):
+ self._queries[queryID].row(row)
+ return {}
+
+
+ @QueryComplete.responder
+ def complete(self, queryID, norows):
+ self.queries.pop(queryID).done(norows)
+ return {}
+
+
+
+class _Query(object):
+ def __init__(self, raiseOnZeroRowCount):
+ self.results = []
+ self.deferred = Deferred()
+ self.raiseOnZeroRowCount = raiseOnZeroRowCount
+
+
+ def row(self, row):
+ """
+ A row was received.
+ """
+ self.results.append(row)
+
+
+ def done(self, norows):
+ """
+ The query is complete.
+
+ @param norows: A boolean. True if there were any rows.
+ """
+ if norows and self.raiseOnZeroRowCount is not None:
+ exc = self.raiseOnZeroRowCount()
+ self.deferred.errback(Failure(exc))
+ else:
+ self.deferred.callback(self.results)
+
+
+
+
+class Transaction(object):
+ """
+ Async transaction implementation.
+ """
+
+ implements(IAsyncTransaction)
+
+ def __init__(self, client, transactionID):
+ """
+ Initialize a transaction with a L{ConnectionPoolClient} and a unique
+ transaction identifier.
+ """
+ self.client = client
+ self.transactionID = transactionID
+
+
+ def execSQL(self, sql, args, raiseOnZeroRowCount=None):
+ queryID = self.client._nextID()
+ d = Deferred()
+ self.client._queries[queryID] = _Query(raiseOnZeroRowCount)
+ self.client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args)
+ return d
+
+
+ def complete(self, command):
+ return self.client.callRemote(
+ command, transactionID=self.transactionID
+ ).addCallback(lambda x: None)
+
+
+ def commit(self):
+ return self.complete(Commit)
+
+
+ def abort(self):
+ return self.complete(Abort)
+
+
Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:13:12 UTC (rev 6496)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:13:37 UTC (rev 6497)
@@ -18,17 +18,12 @@
"""
Run and manage PostgreSQL as a subprocess.
"""
+
import os
import pwd
-import sys
-#import thread
-from Queue import Queue
-
from hashlib import md5
-from zope.interface import implements
-
from twisted.python.procutils import which
from twisted.internet.protocol import ProcessProtocol
@@ -40,10 +35,8 @@
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor
-from twisted.python.failure import Failure
-from txdav.idav import IAsyncTransaction
-from txdav.idav import AlreadyFinishedError
from twisted.internet.defer import Deferred
+from txdav.base.datastore.asyncsqlpool import BaseSqlTxn
from twisted.application.service import MultiService
@@ -222,6 +215,7 @@
"""
self.output.append(data)
+
def errReceived(self, data):
"""
Some output was received on stderr.
@@ -237,182 +231,23 @@
-_DONE = object()
-
-_STATE_STOPPED = 'STOPPED'
-_STATE_RUNNING = 'RUNNING'
-_STATE_STOPPING = 'STOPPING'
-
-class ThreadHolder(object):
+class UnpooledSqlTxn(BaseSqlTxn):
"""
- A queue which will hold a reactor threadpool thread open until all of the
- work in that queue is done.
+ Unpooled variant (releases thread immediately on commit or abort),
+ currently exclusively for testing.
"""
-
- def __init__(self, reactor):
- self._reactor = reactor
- self._state = _STATE_STOPPED
- self._stopper = None
- self._q = None
-
-
- def _run(self):
- """
- Worker function which runs in a non-reactor thread.
- """
- while True:
- work = self._q.get()
- if work is _DONE:
- def finishStopping():
- self._state = _STATE_STOPPED
- self._q = None
- s = self._stopper
- self._stopper = None
- s.callback(None)
- self._reactor.callFromThread(finishStopping)
- return
- self._oneWorkUnit(*work)
-
-
- def _oneWorkUnit(self, deferred, instruction):
- try:
- result = instruction()
- except:
- etype, evalue, etb = sys.exc_info()
- def relayFailure():
- f = Failure(evalue, etype, etb)
- deferred.errback(f)
- self._reactor.callFromThread(relayFailure)
- else:
- self._reactor.callFromThread(deferred.callback, result)
-
-
- def submit(self, work):
- """
- Submit some work to be run.
-
- @param work: a 0-argument callable, which will be run in a thread.
-
- @return: L{Deferred} that fires with the result of L{work}
- """
- d = Deferred()
- self._q.put((d, work))
- return d
-
-
- def start(self):
- """
- Start this thing, if it's stopped.
- """
- if self._state != _STATE_STOPPED:
- raise RuntimeError("Not stopped.")
- self._state = _STATE_RUNNING
- self._q = Queue(0)
- self._reactor.callInThread(self._run)
-
-
- def stop(self):
- """
- Stop this thing and release its thread, if it's running.
- """
- if self._state != _STATE_RUNNING:
- raise RuntimeError("Not running.")
- s = self._stopper = Deferred()
- self._state = _STATE_STOPPING
- self._q.put(_DONE)
- return s
-
-
-
-class ThisProcessSqlTxn(object):
- """
- L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
- current process.
- """
- implements(IAsyncTransaction)
-
- def __init__(self, connectionFactory):
- """
- @param connectionFactory: A 0-argument callable which returns a DB-API
- 2.0 connection.
- """
- self._completed = False
- self._holder = ThreadHolder(reactor)
- self._holder.start()
- def initCursor():
- # support threadlevel=1; we can't necessarily cursor() in a
- # different thread than we do transactions in.
-
- # FIXME: may need to be pooling ThreadHolders along with
- # connections, if threadlevel=1 requires connect() be called in the
- # same thread as cursor() et. al.
- self._connection = connectionFactory()
- self._cursor = self._connection.cursor()
- self._holder.submit(initCursor)
-
-
- def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
- self._cursor.execute(sql, args)
- if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
- raise raiseOnZeroRowCount()
- if self._cursor.description:
- return self._cursor.fetchall()
- else:
- return None
-
-
- noisy = False
-
- def execSQL(self, *args, **kw):
- result = self._holder.submit(
- lambda : self._reallyExecSQL(*args, **kw)
- )
- if self.noisy:
- def reportResult(results):
- sys.stdout.write("\n".join([
- "",
- "SQL: %r %r" % (args, kw),
- "Results: %r" % (results,),
- "",
- ]))
- return results
- result.addBoth(reportResult)
+ def commit(self):
+ result = super(UnpooledSqlTxn, self).commit()
+ self.stop()
return result
-
- def commit(self):
- if not self._completed:
- self._completed = True
- def reallyCommit():
- self._connection.commit()
- self._connection.close()
- result = self._holder.submit(reallyCommit)
- self._holder.stop()
- return result
- else:
- raise AlreadyFinishedError()
-
-
def abort(self):
- if not self._completed:
- def reallyAbort():
- self._connection.rollback()
- self._connection.close()
- self._completed = True
- result = self._holder.submit(reallyAbort)
- self._holder.stop()
- return result
- else:
- raise AlreadyFinishedError()
+ result = super(UnpooledSqlTxn, self).abort()
+ self.stop()
+ return result
- def __del__(self):
- if not self._completed:
- print 'CommonStoreTransaction.__del__: OK'
- self.abort()
-
-
class PostgresService(MultiService):
def __init__(self, dataStoreDirectory, subServiceFactory,
@@ -516,6 +351,7 @@
w = DiagnosticConnectionWrapper(connection, label)
c = w.cursor()
+
# Turn on standard conforming strings. This option is _required_ if
# you want to get correct behavior out of parameter-passing with the
# pgdb module. If it is not set then the server is potentially
@@ -530,6 +366,12 @@
# preferable to see some exceptions while we're in this state than to
# have the entire worker process hang.
c.execute("set statement_timeout=30000")
+
+ # pgdb (as per DB-API 2.0) automatically puts the connection into a
+ # 'executing a transaction' state when _any_ statement is executed on
+ # it (even these not-touching-any-data statements); make sure to commit
+ # first so that the application sees a fresh transaction, and the
+ # connection can safely be pooled without executing anything on it.
w.commit()
c.close()
return w
@@ -539,7 +381,7 @@
"""
Create a L{IAsyncTransaction} based on a thread in the current process.
"""
- return ThisProcessSqlTxn(lambda : self.produceConnection(label))
+ return UnpooledSqlTxn(lambda : self.produceConnection(label))
def ready(self):
Added: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py (rev 0)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py 2010-11-01 21:13:37 UTC (rev 6497)
@@ -0,0 +1,111 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+from Queue import Queue
+
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred
+
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+ """
+ A queue which will hold a reactor threadpool thread open until all of the
+ work in that queue is done.
+ """
+
+ def __init__(self, reactor):
+ self._reactor = reactor
+ self._state = _STATE_STOPPED
+ self._stopper = None
+ self._q = None
+
+
+ def _run(self):
+ """
+ Worker function which runs in a non-reactor thread.
+ """
+ while True:
+ work = self._q.get()
+ if work is _DONE:
+ def finishStopping():
+ self._state = _STATE_STOPPED
+ self._q = None
+ s = self._stopper
+ self._stopper = None
+ s.callback(None)
+ self._reactor.callFromThread(finishStopping)
+ return
+ self._oneWorkUnit(*work)
+
+
+ def _oneWorkUnit(self, deferred, instruction):
+ try:
+ result = instruction()
+ except:
+ etype, evalue, etb = sys.exc_info()
+ def relayFailure():
+ f = Failure(evalue, etype, etb)
+ deferred.errback(f)
+ self._reactor.callFromThread(relayFailure)
+ else:
+ self._reactor.callFromThread(deferred.callback, result)
+
+
+ def submit(self, work):
+ """
+ Submit some work to be run.
+
+ @param work: a 0-argument callable, which will be run in a thread.
+
+ @return: L{Deferred} that fires with the result of L{work}
+ """
+ d = Deferred()
+ self._q.put((d, work))
+ return d
+
+
+ def start(self):
+ """
+ Start this thing, if it's stopped.
+ """
+ if self._state != _STATE_STOPPED:
+ raise RuntimeError("Not stopped.")
+ self._state = _STATE_RUNNING
+ self._q = Queue(0)
+ self._reactor.callInThread(self._run)
+
+
+ def stop(self):
+ """
+ Stop this thing and release its thread, if it's running.
+ """
+ if self._state != _STATE_RUNNING:
+ raise RuntimeError("Not running.")
+ s = self._stopper = Deferred()
+ self._state = _STATE_STOPPING
+ self._q.put(_DONE)
+ return s
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/49dd0931/attachment-0001.html>
More information about the calendarserver-changes
mailing list