X-Git-Url: http://git.inspyration.org/?a=blobdiff_plain;ds=sidebyside;f=addons%2Fdocument_webdav%2Ftest_davclient.py;h=0af83f48f0ed6d6abfebd5da622fb6e6ec02856e;hb=872f6be3408fb4b7f86dd443f0a28194d83554fb;hp=c06eb0cec6827c6d4c52557cfcc5c601581d8af2;hpb=4cc8404d364911c3cf754c335c7cacceea39abbb;p=odoo%2Fodoo.git diff --git a/addons/document_webdav/test_davclient.py b/addons/document_webdav/test_davclient.py index c06eb0c..0af83f4 100644 --- a/addons/document_webdav/test_davclient.py +++ b/addons/document_webdav/test_davclient.py @@ -32,11 +32,6 @@ # code taken from the 'http-client.py' script: # http://git.hellug.gr/?p=xrg/openerp;a=history;f=tests/http-client.py;hb=refs/heads/xrg-60 -import imp -import sys -import os -import glob -import subprocess import re import gzip import logging @@ -44,7 +39,10 @@ import xml.dom.minidom import httplib -# from xmlrpclib import Transport +from tools import config +from xmlrpclib import Transport, ProtocolError +import StringIO +import base64 log = logging.getLogger('http-client') @@ -320,16 +318,74 @@ class HTTPSConnection(httplib.HTTPSConnection): return cert -def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg=2): + +class DAVClient(object): + """An instance of a WebDAV client, connected to the OpenERP server + """ + + def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False, useragent=False): + if use_ssl: + self.host = config.get_misc('httpsd', 'interface', False) + self.port = config.get_misc('httpsd', 'port', 8071) + if not self.host: + self.host = config.get('xmlrpcs_interface') + self.port = config.get('xmlrpcs_port') + else: + self.host = config.get_misc('httpd', 'interface') + self.port = config.get_misc('httpd', 'port', 8069) + if not self.host: + self.host = config.get('xmlrpc_interface') + self.port = config.get('xmlrpc_port') or self.port + if self.host == '0.0.0.0' or not self.host: + self.host = '127.0.0.1' + self.port = int(self.port) + if not config.get_misc('webdav','enable',True): + raise Exception("WebDAV is disabled, cannot continue") + self.davpath = '/' + config.get_misc('webdav','vdir','webdav') + self.user = user + self.passwd = passwd + self.dbg = dbg + self.hdrs = {} + if useragent: + self.set_useragent(useragent) + + def get_creds(self, obj, cr, uid): + """Read back the user credentials from cr, uid + + @param obj is any orm object, in order to use its pool + @param uid is the numeric id, which we will try to reverse resolve + + note: this is a hackish way to get the credentials. It is expected + to break if "base_crypt" is used. + """ + ruob = obj.pool.get('res.users') + res = ruob.read(cr, 1, [uid,], ['login', 'password']) + assert res, "uid %s not found" % uid + self.user = res[0]['login'] + self.passwd = res[0]['password'] + return True + + def set_useragent(self, uastr): + """ Set the user-agent header to something meaningful. + Some shorthand names will be replaced by stock strings. + """ + if uastr in ('KDE4', 'Korganizer'): + self.hdrs['User-Agent'] = "Mozilla/5.0 (compatible; Konqueror/4.4; Linux) KHTML/4.4.3 (like Gecko)" + elif uastr == 'iPhone3': + self.hdrs['User-Agent'] = "DAVKit/5.0 (765); iCalendar/5.0 (79); iPhone/4.1 8B117" + elif uastr == "MacOS": + self.hdrs['User-Agent'] = "WebDAVFS/1.8 (01808000) Darwin/9.8.0 (i386)" + else: + self.hdrs['User-Agent'] = uastr + + def _http_request(self, path, method='GET', hdrs=None, body=None): if not hdrs: hdrs = {} - passwd=None - if user: - import getpass - passwd = getpass.getpass("Password for %s@%s: " %(user,host)) import base64 - log.debug("Getting %s http://%s/%s", method, host , path) - conn = httplib.HTTPConnection(host) + dbg = self.dbg + hdrs.update(self.hdrs) + log.debug("Getting %s http://%s:%d/%s", method, self.host, self.port, path) + conn = httplib.HTTPConnection(self.host, port=self.port) conn.set_debuglevel(dbg) if not path: path = "/index.html" @@ -340,17 +396,17 @@ def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg= r1 = conn.getresponse() except httplib.BadStatusLine, bsl: log.warning("Bad status line: %s", bsl.line) - return + raise Exception('Bad status line') if r1.status == 401: # and r1.headers: if 'www-authenticate' in r1.msg: (atype,realm) = r1.msg.getheader('www-authenticate').split(' ',1) data1 = r1.read() - if not user: + if not self.user: raise Exception('Must auth, have no user/pass!') log.debug("Ver: %s, closed: %s, will close: %s", r1.version,r1.isclosed(), r1.will_close) log.debug("Want to do auth %s for realm %s", atype, realm) if atype == 'Basic' : - auths = base64.encodestring(user + ':' + passwd) + auths = base64.encodestring(self.user + ':' + self.passwd) if auths[-1] == "\n": auths = auths[:-1] hdrs['Authorization']= 'Basic '+ auths @@ -365,8 +421,7 @@ def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg= log.debug("Reponse: %s %s",r1.status, r1.reason) data1 = r1.read() - did_print = False - log.debug("Body:\n%s\nEnd of body\n", data1) + log.debug("Body:\n%s\nEnd of body", data1) try: ctype = r1.msg.getheader('content-type') if ctype and ';' in ctype: @@ -374,8 +429,211 @@ def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg= if ctype == 'text/xml': doc = xml.dom.minidom.parseString(data1) log.debug("XML Body:\n %s", doc.toprettyxml(indent="\t")) - did_print = True - except Exception, e: + except Exception: log.warning("could not print xml", exc_info=True) pass conn.close() + return r1.status, r1.msg, data1 + + def _assert_headers(self, expect, msg): + """ Assert that the headers in msg contain the expect values + """ + for k, v in expect.items(): + hval = msg.getheader(k) + if not hval: + raise AssertionError("Header %s not defined in http response" % k) + if isinstance(v, (list, tuple)): + delim = ',' + hits = map(str.strip, hval.split(delim)) + mvits= [] + for vit in v: + if vit not in hits: + mvits.append(vit) + if mvits: + raise AssertionError("HTTP header \"%s\" is missing: %s" %(k, ', '.join(mvits))) + else: + if hval.strip() != v.strip(): + raise AssertionError("HTTP header \"%s: %s\"" % (k, hval)) + + def gd_options(self, path='*', expect=None): + """ Test the http options functionality + If a dictionary is defined in expect, those options are + asserted. + """ + if path != '*': + path = self.davpath + path + hdrs = { 'Content-Length': 0 + } + s, m, d = self._http_request(path, method='OPTIONS', hdrs=hdrs) + assert s == 200, "Status: %r" % s + assert 'OPTIONS' in m.getheader('Allow') + log.debug('Options: %r', m.getheader('Allow')) + + if expect: + self._assert_headers(expect, m) + + def _parse_prop_response(self, data): + """ Parse a propfind/propname response + """ + def getText(node): + rc = [] + for node in node.childNodes: + if node.nodeType == node.TEXT_NODE: + rc.append(node.data) + return ''.join(rc) + + def getElements(node, namespaces=None, strict=False): + for cnod in node.childNodes: + if cnod.nodeType != node.ELEMENT_NODE: + if strict: + log.debug("Found %r inside <%s>", cnod, node.tagName) + continue + if namespaces and (cnod.namespaceURI not in namespaces): + log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName) + continue + yield cnod + + nod = xml.dom.minidom.parseString(data) + nod_r = nod.documentElement + res = {} + assert nod_r.localName == 'multistatus', nod_r.tagName + for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'): + href = None + status = 200 + res_nss = {} + for cno in getElements(resp, namespaces=['DAV:',]): + if cno.localName == 'href': + assert href is None, "Second href in same response" + href = getText(cno) + elif cno.localName == 'propstat': + for pno in getElements(cno, namespaces=['DAV:',]): + rstatus = None + if pno.localName == 'prop': + for prop in getElements(pno): + key = prop.localName + tval = getText(prop).strip() + val = tval or (True, rstatus or status) + if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype': + val = 'plain' + for rte in getElements(prop, namespaces=['DAV:',]): + # Note: we only look at DAV:... elements, we + # actually expect only one DAV:collection child + val = rte.localName + res_nss.setdefault(prop.namespaceURI,{})[key] = val + elif pno.localName == 'status': + rstr = getText(pno) + htver, sta, msg = rstr.split(' ', 3) + assert htver == 'HTTP/1.1' + rstatus = int(sta) + else: + log.debug("What is <%s> inside a ?", pno.tagName) + + else: + log.debug("Unknown node: %s", cno.tagName) + + res.setdefault(href,[]).append((status, res_nss)) + + return res + + def gd_propfind(self, path, props=None, depth=0): + if not props: + propstr = '' + else: + propstr = '' + nscount = 0 + for p in props: + ns = None + if isinstance(p, tuple): + p, ns = p + if ns is None or ns == 'DAV:': + propstr += '<%s/>' % p + else: + propstr += '' %(nscount, p, nscount, ns) + nscount += 1 + propstr += '' + + body=""" + %s""" % propstr + hdrs = { 'Content-Type': 'text/xml; charset=utf-8', + 'Accept': 'text/xml', + 'Depth': depth, + } + + s, m, d = self._http_request(self.davpath + path, method='PROPFIND', + hdrs=hdrs, body=body) + assert s == 207, "Bad status: %s" % s + ctype = m.getheader('Content-Type').split(';',1)[0] + assert ctype == 'text/xml', m.getheader('Content-Type') + res = self._parse_prop_response(d) + if depth == 0: + assert len(res) == 1 + res = res.values()[0] + else: + assert len(res) >= 1 + return res + + + def gd_propname(self, path, depth=0): + body=""" + """ + hdrs = { 'Content-Type': 'text/xml; charset=utf-8', + 'Accept': 'text/xml', + 'Depth': depth + } + s, m, d = self._http_request(self.davpath + path, method='PROPFIND', + hdrs=hdrs, body=body) + assert s == 207, "Bad status: %s" % s + ctype = m.getheader('Content-Type').split(';',1)[0] + assert ctype == 'text/xml', m.getheader('Content-Type') + res = self._parse_prop_response(d) + if depth == 0: + assert len(res) == 1 + res = res.values()[0] + else: + assert len(res) >= 1 + return res + + def gd_getetag(self, path, depth=0): + return self.gd_propfind(path, props=['getetag',], depth=depth) + + def gd_lsl(self, path): + """ Return a list of 'ls -l' kind of data for a folder + + This is based on propfind. + """ + + lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'), + ('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'), + ('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'), + ('mime', 'getcontenttype', 'application/data'), ] + + propnames = [ l[1] for l in lspairs] + propres = self.gd_propfind(path, props=propnames, depth=1) + + res = [] + for href, pr in propres.items(): + lsline = {} + for st, nsdic in pr: + davprops = nsdic['DAV:'] + if st == 200: + for lsp in lspairs: + if lsp[1] in davprops: + if lsp[1] == 'resourcetype': + if davprops[lsp[1]] == 'collection': + lsline[lsp[0]] = 'dr-xr-x---' + else: + lsline[lsp[0]] = '-r-xr-x---' + else: + lsline[lsp[0]] = davprops[lsp[1]] + elif st in (404, 403): + for lsp in lspairs: + if lsp[1] in davprops: + lsline[lsp[0]] = lsp[2] + else: + log.debug("Strange status: %s", st) + + res.append(lsline) + + return res + +#eof \ No newline at end of file