#
##############################################################################
+#.apidoc title: Utilities: tools.misc
+
"""
Miscelleanous tools used by OpenERP.
"""
+from functools import wraps
import inspect
import subprocess
import logging
import time
import warnings
import zipfile
+from collections import defaultdict
from datetime import datetime
from email.MIMEText import MIMEText
from email.MIMEBase import MIMEBase
from email.MIMEMultipart import MIMEMultipart
from email.Header import Header
from email.Utils import formatdate, COMMASPACE
+from email import Utils
from email import Encoders
from itertools import islice, izip
from lxml import etree
from which import which
-if sys.version_info[:2] < (2, 4):
- from threadinglocal import local
-else:
- from threading import local
+from threading import local
try:
from html2text import html2text
except ImportError:
html2text = None
import openerp.loglevels as loglevels
+import openerp.pooler as pooler
from config import config
-from lru import LRU
+from cache import *
# get_encodings, ustr and exception_to_unicode were originally from tools.misc.
# There are moved to loglevels until we refactor tools.
def file_open(name, mode="r", subdir='addons', pathinfo=False):
"""Open a file from the OpenERP root, using a subdir folder.
+ Example::
+
>>> file_open('hr/report/timesheer.xsl')
>>> file_open('addons/hr/report/timesheet.xsl')
>>> file_open('../../base/report/rml_template.xsl', subdir='addons/hr/report', pathinfo=True)
- @param name: name of the file
- @param mode: file open mode
- @param subdir: subdirectory
- @param pathinfo: if True returns tupple (fileobject, filepath)
+ @param name name of the file
+ @param mode file open mode
+ @param subdir subdirectory
+ @param pathinfo if True returns tupple (fileobject, filepath)
- @return: fileobject if pathinfo is False else (fileobject, filepath)
+ @return fileobject if pathinfo is False else (fileobject, filepath)
"""
import openerp.modules as addons
- adps = addons.ad_paths
+ adps = addons.module.ad_paths
rtp = os.path.normcase(os.path.abspath(config['root_path']))
if name.replace(os.path.sep, '/').startswith('addons/'):
"""Flatten a list of elements into a uniqu list
Author: Christophe Simonis (christophe@tinyerp.com)
- Examples:
+ Examples::
>>> flatten(['a'])
['a']
>>> flatten('b')
def reverse_enumerate(l):
"""Like enumerate but in the other sens
+
+ Usage::
>>> a = ['a', 'b', 'c']
>>> it = reverse_enumerate(a)
>>> it.next()
""", re.VERBOSE)
res_re = re.compile(r"\[([0-9]+)\]", re.UNICODE)
command_re = re.compile("^Set-([a-z]+) *: *(.+)$", re.I + re.UNICODE)
-reference_re = re.compile("<.*-openobject-(\\d+)@(.*)>", re.UNICODE)
-
-priorities = {
- '1': '1 (Highest)',
- '2': '2 (High)',
- '3': '3 (Normal)',
- '4': '4 (Low)',
- '5': '5 (Lowest)',
- }
+reference_re = re.compile("<.*-open(?:object|erp)-(\\d+).*@(.*)>", re.UNICODE)
def html2plaintext(html, body_id=None, encoding='utf-8'):
+ """ From an HTML text, convert the HTML to plain text.
+ If @param body_id is provided then this is the tag where the
+ body (not necessarily <body>) starts.
+ """
## (c) Fry-IT, www.fry-it.com, 2007
## <peter@fry-it.com>
## download here: http://www.peterbe.com/plog/html2plaintext
-
- """ from an HTML text, convert the HTML to plain text.
- If @body_id is provided then this is the tag where the
- body (not necessarily <body>) starts.
- """
-
html = ustr(html)
from lxml.etree import tostring
return html
-def generate_tracking_message_id(openobject_id):
- """Returns a string that can be used in the Message-ID RFC822 header field so we
- can track the replies related to a given object thanks to the "In-Reply-To" or
- "References" fields that Mail User Agents will set.
- """
- return "<%s-openobject-%s@%s>" % (time.time(), openobject_id, socket.gethostname())
-
-def _email_send(smtp_from, smtp_to_list, message, openobject_id=None, ssl=False, debug=False):
- """Low-level method to send directly a Message through the configured smtp server.
- :param smtp_from: RFC-822 envelope FROM (not displayed to recipient)
- :param smtp_to_list: RFC-822 envelope RCPT_TOs (not displayed to recipient)
- :param message: an email.message.Message to send
- :param debug: True if messages should be output to stderr before being sent,
- and smtplib.SMTP put into debug mode.
- :return: True if the mail was delivered successfully to the smtp,
- else False (+ exception logged)
+def generate_tracking_message_id(res_id):
+ """Returns a string that can be used in the Message-ID RFC822 header field
+
+ Used to track the replies related to a given object thanks to the "In-Reply-To"
+ or "References" fields that Mail User Agents will set.
"""
- class WriteToLogger(object):
- def __init__(self):
- self.logger = loglevels.Logger()
+ return "<%s-openerp-%s@%s>" % (time.time(), res_id, socket.gethostname())
- def write(self, s):
- self.logger.notifyChannel('email_send', loglevels.LOG_DEBUG, s)
+def email_send(email_from, email_to, subject, body, email_cc=None, email_bcc=None, reply_to=False,
+ attachments=None, message_id=None, references=None, openobject_id=False, debug=False, subtype='plain', headers=None,
+ smtp_server=None, smtp_port=None, ssl=False, smtp_user=None, smtp_password=None, cr=None, uid=None):
+ """Low-level function for sending an email (deprecated).
+
+ :deprecate: since OpenERP 6.1, please use ir.mail_server.send_email() instead.
+ :param email_from: A string used to fill the `From` header, if falsy,
+ config['email_from'] is used instead. Also used for
+ the `Reply-To` header if `reply_to` is not provided
+ :param email_to: a sequence of addresses to send the mail to.
+ """
- if openobject_id:
- message['Message-Id'] = generate_tracking_message_id(openobject_id)
+ # If not cr, get cr from current thread database
+ if not cr:
+ db_name = getattr(threading.currentThread(), 'dbname', None)
+ if db_name:
+ cr = pooler.get_db_only(db_name).cursor()
+ else:
+ raise Exception("No database cursor found, please pass one explicitly")
+ # Send Email
try:
- smtp_server = config['smtp_server']
-
- if smtp_server.startswith('maildir:/'):
- from mailbox import Maildir
- maildir_path = smtp_server[8:]
- mdir = Maildir(maildir_path,factory=None, create = True)
- mdir.add(message.as_string(True))
- return True
-
- oldstderr = smtplib.stderr
- if not ssl: ssl = config.get('smtp_ssl', False)
- s = smtplib.SMTP()
- try:
- # in case of debug, the messages are printed to stderr.
- if debug:
- smtplib.stderr = WriteToLogger()
-
- s.set_debuglevel(int(bool(debug))) # 0 or 1
- s.connect(smtp_server, config['smtp_port'])
- if ssl:
- s.ehlo()
- s.starttls()
- s.ehlo()
-
- if config['smtp_user'] or config['smtp_password']:
- s.login(config['smtp_user'], config['smtp_password'])
-
- s.sendmail(smtp_from, smtp_to_list, message.as_string())
- finally:
- try:
- s.quit()
- if debug:
- smtplib.stderr = oldstderr
- except Exception:
- # ignored, just a consequence of the previous exception
- pass
-
+ mail_server_pool = pooler.get_pool(cr.dbname).get('ir.mail_server')
+ res = False
+ # Pack Message into MIME Object
+ email_msg = mail_server_pool.build_email(email_from, email_to, subject, body, email_cc, email_bcc, reply_to,
+ attachments, message_id, references, openobject_id, subtype, headers=headers)
+
+ res = mail_server_pool.send_email(cr, uid or 1, email_msg, mail_server_id=None,
+ smtp_server=smtp_server, smtp_port=smtp_port, smtp_user=smtp_user, smtp_password=smtp_password,
+ smtp_encryption=('ssl' if ssl else None), debug=debug)
except Exception:
- _logger.error('could not deliver email', exc_info=True)
+ _log.exception("tools.email_send failed to deliver email")
return False
-
- return True
-
-
-def email_send(email_from, email_to, subject, body, email_cc=None, email_bcc=None, reply_to=False,
- attach=None, openobject_id=False, ssl=False, debug=False, subtype='plain', x_headers=None, priority='3'):
-
- """Send an email.
-
- Arguments:
-
- `email_from`: A string used to fill the `From` header, if falsy,
- config['email_from'] is used instead. Also used for
- the `Reply-To` header if `reply_to` is not provided
-
- `email_to`: a sequence of addresses to send the mail to.
- """
- if x_headers is None:
- x_headers = {}
-
-
- if not (email_from or config['email_from']):
- raise ValueError("Sending an email requires either providing a sender "
- "address or having configured one")
-
- if not email_from: email_from = config.get('email_from', False)
- email_from = ustr(email_from).encode('utf-8')
-
- if not email_cc: email_cc = []
- if not email_bcc: email_bcc = []
- if not body: body = u''
-
- email_body = ustr(body).encode('utf-8')
- email_text = MIMEText(email_body or '',_subtype=subtype,_charset='utf-8')
-
- msg = MIMEMultipart()
-
- msg['Subject'] = Header(ustr(subject), 'utf-8')
- msg['From'] = email_from
- del msg['Reply-To']
- if reply_to:
- msg['Reply-To'] = reply_to
- else:
- msg['Reply-To'] = msg['From']
- msg['To'] = COMMASPACE.join(email_to)
- if email_cc:
- msg['Cc'] = COMMASPACE.join(email_cc)
- if email_bcc:
- msg['Bcc'] = COMMASPACE.join(email_bcc)
- msg['Date'] = formatdate(localtime=True)
-
- msg['X-Priority'] = priorities.get(priority, '3 (Normal)')
-
- # Add dynamic X Header
- for key, value in x_headers.iteritems():
- msg['%s' % key] = str(value)
-
- if html2text and subtype == 'html':
- text = html2text(email_body.decode('utf-8')).encode('utf-8')
- alternative_part = MIMEMultipart(_subtype="alternative")
- alternative_part.attach(MIMEText(text, _charset='utf-8', _subtype='plain'))
- alternative_part.attach(email_text)
- msg.attach(alternative_part)
- else:
- msg.attach(email_text)
-
- if attach:
- for (fname,fcontent) in attach:
- part = MIMEBase('application', "octet-stream")
- part.set_payload( fcontent )
- Encoders.encode_base64(part)
- part.add_header('Content-Disposition', 'attachment; filename="%s"' % (fname,))
- msg.attach(part)
-
- return _email_send(email_from, flatten([email_to, email_cc, email_bcc]), msg, openobject_id=openobject_id, ssl=ssl, debug=debug)
+ finally:
+ cr.close()
+ return res
#----------------------------------------------------------
# SMS
# FIXME: Use the logger if there is an error
return True
-#---------------------------------------------------------
-# Class that stores an updateable string (used in wizards)
-#---------------------------------------------------------
class UpdateableStr(local):
+ """ Class that stores an updateable string (used in wizards)
+ """
def __init__(self, string=''):
self.string = string
class UpdateableDict(local):
- '''Stores an updateable dict to use in wizards'''
+ """Stores an updateable dict to use in wizards
+ """
def __init__(self, dict=None):
if dict is None:
def __ne__(self, y):
return self.dict.__ne__(y)
-
-# Don't use ! Use res.currency.round()
class currency(float):
+ """ Deprecate
+
+ .. warning::
+
+ Don't use ! Use res.currency.round()
+ """
def __init__(self, value, accuracy=2, rounding=None):
if rounding is None:
# display_value = int(self*(10**(-self.accuracy))/self.rounding)*self.rounding/(10**(-self.accuracy))
# return str(display_value)
-
-def is_hashable(h):
- try:
- hash(h)
- return True
- except TypeError:
- return False
-
-class dummy_cache(object):
- """ Cache decorator replacement to actually do no caching.
-
- This can be useful to benchmark and/or track memory leak.
-
- """
-
- def __init__(self, timeout=None, skiparg=2, multi=None, size=8192):
- pass
-
- def clear(self, dbname, *args, **kwargs):
- pass
-
- @classmethod
- def clean_caches_for_db(cls, dbname):
- pass
-
- def __call__(self, fn):
- fn.clear_cache = self.clear
- return fn
-
-class real_cache(object):
- """
- Use it as a decorator of the function you plan to cache
- Timeout: 0 = no timeout, otherwise in seconds
- """
-
- __caches = []
-
- def __init__(self, timeout=None, skiparg=2, multi=None, size=8192):
- assert skiparg >= 2 # at least self and cr
- if timeout is None:
- self.timeout = config['cache_timeout']
- else:
- self.timeout = timeout
- self.skiparg = skiparg
- self.multi = multi
- self.lasttime = time.time()
- self.cache = LRU(size) # TODO take size from config
- self.fun = None
- cache.__caches.append(self)
-
-
- def _generate_keys(self, dbname, kwargs2):
- """
- Generate keys depending of the arguments and the self.mutli value
- """
-
- def to_tuple(d):
- pairs = d.items()
- pairs.sort(key=lambda (k,v): k)
- for i, (k, v) in enumerate(pairs):
- if isinstance(v, dict):
- pairs[i] = (k, to_tuple(v))
- if isinstance(v, (list, set)):
- pairs[i] = (k, tuple(v))
- elif not is_hashable(v):
- pairs[i] = (k, repr(v))
- return tuple(pairs)
-
- if not self.multi:
- key = (('dbname', dbname),) + to_tuple(kwargs2)
- yield key, None
- else:
- multis = kwargs2[self.multi][:]
- for id in multis:
- kwargs2[self.multi] = (id,)
- key = (('dbname', dbname),) + to_tuple(kwargs2)
- yield key, id
-
- def _unify_args(self, *args, **kwargs):
- # Update named arguments with positional argument values (without self and cr)
- kwargs2 = self.fun_default_values.copy()
- kwargs2.update(kwargs)
- kwargs2.update(dict(zip(self.fun_arg_names, args[self.skiparg-2:])))
- return kwargs2
-
- def clear(self, dbname, *args, **kwargs):
- """clear the cache for database dbname
- if *args and **kwargs are both empty, clear all the keys related to this database
- """
- if not args and not kwargs:
- keys_to_del = [key for key in self.cache.keys() if key[0][1] == dbname]
- else:
- kwargs2 = self._unify_args(*args, **kwargs)
- keys_to_del = [key for key, _ in self._generate_keys(dbname, kwargs2) if key in self.cache.keys()]
-
- for key in keys_to_del:
- self.cache.pop(key)
-
- @classmethod
- def clean_caches_for_db(cls, dbname):
- for c in cls.__caches:
- c.clear(dbname)
-
- def __call__(self, fn):
- if self.fun is not None:
- raise Exception("Can not use a cache instance on more than one function")
- self.fun = fn
-
- argspec = inspect.getargspec(fn)
- self.fun_arg_names = argspec[0][self.skiparg:]
- self.fun_default_values = {}
- if argspec[3]:
- self.fun_default_values = dict(zip(self.fun_arg_names[-len(argspec[3]):], argspec[3]))
-
- def cached_result(self2, cr, *args, **kwargs):
- if time.time()-int(self.timeout) > self.lasttime:
- self.lasttime = time.time()
- t = time.time()-int(self.timeout)
- old_keys = [key for key in self.cache.keys() if self.cache[key][1] < t]
- for key in old_keys:
- self.cache.pop(key)
-
- kwargs2 = self._unify_args(*args, **kwargs)
-
- result = {}
- notincache = {}
- for key, id in self._generate_keys(cr.dbname, kwargs2):
- if key in self.cache:
- result[id] = self.cache[key][0]
- else:
- notincache[id] = key
-
- if notincache:
- if self.multi:
- kwargs2[self.multi] = notincache.keys()
-
- result2 = fn(self2, cr, *args[:self.skiparg-2], **kwargs2)
- if not self.multi:
- key = notincache[None]
- self.cache[key] = (result2, time.time())
- result[None] = result2
- else:
- for id in result2:
- key = notincache[id]
- self.cache[key] = (result2[id], time.time())
- result.update(result2)
-
- if not self.multi:
- return result[None]
- return result
-
- cached_result.clear_cache = self.clear
- return cached_result
-
-# TODO make it an option
-cache = real_cache
-
def to_xml(s):
return s.replace('&','&').replace('<','<').replace('>','>')
-# to be compatible with python 2.4
-import __builtin__
-if not hasattr(__builtin__, 'all'):
- def all(iterable):
- for element in iterable:
- if not element:
- return False
- return True
-
- __builtin__.all = all
- del all
-
-if not hasattr(__builtin__, 'any'):
- def any(iterable):
- for element in iterable:
- if element:
- return True
- return False
-
- __builtin__.any = any
- del any
-
def get_iso_codes(lang):
if lang.find('_') != -1:
if lang.split('_')[0] == lang.split('_')[1].lower():
return "%0.2f %s" % (s, units[i])
def logged(f):
- from func import wraps
-
@wraps(f)
def wrapper(*args, **kwargs):
from pprint import pformat
self.fname = fname
def __call__(self, f):
- from func import wraps
-
@wraps(f)
def wrapper(*args, **kwargs):
class profile_wrapper(object):
return wrapper
-def debug(what):
- """
- This method allow you to debug your code without print
- Example:
- >>> def func_foo(bar)
- ... baz = bar
- ... debug(baz)
- ... qnx = (baz, bar)
- ... debug(qnx)
- ...
- >>> func_foo(42)
-
- This will output on the logger:
-
- [Wed Dec 25 00:00:00 2008] DEBUG:func_foo:baz = 42
- [Wed Dec 25 00:00:00 2008] DEBUG:func_foo:qnx = (42, 42)
-
- To view the DEBUG lines in the logger you must start the server with the option
- --log-level=debug
-
- """
- warnings.warn("The tools.debug() method is deprecated, please use logging.",
- DeprecationWarning, stacklevel=2)
- from inspect import stack
- from pprint import pformat
- st = stack()[1]
- param = re.split("debug *\((.+)\)", st[4][0].strip())[1].strip()
- while param.count(')') > param.count('('): param = param[:param.rfind(')')]
- what = pformat(what)
- if param != what:
- what = "%s = %s" % (param, what)
- logging.getLogger(st[3]).debug(what)
-
-
__icons_list = ['STOCK_ABOUT', 'STOCK_ADD', 'STOCK_APPLY', 'STOCK_BOLD',
'STOCK_CANCEL', 'STOCK_CDROM', 'STOCK_CLEAR', 'STOCK_CLOSE', 'STOCK_COLOR_PICKER',
'STOCK_CONNECT', 'STOCK_CONVERT', 'STOCK_COPY', 'STOCK_CUT', 'STOCK_DELETE',
# times.
def get_win32_timezone():
"""Attempt to return the "standard name" of the current timezone on a win32 system.
- @return: the standard name of the current win32 timezone, or False if it cannot be found.
+ @return the standard name of the current win32 timezone, or False if it cannot be found.
"""
res = False
if (sys.platform == "win32"):
def detect_server_timezone():
"""Attempt to detect the timezone to use on the server side.
Defaults to UTC if no working timezone can be found.
- @return: the timezone identifier as expected by pytz.timezone.
+ @return the timezone identifier as expected by pytz.timezone.
"""
try:
import pytz
return 'UTC'
def get_server_timezone():
- # timezone detection is safe in multithread, so lazy init is ok here
- if (not config['timezone']):
- config['timezone'] = detect_server_timezone()
- return config['timezone']
+ return "UTC"
DEFAULT_SERVER_DATE_FORMAT = "%Y-%m-%d"
@param ignore_unparsable_time: if True, return False if src_tstamp_str cannot be parsed
using src_format or formatted using dst_format.
- @return: local/client formatted timestamp, expressed in the local/client timezone if possible
+ @return local/client formatted timestamp, expressed in the local/client timezone if possible
and if tz_offset is true, or src_tstamp_str if timezone offset could not be determined.
"""
if not src_tstamp_str:
a.start()
return True
+def get_and_group_by_field(cr, uid, obj, ids, field, context=None):
+ """ Read the values of ``field´´ for the given ``ids´´ and group ids by value.
+
+ :param string field: name of the field we want to read and group by
+ :return: mapping of field values to the list of ids that have it
+ :rtype: dict
+ """
+ res = {}
+ for record in obj.read(cr, uid, ids, [field], context=context):
+ key = record[field]
+ res.setdefault(key[0] if isinstance(key, tuple) else key, []).append(record['id'])
+ return res
+
+def get_and_group_by_company(cr, uid, obj, ids, context=None):
+ return get_and_group_by_field(cr, uid, obj, ids, field='company_id', context=context)
# port of python 2.6's attrgetter with support for dotted notation
def resolve_attr(obj, attr):
or escaping, keeping the original string untouched. The name come from Lisp's unquote.
One of the uses for this is to preserve or insert bare variable names within dicts during eval()
of a dict's repr(). Use with care.
- Some examples:
+
+ Some examples (notice that there are never quotes surrounding
+ the ``active_id`` name:
+
>>> unquote('active_id')
active_id
- >>> repr(unquote('active_id'))
- active_id
>>> d = {'test': unquote('active_id')}
>>> d
{'test': active_id}
- >>> repr(d)
- "{'test': active_id}"
+ >>> print d
+ {'test': active_id}
"""
def __repr__(self):
return self
+class UnquoteEvalContext(defaultdict):
+ """Defaultdict-based evaluation context that returns
+ an ``unquote`` string for any missing name used during
+ the evaluation.
+ Mostly useful for evaluating OpenERP domains/contexts that
+ may refer to names that are unknown at the time of eval,
+ so that when the context/domain is converted back to a string,
+ the original names are preserved.
+
+ **Warning**: using an ``UnquoteEvalContext`` as context for ``eval()`` or
+ ``safe_eval()`` will shadow the builtins, which may cause other
+ failures, depending on what is evaluated.
+
+ Example (notice that ``section_id`` is preserved in the final
+ result) :
+
+ >>> context_str = "{'default_user_id': uid, 'default_section_id': section_id}"
+ >>> eval(context_str, UnquoteEvalContext(uid=1))
+ {'default_user_id': 1, 'default_section_id': section_id}
+
+ """
+ def __init__(self, *args, **kwargs):
+ super(UnquoteEvalContext, self).__init__(None, *args, **kwargs)
+
+ def __missing__(self, key):
+ return unquote(key)
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: