#
# OpenERP, Open Source Management Solution
# Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
-# Copyright (C) 2010-2012 OpenERP s.a. (<http://openerp.com>).
+# Copyright (C) 2010-2013 OpenERP s.a. (<http://openerp.com>).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
#
##############################################################################
-#.apidoc title: Utilities: tools.misc
"""
Miscellaneous tools used by OpenERP.
"""
from functools import wraps
+import cProfile
import subprocess
import logging
import os
-import random
-import re
import socket
import sys
import threading
import zipfile
from collections import defaultdict
from datetime import datetime
-from itertools import islice, izip
+from itertools import islice, izip, groupby
from lxml import etree
from which import which
from threading import local
+
try:
from html2text import html2text
except ImportError:
html2text = None
-import openerp.loglevels as loglevels
-import openerp.pooler as pooler
from config import config
from cache import *
pop = subprocess.Popen((prog,) + args, bufsize= -1,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=(os.name=="posix"))
- return (pop.stdin, pop.stdout)
+ return pop.stdin, pop.stdout
def exec_command_pipe(name, *args):
prog = find_in_path(name)
pop = subprocess.Popen((prog,) + args, bufsize= -1,
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
close_fds=(os.name=="posix"))
- return (pop.stdin, pop.stdout)
+ return pop.stdin, pop.stdout
#----------------------------------------------------------
# File paths
# Is it below 'addons_path' or 'root_path'?
name = os.path.normcase(os.path.normpath(name))
for root in adps + [rtp]:
+ root = os.path.normcase(os.path.normpath(root)) + os.sep
if name.startswith(root):
base = root.rstrip(os.sep)
name = name[len(base) + 1:]
if os.path.isfile(name):
fo = open(name, mode)
if pathinfo:
- return (fo, name)
+ return fo, name
return fo
# Support for loading modules in zipped form.
os.sep, '/')))
fo.seek(0)
if pathinfo:
- return (fo, name)
+ return fo, name
return fo
except Exception:
pass
"""
return izip(xrange(len(l)-1, -1, -1), reversed(l))
-#----------------------------------------------------------
-# Emails
-#----------------------------------------------------------
-email_re = re.compile(r"""
- ([a-zA-Z][\w\.-]*[a-zA-Z0-9] # username part
- @ # mandatory @ sign
- [a-zA-Z0-9][\w\.-]* # domain must start with a letter ... Ged> why do we include a 0-9 then?
- \.
- [a-z]{2,3} # TLD
- )
- """, re.VERBOSE)
-res_re = re.compile(r"\[([0-9]+)\]", re.UNICODE)
-command_re = re.compile("^Set-([a-z]+) *: *(.+)$", re.I + re.UNICODE)
-
-# Updated in 7.0 to match the model name as well
-# Typical form of references is <timestamp-openerp-record_id-model_name@domain>
-# group(1) = the record ID ; group(2) = the model (if any) ; group(3) = the domain
-reference_re = re.compile("<.*-open(?:object|erp)-(\\d+)(?:-([\w.]+))?.*@(.*)>", re.UNICODE)
-
-def html2plaintext(html, body_id=None, encoding='utf-8'):
- """ From an HTML text, convert the HTML to plain text.
- If @param body_id is provided then this is the tag where the
- body (not necessarily <body>) starts.
- """
- ## (c) Fry-IT, www.fry-it.com, 2007
- ## <peter@fry-it.com>
- ## download here: http://www.peterbe.com/plog/html2plaintext
-
- html = ustr(html)
-
- from lxml.etree import tostring, fromstring, HTMLParser
- tree = fromstring(html, parser=HTMLParser())
-
- if body_id is not None:
- source = tree.xpath('//*[@id=%s]'%(body_id,))
- else:
- source = tree.xpath('//body')
- if len(source):
- tree = source[0]
-
- url_index = []
- i = 0
- for link in tree.findall('.//a'):
- url = link.get('href')
- if url:
- i += 1
- link.tag = 'span'
- link.text = '%s [%s]' % (link.text, i)
- url_index.append(url)
-
- html = ustr(tostring(tree, encoding=encoding))
-
- html = html.replace('<strong>','*').replace('</strong>','*')
- html = html.replace('<b>','*').replace('</b>','*')
- html = html.replace('<h3>','*').replace('</h3>','*')
- html = html.replace('<h2>','**').replace('</h2>','**')
- html = html.replace('<h1>','**').replace('</h1>','**')
- html = html.replace('<em>','/').replace('</em>','/')
- html = html.replace('<tr>', '\n')
- html = html.replace('</p>', '\n')
- html = re.sub('<br\s*/?>', '\n', html)
- html = re.sub('<.*?>', ' ', html)
- html = html.replace(' ' * 2, ' ')
-
- # strip all lines
- html = '\n'.join([x.strip() for x in html.splitlines()])
- html = html.replace('\n' * 2, '\n')
-
- for i, url in enumerate(url_index):
- if i == 0:
- html += '\n\n'
- html += ustr('[%s] %s\n') % (i+1, url)
-
- return html
-
-def generate_tracking_message_id(res_id):
- """Returns a string that can be used in the Message-ID RFC822 header field
-
- Used to track the replies related to a given object thanks to the "In-Reply-To"
- or "References" fields that Mail User Agents will set.
- """
- try:
- rnd = random.SystemRandom().random()
- except NotImplementedError:
- rnd = random.random()
- rndstr = ("%.15f" % rnd)[2:]
- return "<%.15f.%s-openerp-%s@%s>" % (time.time(), rndstr, res_id, socket.gethostname())
-
-def email_send(email_from, email_to, subject, body, email_cc=None, email_bcc=None, reply_to=False,
- attachments=None, message_id=None, references=None, openobject_id=False, debug=False, subtype='plain', headers=None,
- smtp_server=None, smtp_port=None, ssl=False, smtp_user=None, smtp_password=None, cr=None, uid=None):
- """Low-level function for sending an email (deprecated).
-
- :deprecate: since OpenERP 6.1, please use ir.mail_server.send_email() instead.
- :param email_from: A string used to fill the `From` header, if falsy,
- config['email_from'] is used instead. Also used for
- the `Reply-To` header if `reply_to` is not provided
- :param email_to: a sequence of addresses to send the mail to.
- """
-
- # If not cr, get cr from current thread database
- if not cr:
- db_name = getattr(threading.currentThread(), 'dbname', None)
- if db_name:
- cr = pooler.get_db_only(db_name).cursor()
- else:
- raise Exception("No database cursor found, please pass one explicitly")
-
- # Send Email
- try:
- mail_server_pool = pooler.get_pool(cr.dbname).get('ir.mail_server')
- res = False
- # Pack Message into MIME Object
- email_msg = mail_server_pool.build_email(email_from, email_to, subject, body, email_cc, email_bcc, reply_to,
- attachments, message_id, references, openobject_id, subtype, headers=headers)
-
- res = mail_server_pool.send_email(cr, uid or 1, email_msg, mail_server_id=None,
- smtp_server=smtp_server, smtp_port=smtp_port, smtp_user=smtp_user, smtp_password=smtp_password,
- smtp_encryption=('ssl' if ssl else None), smtp_debug=debug)
- except Exception:
- _logger.exception("tools.email_send failed to deliver email")
- return False
- finally:
- cr.close()
- return res
-
-def email_split(text):
- """ Return a list of the email addresses found in ``text`` """
- if not text: return []
- return re.findall(r'([^ ,<@]+@[^> ,]+)', text)
-
-def append_content_to_html(html, content, plaintext=True):
- """Append extra content at the end of an HTML snippet, trying
- to locate the end of the HTML document (</body>, </html>, or
- EOF), and wrapping the provided content in a <pre/> block
- unless ``plaintext`` is False. A side-effect of this
- method is to coerce all HTML tags to lowercase in ``html``,
- and strip enclosing <html> or <body> tags in content if
- ``plaintext`` is False.
-
- :param str html: html tagsoup (doesn't have to be XHTML)
- :param str content: extra content to append
- :param bool plaintext: whether content is plaintext and should
- be wrapped in a <pre/> tag.
- """
- html = ustr(html)
- if plaintext:
- content = u'\n<pre>%s</pre>\n' % ustr(content)
- else:
- content = re.sub(r'(?i)(</?html.*>|</?body.*>|<!\W*DOCTYPE.*>)', '', content)
- content = u'\n%s\n'% ustr(content)
- # Force all tags to lowercase
- html = re.sub(r'(</?)\W*(\w+)([ >])',
- lambda m: '%s%s%s' % (m.group(1),m.group(2).lower(),m.group(3)), html)
- insert_location = html.find('</body>')
- if insert_location == -1:
- insert_location = html.find('</html>')
- if insert_location == -1:
- return '%s%s' % (html, content)
- return '%s%s%s' % (html[:insert_location], content, html[insert_location:])
-
-
-#----------------------------------------------------------
-# SMS
-#----------------------------------------------------------
-# text must be latin-1 encoded
-def sms_send(user, password, api_id, text, to):
- import urllib
- url = "http://api.urlsms.com/SendSMS.aspx"
- #url = "http://196.7.150.220/http/sendmsg"
- params = urllib.urlencode({'UserID': user, 'Password': password, 'SenderID': api_id, 'MsgText': text, 'RecipientMobileNo':to})
- urllib.urlopen(url+"?"+params)
- # FIXME: Use the logger if there is an error
- return True
class UpdateableStr(local):
""" Class that stores an updateable string (used in wizards)
ALL_LANGUAGES = {
'ab_RU': u'Abkhazian / аҧсуа',
- 'ar_AR': u'Arabic / الْعَرَبيّة',
+ 'am_ET': u'Amharic / አምሃርኛ',
+ 'ar_SY': u'Arabic / الْعَرَبيّة',
'bg_BG': u'Bulgarian / български език',
'bs_BS': u'Bosnian / bosanski jezik',
'ca_ES': u'Catalan / Català',
sz=len(sz)
s, i = float(sz), 0
while s >= 1024 and i < len(units)-1:
- s = s / 1024
- i = i + 1
+ s /= 1024
+ i += 1
return "%0.2f %s" % (s, units[i])
def logged(f):
def __call__(self, f):
@wraps(f)
def wrapper(*args, **kwargs):
- class profile_wrapper(object):
- def __init__(self):
- self.result = None
- def __call__(self):
- self.result = f(*args, **kwargs)
- pw = profile_wrapper()
- import cProfile
- fname = self.fname or ("%s.cprof" % (f.func_name,))
- cProfile.runctx('pw()', globals(), locals(), filename=fname)
- return pw.result
+ profile = cProfile.Profile()
+ result = profile.runcall(f, *args, **kwargs)
+ profile.dump_stats(self.fname or ("%s.cprof" % (f.func_name,)))
+ return result
return wrapper
global __icons_list
return [(x, x) for x in __icons_list ]
-def extract_zip_file(zip_file, outdirectory):
- zf = zipfile.ZipFile(zip_file, 'r')
- out = outdirectory
- for path in zf.namelist():
- tgt = os.path.join(out, path)
- tgtdir = os.path.dirname(tgt)
- if not os.path.exists(tgtdir):
- os.makedirs(tgtdir)
-
- if not tgt.endswith(os.sep):
- fp = open(tgt, 'wb')
- fp.write(zf.read(path))
- fp.close()
- zf.close()
-
def detect_ip_addr():
"""Try a very crude method to figure out a valid external
IP or hostname for the current machine. Don't rely on this
@return the standard name of the current win32 timezone, or False if it cannot be found.
"""
res = False
- if (sys.platform == "win32"):
+ if sys.platform == "win32":
try:
import _winreg
hklm = _winreg.ConnectRegistry(None,_winreg.HKEY_LOCAL_MACHINE)
(time.tzname[0], 'time.tzname'),
(os.environ.get('TZ',False),'TZ environment variable'), ]
# Option 4: OS-specific: /etc/timezone on Unix
- if (os.path.exists("/etc/timezone")):
+ if os.path.exists("/etc/timezone"):
tz_value = False
try:
f = open("/etc/timezone")
f.close()
sources.append((tz_value,"/etc/timezone file"))
# Option 5: timezone info from registry on Win32
- if (sys.platform == "win32"):
+ if sys.platform == "win32":
# Timezone info is stored in windows registry.
# However this is not likely to work very well as the standard name
# of timezones in windows is rarely something that is known to pytz.
raise StopIteration()
return val
+def stripped_sys_argv(*strip_args):
+ """Return sys.argv with some arguments stripped, suitable for reexecution or subprocesses"""
+ strip_args = sorted(set(strip_args) | set(['-s', '--save', '-d', '--database', '-u', '--update', '-i', '--init']))
+ assert all(config.parser.has_option(s) for s in strip_args)
+ takes_value = dict((s, config.parser.get_option(s).takes_value()) for s in strip_args)
+
+ longs, shorts = list(tuple(y) for _, y in groupby(strip_args, lambda x: x.startswith('--')))
+ longs_eq = tuple(l + '=' for l in longs if takes_value[l])
+
+ args = sys.argv[:]
+
+ def strip(args, i):
+ return args[i].startswith(shorts) \
+ or args[i].startswith(longs_eq) or (args[i] in longs) \
+ or (i >= 1 and (args[i - 1] in strip_args) and takes_value[args[i - 1]])
+
+ return [x for i, x in enumerate(args) if not strip(args, i)]
+
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: