# code taken from the 'http-client.py' script:
# http://git.hellug.gr/?p=xrg/openerp;a=history;f=tests/http-client.py;hb=refs/heads/xrg-60
-import imp
-import sys
-import os
-import glob
-import subprocess
import re
import gzip
import logging
import httplib
-# from xmlrpclib import Transport
+from tools import config
+from xmlrpclib import Transport, ProtocolError
+import StringIO
+import base64
log = logging.getLogger('http-client')
return cert
-def http_request(host, path, user=None, method='GET', hdrs=None, body=None, dbg=2):
+
+class DAVClient(object):
+ """An instance of a WebDAV client, connected to the OpenERP server
+ """
+
+ def __init__(self, user=None, passwd=None, dbg=0, use_ssl=False, useragent=False):
+ if use_ssl:
+ self.host = config.get_misc('httpsd', 'interface', False)
+ self.port = config.get_misc('httpsd', 'port', 8071)
+ if not self.host:
+ self.host = config.get('xmlrpcs_interface')
+ self.port = config.get('xmlrpcs_port')
+ else:
+ self.host = config.get_misc('httpd', 'interface')
+ self.port = config.get_misc('httpd', 'port', 8069)
+ if not self.host:
+ self.host = config.get('xmlrpc_interface')
+ self.port = config.get('xmlrpc_port') or self.port
+ if self.host == '0.0.0.0' or not self.host:
+ self.host = '127.0.0.1'
+ self.port = int(self.port)
+ if not config.get_misc('webdav','enable',True):
+ raise Exception("WebDAV is disabled, cannot continue")
+ self.davpath = '/' + config.get_misc('webdav','vdir','webdav')
+ self.user = user
+ self.passwd = passwd
+ self.dbg = dbg
+ self.hdrs = {}
+ if useragent:
+ self.set_useragent(useragent)
+
+ def get_creds(self, obj, cr, uid):
+ """Read back the user credentials from cr, uid
+
+ @param obj is any orm object, in order to use its pool
+ @param uid is the numeric id, which we will try to reverse resolve
+
+ note: this is a hackish way to get the credentials. It is expected
+ to break if "base_crypt" is used.
+ """
+ ruob = obj.pool.get('res.users')
+ res = ruob.read(cr, 1, [uid,], ['login', 'password'])
+ assert res, "uid %s not found" % uid
+ self.user = res[0]['login']
+ self.passwd = res[0]['password']
+ return True
+
+ def set_useragent(self, uastr):
+ """ Set the user-agent header to something meaningful.
+ Some shorthand names will be replaced by stock strings.
+ """
+ if uastr in ('KDE4', 'Korganizer'):
+ self.hdrs['User-Agent'] = "Mozilla/5.0 (compatible; Konqueror/4.4; Linux) KHTML/4.4.3 (like Gecko)"
+ elif uastr == 'iPhone3':
+ self.hdrs['User-Agent'] = "DAVKit/5.0 (765); iCalendar/5.0 (79); iPhone/4.1 8B117"
+ elif uastr == "MacOS":
+ self.hdrs['User-Agent'] = "WebDAVFS/1.8 (01808000) Darwin/9.8.0 (i386)"
+ else:
+ self.hdrs['User-Agent'] = uastr
+
+ def _http_request(self, path, method='GET', hdrs=None, body=None):
if not hdrs:
hdrs = {}
- passwd=None
- if user:
- import getpass
- passwd = getpass.getpass("Password for %s@%s: " %(user,host))
import base64
- log.debug("Getting %s http://%s/%s", method, host , path)
- conn = httplib.HTTPConnection(host)
+ dbg = self.dbg
+ hdrs.update(self.hdrs)
+ log.debug("Getting %s http://%s:%d/%s", method, self.host, self.port, path)
+ conn = httplib.HTTPConnection(self.host, port=self.port)
conn.set_debuglevel(dbg)
if not path:
path = "/index.html"
r1 = conn.getresponse()
except httplib.BadStatusLine, bsl:
log.warning("Bad status line: %s", bsl.line)
- return
+ raise Exception('Bad status line')
if r1.status == 401: # and r1.headers:
if 'www-authenticate' in r1.msg:
(atype,realm) = r1.msg.getheader('www-authenticate').split(' ',1)
data1 = r1.read()
- if not user:
+ if not self.user:
raise Exception('Must auth, have no user/pass!')
log.debug("Ver: %s, closed: %s, will close: %s", r1.version,r1.isclosed(), r1.will_close)
log.debug("Want to do auth %s for realm %s", atype, realm)
if atype == 'Basic' :
- auths = base64.encodestring(user + ':' + passwd)
+ auths = base64.encodestring(self.user + ':' + self.passwd)
if auths[-1] == "\n":
auths = auths[:-1]
hdrs['Authorization']= 'Basic '+ auths
log.debug("Reponse: %s %s",r1.status, r1.reason)
data1 = r1.read()
- did_print = False
- log.debug("Body:\n%s\nEnd of body\n", data1)
+ log.debug("Body:\n%s\nEnd of body", data1)
try:
ctype = r1.msg.getheader('content-type')
if ctype and ';' in ctype:
if ctype == 'text/xml':
doc = xml.dom.minidom.parseString(data1)
log.debug("XML Body:\n %s", doc.toprettyxml(indent="\t"))
- did_print = True
- except Exception, e:
+ except Exception:
log.warning("could not print xml", exc_info=True)
pass
conn.close()
+ return r1.status, r1.msg, data1
+
+ def _assert_headers(self, expect, msg):
+ """ Assert that the headers in msg contain the expect values
+ """
+ for k, v in expect.items():
+ hval = msg.getheader(k)
+ if not hval:
+ raise AssertionError("Header %s not defined in http response" % k)
+ if isinstance(v, (list, tuple)):
+ delim = ','
+ hits = map(str.strip, hval.split(delim))
+ mvits= []
+ for vit in v:
+ if vit not in hits:
+ mvits.append(vit)
+ if mvits:
+ raise AssertionError("HTTP header \"%s\" is missing: %s" %(k, ', '.join(mvits)))
+ else:
+ if hval.strip() != v.strip():
+ raise AssertionError("HTTP header \"%s: %s\"" % (k, hval))
+
+ def gd_options(self, path='*', expect=None):
+ """ Test the http options functionality
+ If a dictionary is defined in expect, those options are
+ asserted.
+ """
+ if path != '*':
+ path = self.davpath + path
+ hdrs = { 'Content-Length': 0
+ }
+ s, m, d = self._http_request(path, method='OPTIONS', hdrs=hdrs)
+ assert s == 200, "Status: %r" % s
+ assert 'OPTIONS' in m.getheader('Allow')
+ log.debug('Options: %r', m.getheader('Allow'))
+
+ if expect:
+ self._assert_headers(expect, m)
+
+ def _parse_prop_response(self, data):
+ """ Parse a propfind/propname response
+ """
+ def getText(node):
+ rc = []
+ for node in node.childNodes:
+ if node.nodeType == node.TEXT_NODE:
+ rc.append(node.data)
+ return ''.join(rc)
+
+ def getElements(node, namespaces=None, strict=False):
+ for cnod in node.childNodes:
+ if cnod.nodeType != node.ELEMENT_NODE:
+ if strict:
+ log.debug("Found %r inside <%s>", cnod, node.tagName)
+ continue
+ if namespaces and (cnod.namespaceURI not in namespaces):
+ log.debug("Ignoring <%s> in <%s>", cnod.tagName, node.localName)
+ continue
+ yield cnod
+
+ nod = xml.dom.minidom.parseString(data)
+ nod_r = nod.documentElement
+ res = {}
+ assert nod_r.localName == 'multistatus', nod_r.tagName
+ for resp in nod_r.getElementsByTagNameNS('DAV:', 'response'):
+ href = None
+ status = 200
+ res_nss = {}
+ for cno in getElements(resp, namespaces=['DAV:',]):
+ if cno.localName == 'href':
+ assert href is None, "Second href in same response"
+ href = getText(cno)
+ elif cno.localName == 'propstat':
+ for pno in getElements(cno, namespaces=['DAV:',]):
+ rstatus = None
+ if pno.localName == 'prop':
+ for prop in getElements(pno):
+ key = prop.localName
+ tval = getText(prop).strip()
+ val = tval or (True, rstatus or status)
+ if prop.namespaceURI == 'DAV:' and prop.localName == 'resourcetype':
+ val = 'plain'
+ for rte in getElements(prop, namespaces=['DAV:',]):
+ # Note: we only look at DAV:... elements, we
+ # actually expect only one DAV:collection child
+ val = rte.localName
+ res_nss.setdefault(prop.namespaceURI,{})[key] = val
+ elif pno.localName == 'status':
+ rstr = getText(pno)
+ htver, sta, msg = rstr.split(' ', 3)
+ assert htver == 'HTTP/1.1'
+ rstatus = int(sta)
+ else:
+ log.debug("What is <%s> inside a <propstat>?", pno.tagName)
+
+ else:
+ log.debug("Unknown node: %s", cno.tagName)
+
+ res.setdefault(href,[]).append((status, res_nss))
+
+ return res
+
+ def gd_propfind(self, path, props=None, depth=0):
+ if not props:
+ propstr = '<allprop/>'
+ else:
+ propstr = '<prop>'
+ nscount = 0
+ for p in props:
+ ns = None
+ if isinstance(p, tuple):
+ p, ns = p
+ if ns is None or ns == 'DAV:':
+ propstr += '<%s/>' % p
+ else:
+ propstr += '<ns%d:%s xmlns:ns%d="%s" />' %(nscount, p, nscount, ns)
+ nscount += 1
+ propstr += '</prop>'
+
+ body="""<?xml version="1.0" encoding="utf-8"?>
+ <propfind xmlns="DAV:">%s</propfind>""" % propstr
+ hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+ 'Accept': 'text/xml',
+ 'Depth': depth,
+ }
+
+ s, m, d = self._http_request(self.davpath + path, method='PROPFIND',
+ hdrs=hdrs, body=body)
+ assert s == 207, "Bad status: %s" % s
+ ctype = m.getheader('Content-Type').split(';',1)[0]
+ assert ctype == 'text/xml', m.getheader('Content-Type')
+ res = self._parse_prop_response(d)
+ if depth == 0:
+ assert len(res) == 1
+ res = res.values()[0]
+ else:
+ assert len(res) >= 1
+ return res
+
+
+ def gd_propname(self, path, depth=0):
+ body="""<?xml version="1.0" encoding="utf-8"?>
+ <propfind xmlns="DAV:"><propname/></propfind>"""
+ hdrs = { 'Content-Type': 'text/xml; charset=utf-8',
+ 'Accept': 'text/xml',
+ 'Depth': depth
+ }
+ s, m, d = self._http_request(self.davpath + path, method='PROPFIND',
+ hdrs=hdrs, body=body)
+ assert s == 207, "Bad status: %s" % s
+ ctype = m.getheader('Content-Type').split(';',1)[0]
+ assert ctype == 'text/xml', m.getheader('Content-Type')
+ res = self._parse_prop_response(d)
+ if depth == 0:
+ assert len(res) == 1
+ res = res.values()[0]
+ else:
+ assert len(res) >= 1
+ return res
+
+ def gd_getetag(self, path, depth=0):
+ return self.gd_propfind(path, props=['getetag',], depth=depth)
+
+ def gd_lsl(self, path):
+ """ Return a list of 'ls -l' kind of data for a folder
+
+ This is based on propfind.
+ """
+
+ lspairs = [ ('name', 'displayname', 'n/a'), ('size', 'getcontentlength', '0'),
+ ('type', 'resourcetype', '----------'), ('uid', 'owner', 'nobody'),
+ ('gid', 'group', 'nogroup'), ('mtime', 'getlastmodified', 'n/a'),
+ ('mime', 'getcontenttype', 'application/data'), ]
+
+ propnames = [ l[1] for l in lspairs]
+ propres = self.gd_propfind(path, props=propnames, depth=1)
+
+ res = []
+ for href, pr in propres.items():
+ lsline = {}
+ for st, nsdic in pr:
+ davprops = nsdic['DAV:']
+ if st == 200:
+ for lsp in lspairs:
+ if lsp[1] in davprops:
+ if lsp[1] == 'resourcetype':
+ if davprops[lsp[1]] == 'collection':
+ lsline[lsp[0]] = 'dr-xr-x---'
+ else:
+ lsline[lsp[0]] = '-r-xr-x---'
+ else:
+ lsline[lsp[0]] = davprops[lsp[1]]
+ elif st in (404, 403):
+ for lsp in lspairs:
+ if lsp[1] in davprops:
+ lsline[lsp[0]] = lsp[2]
+ else:
+ log.debug("Strange status: %s", st)
+
+ res.append(lsline)
+
+ return res
+
+#eof
\ No newline at end of file