#
##############################################################################
-#.apidoc title: Utilities: tools.misc
"""
Miscellaneous tools used by OpenERP.
import threading
import time
import zipfile
-from collections import defaultdict
+from collections import defaultdict, Mapping
from datetime import datetime
from itertools import islice, izip, groupby
from lxml import etree
from which import which
from threading import local
+import traceback
try:
from html2text import html2text
from config import config
from cache import *
+import openerp
# get_encodings, ustr and exception_to_unicode were originally from tools.misc.
# There are moved to loglevels until we refactor tools.
-from openerp.loglevels import get_encodings, ustr, exception_to_unicode
+from openerp.loglevels import get_encodings, ustr, exception_to_unicode # noqa
_logger = logging.getLogger(__name__)
"""
return izip(xrange(len(l)-1, -1, -1), reversed(l))
-#----------------------------------------------------------
-# SMS
-#----------------------------------------------------------
-# text must be latin-1 encoded
-def sms_send(user, password, api_id, text, to):
- import urllib
- url = "http://api.urlsms.com/SendSMS.aspx"
- #url = "http://196.7.150.220/http/sendmsg"
- params = urllib.urlencode({'UserID': user, 'Password': password, 'SenderID': api_id, 'MsgText': text, 'RecipientMobileNo':to})
- urllib.urlopen(url+"?"+params)
- # FIXME: Use the logger if there is an error
- return True
class UpdateableStr(local):
""" Class that stores an updateable string (used in wizards)
'am_ET': u'Amharic / አምሃርኛ',
'ar_SY': u'Arabic / الْعَرَبيّة',
'bg_BG': u'Bulgarian / български език',
- 'bs_BS': u'Bosnian / bosanski jezik',
+ 'bs_BA': u'Bosnian / bosanski jezik',
'ca_ES': u'Catalan / Català',
'cs_CZ': u'Czech / Čeština',
'da_DK': u'Danish / Dansk',
global __icons_list
return [(x, x) for x in __icons_list ]
-def extract_zip_file(zip_file, outdirectory):
- zf = zipfile.ZipFile(zip_file, 'r')
- out = outdirectory
- for path in zf.namelist():
- tgt = os.path.join(out, path)
- tgtdir = os.path.dirname(tgt)
- if not os.path.exists(tgtdir):
- os.makedirs(tgtdir)
-
- if not tgt.endswith(os.sep):
- fp = open(tgt, 'wb')
- fp.write(zf.read(path))
- fp.close()
- zf.close()
-
def detect_ip_addr():
"""Try a very crude method to figure out a valid external
IP or hostname for the current machine. Don't rely on this
'%Z': '',
}
+POSIX_TO_LDML = {
+ 'a': 'E',
+ 'A': 'EEEE',
+ 'b': 'MMM',
+ 'B': 'MMMM',
+ #'c': '',
+ 'd': 'dd',
+ 'H': 'HH',
+ 'I': 'hh',
+ 'j': 'DDD',
+ 'm': 'MM',
+ 'M': 'mm',
+ 'p': 'a',
+ 'S': 'ss',
+ 'U': 'w',
+ 'w': 'e',
+ 'W': 'w',
+ 'y': 'yy',
+ 'Y': 'yyyy',
+ # see comments above, and babel's format_datetime assumes an UTC timezone
+ # for naive datetime objects
+ #'z': 'Z',
+ #'Z': 'z',
+}
+
+def posix_to_ldml(fmt, locale):
+ """ Converts a posix/strftime pattern into an LDML date format pattern.
+
+ :param fmt: non-extended C89/C90 strftime pattern
+ :param locale: babel locale used for locale-specific conversions (e.g. %x and %X)
+ :return: unicode
+ """
+ buf = []
+ pc = False
+ quoted = []
+
+ for c in fmt:
+ # LDML date format patterns uses letters, so letters must be quoted
+ if not pc and c.isalpha():
+ quoted.append(c if c != "'" else "''")
+ continue
+ if quoted:
+ buf.append("'")
+ buf.append(''.join(quoted))
+ buf.append("'")
+ quoted = []
+
+ if pc:
+ if c == '%': # escaped percent
+ buf.append('%')
+ elif c == 'x': # date format, short seems to match
+ buf.append(locale.date_formats['short'].pattern)
+ elif c == 'X': # time format, seems to include seconds. short does not
+ buf.append(locale.time_formats['medium'].pattern)
+ else: # look up format char in static mapping
+ buf.append(POSIX_TO_LDML[c])
+ pc = False
+ elif c == '%':
+ pc = True
+ else:
+ buf.append(c)
+
+ # flush anything remaining in quoted buffer
+ if quoted:
+ buf.append("'")
+ buf.append(''.join(quoted))
+ buf.append("'")
+
+ return ''.join(buf)
+
def server_to_local_timestamp(src_tstamp_str, src_format, dst_format, dst_tz_name,
tz_offset=True, ignore_unparsable_time=True):
"""
def __enter__(self):
for logger in self.loggers:
+ assert isinstance(logger, basestring),\
+ "A logger name must be a string, got %s" % type(logger)
logging.getLogger(logger).addFilter(self)
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
return [x for i, x in enumerate(args) if not strip(args, i)]
+
+class ConstantMapping(Mapping):
+ """
+ An immutable mapping returning the provided value for every single key.
+
+ Useful for default value to methods
+ """
+ __slots__ = ['_value']
+ def __init__(self, val):
+ self._value = val
+
+ def __len__(self):
+ """
+ defaultdict updates its length for each individually requested key, is
+ that really useful?
+ """
+ return 0
+
+ def __iter__(self):
+ """
+ same as len, defaultdict udpates its iterable keyset with each key
+ requested, is there a point for this?
+ """
+ return iter([])
+
+ def __getitem__(self, item):
+ return self._value
+
+
+def dumpstacks(sig, frame):
+ """ Signal handler: dump a stack trace for each existing thread."""
+ code = []
+
+ def extract_stack(stack):
+ for filename, lineno, name, line in traceback.extract_stack(stack):
+ yield 'File: "%s", line %d, in %s' % (filename, lineno, name)
+ if line:
+ yield " %s" % (line.strip(),)
+
+ # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
+ # modified for python 2.5 compatibility
+ threads_info = dict([(th.ident, {'name': th.name, 'uid': getattr(th, 'uid', 'n/a')})
+ for th in threading.enumerate()])
+ for threadId, stack in sys._current_frames().items():
+ thread_info = threads_info.get(threadId)
+ code.append("\n# Thread: %s (id:%s) (uid:%s)" %
+ (thread_info and thread_info['name'] or 'n/a',
+ threadId,
+ thread_info and thread_info['uid'] or 'n/a'))
+ for line in extract_stack(stack):
+ code.append(line)
+
+ if openerp.evented:
+ # code from http://stackoverflow.com/questions/12510648/in-gevent-how-can-i-dump-stack-traces-of-all-running-greenlets
+ import gc
+ from greenlet import greenlet
+ for ob in gc.get_objects():
+ if not isinstance(ob, greenlet) or not ob:
+ continue
+ code.append("\n# Greenlet: %r" % (ob,))
+ for line in extract_stack(ob.gr_frame):
+ code.append(line)
+
+ _logger.info("\n".join(code))
+
+
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
import inspect
import locale
import os
-import openerp.pooler as pooler
import openerp.sql_db as sql_db
import re
import logging
from misc import UpdateableStr
from misc import SKIPPED_ELEMENT_TYPES
import osutil
+import openerp
from openerp import SUPERUSER_ID
_logger = logging.getLogger(__name__)
'ar_SA': 'Arabic_Saudi Arabia',
'eu_ES': 'Basque_Spain',
'be_BY': 'Belarusian_Belarus',
- 'bs_BA': 'Serbian (Latin)',
+ 'bs_BA': 'Bosnian_Bosnia and Herzegovina',
'bg_BG': 'Bulgarian_Bulgaria',
'ca_ES': 'Catalan_Spain',
'hr_HR': 'Croatian_Croatia',
(cr, dummy) = self._get_cr(frame, allow_create=False)
uid = self._get_uid(frame)
if pool and cr and uid:
- lang = pool.get('res.users').context_get(cr, uid)['lang']
+ lang = pool['res.users'].context_get(cr, uid)['lang']
return lang
def __call__(self, source):
cr, is_new_cr = self._get_cr(frame)
if cr:
# Try to use ir.translation to benefit from global cache if possible
- pool = pooler.get_pool(cr.dbname)
- res = pool.get('ir.translation')._get_source(cr, SUPERUSER_ID, None, ('code','sql_constraint'), lang, source)
+ registry = openerp.registry(cr.dbname)
+ res = registry['ir.translation']._get_source(cr, SUPERUSER_ID, None, ('code','sql_constraint'), lang, source)
else:
_logger.debug('no context cursor detected, skipping translation for "%r"', source)
else:
def trans_generate(lang, modules, cr):
dbname = cr.dbname
- pool = pooler.get_pool(dbname)
- trans_obj = pool.get('ir.translation')
- model_data_obj = pool.get('ir.model.data')
+ registry = openerp.registry(dbname)
+ trans_obj = registry.get('ir.translation')
+ model_data_obj = registry.get('ir.model.data')
uid = 1
- l = pool.models.items()
+ l = registry.models.items()
l.sort()
query = 'SELECT name, model, res_id, module' \
model = encode(model)
xml_name = "%s.%s" % (module, encode(xml_name))
- if not pool.get(model):
+ if model not in registry:
_logger.error("Unable to find object %r", model)
continue
- exists = pool.get(model).exists(cr, uid, res_id)
+ exists = registry[model].exists(cr, uid, res_id)
if not exists:
_logger.warning("Unable to find object %r with id %d", model, res_id)
continue
- obj = pool.get(model).browse(cr, uid, res_id)
+ obj = registry[model].browse(cr, uid, res_id)
if model=='ir.ui.view':
d = etree.XML(encode(obj.arch))
for t in trans_parse_view(d):
push_translation(module, 'view', encode(obj.model), 0, t)
elif model=='ir.actions.wizard':
- service_name = 'wizard.'+encode(obj.wiz_name)
- import openerp.netsvc as netsvc
- if netsvc.Service._services.get(service_name):
- obj2 = netsvc.Service._services[service_name]
- for state_name, state_def in obj2.states.iteritems():
- if 'result' in state_def:
- result = state_def['result']
- if result['type'] != 'form':
- continue
- name = "%s,%s" % (encode(obj.wiz_name), state_name)
-
- def_params = {
- 'string': ('wizard_field', lambda s: [encode(s)]),
- 'selection': ('selection', lambda s: [encode(e[1]) for e in ((not callable(s)) and s or [])]),
- 'help': ('help', lambda s: [encode(s)]),
- }
-
- # export fields
- if not result.has_key('fields'):
- _logger.warning("res has no fields: %r", result)
- continue
- for field_name, field_def in result['fields'].iteritems():
- res_name = name + ',' + field_name
-
- for fn in def_params:
- if fn in field_def:
- transtype, modifier = def_params[fn]
- for val in modifier(field_def[fn]):
- push_translation(module, transtype, res_name, 0, val)
-
- # export arch
- arch = result['arch']
- if arch and not isinstance(arch, UpdateableStr):
- d = etree.XML(arch)
- for t in trans_parse_view(d):
- push_translation(module, 'wizard_view', name, 0, t)
-
- # export button labels
- for but_args in result['state']:
- button_name = but_args[0]
- button_label = but_args[1]
- res_name = name + ',' + button_name
- push_translation(module, 'wizard_button', res_name, 0, button_label)
+ pass # TODO Can model really be 'ir.actions.wizard' ?
elif model=='ir.model.fields':
try:
except AttributeError, exc:
_logger.error("name error in %s: %s", xml_name, str(exc))
continue
- objmodel = pool.get(obj.model)
+ objmodel = registry.get(obj.model)
if not objmodel or not field_name in objmodel._columns:
continue
field_def = objmodel._columns[field_name]
push_constraint_msg(module, term_type, model._name, constraint[msg_pos])
for (_, model, module) in cr.fetchall():
- model_obj = pool.get(model)
-
- if not model_obj:
+ if model not in registry:
_logger.error("Unable to find object %r", model)
continue
+ model_obj = registry[model]
+
if model_obj._constraints:
push_local_constraints(module, model_obj, 'constraints')
return path.split(os.path.sep)[0]
return 'base' # files that are not in a module are considered as being in 'base' module
- modobj = pool.get('ir.module.module')
+ modobj = registry['ir.module.module']
installed_modids = modobj.search(cr, uid, [('state', '=', 'installed')])
installed_modules = map(lambda m: m['name'], modobj.read(cr, uid, installed_modids, ['name']))
if context is None:
context = {}
db_name = cr.dbname
- pool = pooler.get_pool(db_name)
- lang_obj = pool.get('res.lang')
- trans_obj = pool.get('ir.translation')
+ registry = openerp.registry(db_name)
+ lang_obj = registry.get('res.lang')
+ trans_obj = registry.get('ir.translation')
iso_lang = misc.get_iso_codes(lang)
try:
ids = lang_obj.search(cr, SUPERUSER_ID, [('code','=', lang)])
:param lang: language ISO code with optional _underscore_ and l10n flavor (ex: 'fr', 'fr_BE', but not 'fr-BE')
:type lang: str
"""
- pool = pooler.get_pool(cr.dbname)
- language_installer = pool.get('base.language.install')
- uid = 1
- oid = language_installer.create(cr, uid, {'lang': lang})
- language_installer.lang_install(cr, uid, [oid], context=None)
+ registry = openerp.registry(cr.dbname)
+ language_installer = registry['base.language.install']
+ oid = language_installer.create(cr, SUPERUSER_ID, {'lang': lang})
+ language_installer.lang_install(cr, SUPERUSER_ID, [oid], context=None)
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: