doc webdav: more tests in lib, yaml
[odoo/odoo.git] / addons / document_webdav / test_davclient.py
index c06eb0c..0af83f4 100644 (file)
 # code taken from the 'http-client.py' script:
 # http://git.hellug.gr/?p=xrg/openerp;a=history;f=tests/http-client.py;hb=refs/heads/xrg-60
 
-import imp
-import sys
-import os
-import glob
-import subprocess
 import re
 import gzip
 import logging
@@ -44,7 +39,10 @@ import xml.dom.minidom
 
 import httplib
 
-# from xmlrpclib import Transport 
+from tools import config
+from xmlrpclib import Transport, ProtocolError
+import StringIO
+import base64
 
 log = logging.getLogger('http-client')
 
@@ -320,16 +318,74 @@ class HTTPSConnection(httplib.HTTPSConnection):
                 
                 return cert
 
-def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg=2):
+
+class DAVClient(object):
+    """An instance of a WebDAV client, connected to the OpenERP server
+    """
+    
+    def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False, useragent=False):
+        if use_ssl:
+            self.host = config.get_misc('httpsd', 'interface', False)
+            self.port = config.get_misc('httpsd', 'port', 8071)
+            if not self.host:
+                self.host = config.get('xmlrpcs_interface')
+                self.port = config.get('xmlrpcs_port')
+        else:
+            self.host = config.get_misc('httpd', 'interface')
+            self.port = config.get_misc('httpd', 'port', 8069)
+            if not self.host:
+                self.host = config.get('xmlrpc_interface')
+                self.port = config.get('xmlrpc_port') or self.port
+        if self.host == '0.0.0.0' or not self.host:
+            self.host = '127.0.0.1'
+        self.port = int(self.port)
+        if not config.get_misc('webdav','enable',True):
+            raise Exception("WebDAV is disabled, cannot continue")
+        self.davpath = '/' + config.get_misc('webdav','vdir','webdav')
+        self.user = user
+        self.passwd = passwd
+        self.dbg = dbg
+        self.hdrs = {}
+        if useragent:
+            self.set_useragent(useragent)
+
+    def get_creds(self, obj, cr, uid):
+        """Read back the user credentials from cr, uid
+        
+        @param obj is any orm object, in order to use its pool
+        @param uid is the numeric id, which we will try to reverse resolve
+        
+        note: this is a hackish way to get the credentials. It is expected
+        to break if "base_crypt" is used.
+        """
+        ruob = obj.pool.get('res.users')
+        res = ruob.read(cr, 1, [uid,], ['login', 'password'])
+        assert res, "uid %s not found" % uid
+        self.user = res[0]['login']
+        self.passwd = res[0]['password']
+        return True
+
+    def set_useragent(self, uastr):
+        """ Set the user-agent header to something meaningful.
+        Some shorthand names will be replaced by stock strings.
+        """
+        if uastr in ('KDE4', 'Korganizer'):
+            self.hdrs['User-Agent'] = "Mozilla/5.0 (compatible; Konqueror/4.4; Linux) KHTML/4.4.3 (like Gecko)"
+        elif uastr == 'iPhone3':
+            self.hdrs['User-Agent'] = "DAVKit/5.0 (765); iCalendar/5.0 (79); iPhone/4.1 8B117"
+        elif uastr == "MacOS":
+            self.hdrs['User-Agent'] = "WebDAVFS/1.8 (01808000) Darwin/9.8.0 (i386)"
+        else:
+            self.hdrs['User-Agent'] = uastr
+
+    def _http_request(self, path, method='GET', hdrs=None, body=None):
         if not hdrs:
             hdrs = {}
-        passwd=None
-        if user:
-            import getpass
-            passwd = getpass.getpass("Password for %s@%s: " %(user,host))
         import base64
-        log.debug("Getting %s http://%s/%s", method, host , path)
-        conn = httplib.HTTPConnection(host)
+        dbg = self.dbg
+        hdrs.update(self.hdrs)
+        log.debug("Getting %s http://%s:%d/%s", method, self.host, self.port, path)
+        conn = httplib.HTTPConnection(self.host, port=self.port)
         conn.set_debuglevel(dbg)
         if not path:
             path = "/index.html"
@@ -340,17 +396,17 @@ def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg=
                 r1 = conn.getresponse()
         except httplib.BadStatusLine, bsl:
                 log.warning("Bad status line: %s", bsl.line)
