[CalendarServer-changes] [10575] CalendarServer/branches/users/glyph/unshare-when-access-revoked

source_changes at macosforge.org source_changes at macosforge.org
Mon Jan 28 19:28:56 PST 2013


Revision: 10575
          http://trac.calendarserver.org//changeset/10575
Author:   glyph at apple.com
Date:     2013-01-28 19:28:56 -0800 (Mon, 28 Jan 2013)
Log Message:
-----------
Improve factoring of test, fakes

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/unshare-when-access-revoked/twistedcaldav/test/test_sharing.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/unshare-when-access-revoked/

Modified: CalendarServer/branches/users/glyph/unshare-when-access-revoked/twistedcaldav/test/test_sharing.py
===================================================================
--- CalendarServer/branches/users/glyph/unshare-when-access-revoked/twistedcaldav/test/test_sharing.py	2013-01-29 03:28:55 UTC (rev 10574)
+++ CalendarServer/branches/users/glyph/unshare-when-access-revoked/twistedcaldav/test/test_sharing.py	2013-01-29 03:28:56 UTC (rev 10575)
@@ -41,6 +41,7 @@
 from twistedcaldav.directory.aggregate import AggregateDirectoryService
 from twistedcaldav.directory.principal import DirectoryPrincipalProvisioningResource
 from twistedcaldav.directory.calendar import DirectoryCalendarHomeProvisioningResource
+from twistedcaldav.test.test_cache import StubResponseCacheResource
 
 
 sharedOwnerType = davxml.ResourceType.sharedownercalendar #@UndefinedVariable
@@ -138,61 +139,69 @@
 
 
 
-class SharingTests(HomeTestCase):
+class FakeHome(object):
+    def removeShareByUID(self, request, uid):
+        pass
 
-    class FakePrincipal(object):
 
-        class FakeRecord(object):
 
-            def __init__(self, name, cuaddr):
-                self.fullName = name
-                self.guid = name
-                self.calendarUserAddresses = set((cuaddr,))
+class FakeRecord(object):
 
-        def __init__(self, cuaddr, test):
-            if cuaddr.startswith("mailto:"):
-                name = cuaddr[7:].split('@')[0]
-            elif cuaddr.startswith("urn:uuid:"):
-                name = cuaddr[9:]
-            else:
-                name = cuaddr
+    def __init__(self, name, cuaddr):
+        self.fullName = name
+        self.guid = name
+        self.calendarUserAddresses = set((cuaddr,))
 
-            self.path = "/principals/__uids__/%s" % (name,)
-            self.homepath = "/calendars/__uids__/%s" % (name,)
-            self.displayname = name.upper()
-            self.record = self.FakeRecord(name, cuaddr)
-            self._test = test
-            self._name = name
 
 
-        @inlineCallbacks
-        def calendarHome(self, request):
-            a, seg = yield self._test.homeProvisioner.locateChild(request,
-                                                                  ["__uids__"])
-            b, seg = yield a.locateChild(request, [self._name])
-            if b is None:
-                # XXX all tests except test_noWikiAccess currently rely on the
-                # fake thing here.
-                class FakeHome(object):
-                    def removeShareByUID(self, request, uid):
-                        pass
-                returnValue(FakeHome())
-            returnValue(b)
+class FakePrincipal(object):
 
+    def __init__(self, cuaddr, test):
+        if cuaddr.startswith("mailto:"):
+            name = cuaddr[7:].split('@')[0]
+        elif cuaddr.startswith("urn:uuid:"):
+            name = cuaddr[9:]
+        else:
+            name = cuaddr
 
-        def principalURL(self):
-            return self.path
+        self.path = "/principals/__uids__/%s" % (name,)
+        self.homepath = "/calendars/__uids__/%s" % (name,)
+        self.displayname = name.upper()
+        self.record = FakeRecord(name, cuaddr)
+        self._test = test
+        self._name = name
 
-        def principalUID(self):
-            return self.record.guid
 
-        def displayName(self):
-            return self.displayname
+    @inlineCallbacks
+    def calendarHome(self, request):
+        a, seg = yield self._test.homeProvisioner.locateChild(request,
+                                                              ["__uids__"])
+        b, seg = yield a.locateChild(request, [self._name])
+        if b is None:
+            # XXX all tests except test_noWikiAccess currently rely on the
+            # fake thing here.
+            returnValue(FakeHome())
+        returnValue(b)
 
 
+    def principalURL(self):
+        return self.path
+
+
+    def principalUID(self):
+        return self.record.guid
+
+
+    def displayName(self):
+        return self.displayname
+
+
+
+class SharingTests(HomeTestCase):
+
     def configure(self):
         """
-        Turn on sharing.
+        Override configuration hook to turn on sharing.
         """
         super(SharingTests, self).configure()
         self.patch(config.Sharing, "Enabled", True)
