#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
-# Copyright (C) 2010 OpenERP s.a. (<http://openerp.com>).
+# Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
#
##############################################################################
+
"""
-Miscelleanous tools used by OpenERP.
+Miscellaneous tools used by OpenERP.
"""
-import inspect
+from functools import wraps
+import cProfile
import subprocess
import logging
import os
-import re
-import smtplib
import socket
import sys
import threading
import time
-import warnings
import zipfile
+from collections import defaultdict, Mapping
from datetime import datetime
-from email.MIMEText import MIMEText
-from email.MIMEBase import MIMEBase
-from email.MIMEMultipart import MIMEMultipart
-from email.Header import Header
-from email.Utils import formatdate, COMMASPACE
-from email import Utils
-from email import Encoders
-from itertools import islice, izip
+from itertools import islice, izip, groupby
from lxml import etree
from which import which
-if sys.version_info[:2] < (2, 4):
- from threadinglocal import local
-else:
- from threading import local
+from threading import local
+
try:
from html2text import html2text
except ImportError:
html2text = None
-import openerp.loglevels as loglevels
from config import config
-from lru import LRU
-import openerp.pooler as pooler
+from cache import *
# get_encodings, ustr and exception_to_unicode were originally from tools.misc.
# There are moved to loglevels until we refactor tools.
from openerp.loglevels import get_encodings, ustr, exception_to_unicode
-_logger = logging.getLogger('tools')
+_logger = logging.getLogger(__name__)
# List of etree._Element subclasses that we choose to ignore when parsing XML.
# We include the *Base ones just in case, currently they seem to be subclasses of the _* ones.
SKIPPED_ELEMENT_TYPES = (etree._Comment, etree._ProcessingInstruction, etree.CommentBase, etree.PIBase)
-# initialize a database with base/base.sql
-def init_db(cr):
- import openerp.addons as addons
- f = addons.get_module_resource('base', 'base.sql')
- base_sql_file = file_open(f)
- try:
- cr.execute(base_sql_file.read())
- cr.commit()
- finally:
- base_sql_file.close()
-
- for i in addons.get_modules():
- mod_path = addons.get_module_path(i)
- if not mod_path:
- continue
-
- info = addons.load_information_from_description_file(i)
-
- if not info:
- continue
- categs = info.get('category', 'Uncategorized').split('/')
- p_id = None
- while categs:
- if p_id is not None:
- cr.execute('SELECT id \
- FROM ir_module_category \
- WHERE name=%s AND parent_id=%s', (categs[0], p_id))
- else:
- cr.execute('SELECT id \
- FROM ir_module_category \
- WHERE name=%s AND parent_id IS NULL', (categs[0],))
- c_id = cr.fetchone()
- if not c_id:
- cr.execute('INSERT INTO ir_module_category \
- (name, parent_id) \
- VALUES (%s, %s) RETURNING id', (categs[0], p_id))
- c_id = cr.fetchone()[0]
- else:
- c_id = c_id[0]
- p_id = c_id
- categs = categs[1:]
-
- active = info.get('active', False)
- installable = info.get('installable', True)
- if installable:
- if active:
- state = 'to install'
- else:
- state = 'uninstalled'
- else:
- state = 'uninstallable'
- cr.execute('INSERT INTO ir_module_module \
- (author, website, name, shortdesc, description, \
- category_id, state, certificate, web, license) \
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id', (
- info.get('author', ''),
- info.get('website', ''), i, info.get('name', False),
- info.get('description', ''), p_id, state, info.get('certificate') or None,
- info.get('web') or False,
- info.get('license') or 'AGPL-3'))
- id = cr.fetchone()[0]
- cr.execute('INSERT INTO ir_model_data \
- (name,model,module, res_id, noupdate) VALUES (%s,%s,%s,%s,%s)', (
- 'module_meta_information', 'ir.module.module', i, id, True))
- dependencies = info.get('depends', [])
- for d in dependencies:
- cr.execute('INSERT INTO ir_module_module_dependency \
- (module_id,name) VALUES (%s, %s)', (id, d))
- cr.commit()
-
def find_in_path(name):
try:
return which(name)
pop = subprocess.Popen((prog,) + args, bufsize= -1,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=(os.name=="posix"))
- return (pop.stdin, pop.stdout)
+ return pop.stdin, pop.stdout
def exec_command_pipe(name, *args):
prog = find_in_path(name)
pop = subprocess.Popen((prog,) + args, bufsize= -1,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=(os.name=="posix"))
- return (pop.stdin, pop.stdout)
+ return pop.stdin, pop.stdout
#----------------------------------------------------------
# File paths
def file_open(name, mode="r", subdir='addons', pathinfo=False):
"""Open a file from the OpenERP root, using a subdir folder.
+ Example::
+
>>> file_open('hr/report/timesheer.xsl')
>>> file_open('addons/hr/report/timesheet.xsl')
>>> file_open('../../base/report/rml_template.xsl', subdir='addons/hr/report', pathinfo=True)
- @param name: name of the file
- @param mode: file open mode
- @param subdir: subdirectory
- @param pathinfo: if True returns tupple (fileobject, filepath)
+ @param name name of the file
+ @param mode file open mode
+ @param subdir subdirectory
+ @param pathinfo if True returns tuple (fileobject, filepath)
- @return: fileobject if pathinfo is False else (fileobject, filepath)
+ @return fileobject if pathinfo is False else (fileobject, filepath)
"""
- import openerp.addons as addons
- adps = addons.ad_paths
+ import openerp.modules as addons
+ adps = addons.module.ad_paths
rtp = os.path.normcase(os.path.abspath(config['root_path']))
- if name.replace(os.path.sep, '/').startswith('addons/'):
+ basename = name
+
+ if os.path.isabs(name):
+ # It is an absolute path
+ # Is it below 'addons_path' or 'root_path'?
+ name = os.path.normcase(os.path.normpath(name))
+ for root in adps + [rtp]:
+ root = os.path.normcase(os.path.normpath(root)) + os.sep
+ if name.startswith(root):
+ base = root.rstrip(os.sep)
+ name = name[len(base) + 1:]
+ break
+ else:
+ # It is outside the OpenERP root: skip zipfile lookup.
+ base, name = os.path.split(name)
+ return _fileopen(name, mode=mode, basedir=base, pathinfo=pathinfo, basename=basename)
+
+ if name.replace(os.sep, '/').startswith('addons/'):
subdir = 'addons'
- name = name[7:]
+ name2 = name[7:]
+ elif subdir:
+ name = os.path.join(subdir, name)
+ if name.replace(os.sep, '/').startswith('addons/'):
+ subdir = 'addons'
+ name2 = name[7:]
+ else:
+ name2 = name
- # First try to locate in addons_path
+ # First, try to locate in addons_path
if subdir:
- subdir2 = subdir
- if subdir2.replace(os.path.sep, '/').startswith('addons/'):
- subdir2 = subdir2[7:]
-
- subdir2 = (subdir2 != 'addons' or None) and subdir2
-
for adp in adps:
try:
- if subdir2:
- fn = os.path.join(adp, subdir2, name)
- else:
- fn = os.path.join(adp, name)
- fn = os.path.normpath(fn)
- fo = file_open(fn, mode=mode, subdir=None, pathinfo=pathinfo)
- if pathinfo:
- return fo, fn
- return fo
+ return _fileopen(name2, mode=mode, basedir=adp,
+ pathinfo=pathinfo, basename=basename)
except IOError:
pass
- if subdir:
- name = os.path.join(rtp, subdir, name)
- else:
- name = os.path.join(rtp, name)
+ # Second, try to locate in root_path
+ return _fileopen(name, mode=mode, basedir=rtp, pathinfo=pathinfo, basename=basename)
+
+
+def _fileopen(path, mode, basedir, pathinfo, basename=None):
+ name = os.path.normpath(os.path.join(basedir, path))
- name = os.path.normpath(name)
+ if basename is None:
+ basename = name
+ # Give higher priority to module directories, which is
+ # a more common case than zipped modules.
+ if os.path.isfile(name):
+ fo = open(name, mode)
+ if pathinfo:
+ return fo, name
+ return fo
- # Check for a zipfile in the path
- head = name
+ # Support for loading modules in zipped form.
+ # This will not work for zipped modules that are sitting
+ # outside of known addons paths.
+ head = os.path.normpath(path)
zipname = False
- name2 = False
- while True:
+ while os.sep in head:
head, tail = os.path.split(head)
if not tail:
break
zipname = os.path.join(tail, zipname)
else:
zipname = tail
- if zipfile.is_zipfile(head+'.zip'):
+ zpath = os.path.join(basedir, head + '.zip')
+ if zipfile.is_zipfile(zpath):
from cStringIO import StringIO
- zfile = zipfile.ZipFile(head+'.zip')
+ zfile = zipfile.ZipFile(zpath)
try:
fo = StringIO()
fo.write(zfile.read(os.path.join(
return fo, name
return fo
except Exception:
- name2 = os.path.normpath(os.path.join(head + '.zip', zipname))
pass
- for i in (name2, name):
- if i and os.path.isfile(i):
- fo = file(i, mode)
- if pathinfo:
- return fo, i
- return fo
- if os.path.splitext(name)[1] == '.rml':
- raise IOError, 'Report %s doesn\'t exist or deleted : ' %str(name)
- raise IOError, 'File not found : %s' % name
+ # Not found
+ if name.endswith('.rml'):
+ raise IOError('Report %r doesn\'t exist or deleted' % basename)
+ raise IOError('File not found: %s' % basename)
#----------------------------------------------------------
"""Flatten a list of elements into a uniqu list
Author: Christophe Simonis (christophe@tinyerp.com)
- Examples:
+ Examples::
>>> flatten(['a'])
['a']
>>> flatten('b')
def reverse_enumerate(l):
"""Like enumerate but in the other sens
+
+ Usage::
>>> a = ['a', 'b', 'c']
>>> it = reverse_enumerate(a)
>>> it.next()
"""
return izip(xrange(len(l)-1, -1, -1), reversed(l))
-#----------------------------------------------------------
-# Emails
-#----------------------------------------------------------
-email_re = re.compile(r"""
- ([a-zA-Z][\w\.-]*[a-zA-Z0-9] # username part
- @ # mandatory @ sign
- [a-zA-Z0-9][\w\.-]* # domain must start with a letter ... Ged> why do we include a 0-9 then?
- \.
- [a-z]{2,3} # TLD
- )
- """, re.VERBOSE)
-res_re = re.compile(r"\[([0-9]+)\]", re.UNICODE)
-command_re = re.compile("^Set-([a-z]+) *: *(.+)$", re.I + re.UNICODE)
-reference_re = re.compile("<.*-openobject-(\\d+)@(.*)>", re.UNICODE)
-
-def html2plaintext(html, body_id=None, encoding='utf-8'):
- ## (c) Fry-IT, www.fry-it.com, 2007
- ## <peter@fry-it.com>
- ## download here: http://www.peterbe.com/plog/html2plaintext
-
-
- """ from an HTML text, convert the HTML to plain text.
- If @body_id is provided then this is the tag where the
- body (not necessarily <body>) starts.
- """
-
- html = ustr(html)
- from lxml.etree import tostring
- try:
- from lxml.html.soupparser import fromstring
- kwargs = {}
- except ImportError:
- _logger.debug('tools.misc.html2plaintext: cannot use BeautifulSoup, fallback to lxml.etree.HTMLParser')
- from lxml.etree import fromstring, HTMLParser
- kwargs = dict(parser=HTMLParser())
-
- tree = fromstring(html, **kwargs)
-
- if body_id is not None:
- source = tree.xpath('//*[@id=%s]'%(body_id,))
- else:
- source = tree.xpath('//body')
- if len(source):
- tree = source[0]
-
- url_index = []
- i = 0
- for link in tree.findall('.//a'):
- url = link.get('href')
- if url:
- i += 1
- link.tag = 'span'
- link.text = '%s [%s]' % (link.text, i)
- url_index.append(url)
-
- html = ustr(tostring(tree, encoding=encoding))
-
- html = html.replace('<strong>','*').replace('</strong>','*')
- html = html.replace('<b>','*').replace('</b>','*')
- html = html.replace('<h3>','*').replace('</h3>','*')
- html = html.replace('<h2>','**').replace('</h2>','**')
- html = html.replace('<h1>','**').replace('</h1>','**')
- html = html.replace('<em>','/').replace('</em>','/')
- html = html.replace('<tr>', '\n')
- html = html.replace('</p>', '\n')
- html = re.sub('<br\s*/?>', '\n', html)
- html = re.sub('<.*?>', ' ', html)
- html = html.replace(' ' * 2, ' ')
-
- # strip all lines
- html = '\n'.join([x.strip() for x in html.splitlines()])
- html = html.replace('\n' * 2, '\n')
-
- for i, url in enumerate(url_index):
- if i == 0:
- html += '\n\n'
- html += ustr('[%s] %s\n') % (i+1, url)
-
- return html
-
-def email_send(smtp_from, smtp_to_list, message, ssl=False, debug=False, smtp_server=None, smtp_port=None,
- smtp_user=None, smtp_password=None, cr=None, uid=None):
- if not cr:
- db_name = getattr(threading.currentThread(), 'dbname', None)
- if db_name:
- cr = pooler.get_db_only(db_name).cursor()
- else:
- raise Exception("No database cursor found!")
- if not uid:
- uid = 1
- try:
- server_pool = pooler.get_pool(cr.dbname).get('ir.mail_server')
- server_pool.send_email(cr, uid, smtp_from, smtp_to_list, message,
- ssl=ssl,debug=debug, smtp_server=smtp_server, smtp_port=smtp_port,
- smtp_user=smtp_user, smtp_password=smtp_password)
- except Exception:
- return False
- finally:
- cr.close()
- return True
-
-#----------------------------------------------------------
-# SMS
-#----------------------------------------------------------
-# text must be latin-1 encoded
-def sms_send(user, password, api_id, text, to):
- import urllib
- url = "http://api.urlsms.com/SendSMS.aspx"
- #url = "http://196.7.150.220/http/sendmsg"
- params = urllib.urlencode({'UserID': user, 'Password': password, 'SenderID': api_id, 'MsgText': text, 'RecipientMobileNo':to})
- urllib.urlopen(url+"?"+params)
- # FIXME: Use the logger if there is an error
- return True
-
-#---------------------------------------------------------
-# Class that stores an updateable string (used in wizards)
-#---------------------------------------------------------
class UpdateableStr(local):
+ """ Class that stores an updateable string (used in wizards)
+ """
def __init__(self, string=''):
self.string = string
class UpdateableDict(local):
- '''Stores an updateable dict to use in wizards'''
+ """Stores an updateable dict to use in wizards
+ """
def __init__(self, dict=None):
if dict is None:
def __ne__(self, y):
return self.dict.__ne__(y)
-
-# Don't use ! Use res.currency.round()
class currency(float):
+ """ Deprecate
+
+ .. warning::
+
+ Don't use ! Use res.currency.round()
+ """
def __init__(self, value, accuracy=2, rounding=None):
if rounding is None:
# display_value = int(self*(10**(-self.accuracy))/self.rounding)*self.rounding/(10**(-self.accuracy))
# return str(display_value)
-
-def is_hashable(h):
- try:
- hash(h)
- return True
- except TypeError:
- return False
-
-class cache(object):
- """
- Use it as a decorator of the function you plan to cache
- Timeout: 0 = no timeout, otherwise in seconds
- """
-
- __caches = []
-
- def __init__(self, timeout=None, skiparg=2, multi=None, size=8192):
- assert skiparg >= 2 # at least self and cr
- if timeout is None:
- self.timeout = config['cache_timeout']
- else:
- self.timeout = timeout
- self.skiparg = skiparg
- self.multi = multi
- self.lasttime = time.time()
- self.cache = LRU(size) # TODO take size from config
- self.fun = None
- cache.__caches.append(self)
-
-
- def _generate_keys(self, dbname, kwargs2):
- """
- Generate keys depending of the arguments and the self.mutli value
- """
-
- def to_tuple(d):
- pairs = d.items()
- pairs.sort(key=lambda (k,v): k)
- for i, (k, v) in enumerate(pairs):
- if isinstance(v, dict):
- pairs[i] = (k, to_tuple(v))
- if isinstance(v, (list, set)):
- pairs[i] = (k, tuple(v))
- elif not is_hashable(v):
- pairs[i] = (k, repr(v))
- return tuple(pairs)
-
- if not self.multi:
- key = (('dbname', dbname),) + to_tuple(kwargs2)
- yield key, None
- else:
- multis = kwargs2[self.multi][:]
- for id in multis:
- kwargs2[self.multi] = (id,)
- key = (('dbname', dbname),) + to_tuple(kwargs2)
- yield key, id
-
- def _unify_args(self, *args, **kwargs):
- # Update named arguments with positional argument values (without self and cr)
- kwargs2 = self.fun_default_values.copy()
- kwargs2.update(kwargs)
- kwargs2.update(dict(zip(self.fun_arg_names, args[self.skiparg-2:])))
- return kwargs2
-
- def clear(self, dbname, *args, **kwargs):
- """clear the cache for database dbname
- if *args and **kwargs are both empty, clear all the keys related to this database
- """
- if not args and not kwargs:
- keys_to_del = [key for key in self.cache.keys() if key[0][1] == dbname]
- else:
- kwargs2 = self._unify_args(*args, **kwargs)
- keys_to_del = [key for key, _ in self._generate_keys(dbname, kwargs2) if key in self.cache.keys()]
-
- for key in keys_to_del:
- self.cache.pop(key)
-
- @classmethod
- def clean_caches_for_db(cls, dbname):
- for c in cls.__caches:
- c.clear(dbname)
-
- def __call__(self, fn):
- if self.fun is not None:
- raise Exception("Can not use a cache instance on more than one function")
- self.fun = fn
-
- argspec = inspect.getargspec(fn)
- self.fun_arg_names = argspec[0][self.skiparg:]
- self.fun_default_values = {}
- if argspec[3]:
- self.fun_default_values = dict(zip(self.fun_arg_names[-len(argspec[3]):], argspec[3]))
-
- def cached_result(self2, cr, *args, **kwargs):
- if time.time()-int(self.timeout) > self.lasttime:
- self.lasttime = time.time()
- t = time.time()-int(self.timeout)
- old_keys = [key for key in self.cache.keys() if self.cache[key][1] < t]
- for key in old_keys:
- self.cache.pop(key)
-
- kwargs2 = self._unify_args(*args, **kwargs)
-
- result = {}
- notincache = {}
- for key, id in self._generate_keys(cr.dbname, kwargs2):
- if key in self.cache:
- result[id] = self.cache[key][0]
- else:
- notincache[id] = key
-
- if notincache:
- if self.multi:
- kwargs2[self.multi] = notincache.keys()
-
- result2 = fn(self2, cr, *args[:self.skiparg-2], **kwargs2)
- if not self.multi:
- key = notincache[None]
- self.cache[key] = (result2, time.time())
- result[None] = result2
- else:
- for id in result2:
- key = notincache[id]
- self.cache[key] = (result2[id], time.time())
- result.update(result2)
-
- if not self.multi:
- return result[None]
- return result
-
- cached_result.clear_cache = self.clear
- return cached_result
-
def to_xml(s):
return s.replace('&','&').replace('<','<').replace('>','>')
-# to be compatible with python 2.4
-import __builtin__
-if not hasattr(__builtin__, 'all'):
- def all(iterable):
- for element in iterable:
- if not element:
- return False
- return True
-
- __builtin__.all = all
- del all
-
-if not hasattr(__builtin__, 'any'):
- def any(iterable):
- for element in iterable:
- if element:
- return True
- return False
-
- __builtin__.any = any
- del any
-
def get_iso_codes(lang):
if lang.find('_') != -1:
if lang.split('_')[0] == lang.split('_')[1].lower():
lang = lang.split('_')[0]
return lang
-def get_languages():
- # The codes below are those from Launchpad's Rosetta, with the exception
- # of some trivial codes where the Launchpad code is xx and we have xx_XX.
- languages={
+ALL_LANGUAGES = {
'ab_RU': u'Abkhazian / аҧсуа',
- 'ar_AR': u'Arabic / الْعَرَبيّة',
+ 'am_ET': u'Amharic / አምሃርኛ',
+ 'ar_SY': u'Arabic / الْعَرَبيّة',
'bg_BG': u'Bulgarian / български език',
'bs_BS': u'Bosnian / bosanski jezik',
'ca_ES': u'Catalan / Català',
'nl_BE': u'Flemish (BE) / Vlaams (BE)',
'oc_FR': u'Occitan (FR, post 1500) / Occitan',
'pl_PL': u'Polish / Język polski',
- 'pt_BR': u'Portugese (BR) / Português (BR)',
- 'pt_PT': u'Portugese / Português',
+ 'pt_BR': u'Portuguese (BR) / Português (BR)',
+ 'pt_PT': u'Portuguese / Português',
'ro_RO': u'Romanian / română',
'ru_RU': u'Russian / русский язык',
'si_LK': u'Sinhalese / සිංහල',
'th_TH': u'Thai / ภาษาไทย',
'tlh_TLH': u'Klingon',
}
- return languages
def scan_languages():
- # Now it will take all languages from get languages function without filter it with base module languages
- lang_dict = get_languages()
- ret = [(lang, lang_dict.get(lang, lang)) for lang in list(lang_dict)]
- ret.sort(key=lambda k:k[1])
- return ret
+ """ Returns all languages supported by OpenERP for translation
+ :returns: a list of (lang_code, lang_name) pairs
+ :rtype: [(str, unicode)]
+ """
+ return sorted(ALL_LANGUAGES.iteritems(), key=lambda k: k[1])
def get_user_companies(cr, user):
def _get_company_children(cr, ids):
sz=len(sz)
s, i = float(sz), 0
while s >= 1024 and i < len(units)-1:
- s = s / 1024
- i = i + 1
+ s /= 1024
+ i += 1
return "%0.2f %s" % (s, units[i])
def logged(f):
- from func import wraps
-
@wraps(f)
def wrapper(*args, **kwargs):
from pprint import pformat
vector.append(' result: %s' % pformat(res))
vector.append(' time delta: %s' % (time.time() - timeb4))
- loglevels.Logger().notifyChannel('logged', loglevels.LOG_DEBUG, '\n'.join(vector))
+ _logger.debug('\n'.join(vector))
return res
return wrapper
self.fname = fname
def __call__(self, f):
- from func import wraps
-
@wraps(f)
def wrapper(*args, **kwargs):
- class profile_wrapper(object):
- def __init__(self):
- self.result = None
- def __call__(self):
- self.result = f(*args, **kwargs)
- pw = profile_wrapper()
- import cProfile
- fname = self.fname or ("%s.cprof" % (f.func_name,))
- cProfile.runctx('pw()', globals(), locals(), filename=fname)
- return pw.result
+ profile = cProfile.Profile()
+ result = profile.runcall(f, *args, **kwargs)
+ profile.dump_stats(self.fname or ("%s.cprof" % (f.func_name,)))
+ return result
return wrapper
-def debug(what):
- """
- This method allow you to debug your code without print
- Example:
- >>> def func_foo(bar)
- ... baz = bar
- ... debug(baz)
- ... qnx = (baz, bar)
- ... debug(qnx)
- ...
- >>> func_foo(42)
-
- This will output on the logger:
-
- [Wed Dec 25 00:00:00 2008] DEBUG:func_foo:baz = 42
- [Wed Dec 25 00:00:00 2008] DEBUG:func_foo:qnx = (42, 42)
-
- To view the DEBUG lines in the logger you must start the server with the option
- --log-level=debug
-
- """
- warnings.warn("The tools.debug() method is deprecated, please use logging.",
- DeprecationWarning, stacklevel=2)
- from inspect import stack
- from pprint import pformat
- st = stack()[1]
- param = re.split("debug *\((.+)\)", st[4][0].strip())[1].strip()
- while param.count(')') > param.count('('): param = param[:param.rfind(')')]
- what = pformat(what)
- if param != what:
- what = "%s = %s" % (param, what)
- logging.getLogger(st[3]).debug(what)
-
-
__icons_list = ['STOCK_ABOUT', 'STOCK_ADD', 'STOCK_APPLY', 'STOCK_BOLD',
'STOCK_CANCEL', 'STOCK_CDROM', 'STOCK_CLEAR', 'STOCK_CLOSE', 'STOCK_COLOR_PICKER',
'STOCK_CONNECT', 'STOCK_CONVERT', 'STOCK_COPY', 'STOCK_CUT', 'STOCK_DELETE',
global __icons_list
return [(x, x) for x in __icons_list ]
-def extract_zip_file(zip_file, outdirectory):
- zf = zipfile.ZipFile(zip_file, 'r')
- out = outdirectory
- for path in zf.namelist():
- tgt = os.path.join(out, path)
- tgtdir = os.path.dirname(tgt)
- if not os.path.exists(tgtdir):
- os.makedirs(tgtdir)
-
- if not tgt.endswith(os.sep):
- fp = open(tgt, 'wb')
- fp.write(zf.read(path))
- fp.close()
- zf.close()
-
def detect_ip_addr():
"""Try a very crude method to figure out a valid external
IP or hostname for the current machine. Don't rely on this
# times.
def get_win32_timezone():
"""Attempt to return the "standard name" of the current timezone on a win32 system.
- @return: the standard name of the current win32 timezone, or False if it cannot be found.
+ @return the standard name of the current win32 timezone, or False if it cannot be found.
"""
res = False
- if (sys.platform == "win32"):
+ if sys.platform == "win32":
try:
import _winreg
hklm = _winreg.ConnectRegistry(None,_winreg.HKEY_LOCAL_MACHINE)
def detect_server_timezone():
"""Attempt to detect the timezone to use on the server side.
Defaults to UTC if no working timezone can be found.
- @return: the timezone identifier as expected by pytz.timezone.
+ @return the timezone identifier as expected by pytz.timezone.
"""
try:
import pytz
except Exception:
- loglevels.Logger().notifyChannel("detect_server_timezone", loglevels.LOG_WARNING,
- "Python pytz module is not available. Timezone will be set to UTC by default.")
+ _logger.warning("Python pytz module is not available. "
+ "Timezone will be set to UTC by default.")
return 'UTC'
# Option 1: the configuration option (did not exist before, so no backwards compatibility issue)
(time.tzname[0], 'time.tzname'),
(os.environ.get('TZ',False),'TZ environment variable'), ]
# Option 4: OS-specific: /etc/timezone on Unix
- if (os.path.exists("/etc/timezone")):
+ if os.path.exists("/etc/timezone"):
tz_value = False
try:
f = open("/etc/timezone")
f.close()
sources.append((tz_value,"/etc/timezone file"))
# Option 5: timezone info from registry on Win32
- if (sys.platform == "win32"):
+ if sys.platform == "win32":
# Timezone info is stored in windows registry.
# However this is not likely to work very well as the standard name
# of timezones in windows is rarely something that is known to pytz.
if value:
try:
tz = pytz.timezone(value)
- loglevels.Logger().notifyChannel("detect_server_timezone", loglevels.LOG_INFO,
- "Using timezone %s obtained from %s." % (tz.zone,source))
+ _logger.info("Using timezone %s obtained from %s.", tz.zone, source)
return value
except pytz.UnknownTimeZoneError:
- loglevels.Logger().notifyChannel("detect_server_timezone", loglevels.LOG_WARNING,
- "The timezone specified in %s (%s) is invalid, ignoring it." % (source,value))
+ _logger.warning("The timezone specified in %s (%s) is invalid, ignoring it.", source, value)
- loglevels.Logger().notifyChannel("detect_server_timezone", loglevels.LOG_WARNING,
- "No valid timezone could be detected, using default UTC timezone. You can specify it explicitly with option 'timezone' in the server configuration.")
+ _logger.warning("No valid timezone could be detected, using default UTC "
+ "timezone. You can specify it explicitly with option 'timezone' in "
+ "the server configuration.")
return 'UTC'
def get_server_timezone():
- # timezone detection is safe in multithread, so lazy init is ok here
- if (not config['timezone']):
- config['timezone'] = detect_server_timezone()
- return config['timezone']
+ return "UTC"
DEFAULT_SERVER_DATE_FORMAT = "%Y-%m-%d"
@param ignore_unparsable_time: if True, return False if src_tstamp_str cannot be parsed
using src_format or formatted using dst_format.
- @return: local/client formatted timestamp, expressed in the local/client timezone if possible
+ @return local/client formatted timestamp, expressed in the local/client timezone if possible
and if tz_offset is true, or src_tstamp_str if timezone offset could not be determined.
"""
if not src_tstamp_str:
a.start()
return True
+def get_and_group_by_field(cr, uid, obj, ids, field, context=None):
+ """ Read the values of ``field´´ for the given ``ids´´ and group ids by value.
+
+ :param string field: name of the field we want to read and group by
+ :return: mapping of field values to the list of ids that have it
+ :rtype: dict
+ """
+ res = {}
+ for record in obj.read(cr, uid, ids, [field], context=context):
+ key = record[field]
+ res.setdefault(key[0] if isinstance(key, tuple) else key, []).append(record['id'])
+ return res
+
+def get_and_group_by_company(cr, uid, obj, ids, context=None):
+ return get_and_group_by_field(cr, uid, obj, ids, field='company_id', context=context)
# port of python 2.6's attrgetter with support for dotted notation
def resolve_attr(obj, attr):
return tuple(resolve_attr(obj, attr) for attr in items)
return g
+class unquote(str):
+ """A subclass of str that implements repr() without enclosing quotation marks
+ or escaping, keeping the original string untouched. The name come from Lisp's unquote.
+ One of the uses for this is to preserve or insert bare variable names within dicts during eval()
+ of a dict's repr(). Use with care.
+
+ Some examples (notice that there are never quotes surrounding
+ the ``active_id`` name:
+
+ >>> unquote('active_id')
+ active_id
+ >>> d = {'test': unquote('active_id')}
+ >>> d
+ {'test': active_id}
+ >>> print d
+ {'test': active_id}
+ """
+ def __repr__(self):
+ return self
+
+class UnquoteEvalContext(defaultdict):
+ """Defaultdict-based evaluation context that returns
+ an ``unquote`` string for any missing name used during
+ the evaluation.
+ Mostly useful for evaluating OpenERP domains/contexts that
+ may refer to names that are unknown at the time of eval,
+ so that when the context/domain is converted back to a string,
+ the original names are preserved.
+
+ **Warning**: using an ``UnquoteEvalContext`` as context for ``eval()`` or
+ ``safe_eval()`` will shadow the builtins, which may cause other
+ failures, depending on what is evaluated.
+
+ Example (notice that ``section_id`` is preserved in the final
+ result) :
+
+ >>> context_str = "{'default_user_id': uid, 'default_section_id': section_id}"
+ >>> eval(context_str, UnquoteEvalContext(uid=1))
+ {'default_user_id': 1, 'default_section_id': section_id}
+
+ """
+ def __init__(self, *args, **kwargs):
+ super(UnquoteEvalContext, self).__init__(None, *args, **kwargs)
+
+ def __missing__(self, key):
+ return unquote(key)
-# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
+class mute_logger(object):
+ """Temporary suppress the logging.
+ Can be used as context manager or decorator.
+
+ @mute_logger('openerp.plic.ploc')
+ def do_stuff():
+ blahblah()
+
+ with mute_logger('openerp.foo.bar'):
+ do_suff()
+
+ """
+ def __init__(self, *loggers):
+ self.loggers = loggers
+
+ def filter(self, record):
+ return 0
+
+ def __enter__(self):
+ for logger in self.loggers:
+ logging.getLogger(logger).addFilter(self)
+
+ def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
+ for logger in self.loggers:
+ logging.getLogger(logger).removeFilter(self)
+
+ def __call__(self, func):
+ @wraps(func)
+ def deco(*args, **kwargs):
+ with self:
+ return func(*args, **kwargs)
+ return deco
+
+_ph = object()
+class CountingStream(object):
+ """ Stream wrapper counting the number of element it has yielded. Similar
+ role to ``enumerate``, but for use when the iteration process of the stream
+ isn't fully under caller control (the stream can be iterated from multiple
+ points including within a library)
+
+ ``start`` allows overriding the starting index (the index before the first
+ item is returned).
+
+ On each iteration (call to :meth:`~.next`), increases its :attr:`~.index`
+ by one.
+
+ .. attribute:: index
+
+ ``int``, index of the last yielded element in the stream. If the stream
+ has ended, will give an index 1-past the stream
+ """
+ def __init__(self, stream, start=-1):
+ self.stream = iter(stream)
+ self.index = start
+ self.stopped = False
+ def __iter__(self):
+ return self
+ def next(self):
+ if self.stopped: raise StopIteration()
+ self.index += 1
+ val = next(self.stream, _ph)
+ if val is _ph:
+ self.stopped = True
+ raise StopIteration()
+ return val
+
+def stripped_sys_argv(*strip_args):
+ """Return sys.argv with some arguments stripped, suitable for reexecution or subprocesses"""
+ strip_args = sorted(set(strip_args) | set(['-s', '--save', '-d', '--database', '-u', '--update', '-i', '--init']))
+ assert all(config.parser.has_option(s) for s in strip_args)
+ takes_value = dict((s, config.parser.get_option(s).takes_value()) for s in strip_args)
+
+ longs, shorts = list(tuple(y) for _, y in groupby(strip_args, lambda x: x.startswith('--')))
+ longs_eq = tuple(l + '=' for l in longs if takes_value[l])
+
+ args = sys.argv[:]
+
+ def strip(args, i):
+ return args[i].startswith(shorts) \
+ or args[i].startswith(longs_eq) or (args[i] in longs) \
+ or (i >= 1 and (args[i - 1] in strip_args) and takes_value[args[i - 1]])
+
+ return [x for i, x in enumerate(args) if not strip(args, i)]
+
+class ConstantMapping(Mapping):
+ """
+ An immutable mapping returning the provided value for every single key.
+
+ Useful for default value to methods
+ """
+ __slots__ = ['_value']
+ def __init__(self, val):
+ self._value = val
+
+ def __len__(self):
+ """
+ defaultdict updates its length for each individually requested key, is
+ that really useful?
+ """
+ return 0
+
+ def __iter__(self):
+ """
+ same as len, defaultdict udpates its iterable keyset with each key
+ requested, is there a point for this?
+ """
+ return iter([])
+
+ def __getitem__(self, item):
+ return self._value
+
+
+# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: