[CalendarServer-changes] [2538] CalendarServer/branches/memcache-uid-reservation/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Thu Jun 5 12:50:26 PDT 2008


Revision: 2538
          http://trac.macosforge.org/projects/calendarserver/changeset/2538
Author:   dreid at apple.com
Date:     2008-06-05 12:50:25 -0700 (Thu, 05 Jun 2008)

Log Message:
-----------
First draft of UID reservation in memcached.

Modified Paths:
--------------
    CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py
    CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py
    CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py
    CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py

Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py	2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py	2008-06-05 19:50:25 UTC (rev 2538)
@@ -29,6 +29,7 @@
 import datetime
 import os
 import time
+import hashlib
 
 try:
     import sqlite3 as sqlite
@@ -37,12 +38,18 @@
 
 from vobject.icalendar import utc
 
+from twisted.internet.defer import maybeDeferred, succeed
+from twisted.internet.protocol import ClientCreator
+
+from twistedcaldav.memcachepool import CachePoolUserMixIn
+
 from twistedcaldav.ical import Component
 from twistedcaldav.query import calendarquery
 from twistedcaldav.sql import AbstractSQLDatabase
 from twistedcaldav.sql import db_prefix
 from twistedcaldav import caldavxml
-from twistedcaldav.log import Logger
+from twistedcaldav.log import Logger, LoggingMixIn
+from twistedcaldav.config import config
 
 log = Logger()
 
@@ -179,7 +186,7 @@
         for name in self.resourceNamesForUID(uid):
             assert result is None, "More than one resource with UID %s in calendar collection %r" % (uid, self)
             result = name
-            
+
         return result
 
     def resourceUIDForName(self, name):
@@ -221,7 +228,7 @@
         if uid is not None:
             self._delete_from_db(name, uid)
             self._db_commit()
-    
+
     def resourceExists(self, name):
         """
         Determines whether the specified resource name exists in the index.
@@ -230,7 +237,7 @@
         """
         uid = self._db_value_for_sql("select UID from RESOURCE where NAME = :1", name)
         return uid is not None
-    
+
     def resourcesExist(self, names):
         """
         Determines whether the specified resource name exists in the index.
@@ -245,13 +252,13 @@
         statement += ")"
         results = self._db_values_for_sql(statement, *names)
         return results
-    
+
     def searchValid(self, filter):
         if isinstance(filter, caldavxml.Filter):
             qualifiers = calendarquery.sqlcalendarquery(filter)
         else:
             qualifiers = None
-            
+
         return qualifiers is not None
 
     def search(self, filter):
@@ -265,7 +272,7 @@
         """
         # FIXME: Don't forget to use maximum_future_expansion_duration when we
         # start caching...
-        
+
         # Make sure we have a proper Filter element and get the partial SQL statement to use.
         if isinstance(filter, caldavxml.Filter):
             qualifiers = calendarquery.sqlcalendarquery(filter)
@@ -275,7 +282,7 @@
             rowiter = self._db_execute("select DISTINCT RESOURCE.NAME, RESOURCE.UID, RESOURCE.TYPE" + qualifiers[0], *qualifiers[1])
         else:
             rowiter = self._db_execute("select NAME, UID, TYPE from RESOURCE")
-            
+
         for row in rowiter:
             name = row[0]
             if self.resource.getChild(name.encode("utf-8")):
@@ -290,7 +297,7 @@
         @return: the schema version assigned to this index.
         """
         return schema_version
-        
+
     def _add_to_db(self, name, calendar, cursor = None):
         """
         Records the given calendar resource in the index with the given name.
@@ -303,7 +310,7 @@
             contents.
         """
         raise NotImplementedError
-    
+
     def _delete_from_db(self, name, uid):
         """
         Deletes the specified entry from all dbs.
@@ -311,12 +318,12 @@
         @param uid: the uid of the resource to delete.
         """
         raise NotImplementedError
-    
+
 class CalendarIndex (AbstractCalendarIndex):
     """
     Calendar index - abstract class for indexer that indexes calendar objects in a collection.
     """
-    
+
     def __init__(self, resource):
         """
         @param resource: the L{twistedcaldav.static.CalDAVFile} resource to
@@ -426,7 +433,7 @@
             values (:1, :2, :3, :4)
             """, name, uid, calendar.resourceType(), instances.limit
         )
-    
+
     def _delete_from_db(self, name, uid):
         """
         Deletes the specified entry from all dbs.
