self.sql_log_count = 0
self.__closed = True # avoid the call of close() (by __del__) if an exception
# is raised by any of the following initialisations
- self._pool = pool
+ self.__pool = pool
self.dbname = dbname
# Whether to enable snapshot isolation level for this cursor.
chosen_template = tools.config['db_template']
templates_list = tuple(set(['template0', 'template1', 'postgres', chosen_template]))
keep_in_pool = self.dbname not in templates_list
- self._pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
+ self.__pool.give_back(self._cnx, keep_in_pool=keep_in_pool)
@check
def autocommit(self, on):
def __init__(self, pool, dbname):
self.dbname = dbname
- self._pool = pool
+ self.__pool = pool
def cursor(self, serialized=True):
cursor_type = serialized and 'serialized ' or ''
_logger.debug('create %scursor to %r', cursor_type, self.dbname)
- return Cursor(self._pool, self.dbname, serialized=serialized)
+ return Cursor(self.__pool, self.dbname, serialized=serialized)
# serialized_cursor is deprecated - cursors are serialized by default
serialized_cursor = cursor
'MAKE_FUNCTION', 'SLICE+0', 'SLICE+1', 'SLICE+2', 'SLICE+3',
# New in Python 2.7 - http://bugs.python.org/issue4715 :
'JUMP_IF_FALSE_OR_POP', 'JUMP_IF_TRUE_OR_POP', 'POP_JUMP_IF_FALSE',
- 'POP_JUMP_IF_TRUE', 'SETUP_EXCEPT', 'END_FINALLY'
+ 'POP_JUMP_IF_TRUE', 'SETUP_EXCEPT', 'END_FINALLY', 'LOAD_FAST',
+ 'LOAD_GLOBAL',
] if x in opmap))
_logger = logging.getLogger(__name__)
[100, 100, 23, 100, 100, 102, 103, 83]
"""
i = 0
- opcodes = []
byte_codes = codeobj.co_code
while i < len(byte_codes):
code = ord(byte_codes[i])
- opcodes.append(code)
+ yield code
+
if code >= HAVE_ARGUMENT:
i += 3
else:
i += 1
- return opcodes
+
+def assert_no_dunder_name(code_obj, expr):
+ """ assert_no_dunder_name(code_obj, expr) -> None
+
+ Asserts that the code object does not refer to any "dunder name"
+ (__$name__), so that safe_eval prevents access to any internal-ish Python
+ attribute or method (both are loaded via LOAD_ATTR which uses a name, not a
+ const or a var).
+
+ Checks that no such name exists in the provided code object (co_names).
+
+ :param code_obj: code object to name-validate
+ :type code_obj: CodeType
+ :param str expr: expression corresponding to the code object, for debugging
+ purposes
+ :raises NameError: in case a forbidden name (containing two underscores)
+ is found in ``code_obj``
+
+ .. note:: actually forbids every name containing 2 underscores
+ """
+ for name in code_obj.co_names:
+ if "__" in name:
+ raise NameError('Access to forbidden name %r (%r)' % (name, expr))
+
+def assert_valid_codeobj(allowed_codes, code_obj, expr):
+ """ Asserts that the provided code object validates against the bytecode
+ and name constraints.
+
+ Recursively validates the code objects stored in its co_consts in case
+ lambdas are being created/used (lambdas generate their own separated code
+ objects and don't live in the root one)
+
+ :param allowed_codes: list of permissible bytecode instructions
+ :type allowed_codes: set(int)
+ :param code_obj: code object to name-validate
+ :type code_obj: CodeType
+ :param str expr: expression corresponding to the code object, for debugging
+ purposes
+ :raises ValueError: in case of forbidden bytecode in ``code_obj``
+ :raises NameError: in case a forbidden name (containing two underscores)
+ is found in ``code_obj``
+ """
+ assert_no_dunder_name(code_obj, expr)
+ for opcode in _get_opcodes(code_obj):
+ if opcode not in allowed_codes:
+ raise ValueError(
+ "opcode %s not allowed (%r)" % (opname[opcode], expr))
+ for const in code_obj.co_consts:
+ if isinstance(const, CodeType):
+ assert_valid_codeobj(allowed_codes, const, 'lambda')
def test_expr(expr, allowed_codes, mode="eval"):
"""test_expr(expression, allowed_codes[, mode]) -> code_object
# eval() does not like leading/trailing whitespace
expr = expr.strip()
code_obj = compile(expr, "", mode)
- except (SyntaxError, TypeError):
+ except (SyntaxError, TypeError, ValueError):
_logger.debug('Invalid eval expression', exc_info=True)
raise
except Exception:
_logger.debug('Disallowed or invalid eval expression', exc_info=True)
raise ValueError("%s is not a valid expression" % expr)
- for code in _get_opcodes(code_obj):
- if code not in allowed_codes:
- raise ValueError("opcode %s not allowed (%r)" % (opname[code], expr))
+
+ assert_valid_codeobj(allowed_codes, code_obj, expr)
return code_obj
This can be used to e.g. evaluate
an OpenERP domain expression from an untrusted source.
- Throws TypeError, SyntaxError or ValueError (not allowed) accordingly.
-
- >>> safe_eval("__import__('sys').modules")
- Traceback (most recent call last):
- ...
- ValueError: opcode LOAD_NAME not allowed
-
+ :throws TypeError: If the expression provided is a code object
+ :throws SyntaxError: If the expression provided is not valid Python
+ :throws NameError: If the expression provided accesses forbidden names
+ :throws ValueError: If the expression provided uses forbidden bytecode
"""
if isinstance(expr, CodeType):
- raise ValueError("safe_eval does not allow direct evaluation of code objects.")
-
- if '__subclasses__' in expr:
- raise ValueError('expression not allowed (__subclasses__)')
+ raise TypeError("safe_eval does not allow direct evaluation of code objects.")
if globals_dict is None:
globals_dict = {}