@@ -249,7 +258,7 @@
 
         @patched
         def principalForUID(resourceSelf, principalUID):
-            return SharingTests.FakePrincipal("urn:uuid:" + principalUID, self)
+            return FakePrincipal("urn:uuid:" + principalUID, self)
 
 
     def createDataStore(self):
@@ -269,7 +278,7 @@
 
     @inlineCallbacks
     def do(self, method, path="/", body="", mimetype="text", subtype="xml",
-           resultcode=responsecode.OK):
+           resultcode=responsecode.OK, headers=()):
         """
         Do a simple request.
 
@@ -291,13 +300,20 @@
 
         @param resultcode: The expected result code for the response to the
             request.
+        @type resultcode: L{int}
 
+        @param headers: An iterable of 2-tuples of C{(header, value)}; headers
+            to set on the outgoing request.
+
         @return: a L{Deferred} which fires with an L{IResponse} if the request
             was successfully processed and fails with an L{HTTPError} if not;
             or, if the resultcode does not match the response's code, fails
             with L{FailTest}.
         """
         request = SimpleRequest(self.site, method, path)
+        if headers is not None:
+            for k, v in headers:
+                request.headers.setHeader(k, v)
         request.headers.setHeader("content-type", MimeType(mimetype, subtype))
         request.stream = MemoryStream(body)
 
@@ -850,31 +866,44 @@
             "/principals/", self.directoryService
         )
 
+        origRefreshRoot = self._refreshRoot
+        @inlineCallbacks
+        def _newRefreshRoot(request=None):
+            yield origRefreshRoot(request)
+            self.site.resource.responseCache = StubResponseCacheResource()
+            self.site.resource.putChild("calendars", self.homeProvisioner)
+            # if request is not None:
+            #     request._rememberResource(self.site.resource, "/")
+        self._refreshRoot = _newRefreshRoot
 
-        yield self._refreshRoot()
-        # Allow the site to see its other resources.
-        self.site.resource.putChild("calendars", self.homeProvisioner)
-        txn = self.site.resource._associatedTransaction
-        sharee = self.site.resource._newStoreHome
-        sharer = yield txn.calendarHomeWithUID("wiki-testing")
-        cal = yield sharer.calendarWithName("calendar")
+        @inlineCallbacks
+        def getSharedName():
+            """
+            Share a resource from a wiki; get its name.  Put this in its own
+            function so it doesn't leak any soon-to-expire variables to the
+            outer test.
+            """
+            txn = self.site.resource._associatedTransaction
+            sharee = self.site.resource._newStoreHome
+
+            sharer = yield txn.calendarHomeWithUID("wiki-testing")
+            cal = yield sharer.calendarWithName("calendar")
+            sharedName = yield cal.shareWith(sharee, BIND_DIRECT)
+
+            yield self._refreshRoot()
+            returnValue(sharedName)
+
+        sharedName = yield getSharedName()
         access = "write"
         def stubWikiAccessMethod(userID, wikiID):
             return access
         from twistedcaldav import sharing
         self.patch(sharing, "getWikiAccess", stubWikiAccessMethod)
-        sharedName = yield cal.shareWith(sharee, BIND_DIRECT)
         @inlineCallbacks
         def listChildrenViaPropfind():
-            # response = yield self.do("PROPFIND", "/",
-            #                          resultcode=responsecode.MULTI_STATUS)
-            req = SimpleRequest(self.site, "PROPFIND", "/")
-            req._rememberResource(self.site.resource, "/")
-            req.headers.setHeader("Depth", "1")
-            # Invoke http_PROPFIND directly rather than using HTTP machinery so
-            # we can simply do everything in one transaction rather than
-            # repeatedly spinning up new ones.
-            response = yield self.site.resource.http_PROPFIND(req)
+            response = yield self.do("PROPFIND", "/",
+                                     resultcode=responsecode.MULTI_STATUS,
+                                     headers=[('Depth', '1')])
             data = yield allDataFromStream(response.stream)
             tree = XML(data)
             seq = [e.text for e in tree.findall("{DAV:}response/{DAV:}href")]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130128/bb516baa/attachment.html>


More information about the calendarserver-changes mailing list