@@ -435,27 +442,85 @@
         """
         self._db_execute("delete from TIMESPAN where NAME = :1", name)
         self._db_execute("delete from RESOURCE where NAME = :1", name)
-    
-class Index (CalendarIndex):
-    """
-    Calendar collection index - regular collection that enforces CalDAV UID uniqueness requirement.
-    """
-    
-    def __init__(self, resource):
-        """
-        @param resource: the L{twistedcaldav.static.CalDAVFile} resource to
-            index. C{resource} must be a calendar collection (ie.
-            C{resource.isPseudoCalendarCollection()} returns C{True}.)
-        """
-        assert resource.isCalendarCollection(), "non-calendar collection resource %s has no index." % (resource,)
-        super(Index, self).__init__(resource)
 
-    #
-    # A dict of sets. The dict keys are calendar collection paths,
-    # and the sets contains reserved UIDs for each path.
-    #
-    
+
+def wrapInDeferred(f):
+    def _(*args, **kwargs):
+        return maybeDeferred(f, *args, **kwargs)
+
+    return _
+
+
+class MemcachedUIDReserver(CachePoolUserMixIn, LoggingMixIn):
+    def __init__(self, index, cachePool=None):
+        self.index = index
+        self._cachePool = cachePool
+
+    def _key(self, uid):
+        return 'reservation:%s' % (
+            hashlib.md5('%s:%s' % (uid,
+                                   self.index.resource.fp.path)).hexdigest())
+
     def reserveUID(self, uid):
+        uid = uid.encode('utf-8')
+        self.log_debug("Reserving UID %r @ %r" % (
+                uid,
+                self.index.resource.fp.path))
+        def _handleFalse(result):
+            if result is False:
+                raise ReservationError(
+                    "UID %s already reserved for calendar collection %s."
+                    % (uid, self.index.resource)
+                    )
+
+        d = self.getCachePool().add(self._key(uid),
+                                    'reserved',
+                                    expireTime=reservation_timeout_secs)
+        d.addCallback(_handleFalse)
+        return d
+
+
+    def unreserveUID(self, uid):
+        uid = uid.encode('utf-8')
+        self.log_debug("Unreserving UID %r @ %r" % (
+                uid,
+                self.index.resource.fp.path))
+        def _handleFalse(result):
+            if result is False:
+                raise ReservationError(
+                    "UID %s is not reserved for calendar collection %s."
+                    % (uid, self.index.resource)
+                    )
+
+        d =self.getCachePool().delete(self._key(uid))
+        d.addCallback(_handleFalse)
+        return d
+
+
+    def isReservedUID(self, uid):
+        uid = uid.encode('utf-8')
+        self.log_debug("Is reserved UID %r @ %r" % (
+                uid,
+                self.index.resource.fp.path))
+
+        def _checkValue((flags, value)):
+            if value is None:
+                return False
+            else:
+                return True
+
+        d = self.getCachePool().get(self._key(uid))
+        d.addCallback(_checkValue)
+        return d
+
+
+
+class SQLUIDReserver(object):
+    def __init__(self, index):
+        self.index = index
+
+    @wrapInDeferred
+    def reserveUID(self, uid):
         """
         Reserve a UID for this index's resource.
         @param uid: the UID to reserve
