+ return r1.status, r1.msg, data1
+
+ def _assert_headers(self, expect, msg):
+ """ Assert that the headers in msg contain the expect values
+ """
+ for k, v in expect.items():
+ hval = msg.getheader(k)
+ if not hval:
+ raise AssertionError("Header %s not defined in http response" % k)
+ if isinstance(v, (list, tuple)):
+ delim = ','
+ hits = map(str.strip, hval.split(delim))
+ mvits= []
+ for vit in v:
+ if vit not in hits:
+ mvits.append(vit)
+ if mvits:
+ raise AssertionError("HTTP header \"%s\" is missing: %s" %(k, ', '.join(mvits)))
+ else:
+ if hval.strip() != v.strip():
+ raise AssertionError("HTTP header \"%s: %s\"" % (k, hval))
+
+ def gd_options(self, path='*', expect=None):
+ """ Test the http options functionality
+ If a dictionary is defined in expect, those options are
+ asserted.
+ """
+ if path != '*':
+ path = self.davpath + path
+ hdrs = { 'Content-Length': 0
+ }
+ s, m, d = self._http_request(path, method='OPTIONS', hdrs=hdrs)
+ assert s == 200, "Status: %r" % s
+ assert 'OPTIONS' in m.getheader('Allow')
+ log.debug('Options: %r', m.getheader('Allow'))
+
+ if expect:
+ self._assert_headers(expect, m)
+
+ def _parse_prop_response(self, data):
+ """ Parse a propfind/propname response
+ """
+ def getText(node):
+ rc = []
+ for node in node.childNodes:
+ if node.nodeType == node.TEXT_NODE:
+ rc.append(node.data)
+ return ''.join(rc)
+
+ def getElements(node, namespaces=None, strict=False):
+ for cnod in node.childNodes:
+ if cnod.nodeType != node.ELEMENT_NODE:
+ if strict:
+ log.debug("Found %r inside <%s>", cnod, node.tagName)
+ continue
+ if namespaces and (cnod.namespaceURI not in namespaces):
+ log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName)
+ continue
+ yield cnod
+
+ nod = xml.dom.minidom.parseString(data)
+ nod_r = nod.documentElement
+ res = {}
+ assert nod_r.localName == 'multistatus', nod_r.tagName
+ for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'):
+ href = None
+ status = 200
+ res_nss = {}
+ for cno in getElements(resp, namespaces=['DAV:',]):
+ if cno.localName == 'href':
+ assert href is None, "Second href in same response"
+ href = getText(cno)
+ elif cno.localName == 'propstat':
+ for pno in getElements(cno, namespaces=['DAV:',]):
+ rstatus = None
+ if pno.localName == 'prop':
+ for prop in getElements(pno):
+ key = prop.localName
+ tval = getText(prop).strip()
+ val = tval or (True, rstatus or status)
+ if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype':
+ val = 'plain'
+ for rte in getElements(prop, namespaces=['DAV:',]):
+ # Note: we only look at DAV:... elements, we
+ # actually expect only one DAV:collection child
+ val = rte.localName
+ res_nss.setdefault(prop.namespaceURI,{})[key] = val
+ elif pno.localName == 'status':
+ rstr = getText(pno)
+ htver, sta, msg = rstr.split(' ', 3)
+ assert htver == 'HTTP/1.1'
+ rstatus = int(sta)
+ else:
+ log.debug("What is <%s> inside a <propstat>?", pno.tagName)
+
+ else:
+ log.debug("Unknown node: %s", cno.tagName)
+
+ res.setdefault(href,[]).append((status, res_nss))
+
+ return res
+
+ def gd_propfind(self, path, props=None, depth=0):
+ if not props:
+ propstr = '<allprop/>'
+ else:
+ propstr = '<prop>'
+ nscount = 0
+ for p in props:
+ ns = None
+ if isinstance(p, tuple):
+ p, ns = p
+ if ns is None or ns == 'DAV:':
+ propstr += '<%s/>' % p
+ else:
+ propstr += '<ns%d:%s xmlns:ns%d="%s" />' %(nscount, p, nscount, ns)
+ nscount += 1
+ propstr += '</prop>'
+
+ body="""<?xml version="1.0" encoding="utf-8"?>
+ <propfind xmlns="DAV:">%s</propfind>""" % propstr
+ hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+ 'Accept': 'text/xml',
+ 'Depth': depth,
+ }
+
+ s, m, d = self._http_request(self.davpath + path, method='PROPFIND',
+ hdrs=hdrs, body=body)
+ assert s == 207, "Bad status: %s" % s
+ ctype = m.getheader('Content-Type').split(';',1)[0]
+ assert ctype == 'text/xml', m.getheader('Content-Type')
+ res = self._parse_prop_response(d)
+ if depth == 0:
+ assert len(res) == 1
+ res = res.values()[0]
+ else:
+ assert len(res) >= 1
+ return res
+
+
+ def gd_propname(self, path, depth=0):
+ body="""<?xml version="1.0" encoding="utf-8"?>
+ <propfind xmlns="DAV:"><propname/></propfind>"""
+ hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+ 'Accept': 'text/xml',
+ 'Depth': depth
+ }
+ s, m, d = self._http_request(self.davpath + path, method='PROPFIND',
+ hdrs=hdrs, body=body)
+ assert s == 207, "Bad status: %s" % s
+ ctype = m.getheader('Content-Type').split(';',1)[0]
+ assert ctype == 'text/xml', m.getheader('Content-Type')
+ res = self._parse_prop_response(d)
+ if depth == 0:
+ assert len(res) == 1
+ res = res.values()[0]
+ else:
+ assert len(res) >= 1
+ return res
+
+ def gd_getetag(self, path, depth=0):
+ return self.gd_propfind(path, props=['getetag',], depth=depth)
+
+ def gd_lsl(self, path):
+ """ Return a list of 'ls -l' kind of data for a folder
+
+ This is based on propfind.
+ """
+
+ lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'),
+ ('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'),
+ ('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'),
+ ('mime', 'getcontenttype', 'application/data'), ]
+
+ propnames = [ l[1] for l in lspairs]
+ propres = self.gd_propfind(path, props=propnames, depth=1)
+
+ res = []
+ for href, pr in propres.items():
+ lsline = {}
+ for st, nsdic in pr:
+ davprops = nsdic['DAV:']
+ if st == 200:
+ for lsp in lspairs:
+ if lsp[1] in davprops:
+ if lsp[1] == 'resourcetype':
+ if davprops[lsp[1]] == 'collection':
+ lsline[lsp[0]] = 'dr-xr-x---'
+ else:
+ lsline[lsp[0]] = '-r-xr-x---'
+ else:
+ lsline[lsp[0]] = davprops[lsp[1]]
+ elif st in (404, 403):
+ for lsp in lspairs:
+ if lsp[1] in davprops:
+ lsline[lsp[0]] = lsp[2]
+ else:
+ log.debug("Strange status: %s", st)
+
+ res.append(lsline)
+
+ return res
+
+#eof
\ No newline at end of file