fields_pre2 = map(convert_field, fields_pre)
order_by = self._parent_order or self._order
- select_fields = ','.join(fields_pre2 + [self._table + '.id'])
+ select_fields = ','.join(fields_pre2 + ['%s.id' % self._table])
query = 'SELECT %s FROM %s WHERE %s.id IN %%s' % (select_fields, ','.join(tables), self._table)
if rule_clause:
query += " AND " + (' OR '.join(rule_clause))
query += " ORDER BY " + order_by
for sub_ids in cr.split_for_in_conditions(ids):
- if rule_clause:
- cr.execute(query, [tuple(sub_ids)] + rule_params)
- self._check_record_rules_result_count(cr, user, sub_ids, 'read', context=context)
- else:
- cr.execute(query, (tuple(sub_ids),))
- res.extend(cr.dictfetchall())
+ cr.execute(query, [tuple(sub_ids)] + rule_params)
+ results = cr.dictfetchall()
+ result_ids = [x['id'] for x in results]
+ self._check_record_rules_result_count(cr, user, sub_ids, result_ids, 'read', context=context)
+ res.extend(results)
else:
res = map(lambda x: {'id': x}, ids)
# mention the first one only to keep the error message readable
raise except_orm('ConcurrencyException', _('A document was modified since you last viewed it (%s:%d)') % (self._description, res[0]))
- def _check_record_rules_result_count(self, cr, uid, ids, operation, context=None):
- """Verify that number of returned rows after applying record rules matches
+ def _check_record_rules_result_count(self, cr, uid, ids, result_ids, operation, context=None):
+ """Verify the returned rows after applying record rules matches
the length of `ids`, and raise an appropriate exception if it does not.
"""
- if cr.rowcount != len(ids):
+ ids, result_ids = set(ids), set(result_ids)
+ missing_ids = ids - result_ids
+ if missing_ids:
# Attempt to distinguish record rule restriction vs deleted records,
- # to provide a more specific error message
- cr.execute('SELECT id FROM ' + self._table + ' WHERE id IN %s', (tuple(ids),))
- if cr.rowcount != len(ids):
- if operation == 'unlink':
- # no need to warn about deleting an already deleted record!
+ # to provide a more specific error message - check if the missinf
+ cr.execute('SELECT id FROM ' + self._table + ' WHERE id IN %s', (tuple(missing_ids),))
+ if cr.rowcount:
+ # the missing ids are (at least partially) hidden by access rules
+ _logger.warning('Access Denied by record rules for operation: %s, uid: %s, model: %s', operation, uid, self._name)
+ raise except_orm(_('Access Denied'),
+ _('The requested operation cannot be completed due to security restrictions. Please contact your system administrator.\n\n(Document type: %s, Operation: %s)') % \
+ (self._description, operation))
+ else:
+ # If we get here, the missing_ids are not in the database
+ if operation in ('read','unlink'):
+ # No need to warn about deleting an already deleted record.
+ # And no error when reading a record that was deleted, to prevent spurious
+ # errors for non-transactional search/read sequences coming from clients
return
_logger.warning('Failed operation on deleted record(s): %s, uid: %s, model: %s', operation, uid, self._name)
raise except_orm(_('Missing document(s)'),
_('One of the documents you are trying to access has been deleted, please try again after refreshing.'))
- _logger.warning('Access Denied by record rules for operation: %s, uid: %s, model: %s', operation, uid, self._name)
- raise except_orm(_('Access Denied'),
- _('The requested operation cannot be completed due to security restrictions. Please contact your system administrator.\n\n(Document type: %s, Operation: %s)') % \
- (self._description, operation))
+
def check_access_rights(self, cr, uid, operation, raise_exception=True): # no context on purpose.
"""Verifies that the operation given by ``operation`` is allowed for the user
cr.execute('SELECT ' + self._table + '.id FROM ' + ','.join(tables) +
' WHERE ' + self._table + '.id IN %s' + where_clause,
[sub_ids] + where_params)
- self._check_record_rules_result_count(cr, uid, sub_ids, operation, context=context)
+ returned_ids = [x['id'] for x in cr.dictfetchall()]
+ self._check_record_rules_result_count(cr, uid, sub_ids, returned_ids, operation, context=context)
def _workflow_trigger(self, cr, uid, ids, trigger, context=None):
"""Call given workflow trigger as a result of a CRUD operation"""
-import unittest2
-
-import openerp
+from openerp import exceptions
+from openerp.tools import mute_logger
import common
UID = common.ADMIN_USER_ID
DB = common.DB
+
+class TestORM(common.TransactionCase):
+ """ test special behaviors of ORM CRUD functions
+
+ TODO: use real Exceptions types instead of Exception """
+
+ def setUp(self):
+ super(TestORM, self).setUp()
+ cr, uid = self.cr, self.uid
+ self.partner = self.registry('res.partner')
+ self.users = self.registry('res.users')
+ self.p1 = self.partner.name_create(cr, uid, 'W')[0]
+ self.p2 = self.partner.name_create(cr, uid, 'Y')[0]
+ self.ir_rule = self.registry('ir.rule')
+
+ # sample unprivileged user
+ employee_gid = self.ref('base.group_user')
+ self.uid2 = self.users.create(cr, uid, {'name': 'test user', 'login': 'test', 'groups_id': [4,employee_gid]})
+
+ @mute_logger('openerp.osv.orm')
+ def testAccessDeletedRecords(self):
+ """ Verify that accessing deleted records works as expected """
+ cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
+ self.partner.unlink(cr, uid, [p1])
+
+ # read() is expected to skip deleted records because our API is not
+ # transactional for a sequence of search()->read() performed from the
+ # client-side... a concurrent deletion could therefore cause spurious
+ # exceptions even when simply opening a list view!
+ # /!\ Using unprileged user to detect former side effects of ir.rules!
+ self.assertEqual([{'id': p2, 'name': 'Y'}], self.partner.read(cr, uid2, [p1,p2], ['name']), "read() should skip deleted records")
+ self.assertEqual([], self.partner.read(cr, uid2, [p1], ['name']), "read() should skip deleted records")
+
+ # Deleting an already deleted record should be simply ignored
+ self.assertTrue(self.partner.unlink(cr, uid, [p1]), "Re-deleting should be a no-op")
+
+ # Updating an already deleted record should raise, even as admin
+ with self.assertRaises(Exception):
+ self.partner.write(cr, uid, [p1], {'name': 'foo'})
+
+ @mute_logger('openerp.osv.orm')
+ def testAccessFilteredRecords(self):
+ """ Verify that accessing filtered records works as expected for non-admin user """
+ cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
+ partner_model = self.registry('ir.model').search(cr, uid, [('model','=','res.partner')])[0]
+ self.ir_rule.create(cr, uid, {'name': 'Y is invisible',
+ 'domain_force': [('id', '!=', p1)],
+ 'model_id': partner_model})
+ # search as unprivileged user
+ partners = self.partner.search(cr, uid2, [])
+ self.assertFalse(p1 in partners, "W should not be visible...")
+ self.assertTrue(p2 in partners, "... but Y should be visible")
+
+ # read as unprivileged user
+ with self.assertRaises(Exception):
+ self.partner.read(cr, uid2, [p1], ['name'])
+ # write as unprivileged user
+ with self.assertRaises(Exception):
+ self.partner.write(cr, uid2, [p1], {'name': 'foo'})
+ # unlink as unprivileged user
+ with self.assertRaises(Exception):
+ self.partner.unlink(cr, uid2, [p1])
+
+ # Prepare mixed case
+ self.partner.unlink(cr, uid, [p2])
+ # read mixed records: some deleted and some filtered
+ with self.assertRaises(Exception):
+ self.partner.read(cr, uid2, [p1,p2], ['name'])
+ # delete mixed records: some deleted and some filtered
+ with self.assertRaises(Exception):
+ self.partner.unlink(cr, uid2, [p1,p2])
+
+
class TestInherits(common.TransactionCase):
""" test the behavior of the orm for models that use _inherits;
specifically: res.users, that inherits from res.partner