@@ -463,48 +528,56 @@
         """
 
         try:
-            self._db_execute("insert into RESERVED (UID, TIME) values (:1, :2)", uid, datetime.datetime.now())
-            self._db_commit()
+            self.index._db_execute("insert into RESERVED (UID, TIME) values (:1, :2)", uid, datetime.datetime.now())
+            self.index._db_commit()
         except sqlite.IntegrityError:
-            self._db_rollback()
+            self.index._db_rollback()
             raise ReservationError(
                 "UID %s already reserved for calendar collection %s."
-                % (uid, self.resource)
+                % (uid, self.index.resource)
             )
         except sqlite.Error, e:
             log.err("Unable to reserve UID: %s", (e,))
-            self._db_rollback()
+            self.index._db_rollback()
             raise
-    
+
     def unreserveUID(self, uid):
         """
         Unreserve a UID for this index's resource.
         @param uid: the UID to reserve
         @raise ReservationError: if C{uid} is not reserved
         """
-        
-        if not self.isReservedUID(uid):
-            raise ReservationError(
-                "UID %s is not reserved for calendar collection %s."
-                % (uid, self.resource)
-            )
 
-        try:
-            self._db_execute("delete from RESERVED where UID = :1", uid)
-            self._db_commit()
-        except sqlite.Error, e:
-            log.err("Unable to unreserve UID: %s", (e,))
-            self._db_rollback()
-            raise
-    
+        def _cb(result):
+            if result == False:
+                raise ReservationError(
+                    "UID %s is not reserved for calendar collection %s."
+                    % (uid, self.index.resource)
+                    )
+            else:
+                try:
+                    self.index._db_execute(
+                        "delete from RESERVED where UID = :1", uid)
+                    self.index._db_commit()
+                except sqlite.Error, e:
+                    log.err("Unable to unreserve UID: %s", (e,))
+                    self.index._db_rollback()
+                    raise
+
+        d = self.isReservedUID(uid)
+        d.addCallback(_cb)
+        return d
+
+
+    @wrapInDeferred
     def isReservedUID(self, uid):
         """
         Check to see whether a UID is reserved.
         @param uid: the UID to check
         @return: True if C{uid} is reserved, False otherwise.
         """
-        
-        rowiter = self._db_execute("select UID, TIME from RESERVED where UID = :1", uid)
+
+        rowiter = self.index._db_execute("select UID, TIME from RESERVED where UID = :1", uid)
         for uid, attime in rowiter:
             # Double check that the time is within a reasonable period of now
             # otherwise we probably have a stale reservation
@@ -512,18 +585,57 @@
             dt = datetime.datetime(year=tm.tm_year, month=tm.tm_mon, day=tm.tm_mday, hour=tm.tm_hour, minute=tm.tm_min, second = tm.tm_sec)
             if datetime.datetime.now() - dt > datetime.timedelta(seconds=reservation_timeout_secs):
                 try:
-                    self._db_execute("delete from RESERVED where UID = :1", uid)
-                    self._db_commit()
+                    self.index._db_execute("delete from RESERVED where UID = :1", uid)
+                    self.index._db_commit()
                 except sqlite.Error, e:
                     log.err("Unable to unreserve UID: %s", (e,))
-                    self._db_rollback()
+                    self.index._db_rollback()
                     raise
                 return False
             else:
                 return True
 
         return False
-        
+
+
+
+class Index (CalendarIndex):
+    """
+    Calendar collection index - regular collection that enforces CalDAV UID uniqueness requirement.
+    """
+
+    def __init__(self, resource):
+        """
+        @param resource: the L{twistedcaldav.static.CalDAVFile} resource to
+            index. C{resource} must be a calendar collection (ie.
+            C{resource.isPseudoCalendarCollection()} returns C{True}.)
+        """
+        assert resource.isCalendarCollection(), "non-calendar collection resource %s has no index." % (resource,)
+        super(Index, self).__init__(resource)
+
+        if config.Memcached['ClientEnabled']:
+            self.reserver = MemcachedUIDReserver(self)
+
+        else:
+            self.reserver = SQLUIDReserver(self)
+
+    #
+    # A dict of sets. The dict keys are calendar collection paths,
+    # and the sets contains reserved UIDs for each path.
+    #
+
+    def reserveUID(self, uid):
+        return self.reserver.reserveUID(uid)
+
+
+    def unreserveUID(self, uid):
+        return self.reserver.unreserveUID(uid)
+
+
+    def isReservedUID(self, uid):
+        return self.reserver.isReservedUID(uid)
+
+
     def isAllowedUID(self, uid, *names):
         """
         Checks to see whether to allow an operation which would add the
@@ -537,19 +649,19 @@
         """
         rname = self.resourceNameForUID(uid)
         return (rname is None or rname in names)
- 
+
     def _db_type(self):
         """
         @return: the collection type assigned to this index.
         """
         return collection_types["Calendar"]
-        
+
     def _db_init_data_tables(self, q):
         """
         Initialise the underlying database tables.
         @param q:           a database cursor to use.
         """
-        
+
         # Create database where the RESOURCE table has unique UID column.
         self._db_init_data_tables_base(q, True)
 
@@ -557,7 +669,7 @@
         """
         Re-create the database tables from existing calendar data.
         """
-        
+
         #
         # Populate the DB with data from already existing resources.
         # This allows for index recovery if the DB file gets
@@ -586,7 +698,7 @@
                     self.addResource(name, calendar, True)
             finally:
                 stream.close()
-        
+
         # Do commit outside of the loop for better performance
         self._db_commit()
 
@@ -609,30 +721,30 @@
         @param uid: the UID to reserve
         @raise ReservationError: if C{uid} is already reserved
         """
-        
+
         # iTIP does not require unique UIDs
         pass
-    
+
     def unreserveUID(self, uid): #@UnusedVariable
         """
         Unreserve a UID for this index's resource.
         @param uid: the UID to reserve
         @raise ReservationError: if C{uid} is not reserved
         """
-        
+
         # iTIP does not require unique UIDs
         pass
-    
+
     def isReservedUID(self, uid): #@UnusedVariable
         """
         Check to see whether a UID is reserved.
         @param uid: the UID to check
         @return: True if C{uid} is reserved, False otherwise.
         """
-        
+
         # iTIP does not require unique UIDs
         return False
-        
+
     def isAllowedUID(self, uid, *names): #@UnusedVariable
         """
         Checks to see whether to allow an operation with adds the the specified
@@ -645,22 +757,22 @@
         @return: True if the UID is not in the index and is not reserved,
             False otherwise.
         """
-        
+
         # iTIP does not require unique UIDs
         return True
- 
+
     def _db_type(self):
         """
         @return: the collection type assigned to this index.
         """
         return collection_types["iTIP"]
-        
+
     def _db_init_data_tables(self, q):
         """
         Initialise the underlying database tables.
         @param q:           a database cursor to use.
         """
-        
+
         # Create database where the RESOURCE table has a UID column that is not unique.
         self._db_init_data_tables_base(q, False)
 
@@ -668,7 +780,7 @@
         """
         Re-create the database tables from existing calendar data.
         """
-        
+
         #
         # Populate the DB with data from already existing resources.
         # This allows for index recovery if the DB file gets

Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py	2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py	2008-06-05 19:50:25 UTC (rev 2538)
@@ -417,7 +417,9 @@
             failure_count = 0
             while(failure_count < 10):
                 try:
-                    destination_index.reserveUID(uid)
+                    d = waitForDeferred(destination_index.reserveUID(uid))
+                    yield d
+                    d.getResult()
                     reserved = True
                     break
                 except ReservationError:
@@ -663,18 +665,22 @@
         rollback.Commit()
 
         if reserved:
-            destination_index.unreserveUID(uid)
+            d = waitForDeferred(destination_index.unreserveUID(uid))
+            yield d
+            d.getResult()
             reserved = False
 
         yield response
         return
 
-    except:
+    except Exception, err:
         if reserved:
-            destination_index.unreserveUID(uid)
+            d = waitForDeferred(destination_index.unreserveUID(uid))
+            yield d
+            d.getResult()
             reserved = False
 
         # Roll back changes to original server state. Note this may do nothing
         # if the rollback has already ocurred or changes already committed.
         rollback.Rollback()
-        raise
+        raise err

Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py	2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py	2008-06-05 19:50:25 UTC (rev 2538)
@@ -15,51 +15,89 @@
 ##
 
 from twistedcaldav.index import Index
+from twistedcaldav import index
 
 import twistedcaldav.test.util
-from twistedcaldav.index import ReservationError
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
+from twistedcaldav.index import ReservationError, MemcachedUIDReserver
+from twisted.web2.test.test_http import deferLater
 import time
 
-class TestIndex (twistedcaldav.test.util.TestCase):
+class SQLIndexTests (twistedcaldav.test.util.TestCase):
     """
     Test abstract SQL DB class
     """
-    
+
     def setUp(self):
-        super(TestIndex, self).setUp()
+        super(SQLIndexTests, self).setUp()
         self.site.resource.isCalendarCollection = lambda: True
+        self.db = Index(self.site.resource)
 
+
     def test_reserve_uid_ok(self):
         uid = "test-test-test"
-        db = Index(self.site.resource)
-        self.assertFalse(db.isReservedUID(uid))
-        db.reserveUID(uid)
-        self.assertTrue(db.isReservedUID(uid))
-        db.unreserveUID(uid)
-        self.assertFalse(db.isReservedUID(uid))
+        d = self.db.isReservedUID(uid)
+        d.addCallback(self.assertFalse)
+        d.addCallback(lambda _: self.db.reserveUID(uid))
+        d.addCallback(lambda _: self.db.isReservedUID(uid))
+        d.addCallback(self.assertTrue)
+        d.addCallback(lambda _: self.db.unreserveUID(uid))
+        d.addCallback(lambda _: self.db.isReservedUID(uid))
+        d.addCallback(self.assertFalse)
 
+        return d
+
+
     def test_reserve_uid_twice(self):
         uid = "test-test-test"
-        db = Index(self.site.resource)
-        db.reserveUID(uid)
-        self.assertTrue(db.isReservedUID(uid))
-        self.assertRaises(ReservationError, db.reserveUID, uid)
+        d = self.db.reserveUID(uid)
+        d.addCallback(lambda _: self.db.isReservedUID(uid))
+        d.addCallback(self.assertTrue)
+        d.addCallback(lambda _:
+                      self.assertFailure(self.db.reserveUID(uid),
+                                         ReservationError))
+        return d
 
+
     def test_unreserve_unreserved(self):
         uid = "test-test-test"
-        db = Index(self.site.resource)
-        self.assertRaises(ReservationError, db.unreserveUID, uid)
+        return self.assertFailure(self.db.unreserveUID(uid),
+                                  ReservationError)
 
+
     def test_reserve_uid_timeout(self):
+        # WARNING: This test is fundamentally flawed and will fail
+        # intermittently because it uses the real clock.
         uid = "test-test-test"
         old_timeout = twistedcaldav.index.reservation_timeout_secs
-        twistedcaldav.index.reservation_timeout_secs = 2
-        try:
-            db = Index(self.site.resource)
-            self.assertFalse(db.isReservedUID(uid))
-            db.reserveUID(uid)
-            self.assertTrue(db.isReservedUID(uid))
-            time.sleep(3)
-            self.assertFalse(db.isReservedUID(uid))
-        finally:
+        twistedcaldav.index.reservation_timeout_secs = 1
+
+        def _finally(result):
             twistedcaldav.index.reservation_timeout_secs = old_timeout
+            return result
+
+        d = self.db.isReservedUID(uid)
+        d.addCallback(self.assertFalse)
+        d.addCallback(lambda _: self.db.reserveUID(uid))
+        d.addCallback(lambda _: self.db.isReservedUID(uid))
+        d.addCallback(self.assertTrue)
+        d.addCallback(lambda _: deferLater(2))
+        d.addCallback(lambda _: self.db.isReservedUID(uid))
+        d.addCallback(self.assertFalse)
+        d.addBoth(_finally)
+
+        return d
+
+
+
+class MemcacheTests(SQLIndexTests):
+    def setUp(self):
+        super(MemcacheTests, self).setUp()
+        self.memcache = InMemoryMemcacheProtocol()
+        self.db.reserver = MemcachedUIDReserver(self.db, self.memcache)
+
+
+    def tearDown(self):
+        for k, v in self.memcache._timeouts.iteritems():
+            if v.active():
+                v.cancel()

Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py	2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py	2008-06-05 19:50:25 UTC (rev 2538)
@@ -79,3 +79,69 @@
     def changed(self):
         self.changedCount += 1
         return succeed(True)
+
+
+
+class InMemoryMemcacheProtocol(object):
+    def __init__(self, reactor=None):
+        self._cache = {}
+
+        if reactor is None:
+            from twisted.internet import reactor
+
+        self._reactor = reactor
+
+        self._timeouts = {}
+
+    def get(self, key):
+        if key not in self._cache:
+            return succeed((0, None))
+
+        return succeed(self._cache[key])
+
+
+    def _timeoutKey(self, expireTime, key):
+        def _removeKey():
+            del self._cache[key]
+
+        if expireTime > 0:
+            if key in self._timeouts:
+                self._timeouts[key].cancel()
+
+            from twisted.internet.base import DelayedCall
+            DelayedCall.debug = True
+
+            self._timeouts[key] = self._reactor.callLater(
+                expireTime,
+                _removeKey)
+
+
+    def set(self, key, value, flags=0, expireTime=0):
+        try:
+            self._cache[key] = (flags, value)
+
+            self._timeoutKey(expireTime, key)
+
+            return succeed(True)
+
+        except Exception, err:
+            return fail(Failure())
+
+
+    def add(self, key, value, flags=0, expireTime=0):
+        if key in self._cache:
+            return succeed(False)
+
+        return self.set(key, value, flags=flags, expireTime=expireTime)
+
+
+    def delete(self, key):
+        try:
+            del self._cache[key]
+            if key in self._timeouts:
+                self._timeouts[key].cancel()
+            return succeed(True)
+
+        except:
+            return succeed(False)
+

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080605/208cf025/attachment-0001.htm 


More information about the calendarserver-changes mailing list