[CalendarServer-changes] [4481] CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Wed Aug 5 12:34:38 PDT 2009


Revision: 4481
          http://trac.macosforge.org/projects/calendarserver/changeset/4481
Author:   cdaboo at apple.com
Date:     2009-08-05 12:34:35 -0700 (Wed, 05 Aug 2009)
Log Message:
-----------
Twisted ADBAPI implementation for an async (deferred) database similar to the sqlite DBs we currently
use - i.e. they use the same schema, upgrade process. Currently this special cases an sqlite3 db because
of threading issues with that.

Added Paths:
-----------
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/database.py
    CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/test/test_database.py

Added: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/database.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/database.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/database.py	2009-08-05 19:34:35 UTC (rev 4481)
@@ -0,0 +1,319 @@
+##
+# Copyright (c) 2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twistedcaldav.log import Logger
+
+from twisted.enterprise.adbapi import ConnectionPool
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python.threadpool import ThreadPool
+
+import thread
+
+"""
+Generic ADAPI database access object.
+"""
+
+__all__ = [
+    "AbstractADBAPIDatabase",
+]
+
+log = Logger()
+
+class ConnectionClosingThreadPool(ThreadPool):
+    """
+    A ThreadPool that closes connections for each worker thread
+    """
+    
+    def _worker(self):
+        log.debug("Starting ADBAPI thread: %s" % (thread.get_ident(),))
+        ThreadPool._worker(self)
+        self._closeConnection()
+
+    def _closeConnection(self):
+        
+        tid = thread.get_ident()
+        log.debug("Closing ADBAPI thread: %s" % (tid,))
+
+        conn = self.pool.connections.get(tid)
+        self.pool._close(conn)
+        del self.pool.connections[tid]
+
+class AbstractADBAPIDatabase(object):
+    """
+    A generic SQL database.
+    """
+
+    def __init__(self, dbID, dbapiName, dbapiArgs, persistent, **kwargs):
+        """
+        
+        @param pool: the ADAPI ConnectionPool to use.
+        @type dbpath: L{ConnectionPool}
+        @param persistent: C{True} if the data in the DB must be perserved during upgrades,
+            C{False} if the DB data can be re-created from an external source.
+        @type persistent: bool
+        """
+        self.dbID = dbID
+        self.dbapiName = dbapiName
+        self.dbapiArgs = dbapiArgs
+        self.dbapikwargs = kwargs
+
+        self.persistent = persistent
+        
+        self.initialized = False
+
+    def __repr__(self):
+        return "<%s %r>" % (self.__class__.__name__, self.pool)
+
+    @inlineCallbacks
+    def open(self):
+        """
+        Access the underlying database.
+        @return: a db2 connection object for this index's underlying data store.
+        """
+        if not self.initialized:
+
+            self.pool = ConnectionPool(self.dbapiName, *self.dbapiArgs, **self.dbapikwargs)
+            
+            # sqlite3 is not thread safe which means we have to close the sqlite3 connections in the same thread that
+            # opened them. We need a special thread pool class that has a thread worker function that does a close
+            # when a thread is closed.
+            if self.dbapiName == "sqlite3":
+                self.pool.threadpool.stop()
+                self.pool.threadpool = ConnectionClosingThreadPool(1, 1)
+                self.pool.threadpool.start()
+                self.pool.threadpool.pool = self.pool
+
+            #
+            # Set up the schema
+            #
+            # Create CALDAV table if needed
+
+            test = (yield self._test_schema_table())
+            if test:
+                version = (yield self._db_value_for_sql("select VALUE from CALDAV where KEY = 'SCHEMA_VERSION'"))
+                dbtype = (yield self._db_value_for_sql("select VALUE from CALDAV where KEY = 'TYPE'"))
+
+                if (version != self._db_version()) or (dbtype != self._db_type()):
+
+                    if dbtype != self._db_type():
+                        log.err("Database %s has different type (%s vs. %s)"
+                                % (self.dbID, dbtype, self._db_type()))
+
+                        # Delete this index and start over
+                        yield self._db_remove()
+                        yield self._db_init()
+
+                    elif version != self._db_version():
+                        log.err("Database %s has different schema (v.%s vs. v.%s)"
+                                % (self.dbID, version, self._db_version()))
+                        
+                        # Upgrade the DB
+                        yield self._db_upgrade(version)
+
+            else:
+                yield self._db_init()
+            self.initialized = True
+
+    def close(self):
+        
+        if self.initialized:
+            self.pool.close()
+            self.pool = None
+            self.initialized = False
+
+    @inlineCallbacks
+    def execute(self, sql, *query_params):
+        
+        if not self.initialized:
+            yield self.open()
+
+        result = (yield self._db_execute(sql, *query_params))
+        returnValue(result)
+
+    def _db_version(self):
+        """
+        @return: the schema version assigned to this DB.
+        """
+        raise NotImplementedError
+        
+    def _db_type(self):
+        """
+        @return: the collection type assigned to this DB.
+        """
+        raise NotImplementedError
+        
+    @inlineCallbacks
+    def _test_schema_table(self):
+        result = (yield self._db_value_for_sql("""
+        select (1) from SQLITE_MASTER
+         where TYPE = 'table' and NAME = 'CALDAV'
+        """))
+        returnValue(result)
+
+    @inlineCallbacks
+    def _db_init(self):
+        """
+        Initialise the underlying database tables.
+        """
+        log.msg("Initializing database %s" % (self.dbID,))
+
+        # TODO we need an exclusive lock of some kind here to prevent a race condition
+        # in which multiple processes try to create the tables.
+        
+
+        yield self._db_init_schema_table()
+        yield self._db_init_data_tables()
+        yield self._db_recreate()
+
+    @inlineCallbacks
+    def _db_init_schema_table(self):
+        """
+        Initialise the underlying database tables.
+        @param db_filename: the file name of the index database.
+        @param q:           a database cursor to use.
+        """
+
+        #
+        # CALDAV table keeps track of our schema version and type
+        #
+        yield self._db_execute(
+            """
+            create table CALDAV (
+                KEY text unique,
+                VALUE text unique
+            )
+            """
+        )
+        yield self._db_execute(
+            """
+            insert into CALDAV (KEY, VALUE)
+            values ('SCHEMA_VERSION', :1)
+            """, (self._db_version(),)
+        )
+        yield self._db_execute(
+            """
+            insert into CALDAV (KEY, VALUE)
+            values ('TYPE', :1)
+            """, (self._db_type(),)
+        )
+
+    def _db_init_data_tables(self):
+        """
+        Initialise the underlying database tables.
+        """
+        raise NotImplementedError
+
+    def _db_recreate(self):
+        """
+        Recreate the database tables.
+        """
+
+        # Implementations can override this to re-create data
+        pass
+
+    @inlineCallbacks
+    def _db_upgrade(self, old_version):
+        """
+        Upgrade the database tables.
+        """
+        
+        if self.persistent:
+            yield self._db_upgrade_data_tables(old_version)
+            yield self._db_upgrade_schema()
+        else:
+            # Non-persistent DB's by default can be removed and re-created. However, for simple
+            # DB upgrades they SHOULD override this method and handle those for better performance.
+            yield self._db_remove()
+            yield self._db_init()
+    
+    def _db_upgrade_data_tables(self, old_version):
+        """
+        Upgrade the data from an older version of the DB.
+        """
+        # Persistent DB's MUST override this method and do a proper upgrade. Their data
+        # cannot be thrown away.
+        raise NotImplementedError("Persistent databases MUST support an upgrade method.")
+
+    @inlineCallbacks
+    def _db_upgrade_schema(self):
+        """
+        Upgrade the stored schema version to the current one.
+        """
+        yield self._db_execute("insert or replace into CALDAV (KEY, VALUE) values ('SCHEMA_VERSION', :1)", (self._db_version(),))
+
+    @inlineCallbacks
+    def _db_remove(self):
+        """
+        Remove all database information (all the tables)
+        """
+        yield self._db_remove_data_tables()
+        yield self._db_remove_schema()
+
+    def _db_remove_data_tables(self):
+        """
+        Remove all the data from an older version of the DB.
+        """
+        raise NotImplementedError("Each database must remove its own tables.")
+
+    @inlineCallbacks
+    def _db_remove_schema(self):
+        """
+        Remove the stored schema version table.
+        """
+        yield self._db_execute("drop table CALDAV")
+
+    @inlineCallbacks
+    def _db_values_for_sql(self, sql, *query_params):
+        """
+        Execute an SQL query and obtain the resulting values.
+        @param sql: the SQL query to execute.
+        @param query_params: parameters to C{sql}.
+        @return: an interable of values in the first column of each row
+            resulting from executing C{sql} with C{query_params}.
+        @raise AssertionError: if the query yields multiple columns.
+        """
+        
+        results = (yield self.pool.runQuery(sql, *query_params))
+        returnValue(tuple([row[0] for row in results]))
+
+    @inlineCallbacks
+    def _db_value_for_sql(self, sql, *query_params):
+        """
+        Execute an SQL query and obtain a single value.
+        @param sql: the SQL query to execute.
+        @param query_params: parameters to C{sql}.
+        @return: the value resulting from the executing C{sql} with
+            C{query_params}.
+        @raise AssertionError: if the query yields multiple rows or columns.
+        """
+        value = None
+        for row in (yield self._db_values_for_sql(sql, *query_params)):
+            assert value is None, "Multiple values in DB for %s %s" % (sql, query_params)
+            value = row
+        returnValue(value)
+
+    @inlineCallbacks
+    def _db_execute(self, sql, *query_params):
+        """
+        Execute an SQL query and obtain the resulting values.
+        @param sql: the SQL query to execute.
+        @param query_params: parameters to C{sql}.
+        @return: an iterable of tuples for each row resulting from executing
+            C{sql} with C{query_params}.
+        """
+        
+        results = (yield self.pool.runQuery(sql, *query_params))
+        returnValue(results)

