Revision: 6497 http://trac.macosforge.org/projects/calendarserver/changeset/6497 Author: glyph@apple.com Date: 2010-11-01 14:13:37 -0700 (Mon, 01 Nov 2010) Log Message: ----------- reorganize some things, spike async threadpool (don't use it yet) Modified Paths: -------------- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py Added Paths: ----------- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py Added: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py (rev 0) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/asyncsqlpool.py 2010-11-01 21:13:37 UTC (rev 6497) @@ -0,0 +1,464 @@ +# -*- test-case-name: txdav.caldav.datastore -*- +## +# Copyright (c) 2010 Apple Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Asynchronous multi-process connection pool. +""" + +import sys +from cPickle import dumps, loads +from itertools import count + +from zope.interface import implements + +from twisted.internet.defer import inlineCallbacks +from twisted.internet.defer import returnValue +from txdav.idav import IAsyncTransaction +from twisted.internet.defer import Deferred +from twisted.protocols.amp import Boolean +from twisted.python.failure import Failure +from twisted.protocols.amp import Argument, String, Command, AMP, Integer +from twisted.internet import reactor as _reactor +from twisted.application.service import Service +from txdav.base.datastore.threadutils import ThreadHolder +from txdav.idav import AlreadyFinishedError +from twisted.python import log + + +class BaseSqlTxn(object): + """ + L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the + current process. + """ + implements(IAsyncTransaction) + + def __init__(self, connectionFactory, reactor=_reactor): + """ + @param connectionFactory: A 0-argument callable which returns a DB-API + 2.0 connection. + """ + self._completed = False + self._holder = ThreadHolder(reactor) + self._holder.start() + def initCursor(): + # support threadlevel=1; we can't necessarily cursor() in a + # different thread than we do transactions in. + self._connection = connectionFactory() + self._cursor = self._connection.cursor() + + # Note: no locking necessary here; since this gets submitted first, all + # subsequent submitted work-units will be in line behind it and the + # cursor will already have been initialized. + self._holder.submit(initCursor) + + + def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None): + self._cursor.execute(sql, args) + if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0: + raise raiseOnZeroRowCount() + if self._cursor.description: + return self._cursor.fetchall() + else: + return None + + + noisy = False + + def execSQL(self, *args, **kw): + result = self._holder.submit( + lambda : self._reallyExecSQL(*args, **kw) + ) + if self.noisy: + def reportResult(results): + sys.stdout.write("\n".join([ + "", + "SQL: %r %r" % (args, kw), + "Results: %r" % (results,), + "", + ])) + return results + result.addBoth(reportResult) + return result + + + def commit(self): + if not self._completed: + self._completed = True + def reallyCommit(): + self._connection.commit() + result = self._holder.submit(reallyCommit) + return result + else: + raise AlreadyFinishedError() + + + def abort(self): + if not self._completed: + def reallyAbort(): + self._connection.rollback() + self._completed = True + result = self._holder.submit(reallyAbort) + return result + else: + raise AlreadyFinishedError() + + + def __del__(self): + if not self._completed: + print 'CommonStoreTransaction.__del__: OK' + self.abort() + + + def reset(self): + """ + Call this when placing this transaction back into the pool. + + @raise RuntimeError: if the transaction has not been committed or + aborted. + """ + if not self._completed: + raise RuntimeError("Attempt to re-set active transaction.") + + + def stop(self): + """ + Release the thread and database connection associated with this + transaction. + """ + self._stopped = True + self._holder.submit(self._connection.close) + return self._holder.stop() + + + +class PooledDBAPITransaction(BaseSqlTxn): + + def __init__(self, pool): + self.pool = pool + super(PooledDBAPITransaction, self).__init__( + self.pool.connectionFactory, + self.pool.reactor + ) + + + def commit(self): + return self.repoolAfter(super(PooledDBAPITransaction, self).commit()) + + + def abort(self): + return self.repoolAfter(super(PooledDBAPITransaction, self).abort()) + + + def repoolAfter(self, d): + def repool(result): + self.pool.reclaim(self) + return result + return d.addCallback(repool) + + + +class ConnectionPool(Service, object): + """ + This is a central service that has a threadpool and executes SQL statements + asynchronously, in a pool. + """ + + reactor = _reactor + + def __init__(self, connectionFactory): + super(ConnectionPool, self).__init__() + self.free = [] + self.busy = [] + self.connectionFactory = connectionFactory + + + def startService(self): + """ + No startup necessary. + """ + + + @inlineCallbacks + def stopService(self): + """ + Forcibly abort any outstanding transactions. + """ + for busy in self.busy: + try: + yield busy.abort() + except: + log.err() + + + def connection(self): + if self.free: + txn = self.free.pop(0) + else: + txn = PooledDBAPITransaction(self) + self.busy.append(txn) + return self.txn + + + def reclaim(self, txn): + txn.reset() + self.free.append(txn) + self.busy.remove(txn) + + + +def txnarg(): + return [('transactionID', Integer())] + + + +class Pickle(Argument): + """ + A pickle sent over AMP. This is to serialize the 'args' argument to + execSQL, which is the dynamically-typed 'args' list argument to a DB-API + C{execute} function, as well as its dynamically-typed result ('rows'). + + This should be cleaned up into a nicer structure, but this is not a network + protocol, so we can be a little relaxed about security. + """ + + def toString(self, inObject): + return dumps(inObject) + + def fromString(self, inString): + return loads(inString) + + + + +class StartTxn(Command): + """ + Start a transaction, identified with an ID generated by the client. + """ + arguments = txnarg() + + + +class ExecSQL(Command): + """ + Execute an SQL statement. + """ + arguments = [('sql', String()), + ('queryID', String()), + ('args', Pickle())] + txnarg() + + + +class Row(Command): + """ + A row has been returned. Sent from server to client in response to + L{ExecSQL}. + """ + + arguments = [('queryID', String()), + ('row', Pickle())] + + + +class QueryComplete(Command): + """ + A query issued with ExecSQL is complete. + """ + + arguments = [('queryID', String()), + ('norows', Boolean())] + + + +class Commit(Command): + arguments = txnarg() + + + +class Abort(Command): + arguments = txnarg() + + + +class _NoRows(Exception): + """ + Placeholder exception to report zero rows. + """ + + +class ConnectionPoolConnection(AMP): + """ + A L{ConnectionPoolConnection} is a single connection to a + L{ConnectionPool}. + """ + + def __init__(self, pool): + """ + Initialize a mapping of transaction IDs to transaction objects. + """ + super(ConnectionPoolConnection, self).__init__() + self.pool = pool + + + @StartTxn.responder + def start(self, transactionID): + self._txns[transactionID] = self.pool.connection() + return {} + + + @ExecSQL.responder + @inlineCallbacks + def sendSQL(self, transactionID, queryID, sql, args): + norows = True + try: + rows = yield self._txns[transactionID].execSQL(sql, args) + except _NoRows: + pass + else: + if rows is not None: + for row in rows: + norows = False + self.callRemote(Row, queryID=queryID, row=row) + self.callRemote(QueryComplete, queryID=queryID, norows=norows) + returnValue({}) + + + @inlineCallbacks + def _complete(self, transactionID, thunk): + txn = self._txns.pop(transactionID) + try: + yield thunk(txn) + returnValue({}) + finally: + self.pool.reclaim(txn) + + + @Commit.responder + def commit(self, transactionID): + """ + Successfully complete the given transaction. + """ + return self._complete(transactionID, lambda x: x.commit()) + + + @Abort.responder + def abort(self, transactionID): + """ + Roll back the given transaction. + """ + return self._complete(transactionID, lambda x: x.abort()) + + + +class ConnectionPoolClient(AMP): + """ + A client which can execute SQL. + """ + def __init__(self): + super(ConnectionPoolClient, self).__init__() + self._nextID = count().next + self._txns = {} + self._queries = {} + + + def newTransaction(self): + txnid = str(self._nextID()) + txn = Transaction(client=self, transactionID=txnid) + self._txns[txnid] = txn + self.callRemote(StartTxn, transactionID=txnid) + return txn + + + @Row.responder + def row(self, queryID, row): + self._queries[queryID].row(row) + return {} + + + @QueryComplete.responder + def complete(self, queryID, norows): + self.queries.pop(queryID).done(norows) + return {} + + + +class _Query(object): + def __init__(self, raiseOnZeroRowCount): + self.results = [] + self.deferred = Deferred() + self.raiseOnZeroRowCount = raiseOnZeroRowCount + + + def row(self, row): + """ + A row was received. + """ + self.results.append(row) + + + def done(self, norows): + """ + The query is complete. + + @param norows: A boolean. True if there were any rows. + """ + if norows and self.raiseOnZeroRowCount is not None: + exc = self.raiseOnZeroRowCount() + self.deferred.errback(Failure(exc)) + else: + self.deferred.callback(self.results) + + + + +class Transaction(object): + """ + Async transaction implementation. + """ + + implements(IAsyncTransaction) + + def __init__(self, client, transactionID): + """ + Initialize a transaction with a L{ConnectionPoolClient} and a unique + transaction identifier. + """ + self.client = client + self.transactionID = transactionID + + + def execSQL(self, sql, args, raiseOnZeroRowCount=None): + queryID = self.client._nextID() + d = Deferred() + self.client._queries[queryID] = _Query(raiseOnZeroRowCount) + self.client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args) + return d + + + def complete(self, command): + return self.client.callRemote( + command, transactionID=self.transactionID + ).addCallback(lambda x: None) + + + def commit(self): + return self.complete(Commit) + + + def abort(self): + return self.complete(Abort) + + Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:13:12 UTC (rev 6496) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py 2010-11-01 21:13:37 UTC (rev 6497) @@ -18,17 +18,12 @@ """ Run and manage PostgreSQL as a subprocess. """ + import os import pwd -import sys -#import thread -from Queue import Queue - from hashlib import md5 -from zope.interface import implements - from twisted.python.procutils import which from twisted.internet.protocol import ProcessProtocol @@ -40,10 +35,8 @@ from twisted.protocols.basic import LineReceiver from twisted.internet import reactor -from twisted.python.failure import Failure -from txdav.idav import IAsyncTransaction -from txdav.idav import AlreadyFinishedError from twisted.internet.defer import Deferred +from txdav.base.datastore.asyncsqlpool import BaseSqlTxn from twisted.application.service import MultiService @@ -222,6 +215,7 @@ """ self.output.append(data) + def errReceived(self, data): """ Some output was received on stderr. @@ -237,182 +231,23 @@ -_DONE = object() - -_STATE_STOPPED = 'STOPPED' -_STATE_RUNNING = 'RUNNING' -_STATE_STOPPING = 'STOPPING' - -class ThreadHolder(object): +class UnpooledSqlTxn(BaseSqlTxn): """ - A queue which will hold a reactor threadpool thread open until all of the - work in that queue is done. + Unpooled variant (releases thread immediately on commit or abort), + currently exclusively for testing. """ - - def __init__(self, reactor): - self._reactor = reactor - self._state = _STATE_STOPPED - self._stopper = None - self._q = None - - - def _run(self): - """ - Worker function which runs in a non-reactor thread. - """ - while True: - work = self._q.get() - if work is _DONE: - def finishStopping(): - self._state = _STATE_STOPPED - self._q = None - s = self._stopper - self._stopper = None - s.callback(None) - self._reactor.callFromThread(finishStopping) - return - self._oneWorkUnit(*work) - - - def _oneWorkUnit(self, deferred, instruction): - try: - result = instruction() - except: - etype, evalue, etb = sys.exc_info() - def relayFailure(): - f = Failure(evalue, etype, etb) - deferred.errback(f) - self._reactor.callFromThread(relayFailure) - else: - self._reactor.callFromThread(deferred.callback, result) - - - def submit(self, work): - """ - Submit some work to be run. - - @param work: a 0-argument callable, which will be run in a thread. - - @return: L{Deferred} that fires with the result of L{work} - """ - d = Deferred() - self._q.put((d, work)) - return d - - - def start(self): - """ - Start this thing, if it's stopped. - """ - if self._state != _STATE_STOPPED: - raise RuntimeError("Not stopped.") - self._state = _STATE_RUNNING - self._q = Queue(0) - self._reactor.callInThread(self._run) - - - def stop(self): - """ - Stop this thing and release its thread, if it's running. - """ - if self._state != _STATE_RUNNING: - raise RuntimeError("Not running.") - s = self._stopper = Deferred() - self._state = _STATE_STOPPING - self._q.put(_DONE) - return s - - - -class ThisProcessSqlTxn(object): - """ - L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the - current process. - """ - implements(IAsyncTransaction) - - def __init__(self, connectionFactory): - """ - @param connectionFactory: A 0-argument callable which returns a DB-API - 2.0 connection. - """ - self._completed = False - self._holder = ThreadHolder(reactor) - self._holder.start() - def initCursor(): - # support threadlevel=1; we can't necessarily cursor() in a - # different thread than we do transactions in. - - # FIXME: may need to be pooling ThreadHolders along with - # connections, if threadlevel=1 requires connect() be called in the - # same thread as cursor() et. al. - self._connection = connectionFactory() - self._cursor = self._connection.cursor() - self._holder.submit(initCursor) - - - def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None): - self._cursor.execute(sql, args) - if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0: - raise raiseOnZeroRowCount() - if self._cursor.description: - return self._cursor.fetchall() - else: - return None - - - noisy = False - - def execSQL(self, *args, **kw): - result = self._holder.submit( - lambda : self._reallyExecSQL(*args, **kw) - ) - if self.noisy: - def reportResult(results): - sys.stdout.write("\n".join([ - "", - "SQL: %r %r" % (args, kw), - "Results: %r" % (results,), - "", - ])) - return results - result.addBoth(reportResult) + def commit(self): + result = super(UnpooledSqlTxn, self).commit() + self.stop() return result - - def commit(self): - if not self._completed: - self._completed = True - def reallyCommit(): - self._connection.commit() - self._connection.close() - result = self._holder.submit(reallyCommit) - self._holder.stop() - return result - else: - raise AlreadyFinishedError() - - def abort(self): - if not self._completed: - def reallyAbort(): - self._connection.rollback() - self._connection.close() - self._completed = True - result = self._holder.submit(reallyAbort) - self._holder.stop() - return result - else: - raise AlreadyFinishedError() + result = super(UnpooledSqlTxn, self).abort() + self.stop() + return result - def __del__(self): - if not self._completed: - print 'CommonStoreTransaction.__del__: OK' - self.abort() - - class PostgresService(MultiService): def __init__(self, dataStoreDirectory, subServiceFactory, @@ -516,6 +351,7 @@ w = DiagnosticConnectionWrapper(connection, label) c = w.cursor() + # Turn on standard conforming strings. This option is _required_ if # you want to get correct behavior out of parameter-passing with the # pgdb module. If it is not set then the server is potentially @@ -530,6 +366,12 @@ # preferable to see some exceptions while we're in this state than to # have the entire worker process hang. c.execute("set statement_timeout=30000") + + # pgdb (as per DB-API 2.0) automatically puts the connection into a + # 'executing a transaction' state when _any_ statement is executed on + # it (even these not-touching-any-data statements); make sure to commit + # first so that the application sees a fresh transaction, and the + # connection can safely be pooled without executing anything on it. w.commit() c.close() return w @@ -539,7 +381,7 @@ """ Create a L{IAsyncTransaction} based on a thread in the current process. """ - return ThisProcessSqlTxn(lambda : self.produceConnection(label)) + return UnpooledSqlTxn(lambda : self.produceConnection(label)) def ready(self): Added: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py =================================================================== --- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py (rev 0) +++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/threadutils.py 2010-11-01 21:13:37 UTC (rev 6497) @@ -0,0 +1,111 @@ +## +# Copyright (c) 2010 Apple Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +import sys +from Queue import Queue + + +from twisted.python.failure import Failure +from twisted.internet.defer import Deferred + + +_DONE = object() + +_STATE_STOPPED = 'STOPPED' +_STATE_RUNNING = 'RUNNING' +_STATE_STOPPING = 'STOPPING' + +class ThreadHolder(object): + """ + A queue which will hold a reactor threadpool thread open until all of the + work in that queue is done. + """ + + def __init__(self, reactor): + self._reactor = reactor + self._state = _STATE_STOPPED + self._stopper = None + self._q = None + + + def _run(self): + """ + Worker function which runs in a non-reactor thread. + """ + while True: + work = self._q.get() + if work is _DONE: + def finishStopping(): + self._state = _STATE_STOPPED + self._q = None + s = self._stopper + self._stopper = None + s.callback(None) + self._reactor.callFromThread(finishStopping) + return + self._oneWorkUnit(*work) + + + def _oneWorkUnit(self, deferred, instruction): + try: + result = instruction() + except: + etype, evalue, etb = sys.exc_info() + def relayFailure(): + f = Failure(evalue, etype, etb) + deferred.errback(f) + self._reactor.callFromThread(relayFailure) + else: + self._reactor.callFromThread(deferred.callback, result) + + + def submit(self, work): + """ + Submit some work to be run. + + @param work: a 0-argument callable, which will be run in a thread. + + @return: L{Deferred} that fires with the result of L{work} + """ + d = Deferred() + self._q.put((d, work)) + return d + + + def start(self): + """ + Start this thing, if it's stopped. + """ + if self._state != _STATE_STOPPED: + raise RuntimeError("Not stopped.") + self._state = _STATE_RUNNING + self._q = Queue(0) + self._reactor.callInThread(self._run) + + + def stop(self): + """ + Stop this thing and release its thread, if it's running. + """ + if self._state != _STATE_RUNNING: + raise RuntimeError("Not running.") + s = self._stopper = Deferred() + self._state = _STATE_STOPPING + self._q.put(_DONE) + return s + + +
participants (1)
-
source_changes@macosforge.org