doc webdav: more tests in lib, yaml
authorP. Christeas <p_christ@hol.gr>
Wed, 3 Nov 2010 11:25:34 +0000 (13:25 +0200)
committerP. Christeas <p_christ@hol.gr>
Wed, 3 Nov 2010 11:25:34 +0000 (13:25 +0200)
bzr revid: p_christ@hol.gr-20101103112534-7vdnxjxr9wb3736t

addons/document_webdav/test/webdav_test1.yml
addons/document_webdav/test_davclient.py

index 30d4ad3..c9384a5 100644 (file)
         dc.get_creds(self, cr, uid)
         dc.gd_options(path=cr.dbname, expect={'DAV': ['1',]})
 -
+    I will test the propnames at the document root
+-
+    !python {model: ir.attachment}: |
+        from document_webdav import test_davclient as te
+        dc = te.DAVClient()
+        dc.get_creds(self, cr, uid)
+        dc.gd_propname(path=cr.dbname+'/Documents/')
+-
+    I will test the ETags of the document root
+-
+    !python {model: ir.attachment}: |
+        from document_webdav import test_davclient as te
+        dc = te.DAVClient()
+        dc.get_creds(self, cr, uid)
+        dc.gd_getetag(path=cr.dbname+'/Documents/')
+
+-
+    I will now ls -l the document root.
+-
+    !python {model: ir.attachment}: |
+        from document_webdav import test_davclient as te
+        dc = te.DAVClient()
+        dc.get_creds(self, cr, uid)
+        res = dc.gd_lsl(path=cr.dbname+'/Documents/')
+        for lin in res:
+            print "%(type)s\t%(uid)s\t%(gid)s\t%(size)s\t%(mtime)s\t%(name)s" % lin
index c961fac..0af83f4 100644 (file)
@@ -323,7 +323,7 @@ class DAVClient(object):
     """An instance of a WebDAV client, connected to the OpenERP server
     """
     
-    def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False):
+    def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False, useragent=False):
         if use_ssl:
             self.host = config.get_misc('httpsd', 'interface', False)
             self.port = config.get_misc('httpsd', 'port', 8071)
@@ -345,6 +345,9 @@ class DAVClient(object):
         self.user = user
         self.passwd = passwd
         self.dbg = dbg
+        self.hdrs = {}
+        if useragent:
+            self.set_useragent(useragent)
 
     def get_creds(self, obj, cr, uid):
         """Read back the user credentials from cr, uid
@@ -362,11 +365,25 @@ class DAVClient(object):
         self.passwd = res[0]['password']
         return True
 
+    def set_useragent(self, uastr):
+        """ Set the user-agent header to something meaningful.
+        Some shorthand names will be replaced by stock strings.
+        """
+        if uastr in ('KDE4', 'Korganizer'):
+            self.hdrs['User-Agent'] = "Mozilla/5.0 (compatible; Konqueror/4.4; Linux) KHTML/4.4.3 (like Gecko)"
+        elif uastr == 'iPhone3':
+            self.hdrs['User-Agent'] = "DAVKit/5.0 (765); iCalendar/5.0 (79); iPhone/4.1 8B117"
+        elif uastr == "MacOS":
+            self.hdrs['User-Agent'] = "WebDAVFS/1.8 (01808000) Darwin/9.8.0 (i386)"
+        else:
+            self.hdrs['User-Agent'] = uastr
+
     def _http_request(self, path, method='GET', hdrs=None, body=None):
         if not hdrs:
             hdrs = {}
         import base64
         dbg = self.dbg
+        hdrs.update(self.hdrs)
         log.debug("Getting %s http://%s:%d/%s", method, self.host, self.port, path)
         conn = httplib.HTTPConnection(self.host, port=self.port)
         conn.set_debuglevel(dbg)
@@ -454,7 +471,169 @@ class DAVClient(object):
         
         if expect:
             self._assert_headers(expect, m)
+    
+    def _parse_prop_response(self, data):
+        """ Parse a propfind/propname response
+        """
+        def getText(node):
+            rc = []
+            for node in node.childNodes:
+                if node.nodeType == node.TEXT_NODE:
+                    rc.append(node.data)
+            return ''.join(rc)
         
+        def getElements(node, namespaces=None, strict=False):
+            for cnod in node.childNodes:
+                if cnod.nodeType != node.ELEMENT_NODE:
+                    if strict:
+                        log.debug("Found %r inside <%s>", cnod, node.tagName)
+                    continue
+                if namespaces and (cnod.namespaceURI not in namespaces):
+                    log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName)
+                    continue
+                yield cnod
+
+        nod = xml.dom.minidom.parseString(data)
+        nod_r = nod.documentElement
+        res = {}
+        assert nod_r.localName == 'multistatus', nod_r.tagName
+        for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'):
+            href = None
+            status = 200
+            res_nss = {}
+            for cno in getElements(resp, namespaces=['DAV:',]):
+                if cno.localName == 'href':
+                    assert href is None, "Second href in same response"
+                    href = getText(cno)
+                elif cno.localName == 'propstat':
+                    for pno in getElements(cno, namespaces=['DAV:',]):
+                        rstatus = None
+                        if pno.localName == 'prop':
+                            for prop in getElements(pno):
+                                key = prop.localName
+                                tval = getText(prop).strip()
+                                val = tval or (True, rstatus or status)
+                                if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype':
+                                    val = 'plain'
+                                    for rte in getElements(prop, namespaces=['DAV:',]):
+                                        # Note: we only look at DAV:... elements, we
+                                        # actually expect only one DAV:collection child
+                                        val = rte.localName
+                                res_nss.setdefault(prop.namespaceURI,{})[key] = val
+                        elif pno.localName == 'status':
+                            rstr = getText(pno)
+                            htver, sta, msg = rstr.split(' ', 3)
+                            assert htver == 'HTTP/1.1'
+                            rstatus = int(sta)
+                        else:
+                            log.debug("What is <%s> inside a <propstat>?", pno.tagName)
+                    
+                else:
+                    log.debug("Unknown node: %s", cno.tagName)
+                
+            res.setdefault(href,[]).append((status, res_nss))
+
+        return res
+
+    def gd_propfind(self, path, props=None, depth=0):
+        if not props:
+            propstr = '<allprop/>'
+        else:
+            propstr = '<prop>'
+            nscount = 0
+            for p in props:
+                ns = None
+                if isinstance(p, tuple):
+                    p, ns = p
+                if ns is None or ns == 'DAV:':
+                    propstr += '<%s/>' % p
+                else:
+                    propstr += '<ns%d:%s xmlns:ns%d="%s" />' %(nscount, p, nscount, ns)
+                    nscount += 1
+            propstr += '</prop>'
+                
+        body="""<?xml version="1.0" encoding="utf-8"?>
+            <propfind xmlns="DAV:">%s</propfind>""" % propstr
+        hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+                'Accept': 'text/xml',
+                'Depth': depth,
+                }
 
+        s, m, d = self._http_request(self.davpath + path, method='PROPFIND', 
+                                    hdrs=hdrs, body=body)
+        assert s == 207, "Bad status: %s" % s
+        ctype = m.getheader('Content-Type').split(';',1)[0]
+        assert ctype == 'text/xml', m.getheader('Content-Type')
+        res = self._parse_prop_response(d)
+        if depth == 0:
+            assert len(res) == 1
+            res = res.values()[0]
+        else:
+            assert len(res) >= 1
+        return res
+        
+
+    def gd_propname(self, path, depth=0):
+        body="""<?xml version="1.0" encoding="utf-8"?>
+            <propfind xmlns="DAV:"><propname/></propfind>"""
+        hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+                'Accept': 'text/xml',
+                'Depth': depth
+                }
+        s, m, d = self._http_request(self.davpath + path, method='PROPFIND', 
+                                    hdrs=hdrs, body=body)
+        assert s == 207, "Bad status: %s" % s
+        ctype = m.getheader('Content-Type').split(';',1)[0]
+        assert ctype == 'text/xml', m.getheader('Content-Type')
+        res = self._parse_prop_response(d)
+        if depth == 0:
+            assert len(res) == 1
+            res = res.values()[0]
+        else:
+            assert len(res) >= 1
+        return res
+
+    def gd_getetag(self, path, depth=0):
+        return self.gd_propfind(path, props=['getetag',], depth=depth)
+
+    def gd_lsl(self, path):
+        """ Return a list of 'ls -l' kind of data for a folder
+        
+            This is based on propfind.
+        """
+
+        lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'),
+                ('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'),
+                ('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'),
+                ('mime', 'getcontenttype', 'application/data'), ]
+
+        propnames = [ l[1] for l in lspairs]
+        propres = self.gd_propfind(path, props=propnames, depth=1)
+        
+        res = []
+        for href, pr in propres.items():
+            lsline = {}
+            for st, nsdic in pr:
+                davprops = nsdic['DAV:']
+                if st == 200:
+                    for lsp in lspairs:
+                        if lsp[1] in davprops:
+                            if lsp[1] == 'resourcetype':
+                                if davprops[lsp[1]] == 'collection':
+                                    lsline[lsp[0]] = 'dr-xr-x---'
+                                else:
+                                    lsline[lsp[0]] = '-r-xr-x---'
+                            else:
+                                lsline[lsp[0]] = davprops[lsp[1]]
+                elif st in (404, 403):
+                    for lsp in lspairs:
+                        if lsp[1] in davprops:
+                            lsline[lsp[0]] = lsp[2]
+                else:
+                    log.debug("Strange status: %s", st)
+            
+            res.append(lsline)
+            
+        return res
 
 #eof
\ No newline at end of file