##############################################################################
import sys
-import logging
-import warnings
LOG_NOTSET = 'notset'
-LOG_DEBUG_SQL = 'debug_sql'
-LOG_DEBUG_RPC_ANSWER = 'debug_rpc_answer'
-LOG_DEBUG_RPC = 'debug_rpc'
LOG_DEBUG = 'debug'
-LOG_TEST = 'test'
LOG_INFO = 'info'
LOG_WARNING = 'warn'
LOG_ERROR = 'error'
LOG_CRITICAL = 'critical'
-logging.DEBUG_RPC_ANSWER = logging.DEBUG - 4
-logging.addLevelName(logging.DEBUG_RPC_ANSWER, 'DEBUG_RPC_ANSWER')
-logging.DEBUG_RPC = logging.DEBUG - 2
-logging.addLevelName(logging.DEBUG_RPC, 'DEBUG_RPC')
-logging.DEBUG_SQL = logging.DEBUG_RPC - 3
-logging.addLevelName(logging.DEBUG_SQL, 'DEBUG_SQL')
-
-logging.TEST = logging.INFO - 5
-logging.addLevelName(logging.TEST, 'TEST')
-
-class Logger(object):
- def __init__(self):
- warnings.warn("The netsvc.Logger API shouldn't be used anymore, please "
- "use the standard `logging.getLogger` API instead",
- PendingDeprecationWarning, stacklevel=2)
- super(Logger, self).__init__()
-
- def notifyChannel(self, name, level, msg):
- warnings.warn("notifyChannel API shouldn't be used anymore, please use "
- "the standard `logging` module instead",
- PendingDeprecationWarning, stacklevel=2)
- from service.web_services import common
-
- log = logging.getLogger(ustr(name))
-
- if level in [LOG_DEBUG_RPC, LOG_TEST] and not hasattr(log, level):
- fct = lambda msg, *args, **kwargs: log.log(getattr(logging, level.upper()), msg, *args, **kwargs)
- setattr(log, level, fct)
-
-
- level_method = getattr(log, level)
-
- if isinstance(msg, Exception):
- msg = exception_to_unicode(msg)
-
- try:
- msg = ustr(msg).strip()
- if level in (LOG_ERROR, LOG_CRITICAL): # and tools.config.get_misc('debug','env_info',False):
- msg = common().exp_get_server_environment() + "\n" + msg
-
- result = msg.split('\n')
- except UnicodeDecodeError:
- result = msg.strip().split('\n')
- try:
- if len(result)>1:
- for idx, s in enumerate(result):
- level_method('[%02d]: %s' % (idx+1, s,))
- elif result:
- level_method(result[0])
- except IOError:
- # TODO: perhaps reset the logger streams?
- #if logrotate closes our files, we end up here..
- pass
- except Exception:
- # better ignore the exception and carry on..
- pass
-
- def set_loglevel(self, level, logger=None):
- if logger is not None:
- log = logging.getLogger(str(logger))
- else:
- log = logging.getLogger()
- log.setLevel(logging.INFO) # make sure next msg is printed
- log.info("Log level changed to %s" % logging.getLevelName(level))
- log.setLevel(level)
-
- def shutdown(self):
- logging.shutdown()
-
# TODO get_encodings, ustr and exception_to_unicode were originally from tools.misc.
# There are here until we refactor tools so that this module doesn't depends on tools.
def get_encodings(hint_encoding='utf-8'):
fallbacks = {
- 'latin1': 'latin9',
- 'iso-8859-1': 'iso8859-15',
- 'cp1252': '1252',
+ 'latin1': 'latin9',
+ 'iso-8859-1': 'iso8859-15',
+ 'cp1252': '1252',
}
if hint_encoding:
- yield hint_encoding
- if hint_encoding.lower() in fallbacks:
- yield fallbacks[hint_encoding.lower()]
+ yield hint_encoding
+ if hint_encoding.lower() in fallbacks:
+ yield fallbacks[hint_encoding.lower()]
# some defaults (also taking care of pure ASCII)
for charset in ['utf8','latin1']:
- if not (hint_encoding) or (charset.lower() != hint_encoding.lower()):
- yield charset
+ if not hint_encoding or (charset.lower() != hint_encoding.lower()):
+ yield charset
from locale import getpreferredencoding
prefenc = getpreferredencoding()
if prefenc and prefenc.lower() != 'utf-8':
- yield prefenc
- prefenc = fallbacks.get(prefenc.lower())
- if prefenc:
- yield prefenc
-
-def ustr(value, hint_encoding='utf-8'):
- """This method is similar to the builtin `str` method, except
- it will return unicode() string.
-
- @param value: the value to convert
- @param hint_encoding: an optional encoding that was detected
- upstream and should be tried first to
- decode ``value``.
-
- @rtype: unicode
- @return: unicode string
+ yield prefenc
+ prefenc = fallbacks.get(prefenc.lower())
+ if prefenc:
+ yield prefenc
+
+def ustr(value, hint_encoding='utf-8', errors='strict'):
+ """This method is similar to the builtin `unicode`, except
+ that it may try multiple encodings to find one that works
+ for decoding `value`, and defaults to 'utf-8' first.
+
+ :param: value: the value to convert
+ :param: hint_encoding: an optional encoding that was detecte
+ upstream and should be tried first to decode ``value``.
+ :param str errors: optional `errors` flag to pass to the unicode
+ built-in to indicate how illegal character values should be
+ treated when converting a string: 'strict', 'ignore' or 'replace'
+ (see ``unicode()`` constructor).
+ Passing anything other than 'strict' means that the first
+ encoding tried will be used, even if it's not the correct
+ one to use, so be careful! Ignored if value is not a string/unicode.
+ :raise: UnicodeError if value cannot be coerced to unicode
+ :return: unicode string representing the given value
"""
if isinstance(value, Exception):
- return exception_to_unicode(value)
+ return exception_to_unicode(value)
if isinstance(value, unicode):
- return value
+ return value
if not isinstance(value, basestring):
- try:
- return unicode(value)
- except Exception:
- raise UnicodeError('unable to convert %r' % (value,))
+ try:
+ return unicode(value)
+ except Exception:
+ raise UnicodeError('unable to convert %r' % (value,))
for ln in get_encodings(hint_encoding):
- try:
- return unicode(value, ln)
- except Exception:
- pass
+ try:
+ return unicode(value, ln, errors=errors)
+ except Exception:
+ pass
raise UnicodeError('unable to convert %r' % (value,))
def exception_to_unicode(e):
if (sys.version_info[:2] < (2,6)) and hasattr(e, 'message'):
- return ustr(e.message)
+ return ustr(e.message)
if hasattr(e, 'args'):
- return "\n".join((ustr(a) for a in e.args))
+ return "\n".join((ustr(a) for a in e.args))
try:
- return ustr(e)
+ return unicode(e)
except Exception:
- return u"Unknown message"
+ return u"Unknown message"
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: