[CalendarServer-changes] [2538] CalendarServer/branches/memcache-uid-reservation/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Thu Jun 5 12:50:26 PDT 2008
Revision: 2538
http://trac.macosforge.org/projects/calendarserver/changeset/2538
Author: dreid at apple.com
Date: 2008-06-05 12:50:25 -0700 (Thu, 05 Jun 2008)
Log Message:
-----------
First draft of UID reservation in memcached.
Modified Paths:
--------------
CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py
CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py
CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py
CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py
Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py 2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/index.py 2008-06-05 19:50:25 UTC (rev 2538)
@@ -29,6 +29,7 @@
import datetime
import os
import time
+import hashlib
try:
import sqlite3 as sqlite
@@ -37,12 +38,18 @@
from vobject.icalendar import utc
+from twisted.internet.defer import maybeDeferred, succeed
+from twisted.internet.protocol import ClientCreator
+
+from twistedcaldav.memcachepool import CachePoolUserMixIn
+
from twistedcaldav.ical import Component
from twistedcaldav.query import calendarquery
from twistedcaldav.sql import AbstractSQLDatabase
from twistedcaldav.sql import db_prefix
from twistedcaldav import caldavxml
-from twistedcaldav.log import Logger
+from twistedcaldav.log import Logger, LoggingMixIn
+from twistedcaldav.config import config
log = Logger()
@@ -179,7 +186,7 @@
for name in self.resourceNamesForUID(uid):
assert result is None, "More than one resource with UID %s in calendar collection %r" % (uid, self)
result = name
-
+
return result
def resourceUIDForName(self, name):
@@ -221,7 +228,7 @@
if uid is not None:
self._delete_from_db(name, uid)
self._db_commit()
-
+
def resourceExists(self, name):
"""
Determines whether the specified resource name exists in the index.
@@ -230,7 +237,7 @@
"""
uid = self._db_value_for_sql("select UID from RESOURCE where NAME = :1", name)
return uid is not None
-
+
def resourcesExist(self, names):
"""
Determines whether the specified resource name exists in the index.
@@ -245,13 +252,13 @@
statement += ")"
results = self._db_values_for_sql(statement, *names)
return results
-
+
def searchValid(self, filter):
if isinstance(filter, caldavxml.Filter):
qualifiers = calendarquery.sqlcalendarquery(filter)
else:
qualifiers = None
-
+
return qualifiers is not None
def search(self, filter):
@@ -265,7 +272,7 @@
"""
# FIXME: Don't forget to use maximum_future_expansion_duration when we
# start caching...
-
+
# Make sure we have a proper Filter element and get the partial SQL statement to use.
if isinstance(filter, caldavxml.Filter):
qualifiers = calendarquery.sqlcalendarquery(filter)
@@ -275,7 +282,7 @@
rowiter = self._db_execute("select DISTINCT RESOURCE.NAME, RESOURCE.UID, RESOURCE.TYPE" + qualifiers[0], *qualifiers[1])
else:
rowiter = self._db_execute("select NAME, UID, TYPE from RESOURCE")
-
+
for row in rowiter:
name = row[0]
if self.resource.getChild(name.encode("utf-8")):
@@ -290,7 +297,7 @@
@return: the schema version assigned to this index.
"""
return schema_version
-
+
def _add_to_db(self, name, calendar, cursor = None):
"""
Records the given calendar resource in the index with the given name.
@@ -303,7 +310,7 @@
contents.
"""
raise NotImplementedError
-
+
def _delete_from_db(self, name, uid):
"""
Deletes the specified entry from all dbs.
@@ -311,12 +318,12 @@
@param uid: the uid of the resource to delete.
"""
raise NotImplementedError
-
+
class CalendarIndex (AbstractCalendarIndex):
"""
Calendar index - abstract class for indexer that indexes calendar objects in a collection.
"""
-
+
def __init__(self, resource):
"""
@param resource: the L{twistedcaldav.static.CalDAVFile} resource to
@@ -426,7 +433,7 @@
values (:1, :2, :3, :4)
""", name, uid, calendar.resourceType(), instances.limit
)
-
+
def _delete_from_db(self, name, uid):
"""
Deletes the specified entry from all dbs.
@@ -435,27 +442,85 @@
"""
self._db_execute("delete from TIMESPAN where NAME = :1", name)
self._db_execute("delete from RESOURCE where NAME = :1", name)
-
-class Index (CalendarIndex):
- """
- Calendar collection index - regular collection that enforces CalDAV UID uniqueness requirement.
- """
-
- def __init__(self, resource):
- """
- @param resource: the L{twistedcaldav.static.CalDAVFile} resource to
- index. C{resource} must be a calendar collection (ie.
- C{resource.isPseudoCalendarCollection()} returns C{True}.)
- """
- assert resource.isCalendarCollection(), "non-calendar collection resource %s has no index." % (resource,)
- super(Index, self).__init__(resource)
- #
- # A dict of sets. The dict keys are calendar collection paths,
- # and the sets contains reserved UIDs for each path.
- #
-
+
+def wrapInDeferred(f):
+ def _(*args, **kwargs):
+ return maybeDeferred(f, *args, **kwargs)
+
+ return _
+
+
+class MemcachedUIDReserver(CachePoolUserMixIn, LoggingMixIn):
+ def __init__(self, index, cachePool=None):
+ self.index = index
+ self._cachePool = cachePool
+
+ def _key(self, uid):
+ return 'reservation:%s' % (
+ hashlib.md5('%s:%s' % (uid,
+ self.index.resource.fp.path)).hexdigest())
+
def reserveUID(self, uid):
+ uid = uid.encode('utf-8')
+ self.log_debug("Reserving UID %r @ %r" % (
+ uid,
+ self.index.resource.fp.path))
+ def _handleFalse(result):
+ if result is False:
+ raise ReservationError(
+ "UID %s already reserved for calendar collection %s."
+ % (uid, self.index.resource)
+ )
+
+ d = self.getCachePool().add(self._key(uid),
+ 'reserved',
+ expireTime=reservation_timeout_secs)
+ d.addCallback(_handleFalse)
+ return d
+
+
+ def unreserveUID(self, uid):
+ uid = uid.encode('utf-8')
+ self.log_debug("Unreserving UID %r @ %r" % (
+ uid,
+ self.index.resource.fp.path))
+ def _handleFalse(result):
+ if result is False:
+ raise ReservationError(
+ "UID %s is not reserved for calendar collection %s."
+ % (uid, self.index.resource)
+ )
+
+ d =self.getCachePool().delete(self._key(uid))
+ d.addCallback(_handleFalse)
+ return d
+
+
+ def isReservedUID(self, uid):
+ uid = uid.encode('utf-8')
+ self.log_debug("Is reserved UID %r @ %r" % (
+ uid,
+ self.index.resource.fp.path))
+
+ def _checkValue((flags, value)):
+ if value is None:
+ return False
+ else:
+ return True
+
+ d = self.getCachePool().get(self._key(uid))
+ d.addCallback(_checkValue)
+ return d
+
+
+
+class SQLUIDReserver(object):
+ def __init__(self, index):
+ self.index = index
+
+ @wrapInDeferred
+ def reserveUID(self, uid):
"""
Reserve a UID for this index's resource.
@param uid: the UID to reserve
@@ -463,48 +528,56 @@
"""
try:
- self._db_execute("insert into RESERVED (UID, TIME) values (:1, :2)", uid, datetime.datetime.now())
- self._db_commit()
+ self.index._db_execute("insert into RESERVED (UID, TIME) values (:1, :2)", uid, datetime.datetime.now())
+ self.index._db_commit()
except sqlite.IntegrityError:
- self._db_rollback()
+ self.index._db_rollback()
raise ReservationError(
"UID %s already reserved for calendar collection %s."
- % (uid, self.resource)
+ % (uid, self.index.resource)
)
except sqlite.Error, e:
log.err("Unable to reserve UID: %s", (e,))
- self._db_rollback()
+ self.index._db_rollback()
raise
-
+
def unreserveUID(self, uid):
"""
Unreserve a UID for this index's resource.
@param uid: the UID to reserve
@raise ReservationError: if C{uid} is not reserved
"""
-
- if not self.isReservedUID(uid):
- raise ReservationError(
- "UID %s is not reserved for calendar collection %s."
- % (uid, self.resource)
- )
- try:
- self._db_execute("delete from RESERVED where UID = :1", uid)
- self._db_commit()
- except sqlite.Error, e:
- log.err("Unable to unreserve UID: %s", (e,))
- self._db_rollback()
- raise
-
+ def _cb(result):
+ if result == False:
+ raise ReservationError(
+ "UID %s is not reserved for calendar collection %s."
+ % (uid, self.index.resource)
+ )
+ else:
+ try:
+ self.index._db_execute(
+ "delete from RESERVED where UID = :1", uid)
+ self.index._db_commit()
+ except sqlite.Error, e:
+ log.err("Unable to unreserve UID: %s", (e,))
+ self.index._db_rollback()
+ raise
+
+ d = self.isReservedUID(uid)
+ d.addCallback(_cb)
+ return d
+
+
+ @wrapInDeferred
def isReservedUID(self, uid):
"""
Check to see whether a UID is reserved.
@param uid: the UID to check
@return: True if C{uid} is reserved, False otherwise.
"""
-
- rowiter = self._db_execute("select UID, TIME from RESERVED where UID = :1", uid)
+
+ rowiter = self.index._db_execute("select UID, TIME from RESERVED where UID = :1", uid)
for uid, attime in rowiter:
# Double check that the time is within a reasonable period of now
# otherwise we probably have a stale reservation
@@ -512,18 +585,57 @@
dt = datetime.datetime(year=tm.tm_year, month=tm.tm_mon, day=tm.tm_mday, hour=tm.tm_hour, minute=tm.tm_min, second = tm.tm_sec)
if datetime.datetime.now() - dt > datetime.timedelta(seconds=reservation_timeout_secs):
try:
- self._db_execute("delete from RESERVED where UID = :1", uid)
- self._db_commit()
+ self.index._db_execute("delete from RESERVED where UID = :1", uid)
+ self.index._db_commit()
except sqlite.Error, e:
log.err("Unable to unreserve UID: %s", (e,))
- self._db_rollback()
+ self.index._db_rollback()
raise
return False
else:
return True
return False
-
+
+
+
+class Index (CalendarIndex):
+ """
+ Calendar collection index - regular collection that enforces CalDAV UID uniqueness requirement.
+ """
+
+ def __init__(self, resource):
+ """
+ @param resource: the L{twistedcaldav.static.CalDAVFile} resource to
+ index. C{resource} must be a calendar collection (ie.
+ C{resource.isPseudoCalendarCollection()} returns C{True}.)
+ """
+ assert resource.isCalendarCollection(), "non-calendar collection resource %s has no index." % (resource,)
+ super(Index, self).__init__(resource)
+
+ if config.Memcached['ClientEnabled']:
+ self.reserver = MemcachedUIDReserver(self)
+
+ else:
+ self.reserver = SQLUIDReserver(self)
+
+ #
+ # A dict of sets. The dict keys are calendar collection paths,
+ # and the sets contains reserved UIDs for each path.
+ #
+
+ def reserveUID(self, uid):
+ return self.reserver.reserveUID(uid)
+
+
+ def unreserveUID(self, uid):
+ return self.reserver.unreserveUID(uid)
+
+
+ def isReservedUID(self, uid):
+ return self.reserver.isReservedUID(uid)
+
+
def isAllowedUID(self, uid, *names):
"""
Checks to see whether to allow an operation which would add the
@@ -537,19 +649,19 @@
"""
rname = self.resourceNameForUID(uid)
return (rname is None or rname in names)
-
+
def _db_type(self):
"""
@return: the collection type assigned to this index.
"""
return collection_types["Calendar"]
-
+
def _db_init_data_tables(self, q):
"""
Initialise the underlying database tables.
@param q: a database cursor to use.
"""
-
+
# Create database where the RESOURCE table has unique UID column.
self._db_init_data_tables_base(q, True)
@@ -557,7 +669,7 @@
"""
Re-create the database tables from existing calendar data.
"""
-
+
#
# Populate the DB with data from already existing resources.
# This allows for index recovery if the DB file gets
@@ -586,7 +698,7 @@
self.addResource(name, calendar, True)
finally:
stream.close()
-
+
# Do commit outside of the loop for better performance
self._db_commit()
@@ -609,30 +721,30 @@
@param uid: the UID to reserve
@raise ReservationError: if C{uid} is already reserved
"""
-
+
# iTIP does not require unique UIDs
pass
-
+
def unreserveUID(self, uid): #@UnusedVariable
"""
Unreserve a UID for this index's resource.
@param uid: the UID to reserve
@raise ReservationError: if C{uid} is not reserved
"""
-
+
# iTIP does not require unique UIDs
pass
-
+
def isReservedUID(self, uid): #@UnusedVariable
"""
Check to see whether a UID is reserved.
@param uid: the UID to check
@return: True if C{uid} is reserved, False otherwise.
"""
-
+
# iTIP does not require unique UIDs
return False
-
+
def isAllowedUID(self, uid, *names): #@UnusedVariable
"""
Checks to see whether to allow an operation with adds the the specified
@@ -645,22 +757,22 @@
@return: True if the UID is not in the index and is not reserved,
False otherwise.
"""
-
+
# iTIP does not require unique UIDs
return True
-
+
def _db_type(self):
"""
@return: the collection type assigned to this index.
"""
return collection_types["iTIP"]
-
+
def _db_init_data_tables(self, q):
"""
Initialise the underlying database tables.
@param q: a database cursor to use.
"""
-
+
# Create database where the RESOURCE table has a UID column that is not unique.
self._db_init_data_tables_base(q, False)
@@ -668,7 +780,7 @@
"""
Re-create the database tables from existing calendar data.
"""
-
+
#
# Populate the DB with data from already existing resources.
# This allows for index recovery if the DB file gets
Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py 2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/method/put_common.py 2008-06-05 19:50:25 UTC (rev 2538)
@@ -417,7 +417,9 @@
failure_count = 0
while(failure_count < 10):
try:
- destination_index.reserveUID(uid)
+ d = waitForDeferred(destination_index.reserveUID(uid))
+ yield d
+ d.getResult()
reserved = True
break
except ReservationError:
@@ -663,18 +665,22 @@
rollback.Commit()
if reserved:
- destination_index.unreserveUID(uid)
+ d = waitForDeferred(destination_index.unreserveUID(uid))
+ yield d
+ d.getResult()
reserved = False
yield response
return
- except:
+ except Exception, err:
if reserved:
- destination_index.unreserveUID(uid)
+ d = waitForDeferred(destination_index.unreserveUID(uid))
+ yield d
+ d.getResult()
reserved = False
# Roll back changes to original server state. Note this may do nothing
# if the rollback has already ocurred or changes already committed.
rollback.Rollback()
- raise
+ raise err
Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py 2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/test_index.py 2008-06-05 19:50:25 UTC (rev 2538)
@@ -15,51 +15,89 @@
##
from twistedcaldav.index import Index
+from twistedcaldav import index
import twistedcaldav.test.util
-from twistedcaldav.index import ReservationError
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
+from twistedcaldav.index import ReservationError, MemcachedUIDReserver
+from twisted.web2.test.test_http import deferLater
import time
-class TestIndex (twistedcaldav.test.util.TestCase):
+class SQLIndexTests (twistedcaldav.test.util.TestCase):
"""
Test abstract SQL DB class
"""
-
+
def setUp(self):
- super(TestIndex, self).setUp()
+ super(SQLIndexTests, self).setUp()
self.site.resource.isCalendarCollection = lambda: True
+ self.db = Index(self.site.resource)
+
def test_reserve_uid_ok(self):
uid = "test-test-test"
- db = Index(self.site.resource)
- self.assertFalse(db.isReservedUID(uid))
- db.reserveUID(uid)
- self.assertTrue(db.isReservedUID(uid))
- db.unreserveUID(uid)
- self.assertFalse(db.isReservedUID(uid))
+ d = self.db.isReservedUID(uid)
+ d.addCallback(self.assertFalse)
+ d.addCallback(lambda _: self.db.reserveUID(uid))
+ d.addCallback(lambda _: self.db.isReservedUID(uid))
+ d.addCallback(self.assertTrue)
+ d.addCallback(lambda _: self.db.unreserveUID(uid))
+ d.addCallback(lambda _: self.db.isReservedUID(uid))
+ d.addCallback(self.assertFalse)
+ return d
+
+
def test_reserve_uid_twice(self):
uid = "test-test-test"
- db = Index(self.site.resource)
- db.reserveUID(uid)
- self.assertTrue(db.isReservedUID(uid))
- self.assertRaises(ReservationError, db.reserveUID, uid)
+ d = self.db.reserveUID(uid)
+ d.addCallback(lambda _: self.db.isReservedUID(uid))
+ d.addCallback(self.assertTrue)
+ d.addCallback(lambda _:
+ self.assertFailure(self.db.reserveUID(uid),
+ ReservationError))
+ return d
+
def test_unreserve_unreserved(self):
uid = "test-test-test"
- db = Index(self.site.resource)
- self.assertRaises(ReservationError, db.unreserveUID, uid)
+ return self.assertFailure(self.db.unreserveUID(uid),
+ ReservationError)
+
def test_reserve_uid_timeout(self):
+ # WARNING: This test is fundamentally flawed and will fail
+ # intermittently because it uses the real clock.
uid = "test-test-test"
old_timeout = twistedcaldav.index.reservation_timeout_secs
- twistedcaldav.index.reservation_timeout_secs = 2
- try:
- db = Index(self.site.resource)
- self.assertFalse(db.isReservedUID(uid))
- db.reserveUID(uid)
- self.assertTrue(db.isReservedUID(uid))
- time.sleep(3)
- self.assertFalse(db.isReservedUID(uid))
- finally:
+ twistedcaldav.index.reservation_timeout_secs = 1
+
+ def _finally(result):
twistedcaldav.index.reservation_timeout_secs = old_timeout
+ return result
+
+ d = self.db.isReservedUID(uid)
+ d.addCallback(self.assertFalse)
+ d.addCallback(lambda _: self.db.reserveUID(uid))
+ d.addCallback(lambda _: self.db.isReservedUID(uid))
+ d.addCallback(self.assertTrue)
+ d.addCallback(lambda _: deferLater(2))
+ d.addCallback(lambda _: self.db.isReservedUID(uid))
+ d.addCallback(self.assertFalse)
+ d.addBoth(_finally)
+
+ return d
+
+
+
+class MemcacheTests(SQLIndexTests):
+ def setUp(self):
+ super(MemcacheTests, self).setUp()
+ self.memcache = InMemoryMemcacheProtocol()
+ self.db.reserver = MemcachedUIDReserver(self.db, self.memcache)
+
+
+ def tearDown(self):
+ for k, v in self.memcache._timeouts.iteritems():
+ if v.active():
+ v.cancel()
Modified: CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py 2008-06-05 19:49:59 UTC (rev 2537)
+++ CalendarServer/branches/memcache-uid-reservation/twistedcaldav/test/util.py 2008-06-05 19:50:25 UTC (rev 2538)
@@ -79,3 +79,69 @@
def changed(self):
self.changedCount += 1
return succeed(True)
+
+
+
+class InMemoryMemcacheProtocol(object):
+ def __init__(self, reactor=None):
+ self._cache = {}
+
+ if reactor is None:
+ from twisted.internet import reactor
+
+ self._reactor = reactor
+
+ self._timeouts = {}
+
+ def get(self, key):
+ if key not in self._cache:
+ return succeed((0, None))
+
+ return succeed(self._cache[key])
+
+
+ def _timeoutKey(self, expireTime, key):
+ def _removeKey():
+ del self._cache[key]
+
+ if expireTime > 0:
+ if key in self._timeouts:
+ self._timeouts[key].cancel()
+
+ from twisted.internet.base import DelayedCall
+ DelayedCall.debug = True
+
+ self._timeouts[key] = self._reactor.callLater(
+ expireTime,
+ _removeKey)
+
+
+ def set(self, key, value, flags=0, expireTime=0):
+ try:
+ self._cache[key] = (flags, value)
+
+ self._timeoutKey(expireTime, key)
+
+ return succeed(True)
+
+ except Exception, err:
+ return fail(Failure())
+
+
+ def add(self, key, value, flags=0, expireTime=0):
+ if key in self._cache:
+ return succeed(False)
+
+ return self.set(key, value, flags=flags, expireTime=expireTime)
+
+
+ def delete(self, key):
+ try:
+ del self._cache[key]
+ if key in self._timeouts:
+ self._timeouts[key].cancel()
+ return succeed(True)
+
+ except:
+ return succeed(False)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080605/208cf025/attachment-0001.htm
More information about the calendarserver-changes
mailing list