-                return
+                raise Exception('Bad status line')
         if r1.status == 401: # and r1.headers:
                 if 'www-authenticate' in r1.msg:
                         (atype,realm) = r1.msg.getheader('www-authenticate').split(' ',1)
                         data1 = r1.read()
-                        if not user:
+                        if not self.user:
                                 raise Exception('Must auth, have no user/pass!')
                         log.debug("Ver: %s, closed: %s, will close: %s", r1.version,r1.isclosed(), r1.will_close)
                         log.debug("Want to do auth %s for realm %s", atype, realm)
                         if atype == 'Basic' :
-                                auths = base64.encodestring(user + ':' + passwd)
+                                auths = base64.encodestring(self.user + ':' + self.passwd)
                                 if auths[-1] == "\n":
                                         auths = auths[:-1]
                                 hdrs['Authorization']= 'Basic '+ auths 
@@ -365,8 +421,7 @@ def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg=
 
         log.debug("Reponse: %s %s",r1.status, r1.reason)
         data1 = r1.read()
-        did_print = False
-        log.debug("Body:\n%s\nEnd of body\n", data1)
+        log.debug("Body:\n%s\nEnd of body", data1)
         try:
             ctype = r1.msg.getheader('content-type')
             if ctype and ';' in ctype:
@@ -374,8 +429,211 @@ def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg=
             if ctype == 'text/xml':
                 doc = xml.dom.minidom.parseString(data1)
                 log.debug("XML Body:\n %s", doc.toprettyxml(indent="\t"))
-                did_print = True
-        except Exception, e:
+        except Exception:
             log.warning("could not print xml", exc_info=True)
             pass
         conn.close()
+        return r1.status, r1.msg, data1
+
+    def _assert_headers(self, expect, msg):
+        """ Assert that the headers in msg contain the expect values
+        """
+        for k, v in expect.items():
+            hval = msg.getheader(k)
+            if not hval:
+                raise AssertionError("Header %s not defined in http response" % k)
+            if isinstance(v, (list, tuple)):
+                delim = ','
+                hits = map(str.strip, hval.split(delim))
+                mvits= []
+                for vit in v:
+                    if vit not in hits:
+                        mvits.append(vit)
+                if mvits:
+                    raise AssertionError("HTTP header \"%s\" is missing: %s" %(k, ', '.join(mvits)))
+            else:
+                if hval.strip() != v.strip():
+                    raise AssertionError("HTTP header \"%s: %s\"" % (k, hval))
+
+    def gd_options(self, path='*', expect=None):
+        """ Test the http options functionality
+            If a dictionary is defined in expect, those options are
+            asserted.
+        """
+        if path != '*':
+            path = self.davpath + path
+        hdrs = { 'Content-Length': 0
+                }
+        s, m, d = self._http_request(path, method='OPTIONS', hdrs=hdrs)
+        assert s == 200, "Status: %r" % s
+        assert 'OPTIONS' in m.getheader('Allow')
+        log.debug('Options: %r', m.getheader('Allow'))
+        
+        if expect:
+            self._assert_headers(expect, m)
+    
+    def _parse_prop_response(self, data):
+        """ Parse a propfind/propname response
+        """
+        def getText(node):
+            rc = []
+            for node in node.childNodes:
+                if node.nodeType == node.TEXT_NODE:
+                    rc.append(node.data)
+            return ''.join(rc)
+        
+        def getElements(node, namespaces=None, strict=False):
+            for cnod in node.childNodes:
+                if cnod.nodeType != node.ELEMENT_NODE:
+                    if strict:
+                        log.debug("Found %r inside <%s>", cnod, node.tagName)
+                    continue
+                if namespaces and (cnod.namespaceURI not in namespaces):
+                    log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName)
+                    continue
+                yield cnod
+
+        nod = xml.dom.minidom.parseString(data)
+        nod_r = nod.documentElement
+        res = {}
+        assert nod_r.localName == 'multistatus', nod_r.tagName
+        for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'):
+            href = None
+            status = 200
+            res_nss = {}
+            for cno in getElements(resp, namespaces=['DAV:',]):
+                if cno.localName == 'href':
+                    assert href is None, "Second href in same response"
+                    href = getText(cno)
+                elif cno.localName == 'propstat':
+                    for pno in getElements(cno, namespaces=['DAV:',]):
+                        rstatus = None
+                        if pno.localName == 'prop':
+                            for prop in getElements(pno):
+                                key = prop.localName
+                                tval = getText(prop).strip()
+                                val = tval or (True, rstatus or status)
+                                if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype':
+                                    val = 'plain'
+                                    for rte in getElements(prop, namespaces=['DAV:',]):
+                                        # Note: we only look at DAV:... elements, we
+                                        # actually expect only one DAV:collection child
+                                        val = rte.localName
+                                res_nss.setdefault(prop.namespaceURI,{})[key] = val
+                        elif pno.localName == 'status':
+                            rstr = getText(pno)
+                            htver, sta, msg = rstr.split(' ', 3)
+                            assert htver == 'HTTP/1.1'
+                            rstatus = int(sta)
+                        else:
+                            log.debug("What is <%s> inside a <propstat>?", pno.tagName)
+                    
+                else:
+                    log.debug("Unknown node: %s", cno.tagName)
+                
+            res.setdefault(href,[]).append((status, res_nss))
+
+        return res
+
+    def gd_propfind(self, path, props=None, depth=0):
+        if not props:
+            propstr = '<allprop/>'
+        else:
+            propstr = '<prop>'
+            nscount = 0
+            for p in props:
+                ns = None
+                if isinstance(p, tuple):
+                    p, ns = p
+                if ns is None or ns == 'DAV:':
+                    propstr += '<%s/>' % p
+                else:
+                    propstr += '<ns%d:%s xmlns:ns%d="%s" />' %(nscount, p, nscount, ns)
+                    nscount += 1
+            propstr += '</prop>'
+                
+        body="""<?xml version="1.0" encoding="utf-8"?>
+            <propfind xmlns="DAV:">%s</propfind>""" % propstr
+        hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+                'Accept': 'text/xml',
+                'Depth': depth,
+                }
+
+        s, m, d = self._http_request(self.davpath + path, method='PROPFIND', 
+                                    hdrs=hdrs, body=body)
+        assert s == 207, "Bad status: %s" % s
+        ctype = m.getheader('Content-Type').split(';',1)[0]
+        assert ctype == 'text/xml', m.getheader('Content-Type')
+        res = self._parse_prop_response(d)
+        if depth == 0:
+            assert len(res) == 1
+            res = res.values()[0]
+        else:
+            assert len(res) >= 1
+        return res
+        
+
+    def gd_propname(self, path, depth=0):
+        body="""<?xml version="1.0" encoding="utf-8"?>
+            <propfind xmlns="DAV:"><propname/></propfind>"""
+        hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+                'Accept': 'text/xml',
+                'Depth': depth
+                }
+        s, m, d = self._http_request(self.davpath + path, method='PROPFIND', 
+                                    hdrs=hdrs, body=body)
+        assert s == 207, "Bad status: %s" % s
+        ctype = m.getheader('Content-Type').split(';',1)[0]
+        assert ctype == 'text/xml', m.getheader('Content-Type')
+        res = self._parse_prop_response(d)
+        if depth == 0:
+            assert len(res) == 1
+            res = res.values()[0]
+        else:
+            assert len(res) >= 1
+        return res
+
+    def gd_getetag(self, path, depth=0):
+        return self.gd_propfind(path, props=['getetag',], depth=depth)
+
+    def gd_lsl(self, path):
+        """ Return a list of 'ls -l' kind of data for a folder
+        
+            This is based on propfind.
+        """
+
+        lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'),
+                ('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'),
+                ('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'),
+                ('mime', 'getcontenttype', 'application/data'), ]
+
+        propnames = [ l[1] for l in lspairs]
+        propres = self.gd_propfind(path, props=propnames, depth=1)
+        
+        res = []
+        for href, pr in propres.items():
+            lsline = {}
+            for st, nsdic in pr:
+                davprops = nsdic['DAV:']
+                if st == 200:
+                    for lsp in lspairs:
+                        if lsp[1] in davprops:
+                            if lsp[1] == 'resourcetype':
+                                if davprops[lsp[1]] == 'collection':
+                                    lsline[lsp[0]] = 'dr-xr-x---'
+                                else:
+                                    lsline[lsp[0]] = '-r-xr-x---'
+                            else:
+                                lsline[lsp[0]] = davprops[lsp[1]]
+                elif st in (404, 403):
+                    for lsp in lspairs:
+                        if lsp[1] in davprops:
+                            lsline[lsp[0]] = lsp[2]
+                else:
+                    log.debug("Strange status: %s", st)
+            
+            res.append(lsline)
+            
+        return res
+
+#eof
\ No newline at end of file