Miscelleanous tools used by OpenERP.
"""
-import os, time, sys
import inspect
-from datetime import datetime
-
-from config import config
-
-import zipfile
-import release
-import socket
+import subprocess
import logging
+import os
import re
-from itertools import islice
-import threading
-from which import which
-
import smtplib
+import socket
+import sys
+import threading
+import time
+import warnings
+import zipfile
+from datetime import datetime
from email.MIMEText import MIMEText
from email.MIMEBase import MIMEBase
from email.MIMEMultipart import MIMEMultipart
from email.Header import Header
from email.Utils import formatdate, COMMASPACE
-from email.Utils import formatdate, COMMASPACE
from email import Encoders
-import netsvc
-
+from itertools import islice, izip
+from lxml import etree
+from which import which
if sys.version_info[:2] < (2, 4):
from threadinglocal import local
else:
from threading import local
+try:
+ from html2text import html2text
+except ImportError:
+ html2text = None
-from itertools import izip
+import netsvc
+from config import config
+from lru import LRU
_logger = logging.getLogger('tools')
+# List of etree._Element subclasses that we choose to ignore when parsing XML.
+# We include the *Base ones just in case, currently they seem to be subclasses of the _* ones.
+SKIPPED_ELEMENT_TYPES = (etree._Comment, etree._ProcessingInstruction, etree.CommentBase, etree.PIBase)
+
# initialize a database with base/base.sql
def init_db(cr):
import addons
f = addons.get_module_resource('base', 'base.sql')
- cr.execute(file_open(f).read())
- cr.commit()
+ base_sql_file = file_open(f)
+ try:
+ cr.execute(base_sql_file.read())
+ cr.commit()
+ finally:
+ base_sql_file.close()
for i in addons.get_modules():
mod_path = addons.get_module_path(i)
p_id = None
while categs:
if p_id is not None:
- cr.execute('select id \
- from ir_module_category \
- where name=%s and parent_id=%s', (categs[0], p_id))
+ cr.execute('SELECT id \
+ FROM ir_module_category \
+ WHERE name=%s AND parent_id=%s', (categs[0], p_id))
else:
- cr.execute('select id \
- from ir_module_category \
- where name=%s and parent_id is NULL', (categs[0],))
+ cr.execute('SELECT id \
+ FROM ir_module_category \
+ WHERE name=%s AND parent_id IS NULL', (categs[0],))
c_id = cr.fetchone()
if not c_id:
- cr.execute('select nextval(\'ir_module_category_id_seq\')')
+ cr.execute('INSERT INTO ir_module_category \
+ (name, parent_id) \
+ VALUES (%s, %s) RETURNING id', (categs[0], p_id))
c_id = cr.fetchone()[0]
- cr.execute('insert into ir_module_category \
- (id, name, parent_id) \
- values (%s, %s, %s)', (c_id, categs[0], p_id))
else:
c_id = c_id[0]
p_id = c_id
state = 'uninstalled'
else:
state = 'uninstallable'
- cr.execute('select nextval(\'ir_module_module_id_seq\')')
- id = cr.fetchone()[0]
- cr.execute('insert into ir_module_module \
- (id, author, website, name, shortdesc, description, \
- category_id, state, certificate, web) \
- values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)', (
- id, info.get('author', ''),
+ cr.execute('INSERT INTO ir_module_module \
+ (author, website, name, shortdesc, description, \
+ category_id, state, certificate, web, license) \
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id', (
+ info.get('author', ''),
info.get('website', ''), i, info.get('name', False),
info.get('description', ''), p_id, state, info.get('certificate') or None,
- info.get('web') or False))
- cr.execute('insert into ir_model_data \
- (name,model,module, res_id, noupdate) values (%s,%s,%s,%s,%s)', (
+ info.get('web') or False,
+ info.get('license') or 'AGPL-3'))
+ id = cr.fetchone()[0]
+ cr.execute('INSERT INTO ir_model_data \
+ (name,model,module, res_id, noupdate) VALUES (%s,%s,%s,%s,%s)', (
'module_meta_information', 'ir.module.module', i, id, True))
dependencies = info.get('depends', [])
for d in dependencies:
- cr.execute('insert into ir_module_module_dependency \
- (module_id,name) values (%s, %s)', (id, d))
+ cr.execute('INSERT INTO ir_module_module_dependency \
+ (module_id,name) VALUES (%s, %s)', (id, d))
cr.commit()
def find_in_path(name):
prog = find_pg_tool(name)
if not prog:
raise Exception('Couldn\'t find %s' % name)
- args2 = (os.path.basename(prog),) + args
- return os.spawnv(os.P_WAIT, prog, args2)
+ args2 = (prog,) + args
+
+ return subprocess.call(args2)
def exec_pg_command_pipe(name, *args):
prog = find_pg_tool(name)
if not prog:
raise Exception('Couldn\'t find %s' % name)
- if os.name == "nt":
- cmd = '"' + prog + '" ' + ' '.join(args)
- else:
- cmd = prog + ' ' + ' '.join(args)
- return os.popen2(cmd, 'b')
+ # on win32, passing close_fds=True is not compatible
+ # with redirecting std[in/err/out]
+ pop = subprocess.Popen((prog,) + args, bufsize= -1,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ close_fds=(os.name=="posix"))
+ return (pop.stdin, pop.stdout)
def exec_command_pipe(name, *args):
prog = find_in_path(name)
if not prog:
raise Exception('Couldn\'t find %s' % name)
- if os.name == "nt":
- cmd = '"'+prog+'" '+' '.join(args)
- else:
- cmd = prog+' '+' '.join(args)
- return os.popen2(cmd, 'b')
+ # on win32, passing close_fds=True is not compatible
+ # with redirecting std[in/err/out]
+ pop = subprocess.Popen((prog,) + args, bufsize= -1,
+ stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ close_fds=(os.name=="posix"))
+ return (pop.stdin, pop.stdout)
#----------------------------------------------------------
# File paths
subdir2 = (subdir2 != 'addons' or None) and subdir2
for adp in adps:
- try:
- if subdir2:
- fn = os.path.join(adp, subdir2, name)
- else:
- fn = os.path.join(adp, name)
- fn = os.path.normpath(fn)
- fo = file_open(fn, mode=mode, subdir=None, pathinfo=pathinfo)
- if pathinfo:
- return fo, fn
- return fo
- except IOError, e:
- pass
+ try:
+ if subdir2:
+ fn = os.path.join(adp, subdir2, name)
+ else:
+ fn = os.path.join(adp, name)
+ fn = os.path.normpath(fn)
+ fo = file_open(fn, mode=mode, subdir=None, pathinfo=pathinfo)
+ if pathinfo:
+ return fo, fn
+ return fo
+ except IOError:
+ pass
if subdir:
name = os.path.join(rtp, subdir, name)
if pathinfo:
return fo, name
return fo
- except:
+ except Exception:
name2 = os.path.normpath(os.path.join(head + '.zip', zipname))
pass
for i in (name2, name):
return fo
if os.path.splitext(name)[1] == '.rml':
raise IOError, 'Report %s doesn\'t exist or deleted : ' %str(name)
- raise IOError, 'File not found : '+str(name)
+ raise IOError, 'File not found : %s' % name
#----------------------------------------------------------
html = ustr(html)
- from lxml.etree import Element, tostring
+ from lxml.etree import tostring
try:
from lxml.html.soupparser import fromstring
kwargs = {}
url_index = []
i = 0
for link in tree.findall('.//a'):
- title = link.text
url = link.get('href')
if url:
i += 1
from mailbox import Maildir
maildir_path = smtp_server[8:]
mdir = Maildir(maildir_path,factory=None, create = True)
- mdir.add(msg.as_string(True))
+ mdir.add(message.as_string(True))
return True
oldstderr = smtplib.stderr
s.quit()
if debug:
smtplib.stderr = oldstderr
- except:
+ except Exception:
# ignored, just a consequence of the previous exception
pass
- except Exception, e:
+ except Exception:
_logger.error('could not deliver email', exc_info=True)
return False
"address or having configured one")
if not email_from: email_from = config.get('email_from', False)
+ email_from = ustr(email_from).encode('utf-8')
if not email_cc: email_cc = []
if not email_bcc: email_bcc = []
if not body: body = u''
- try: email_body = body.encode('utf-8')
- except (UnicodeEncodeError, UnicodeDecodeError):
- email_body = body
- try:
- email_text = MIMEText(email_body.encode('utf8') or '',_subtype=subtype,_charset='utf-8')
- except:
- email_text = MIMEText(email_body or '',_subtype=subtype,_charset='utf-8')
+ email_body = ustr(body).encode('utf-8')
+ email_text = MIMEText(email_body or '',_subtype=subtype,_charset='utf-8')
- if attach: msg = MIMEMultipart()
- else: msg = email_text
+ msg = MIMEMultipart()
msg['Subject'] = Header(ustr(subject), 'utf-8')
msg['From'] = email_from
msg['To'] = COMMASPACE.join(email_to)
if email_cc:
msg['Cc'] = COMMASPACE.join(email_cc)
- if email_bcc:
- msg['Bcc'] = COMMASPACE.join(email_bcc)
msg['Date'] = formatdate(localtime=True)
msg['X-Priority'] = priorities.get(priority, '3 (Normal)')
for key, value in x_headers.iteritems():
msg['%s' % key] = str(value)
- if attach:
+ if html2text and subtype == 'html':
+ text = html2text(email_body.decode('utf-8')).encode('utf-8')
+ alternative_part = MIMEMultipart(_subtype="alternative")
+ alternative_part.attach(MIMEText(text, _charset='utf-8', _subtype='plain'))
+ alternative_part.attach(email_text)
+ msg.attach(alternative_part)
+ else:
msg.attach(email_text)
+
+ if attach:
for (fname,fcontent) in attach:
part = MIMEBase('application', "octet-stream")
part.set_payload( fcontent )
url = "http://api.urlsms.com/SendSMS.aspx"
#url = "http://196.7.150.220/http/sendmsg"
params = urllib.urlencode({'UserID': user, 'Password': password, 'SenderID': api_id, 'MsgText': text, 'RecipientMobileNo':to})
- f = urllib.urlopen(url+"?"+params)
+ urllib.urlopen(url+"?"+params)
# FIXME: Use the logger if there is an error
return True
__caches = []
- def __init__(self, timeout=None, skiparg=2, multi=None):
+ def __init__(self, timeout=None, skiparg=2, multi=None, size=8192):
assert skiparg >= 2 # at least self and cr
if timeout is None:
self.timeout = config['cache_timeout']
self.skiparg = skiparg
self.multi = multi
self.lasttime = time.time()
- self.cache = {}
+ self.cache = LRU(size) # TODO take size from config
self.fun = None
cache.__caches.append(self)
try:
return unicode(value)
except Exception:
- raise UnicodeError('unable de to convert %r' % (value,))
+ raise UnicodeError('unable to convert %r' % (value,))
for ln in get_encodings(hint_encoding):
try:
return unicode(value, ln)
except Exception:
pass
- raise UnicodeError('unable de to convert %r' % (value,))
+ raise UnicodeError('unable to convert %r' % (value,))
def exception_to_unicode(e):
return "\n".join((ustr(a) for a in e.args))
try:
return ustr(e)
- except:
+ except Exception:
return u"Unknown message"
return lang
def get_languages():
+ # The codes below are those from Launchpad's Rosetta, with the exception
+ # of some trivial codes where the Launchpad code is xx and we have xx_XX.
languages={
'ab_RU': u'Abkhazian / аҧсуа',
'ar_AR': u'Arabic / الْعَرَبيّة',
'fr_FR': u'French / Français',
'gl_ES': u'Galician / Galego',
'gu_IN': u'Gujarati / ગુજરાતી',
+ 'he_IL': u'Hebrew / עִבְרִי',
'hi_IN': u'Hindi / हिंदी',
'hr_HR': u'Croatian / hrvatski jezik',
'hu_HU': u'Hungarian / Magyar',
'sl_SI': u'Slovenian / slovenščina',
'sk_SK': u'Slovak / Slovenský jazyk',
'sq_AL': u'Albanian / Shqip',
- 'sr_RS': u'Serbian / српски језик',
+ 'sr_RS': u'Serbian (Cyrillic) / српски',
+ 'sr@latin': u'Serbian (Latin) / srpski',
'sv_SE': u'Swedish / svenska',
'te_IN': u'Telugu / తెలుగు',
'tr_TR': u'Turkish / Türkçe',
return languages
def scan_languages():
-# import glob
-# file_list = [os.path.splitext(os.path.basename(f))[0] for f in glob.glob(os.path.join(config['root_path'],'addons', 'base', 'i18n', '*.po'))]
-# ret = [(lang, lang_dict.get(lang, lang)) for lang in file_list]
# Now it will take all languages from get languages function without filter it with base module languages
lang_dict = get_languages()
ret = [(lang, lang_dict.get(lang, lang)) for lang in list(lang_dict)]
@wraps(f)
def wrapper(*args, **kwargs):
- import netsvc
from pprint import pformat
vector = ['Call -> function: %r' % f]
--log-level=debug
"""
- import netsvc
+ warnings.warn("The tools.debug() method is deprecated, please use logging.",
+ DeprecationWarning, stacklevel=2)
from inspect import stack
- import re
from pprint import pformat
st = stack()[1]
param = re.split("debug *\((.+)\)", st[4][0].strip())[1].strip()
what = pformat(what)
if param != what:
what = "%s = %s" % (param, what)
- netsvc.Logger().notifyChannel(st[3], netsvc.LOG_DEBUG, what)
+ logging.getLogger(st[3]).debug(what)
-icons = map(lambda x: (x,x), ['STOCK_ABOUT', 'STOCK_ADD', 'STOCK_APPLY', 'STOCK_BOLD',
+__icons_list = ['STOCK_ABOUT', 'STOCK_ADD', 'STOCK_APPLY', 'STOCK_BOLD',
'STOCK_CANCEL', 'STOCK_CDROM', 'STOCK_CLEAR', 'STOCK_CLOSE', 'STOCK_COLOR_PICKER',
'STOCK_CONNECT', 'STOCK_CONVERT', 'STOCK_COPY', 'STOCK_CUT', 'STOCK_DELETE',
'STOCK_DIALOG_AUTHENTICATION', 'STOCK_DIALOG_ERROR', 'STOCK_DIALOG_INFO',
'terp-gdu-smart-failing','terp-go-week','terp-gtk-select-all','terp-locked','terp-mail-forward',
'terp-mail-message-new','terp-mail-replied','terp-rating-rated','terp-stage','terp-stock_format-scientific',
'terp-dolar_ok!','terp-idea','terp-stock_format-default','terp-mail-','terp-mail_delete'
-])
+]
-def extract_zip_file(zip_file, outdirectory):
- import zipfile
- import os
+def icons(*a, **kw):
+ global __icons_list
+ return [(x, x) for x in __icons_list ]
+def extract_zip_file(zip_file, outdirectory):
zf = zipfile.ZipFile(zip_file, 'r')
out = outdirectory
for path in zf.namelist():
"""
def _detect_ip_addr():
from array import array
- import socket
from struct import pack, unpack
try:
try:
ip_addr = _detect_ip_addr()
- except:
+ except Exception:
ip_addr = 'localhost'
return ip_addr
Defaults to UTC if no working timezone can be found.
@return: the timezone identifier as expected by pytz.timezone.
"""
- import time
- import netsvc
try:
import pytz
- except:
+ except Exception:
netsvc.Logger().notifyChannel("detect_server_timezone", netsvc.LOG_WARNING,
"Python pytz module is not available. Timezone will be set to UTC by default.")
return 'UTC'
try:
f = open("/etc/timezone")
tz_value = f.read(128).strip()
- except:
+ except Exception:
pass
finally:
f.close()
DEFAULT_SERVER_DATE_FORMAT,
DEFAULT_SERVER_TIME_FORMAT)
+# Python's strftime supports only the format directives
+# that are available on the platform's libc, so in order to
+# be cross-platform we map to the directives required by
+# the C standard (1989 version), always available on platforms
+# with a C standard implementation.
+DATETIME_FORMATS_MAP = {
+ '%C': '', # century
+ '%D': '%m/%d/%Y', # modified %y->%Y
+ '%e': '%d',
+ '%E': '', # special modifier
+ '%F': '%Y-%m-%d',
+ '%g': '%Y', # modified %y->%Y
+ '%G': '%Y',
+ '%h': '%b',
+ '%k': '%H',
+ '%l': '%I',
+ '%n': '\n',
+ '%O': '', # special modifier
+ '%P': '%p',
+ '%R': '%H:%M',
+ '%r': '%I:%M:%S %p',
+ '%s': '', #num of seconds since epoch
+ '%T': '%H:%M:%S',
+ '%t': ' ', # tab
+ '%u': ' %w',
+ '%V': '%W',
+ '%y': '%Y', # Even if %y works, it's ambiguous, so we should use %Y
+ '%+': '%Y-%m-%d %H:%M:%S',
+
+ # %Z is a special case that causes 2 problems at least:
+ # - the timezone names we use (in res_user.context_tz) come
+ # from pytz, but not all these names are recognized by
+ # strptime(), so we cannot convert in both directions
+ # when such a timezone is selected and %Z is in the format
+ # - %Z is replaced by an empty string in strftime() when
+ # there is not tzinfo in a datetime value (e.g when the user
+ # did not pick a context_tz). The resulting string does not
+ # parse back if the format requires %Z.
+ # As a consequence, we strip it completely from format strings.
+ # The user can always have a look at the context_tz in
+ # preferences to check the timezone.
+ '%z': '',
+ '%Z': '',
+}
+
def server_to_local_timestamp(src_tstamp_str, src_format, dst_format, dst_tz_name,
tz_offset=True, ignore_unparsable_time=True):
"""
fp = urllib.urlopen('http://www.openerp.com/scripts/survey.php', args)
fp.read()
fp.close()
- except:
+ except Exception:
pass
def upload_data(email, data, type='SURVEY'):
return True
-# port of python 2.6's attrgetter with support for dotted notation
+# port of python 2.6's attrgetter with support for dotted notation
def resolve_attr(obj, attr):
for name in attr.split("."):
obj = getattr(obj, name)