From 872f6be3408fb4b7f86dd443f0a28194d83554fb Mon Sep 17 00:00:00 2001 From: "P. Christeas" Date: Wed, 3 Nov 2010 13:25:34 +0200 Subject: [PATCH] doc webdav: more tests in lib, yaml bzr revid: p_christ@hol.gr-20101103112534-7vdnxjxr9wb3736t --- addons/document_webdav/test/webdav_test1.yml | 26 ++++ addons/document_webdav/test_davclient.py | 181 +++++++++++++++++++++++++- 2 files changed, 206 insertions(+), 1 deletion(-) diff --git a/addons/document_webdav/test/webdav_test1.yml b/addons/document_webdav/test/webdav_test1.yml index 30d4ad3..c9384a5 100644 --- a/addons/document_webdav/test/webdav_test1.yml +++ b/addons/document_webdav/test/webdav_test1.yml @@ -11,3 +11,29 @@ dc.get_creds(self, cr, uid) dc.gd_options(path=cr.dbname, expect={'DAV': ['1',]}) - + I will test the propnames at the document root +- + !python {model: ir.attachment}: | + from document_webdav import test_davclient as te + dc = te.DAVClient() + dc.get_creds(self, cr, uid) + dc.gd_propname(path=cr.dbname+'/Documents/') +- + I will test the ETags of the document root +- + !python {model: ir.attachment}: | + from document_webdav import test_davclient as te + dc = te.DAVClient() + dc.get_creds(self, cr, uid) + dc.gd_getetag(path=cr.dbname+'/Documents/') + +- + I will now ls -l the document root. +- + !python {model: ir.attachment}: | + from document_webdav import test_davclient as te + dc = te.DAVClient() + dc.get_creds(self, cr, uid) + res = dc.gd_lsl(path=cr.dbname+'/Documents/') + for lin in res: + print "%(type)s\t%(uid)s\t%(gid)s\t%(size)s\t%(mtime)s\t%(name)s" % lin diff --git a/addons/document_webdav/test_davclient.py b/addons/document_webdav/test_davclient.py index c961fac..0af83f4 100644 --- a/addons/document_webdav/test_davclient.py +++ b/addons/document_webdav/test_davclient.py @@ -323,7 +323,7 @@ class DAVClient(object): """An instance of a WebDAV client, connected to the OpenERP server """ - def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False): + def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False, useragent=False): if use_ssl: self.host = config.get_misc('httpsd', 'interface', False) self.port = config.get_misc('httpsd', 'port', 8071) @@ -345,6 +345,9 @@ class DAVClient(object): self.user = user self.passwd = passwd self.dbg = dbg + self.hdrs = {} + if useragent: + self.set_useragent(useragent) def get_creds(self, obj, cr, uid): """Read back the user credentials from cr, uid @@ -362,11 +365,25 @@ class DAVClient(object): self.passwd = res[0]['password'] return True + def set_useragent(self, uastr): + """ Set the user-agent header to something meaningful. + Some shorthand names will be replaced by stock strings. + """ + if uastr in ('KDE4', 'Korganizer'): + self.hdrs['User-Agent'] = "Mozilla/5.0 (compatible; Konqueror/4.4; Linux) KHTML/4.4.3 (like Gecko)" + elif uastr == 'iPhone3': + self.hdrs['User-Agent'] = "DAVKit/5.0 (765); iCalendar/5.0 (79); iPhone/4.1 8B117" + elif uastr == "MacOS": + self.hdrs['User-Agent'] = "WebDAVFS/1.8 (01808000) Darwin/9.8.0 (i386)" + else: + self.hdrs['User-Agent'] = uastr + def _http_request(self, path, method='GET', hdrs=None, body=None): if not hdrs: hdrs = {} import base64 dbg = self.dbg + hdrs.update(self.hdrs) log.debug("Getting %s http://%s:%d/%s", method, self.host, self.port, path) conn = httplib.HTTPConnection(self.host, port=self.port) conn.set_debuglevel(dbg) @@ -454,7 +471,169 @@ class DAVClient(object): if expect: self._assert_headers(expect, m) + + def _parse_prop_response(self, data): + """ Parse a propfind/propname response + """ + def getText(node): + rc = [] + for node in node.childNodes: + if node.nodeType == node.TEXT_NODE: + rc.append(node.data) + return ''.join(rc) + def getElements(node, namespaces=None, strict=False): + for cnod in node.childNodes: + if cnod.nodeType != node.ELEMENT_NODE: + if strict: + log.debug("Found %r inside <%s>", cnod, node.tagName) + continue + if namespaces and (cnod.namespaceURI not in namespaces): + log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName) + continue + yield cnod + + nod = xml.dom.minidom.parseString(data) + nod_r = nod.documentElement + res = {} + assert nod_r.localName == 'multistatus', nod_r.tagName + for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'): + href = None + status = 200 + res_nss = {} + for cno in getElements(resp, namespaces=['DAV:',]): + if cno.localName == 'href': + assert href is None, "Second href in same response" + href = getText(cno) + elif cno.localName == 'propstat': + for pno in getElements(cno, namespaces=['DAV:',]): + rstatus = None + if pno.localName == 'prop': + for prop in getElements(pno): + key = prop.localName + tval = getText(prop).strip() + val = tval or (True, rstatus or status) + if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype': + val = 'plain' + for rte in getElements(prop, namespaces=['DAV:',]): + # Note: we only look at DAV:... elements, we + # actually expect only one DAV:collection child + val = rte.localName + res_nss.setdefault(prop.namespaceURI,{})[key] = val + elif pno.localName == 'status': + rstr = getText(pno) + htver, sta, msg = rstr.split(' ', 3) + assert htver == 'HTTP/1.1' + rstatus = int(sta) + else: + log.debug("What is <%s> inside a ?", pno.tagName) + + else: + log.debug("Unknown node: %s", cno.tagName) + + res.setdefault(href,[]).append((status, res_nss)) + + return res + + def gd_propfind(self, path, props=None, depth=0): + if not props: + propstr = '' + else: + propstr = '' + nscount = 0 + for p in props: + ns = None + if isinstance(p, tuple): + p, ns = p + if ns is None or ns == 'DAV:': + propstr += '<%s/>' % p + else: + propstr += '' %(nscount, p, nscount, ns) + nscount += 1 + propstr += '' + + body=""" + %s""" % propstr + hdrs = { 'Content-Type': 'text/xml; charset=utf-8', + 'Accept': 'text/xml', + 'Depth': depth, + } + s, m, d = self._http_request(self.davpath + path, method='PROPFIND', + hdrs=hdrs, body=body) + assert s == 207, "Bad status: %s" % s + ctype = m.getheader('Content-Type').split(';',1)[0] + assert ctype == 'text/xml', m.getheader('Content-Type') + res = self._parse_prop_response(d) + if depth == 0: + assert len(res) == 1 + res = res.values()[0] + else: + assert len(res) >= 1 + return res + + + def gd_propname(self, path, depth=0): + body=""" + """ + hdrs = { 'Content-Type': 'text/xml; charset=utf-8', + 'Accept': 'text/xml', + 'Depth': depth + } + s, m, d = self._http_request(self.davpath + path, method='PROPFIND', + hdrs=hdrs, body=body) + assert s == 207, "Bad status: %s" % s + ctype = m.getheader('Content-Type').split(';',1)[0] + assert ctype == 'text/xml', m.getheader('Content-Type') + res = self._parse_prop_response(d) + if depth == 0: + assert len(res) == 1 + res = res.values()[0] + else: + assert len(res) >= 1 + return res + + def gd_getetag(self, path, depth=0): + return self.gd_propfind(path, props=['getetag',], depth=depth) + + def gd_lsl(self, path): + """ Return a list of 'ls -l' kind of data for a folder + + This is based on propfind. + """ + + lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'), + ('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'), + ('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'), + ('mime', 'getcontenttype', 'application/data'), ] + + propnames = [ l[1] for l in lspairs] + propres = self.gd_propfind(path, props=propnames, depth=1) + + res = [] + for href, pr in propres.items(): + lsline = {} + for st, nsdic in pr: + davprops = nsdic['DAV:'] + if st == 200: + for lsp in lspairs: + if lsp[1] in davprops: + if lsp[1] == 'resourcetype': + if davprops[lsp[1]] == 'collection': + lsline[lsp[0]] = 'dr-xr-x---' + else: + lsline[lsp[0]] = '-r-xr-x---' + else: + lsline[lsp[0]] = davprops[lsp[1]] + elif st in (404, 403): + for lsp in lspairs: + if lsp[1] in davprops: + lsline[lsp[0]] = lsp[2] + else: + log.debug("Strange status: %s", st) + + res.append(lsline) + + return res #eof \ No newline at end of file -- 1.7.10.4