Added: CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/test/test_database.py
===================================================================
--- CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/test/test_database.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/partition-4464/twistedcaldav/test/test_database.py	2009-08-05 19:34:35 UTC (rev 4481)
@@ -0,0 +1,206 @@
+##
+# Copyright (c) 2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twistedcaldav.database import AbstractADBAPIDatabase
+import twistedcaldav.test.util
+
+from twisted.internet.defer import inlineCallbacks
+
+import os
+import time
+
+class Database (twistedcaldav.test.util.TestCase):
+    """
+    Test abstract SQL DB class
+    """
+    
+    class TestDB(AbstractADBAPIDatabase):
+        
+        def __init__(self, path, persistent=False, version="1"):
+            self.version = version
+            self.dbpath = path
+            super(Database.TestDB, self).__init__("sqlite", "sqlite3", (path,), persistent, cp_min=3, cp_max=3)
+
+        def _db_version(self):
+            """
+            @return: the schema version assigned to this index.
+            """
+            return self.version
+            
+        def _db_type(self):
+            """
+            @return: the collection type assigned to this index.
+            """
+            return "TESTTYPE"
+            
+        def _db_init_data_tables(self):
+            """
+            Initialise the underlying database tables.
+            @param q:           a database cursor to use.
+            """
+    
+            #
+            # TESTTYPE table
+            #
+            return self._db_execute(
+                """
+                create table TESTTYPE (
+                    KEY         text unique,
+                    VALUE       text
+                )
+                """
+            )
+
+        def _db_remove_data_tables(self):
+            return self._db_execute("drop table TESTTYPE")
+
+    class TestDBRecreateUpgrade(TestDB):
+        
+        class RecreateDBException(Exception):
+            pass
+        class UpgradeDBException(Exception):
+            pass
+
+        def __init__(self, path, persistent=False):
+            super(Database.TestDBRecreateUpgrade, self).__init__(path, persistent, version="2")
+
+        def _db_recreate(self):
+            raise self.RecreateDBException()
+
+    class TestDBCreateIndexOnUpgrade(TestDB):
+        
+        def __init__(self, path, persistent=False):
+            super(Database.TestDBCreateIndexOnUpgrade, self).__init__(path, persistent, version="2")
+
+        def _db_upgrade_data_tables(self, old_version):
+            return self._db_execute(
+                """
+                create index TESTING on TESTTYPE (VALUE)
+                """
+            )
+
+    class TestDBPauseInInit(TestDB):
+        
+        def _db_init(self):
+            
+            time.sleep(1)
+            super(Database.TestDBPauseInInit, self)._db_init()
+
+    @inlineCallbacks
+    def inlineCallbackRaises(self, exc, f, *args, **kwargs):
+        try:
+            yield f(*args, **kwargs)
+        except exc:
+            pass
+        except Exception, e:
+            self.fail("Wrong exception raised: %s" % (e,))
+        else:
+            self.fail("%s not raised" % (exc,))
+
+    @inlineCallbacks
+    def test_connect(self):
+        """
+        Connect to database and create table
+        """
+        db = Database.TestDB(self.mktemp())
+        self.assertFalse(db.initialized)
+        yield db.open()
+        self.assertTrue(db.initialized)
+
+    @inlineCallbacks
+    def test_readwrite(self):
+        """
+        Add a record, search for it
+        """
+        db = Database.TestDB(self.mktemp())
+        yield db.execute("INSERT into TESTTYPE (KEY, VALUE) values (:1, :2)", ("FOO", "BAR",))
+        items = (yield db.execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [("FOO", "BAR"),])
+
+    @inlineCallbacks
+    def test_close(self):
+        """
+        Close database
+        """
+        db = Database.TestDB(self.mktemp())
+        self.assertFalse(db.initialized)
+        yield db.open()
+        db.close()
+        self.assertFalse(db.initialized)
+        db.close()
+        
+    @inlineCallbacks
+    def test_version_upgrade_nonpersistent(self):
+        """
+        Connect to database and create table
+        """
+        
+        db_file = self.mktemp()
+
+        db = Database.TestDB(db_file)
+        yield db.open()
+        yield db.execute("INSERT into TESTTYPE (KEY, VALUE) values (:1, :2)", ("FOO", "BAR",))
+        items = (yield db.execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [("FOO", "BAR")])
+        db.close()
+        db = None
+
+        db = Database.TestDBRecreateUpgrade(db_file)
+        yield self.inlineCallbackRaises(Database.TestDBRecreateUpgrade.RecreateDBException, db.open)
+        items = (yield db.execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [])
+
+    def test_version_upgrade_persistent(self):
+        """
+        Connect to database and create table
+        """
+        db_file = self.mktemp()
+        db = Database.TestDB(db_file, persistent=True)
+        yield db.open()
+        yield db.execute("INSERT into TESTTYPE (KEY, VALUE) values (:1, :2)", "FOO", "BAR")
+        items = (yield db.execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [("FOO", "BAR")])
+        db.close()
+        db = None
+
+        db = Database.TestDBRecreateUpgrade(db_file, persistent=True)
+        yield self.inlineCallbackRaises(NotImplementedError, db.open)
+        self.assertTrue(os.path.exists(db_file))
+        db.close()
+        db = None
+
+        db = Database.TestDB(db_file, persistent=True, autocommit=True)
+        yield db.open()
+        items = (yield db.execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [("FOO", "BAR")])
+
+    def test_version_upgrade_persistent_add_index(self):
+        """
+        Connect to database and create table
+        """
+        db_file = self.mktemp()
+        db = Database.TestDB(db_file, persistent=True, autocommit=True)
+        yield db.open()
+        yield db.execute("INSERT into TESTTYPE (KEY, VALUE) values (:1, :2)", "FOO", "BAR")
+        items = (yield db._db_execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [("FOO", "BAR")])
+        db.close()
+        db = None
+
+        db = Database.TestDBCreateIndexOnUpgrade(db_file, persistent=True, autocommit=True)
+        yield db.open()
+        items = (yield db._db_execute("SELECT * from TESTTYPE"))
+        self.assertEqual(items, [("FOO", "BAR")])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090805/cea33fee/attachment-0001.html>


More information about the calendarserver-changes mailing list