#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
-# Copyright (C) 2010-2012 OpenERP s.a. (<http://openerp.com>).
+# Copyright (C) 2010-2014 OpenERP s.a. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
#
##############################################################################
-#.apidoc title: Utilities: tools.misc
"""
Miscellaneous tools used by OpenERP.
from functools import wraps
import cProfile
+from contextlib import contextmanager
import subprocess
import logging
import os
import sys
import threading
import time
+import werkzeug.utils
import zipfile
-from collections import defaultdict
+from collections import defaultdict, Mapping
from datetime import datetime
-from itertools import islice, izip
+from itertools import islice, izip, groupby
from lxml import etree
from which import which
from threading import local
+import traceback
try:
from html2text import html2text
from config import config
from cache import *
+from .parse_version import parse_version
+import openerp
# get_encodings, ustr and exception_to_unicode were originally from tools.misc.
# There are moved to loglevels until we refactor tools.
-from openerp.loglevels import get_encodings, ustr, exception_to_unicode
+from openerp.loglevels import get_encodings, ustr, exception_to_unicode # noqa
_logger = logging.getLogger(__name__)
SKIPPED_ELEMENT_TYPES = (etree._Comment, etree._ProcessingInstruction, etree.CommentBase, etree.PIBase)
def find_in_path(name):
+ path = os.environ.get('PATH', os.defpath).split(os.pathsep)
+ if config.get('bin_path') and config['bin_path'] != 'None':
+ path.append(config['bin_path'])
try:
- return which(name)
+ return which(name, path=os.pathsep.join(path))
except IOError:
return None
raise Exception('Couldn\'t find %s' % name)
args2 = (prog,) + args
- return subprocess.call(args2)
+ with open(os.devnull) as dn:
+ return subprocess.call(args2, stdout=dn, stderr=subprocess.STDOUT)
def exec_pg_command_pipe(name, *args):
prog = find_pg_tool(name)
# Is it below 'addons_path' or 'root_path'?
name = os.path.normcase(os.path.normpath(name))
for root in adps + [rtp]:
+ root = os.path.normcase(os.path.normpath(root)) + os.sep
if name.startswith(root):
base = root.rstrip(os.sep)
name = name[len(base) + 1:]
"""
return izip(xrange(len(l)-1, -1, -1), reversed(l))
-#----------------------------------------------------------
-# SMS
-#----------------------------------------------------------
-# text must be latin-1 encoded
-def sms_send(user, password, api_id, text, to):
- import urllib
- url = "http://api.urlsms.com/SendSMS.aspx"
- #url = "http://196.7.150.220/http/sendmsg"
- params = urllib.urlencode({'UserID': user, 'Password': password, 'SenderID': api_id, 'MsgText': text, 'RecipientMobileNo':to})
- urllib.urlopen(url+"?"+params)
- # FIXME: Use the logger if there is an error
- return True
+def topological_sort(elems):
+ """ Return a list of elements sorted so that their dependencies are listed
+ before them in the result.
+
+ :param elems: specifies the elements to sort with their dependencies; it is
+ a dictionary like `{element: dependencies}` where `dependencies` is a
+ collection of elements that must appear before `element`. The elements
+ of `dependencies` are not required to appear in `elems`; they will
+ simply not appear in the result.
+
+ :returns: a list with the keys of `elems` sorted according to their
+ specification.
+ """
+ # the algorithm is inspired by [Tarjan 1976],
+ # http://en.wikipedia.org/wiki/Topological_sorting#Algorithms
+ result = []
+ visited = set()
+
+ def visit(n):
+ if n not in visited:
+ visited.add(n)
+ if n in elems:
+ # first visit all dependencies of n, then append n to result
+ map(visit, elems[n])
+ result.append(n)
+
+ map(visit, elems)
+
+ return result
+
class UpdateableStr(local):
""" Class that stores an updateable string (used in wizards)
ALL_LANGUAGES = {
'ab_RU': u'Abkhazian / аҧсуа',
+ 'am_ET': u'Amharic / አምሃርኛ',
'ar_SY': u'Arabic / الْعَرَبيّة',
'bg_BG': u'Bulgarian / български език',
- 'bs_BS': u'Bosnian / bosanski jezik',
+ 'bs_BA': u'Bosnian / bosanski jezik',
'ca_ES': u'Catalan / Català',
'cs_CZ': u'Czech / Čeština',
'da_DK': u'Danish / Dansk',
'fa_IR': u'Persian / فارس',
'fi_FI': u'Finnish / Suomi',
'fr_BE': u'French (BE) / Français (BE)',
+ 'fr_CA': u'French (CA) / Français (CA)',
'fr_CH': u'French (CH) / Français (CH)',
'fr_FR': u'French / Français',
'gl_ES': u'Galician / Galego',
'ja_JP': u'Japanese / 日本語',
'ko_KP': u'Korean (KP) / 한국어 (KP)',
'ko_KR': u'Korean (KR) / 한국어 (KR)',
+ 'lo_LA': u'Lao / ພາສາລາວ',
'lt_LT': u'Lithuanian / Lietuvių kalba',
'lv_LV': u'Latvian / latviešu valoda',
+ 'mk_MK': u'Macedonian / македонски јазик',
'ml_IN': u'Malayalam / മലയാളം',
'mn_MN': u'Mongolian / монгол',
'nb_NO': u'Norwegian Bokmål / Norsk bokmål',
global __icons_list
return [(x, x) for x in __icons_list ]
-def extract_zip_file(zip_file, outdirectory):
- zf = zipfile.ZipFile(zip_file, 'r')
- out = outdirectory
- for path in zf.namelist():
- tgt = os.path.join(out, path)
- tgtdir = os.path.dirname(tgt)
- if not os.path.exists(tgtdir):
- os.makedirs(tgtdir)
-
- if not tgt.endswith(os.sep):
- fp = open(tgt, 'wb')
- fp.write(zf.read(path))
- fp.close()
- zf.close()
-
def detect_ip_addr():
"""Try a very crude method to figure out a valid external
IP or hostname for the current machine. Don't rely on this
'%Z': '',
}
+POSIX_TO_LDML = {
+ 'a': 'E',
+ 'A': 'EEEE',
+ 'b': 'MMM',
+ 'B': 'MMMM',
+ #'c': '',
+ 'd': 'dd',
+ 'H': 'HH',
+ 'I': 'hh',
+ 'j': 'DDD',
+ 'm': 'MM',
+ 'M': 'mm',
+ 'p': 'a',
+ 'S': 'ss',
+ 'U': 'w',
+ 'w': 'e',
+ 'W': 'w',
+ 'y': 'yy',
+ 'Y': 'yyyy',
+ # see comments above, and babel's format_datetime assumes an UTC timezone
+ # for naive datetime objects
+ #'z': 'Z',
+ #'Z': 'z',
+}
+
+def posix_to_ldml(fmt, locale):
+ """ Converts a posix/strftime pattern into an LDML date format pattern.
+
+ :param fmt: non-extended C89/C90 strftime pattern
+ :param locale: babel locale used for locale-specific conversions (e.g. %x and %X)
+ :return: unicode
+ """
+ buf = []
+ pc = False
+ quoted = []
+
+ for c in fmt:
+ # LDML date format patterns uses letters, so letters must be quoted
+ if not pc and c.isalpha():
+ quoted.append(c if c != "'" else "''")
+ continue
+ if quoted:
+ buf.append("'")
+ buf.append(''.join(quoted))
+ buf.append("'")
+ quoted = []
+
+ if pc:
+ if c == '%': # escaped percent
+ buf.append('%')
+ elif c == 'x': # date format, short seems to match
+ buf.append(locale.date_formats['short'].pattern)
+ elif c == 'X': # time format, seems to include seconds. short does not
+ buf.append(locale.time_formats['medium'].pattern)
+ else: # look up format char in static mapping
+ buf.append(POSIX_TO_LDML[c])
+ pc = False
+ elif c == '%':
+ pc = True
+ else:
+ buf.append(c)
+
+ # flush anything remaining in quoted buffer
+ if quoted:
+ buf.append("'")
+ buf.append(''.join(quoted))
+ buf.append("'")
+
+ return ''.join(buf)
+
def server_to_local_timestamp(src_tstamp_str, src_format, dst_format, dst_tz_name,
tz_offset=True, ignore_unparsable_time=True):
"""
def __enter__(self):
for logger in self.loggers:
+ assert isinstance(logger, basestring),\
+ "A logger name must be a string, got %s" % type(logger)
logging.getLogger(logger).addFilter(self)
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
raise StopIteration()
return val
+def stripped_sys_argv(*strip_args):
+ """Return sys.argv with some arguments stripped, suitable for reexecution or subprocesses"""
+ strip_args = sorted(set(strip_args) | set(['-s', '--save', '-u', '--update', '-i', '--init']))
+ assert all(config.parser.has_option(s) for s in strip_args)
+ takes_value = dict((s, config.parser.get_option(s).takes_value()) for s in strip_args)
+
+ longs, shorts = list(tuple(y) for _, y in groupby(strip_args, lambda x: x.startswith('--')))
+ longs_eq = tuple(l + '=' for l in longs if takes_value[l])
+
+ args = sys.argv[:]
+
+ def strip(args, i):
+ return args[i].startswith(shorts) \
+ or args[i].startswith(longs_eq) or (args[i] in longs) \
+ or (i >= 1 and (args[i - 1] in strip_args) and takes_value[args[i - 1]])
+
+ return [x for i, x in enumerate(args) if not strip(args, i)]
+
+
+class ConstantMapping(Mapping):
+ """
+ An immutable mapping returning the provided value for every single key.
+
+ Useful for default value to methods
+ """
+ __slots__ = ['_value']
+ def __init__(self, val):
+ self._value = val
+
+ def __len__(self):
+ """
+ defaultdict updates its length for each individually requested key, is
+ that really useful?
+ """
+ return 0
+
+ def __iter__(self):
+ """
+ same as len, defaultdict udpates its iterable keyset with each key
+ requested, is there a point for this?
+ """
+ return iter([])
+
+ def __getitem__(self, item):
+ return self._value
+
+
+def dumpstacks(sig=None, frame=None):
+ """ Signal handler: dump a stack trace for each existing thread."""
+ code = []
+
+ def extract_stack(stack):
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ yield 'File: "%s", line %d, in %s' % (filename, lineno, name)
+ if line:
+ yield " %s" % (line.strip(),)
+
+ # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
+ # modified for python 2.5 compatibility
+ threads_info = dict([(th.ident, {'name': th.name, 'uid': getattr(th, 'uid', 'n/a')})
+ for th in threading.enumerate()])
+ for threadId, stack in sys._current_frames().items():
+ thread_info = threads_info.get(threadId)
+ code.append("\n# Thread: %s (id:%s) (uid:%s)" %
+ (thread_info and thread_info['name'] or 'n/a',
+ threadId,
+ thread_info and thread_info['uid'] or 'n/a'))
+ for line in extract_stack(stack):
+ code.append(line)
+
+ if openerp.evented:
+ # code from http://stackoverflow.com/questions/12510648/in-gevent-how-can-i-dump-stack-traces-of-all-running-greenlets
+ import gc
+ from greenlet import greenlet
+ for ob in gc.get_objects():
+ if not isinstance(ob, greenlet) or not ob:
+ continue
+ code.append("\n# Greenlet: %r" % (ob,))
+ for line in extract_stack(ob.gr_frame):
+ code.append(line)
+
+ _logger.info("\n".join(code))
+
+class frozendict(dict):
+ """ An implementation of an immutable dictionary. """
+ def __delitem__(self, key):
+ raise NotImplementedError("'__delitem__' not supported on frozendict")
+ def __setitem__(self, key, val):
+ raise NotImplementedError("'__setitem__' not supported on frozendict")
+ def clear(self):
+ raise NotImplementedError("'clear' not supported on frozendict")
+ def pop(self, key, default=None):
+ raise NotImplementedError("'pop' not supported on frozendict")
+ def popitem(self):
+ raise NotImplementedError("'popitem' not supported on frozendict")
+ def setdefault(self, key, default=None):
+ raise NotImplementedError("'setdefault' not supported on frozendict")
+ def update(self, *args, **kwargs):
+ raise NotImplementedError("'update' not supported on frozendict")
+
+@contextmanager
+def ignore(*exc):
+ try:
+ yield
+ except exc:
+ pass
+
+# Avoid DeprecationWarning while still remaining compatible with werkzeug pre-0.9
+if parse_version(getattr(werkzeug, '__version__', '0.0')) < parse_version('0.9.0'):
+ def html_escape(text):
+ return werkzeug.utils.escape(text, quote=True)
+else:
+ def html_escape(text):
+ return werkzeug.utils.escape(text)
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: