[CalendarServer-changes] [8287] CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/ twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Mon Nov 14 14:18:26 PST 2011
Revision: 8287
http://trac.macosforge.org/projects/calendarserver/changeset/8287
Author: cdaboo at apple.com
Date: 2011-11-14 14:18:26 -0800 (Mon, 14 Nov 2011)
Log Message:
-----------
Remove memcachefifo prior to merge with trunk.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py
Removed Paths:
-------------
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py
Deleted: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py 2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py 2011-11-14 22:18:26 UTC (rev 8287)
@@ -1,201 +0,0 @@
-##
-# Copyright (c) 2008-2009 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-from twistedcaldav.memcacher import Memcacher
-from twisted.internet.defer import inlineCallbacks, Deferred, returnValue,\
- succeed
-from twisted.internet import reactor
-import time
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
-import cPickle
-
-class MemcacheFIFOLock(Memcacher):
- """
- Implements a shared lock with a queue such that lock requests are honored in the order
- in which they are received. This differs from MemcacheLock which does not honor ordering
- of lock requests.
-
- The implementation here uses two memcached entries. One represent a ticket counter - a
- counter incremented and returned each time a lock is requested. The other is the
- "next in line queue" - a sorted list of pending counters. To get a lock the caller
- is given the next ticket value, then polls waiting for their number to come up as
- "next in line". When it does they are given the lock, and upon releasing it their
- "next in line" value is popped off the front of the queue, giving the next a chance
- to get the lock.
-
- One complication here: making sure that we don't get the queue stuck waiting on a
- "next in line" that has gone away - so we need to be careful about maintaining the
- next in line queue appropriately.
-
- TODO: have a timeout for the next in line process to grab the lock. That way if that
- process has dies, at least the others will eventually get a lock.
-
- Note that this is really a temporary solution meant to address the need to order scheduling
- requests in order to be "fair" to clients. Ultimately we want a persistent scheduling queue
- implementation that would effectively manage the next in line work loads and would not depend
- on the presence of any one working process.
- """
-
- def __init__(self, namespace, locktoken, timeout=60.0, retry_interval=0.01, expire_time=0):
- """
-
- @param namespace: a unique namespace for this lock's tokens
- @type namespace: C{str}
- @param locktoken: the name of the locktoken
- @type locktoken: C{str}
- @param timeout: the maximum time in seconds that the lock should block
- @type timeout: C{float}
- @param retry_interval: the interval to retry acquiring the lock
- @type retry_interval: C{float}
- @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
- @type expiryTime: C{float}
- """
-
- super(MemcacheFIFOLock, self).__init__(namespace)
- self._locktoken = locktoken
- self._ticket_token = "%s-ticket" % (self._locktoken,)
- self._nextinline_token = "%s-next" % (self._locktoken,)
- self._queue_token = "%s-queue" % (self._locktoken,)
- self._timeout = timeout
- self._retry_interval = retry_interval
- self._expire_time = expire_time
- self._hasLock = False
- self._ticket = None
-
- def _getMemcacheProtocol(self):
-
- result = super(MemcacheFIFOLock, self)._getMemcacheProtocol()
-
- if isinstance(result, Memcacher.nullCacher):
- raise AssertionError("No implementation of shared locking without memcached")
-
- return result
-
- @inlineCallbacks
- def acquire(self):
-
- assert not self._hasLock, "Lock already acquired."
-
- # First make sure the ticket and queue keys exist
- yield self.add(self._ticket_token, "1", self._expire_time)
- yield self.add(self._nextinline_token, "0", self._expire_time)
-
- # Get the next ticket
- self._ticket = (yield self.incr(self._ticket_token)) - 1
-
- # Add ourselves to the pending queue
- yield self._addToQueue()
-
- timeout_at = time.time() + self._timeout
- waiting = False
- while True:
-
- # Check next in line value
- result = int((yield self.get(self._nextinline_token)))
- if result == self._ticket:
- self._hasLock = True
- if waiting:
- self.log_debug("Got lock after waiting on %s" % (self._locktoken,))
- break
-
- if self._timeout and time.time() < timeout_at:
- waiting = True
- self.log_debug("Waiting for lock on %s" % (self._locktoken,))
- pause = Deferred()
- def _timedDeferred():
- pause.callback(True)
- reactor.callLater(self._retry_interval, _timedDeferred)
- yield pause
- else:
- # Must remove our active ticket value otherwise the next in line will stall on
- # this lock which will never happen
- yield self._removeFromQueue()
-
- self.log_debug("Timed out lock after waiting on %s" % (self._locktoken,))
- raise MemcacheLockTimeoutError()
-
- returnValue(True)
-
- @inlineCallbacks
- def release(self):
-
- assert self._hasLock, "Lock not acquired."
- self._hasLock = False
-
- # Remove active ticket value - this will bump the next in line value
- yield self._removeFromQueue()
-
-
- def clean(self):
-
- if self._hasLock:
- return self.release()
- else:
- return succeed(True)
-
- def locked(self):
- """
- Test if the lock is currently being held.
- """
-
- return succeed(self._hasLock)
-
- @inlineCallbacks
- def _addToQueue(self):
- """
- Add our ticket to the queue. If it is now the first pending ticket, set the next in line
- value to that.
- """
-
- # We need a shared lock to protect access to the queue
- lock = MemcacheLock(self._namespace, self._locktoken)
- yield lock.acquire()
-
- try:
- queued_value = (yield self.get(self._queue_token))
- queued_items = cPickle.loads(queued_value) if queued_value is not None else []
- queued_items.append(self._ticket)
- queued_items.sort()
- if len(queued_items) == 1:
- yield self.set(self._nextinline_token, str(queued_items[0]))
- yield self.set(self._queue_token, cPickle.dumps(queued_items))
- finally:
- yield lock.release()
-
- @inlineCallbacks
- def _removeFromQueue(self):
- """
- Remove our ticket from the queue. If it was the first next in line value, then bump
- next in line to the new head of the queue value or reset it if the queue is empty.
- """
-
- # We need a shared lock to protect access to the queue
- lock = MemcacheLock(self._namespace, self._locktoken)
- yield lock.acquire()
-
- try:
- queued_value = (yield self.get(self._queue_token))
- queued_items = cPickle.loads(queued_value) if queued_value is not None else []
-
- if queued_items[0] == self._ticket:
- del queued_items[0]
- yield self.set(self._nextinline_token, str(queued_items[0] if queued_items else 0))
- else:
- queued_items.remove(self._ticket)
- queued_items.sort()
- yield self.set(self._queue_token, cPickle.dumps(queued_items))
- finally:
- yield lock.release()
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py 2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py 2011-11-14 22:18:26 UTC (rev 8287)
@@ -53,8 +53,7 @@
from twistedcaldav.ical import Component, Property
from twistedcaldav.instance import TooManyInstancesError,\
InvalidOverriddenInstanceError
-from twistedcaldav.memcachelock import MemcacheLockTimeoutError
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
+from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
from twistedcaldav.scheduling.implicit import ImplicitScheduler
log = Logger()
@@ -67,7 +66,7 @@
if internal_request:
self.lock = None
else:
- self.lock = MemcacheFIFOLock("ImplicitUIDLock", uid, timeout=60.0, retry_interval=0.01, expire_time=5*60)
+ self.lock = MemcacheLock("ImplicitUIDLock", uid, timeout=60.0, expire_time=5*60)
self.reserved = False
self.index = index
self.uid = uid
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py 2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py 2011-11-14 22:18:26 UTC (rev 8287)
@@ -38,7 +38,6 @@
from pycalendar.duration import PyCalendarDuration
from pycalendar.datetime import PyCalendarDateTime
from pycalendar.timezone import PyCalendarTimezone
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
__all__ = [
"ImplicitProcessor",
@@ -257,7 +256,7 @@
# We need to get the UID lock for implicit processing whilst we send the auto-reply
# as the Organizer processing will attempt to write out data to other attendees to
# refresh them. To prevent a race we need a lock.
- uidlock = MemcacheFIFOLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
+ uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
try:
yield uidlock.acquire()
@@ -515,7 +514,7 @@
# We need to get the UID lock for implicit processing whilst we send the auto-reply
# as the Organizer processing will attempt to write out data to other attendees to
# refresh them. To prevent a race we need a lock.
- lock = MemcacheFIFOLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0, expire_time=5*60)
+ lock = MemcacheLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0, expire_time=5*60)
# Note that this lock also protects the request, as this request is
# being re-used by potentially multiple transactions and should not be
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py 2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py 2011-11-14 22:18:26 UTC (rev 8287)
@@ -44,8 +44,7 @@
from twistedcaldav.config import config
from twistedcaldav.ical import Component as VCalendar, Property as VProperty,\
InvalidICalendarDataError, iCalendarProductID
-from twistedcaldav.memcachelock import MemcacheLockTimeoutError
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
+from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
from twistedcaldav.method.put_addressbook_common import StoreAddressObjectResource
from twistedcaldav.method.put_common import StoreCalendarObjectResource
from twistedcaldav.notifications import (
@@ -1876,7 +1875,7 @@
)
)
if do_implicit_action:
- lock = MemcacheFIFOLock(
+ lock = MemcacheLock(
"ImplicitUIDLock", calendar.resourceUID(), timeout=60.0, expire_time=5*60
)
Deleted: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py 2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py 2011-11-14 22:18:26 UTC (rev 8287)
@@ -1,249 +0,0 @@
-##
-# Copyright (c) 2011 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-from twisted.internet.defer import inlineCallbacks, Deferred
-
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
-
-from twistedcaldav.test.util import TestCase
-import cPickle
-from twistedcaldav.memcachelock import MemcacheLockTimeoutError
-from twistedcaldav.memcacher import Memcacher
-from twisted.internet import reactor
-
-class MemCacheTestCase(TestCase):
- """
- Test client protocol class L{MemCacheProtocol}.
- """
-
- @inlineCallbacks
- def test_one_lock(self):
-
- lock = MemcacheFIFOLock("lock", "test")
- lock.allowTestCache = True
-
- yield lock.acquire()
- self.assertTrue((yield lock.locked()))
-
- yield lock.release()
- self.assertFalse((yield lock.locked()))
-
- self.assertEqual((yield lock.get(lock._ticket_token)), "2")
- self.assertEqual((yield lock.get(lock._nextinline_token)), "0")
- self.assertEqual(cPickle.loads((yield lock.get(lock._queue_token))), [])
-
- @inlineCallbacks
- def test_two_locks_with_timeout(self):
-
- lock1 = MemcacheFIFOLock("lock", "test")
- lock1.allowTestCache = True
-
- lock2 = MemcacheFIFOLock("lock", "test", timeout=0.01)
- lock2.allowTestCache = True
-
- lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
-
- yield lock1.acquire()
- self.assertTrue((yield lock1.locked()))
-
- try:
- yield lock2.acquire()
- except MemcacheLockTimeoutError:
- pass
- else:
- self.fail("Did not timeout lock")
-
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
-
- self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
- self.assertEqual((yield lock1.get(lock1._nextinline_token)), "1")
- self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [1,])
-
- yield lock1.release()
- self.assertFalse((yield lock1.locked()))
-
- self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
- self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
- self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
- @inlineCallbacks
- def test_two_locks_in_sequence(self):
-
- lock1 = MemcacheFIFOLock("lock", "test")
- lock1.allowTestCache = True
-
- lock2 = MemcacheFIFOLock("lock", "test")
- lock2.allowTestCache = True
-
- lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
-
- yield lock1.acquire()
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
-
- yield lock1.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
-
- yield lock2.acquire()
- self.assertFalse((yield lock1.locked()))
- self.assertTrue((yield lock2.locked()))
-
- yield lock2.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
-
- self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
- self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
- self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
- @inlineCallbacks
- def test_two_locks_in_parallel(self):
-
- lock1 = MemcacheFIFOLock("lock", "test", timeout=1.0)
- lock1.allowTestCache = True
-
- lock2 = MemcacheFIFOLock("lock", "test", timeout=1.0)
- lock2.allowTestCache = True
-
- lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
-
- yield lock1.acquire()
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
-
- @inlineCallbacks
- def _release1():
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- yield lock1.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- reactor.callLater(0.1, _release1)
-
- yield lock2.acquire()
- self.assertFalse((yield lock1.locked()))
- self.assertTrue((yield lock2.locked()))
-
- yield lock2.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
-
- self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
- self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
- self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
-
- @inlineCallbacks
- def test_three_in_order(self):
- """
- This tests overlaps a lock on #1 with two attempts on #2 and #3. #3 has
- a very much shorter polling interval than #2 so when #1 releases, #3 will
- poll first. We want to make sure that #3 is not given the lock until after
- #2 has polled and acquired/releases it. i.e., this tests the FIFO behavior.
- """
-
- lock1 = MemcacheFIFOLock("lock", "test", timeout=2.0)
- lock1.allowTestCache = True
-
- lock2 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.5)
- lock2.allowTestCache = True
-
- lock3 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.01) # retry a lot faster than #2
- lock3.allowTestCache = True
-
- lock1._memcacheProtocol = \
- lock2._memcacheProtocol = \
- lock3._memcacheProtocol = Memcacher.memoryCacher()
-
- d = Deferred()
-
- yield lock1.acquire()
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- @inlineCallbacks
- def _release1():
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
- yield lock1.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
- self.assertEqual((yield lock1.get(lock1._nextinline_token)), "2")
- self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [2, 3])
- reactor.callLater(0.1, _release1)
-
- @inlineCallbacks
- def _acquire2():
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- @inlineCallbacks
- def _release2():
- self.assertFalse((yield lock1.locked()))
- self.assertTrue((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
- yield lock2.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- yield lock2.acquire()
- reactor.callLater(0.1, _release2)
- self.assertFalse((yield lock1.locked()))
- self.assertTrue((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- reactor.callLater(0.01, _acquire2)
-
- @inlineCallbacks
- def _acquire3():
- self.assertTrue((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- @inlineCallbacks
- def _release3():
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertTrue((yield lock3.locked()))
- yield lock3.release()
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertFalse((yield lock3.locked()))
-
- self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
- self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
- self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
- d.callback(True)
-
- yield lock3.acquire()
- reactor.callLater(0.1, _release3)
- self.assertFalse((yield lock1.locked()))
- self.assertFalse((yield lock2.locked()))
- self.assertTrue((yield lock3.locked()))
-
- reactor.callLater(0.02, _acquire3)
-
- yield d
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111114/6bbeaed2/attachment-0001.html>
More information about the calendarserver-changes
mailing list