[IMP] orm: make error handling more consistent when accessing deleted/filtered records
authorOlivier Dony <odo@openerp.com>
Fri, 9 Nov 2012 17:14:51 +0000 (18:14 +0100)
committerOlivier Dony <odo@openerp.com>
Fri, 9 Nov 2012 17:14:51 +0000 (18:14 +0100)
Previous behavior was unspecified and untested - leading to random results
when performing operations on a mix of deleted and ir.rule-filtered records.

The behavior is now clarified and explicitly tested.
One suprising case remains: read() on a deleted record
returns an empty result instead of raising an error,
in order to avoid spurious errors when a client
performs a sequence of search(), read() while
another user is deleting records.

bzr revid: odo@openerp.com-20121109171451-z2m6oqs910103lcz

openerp/osv/orm.py
openerp/tests/test_orm.py

index e7ad9ca..c09d406 100644 (file)
@@ -3601,18 +3601,17 @@ class BaseModel(object):
 
             fields_pre2 = map(convert_field, fields_pre)
             order_by = self._parent_order or self._order
-            select_fields = ','.join(fields_pre2 + [self._table + '.id'])
+            select_fields = ','.join(fields_pre2 + ['%s.id' % self._table])
             query = 'SELECT %s FROM %s WHERE %s.id IN %%s' % (select_fields, ','.join(tables), self._table)
             if rule_clause:
                 query += " AND " + (' OR '.join(rule_clause))
             query += " ORDER BY " + order_by
             for sub_ids in cr.split_for_in_conditions(ids):
-                if rule_clause:
-                    cr.execute(query, [tuple(sub_ids)] + rule_params)
-                    self._check_record_rules_result_count(cr, user, sub_ids, 'read', context=context)
-                else:
-                    cr.execute(query, (tuple(sub_ids),))
-                res.extend(cr.dictfetchall())
+                cr.execute(query, [tuple(sub_ids)] + rule_params)
+                results = cr.dictfetchall()
+                result_ids = [x['id'] for x in results]
+                self._check_record_rules_result_count(cr, user, sub_ids, result_ids, 'read', context=context)
+                res.extend(results)
         else:
             res = map(lambda x: {'id': x}, ids)
 
@@ -3796,25 +3795,33 @@ class BaseModel(object):
                 # mention the first one only to keep the error message readable
                 raise except_orm('ConcurrencyException', _('A document was modified since you last viewed it (%s:%d)') % (self._description, res[0]))
 
-    def _check_record_rules_result_count(self, cr, uid, ids, operation, context=None):
-        """Verify that number of returned rows after applying record rules matches
+    def _check_record_rules_result_count(self, cr, uid, ids, result_ids, operation, context=None):
+        """Verify the returned rows after applying record rules matches
            the length of `ids`, and raise an appropriate exception if it does not.
         """
-        if cr.rowcount != len(ids):
+        ids, result_ids = set(ids), set(result_ids)
+        missing_ids = ids - result_ids
+        if missing_ids:
             # Attempt to distinguish record rule restriction vs deleted records,
-            # to provide a more specific error message
-            cr.execute('SELECT id FROM ' + self._table + ' WHERE id IN %s', (tuple(ids),))
-            if cr.rowcount != len(ids):
-                if operation == 'unlink':
-                    # no need to warn about deleting an already deleted record!
+            # to provide a more specific error message - check if the missinf
+            cr.execute('SELECT id FROM ' + self._table + ' WHERE id IN %s', (tuple(missing_ids),))
+            if cr.rowcount:
+                # the missing ids are (at least partially) hidden by access rules
+                _logger.warning('Access Denied by record rules for operation: %s, uid: %s, model: %s', operation, uid, self._name)
+                raise except_orm(_('Access Denied'),
+                                 _('The requested operation cannot be completed due to security restrictions. Please contact your system administrator.\n\n(Document type: %s, Operation: %s)') % \
+                                    (self._description, operation))
+            else:
+                # If we get here, the missing_ids are not in the database
+                if operation in ('read','unlink'):
+                    # No need to warn about deleting an already deleted record.
+                    # And no error when reading a record that was deleted, to prevent spurious
+                    # errors for non-transactional search/read sequences coming from clients 
                     return
                 _logger.warning('Failed operation on deleted record(s): %s, uid: %s, model: %s', operation, uid, self._name)
                 raise except_orm(_('Missing document(s)'),
                                  _('One of the documents you are trying to access has been deleted, please try again after refreshing.'))
-            _logger.warning('Access Denied by record rules for operation: %s, uid: %s, model: %s', operation, uid, self._name)
-            raise except_orm(_('Access Denied'),
-                             _('The requested operation cannot be completed due to security restrictions. Please contact your system administrator.\n\n(Document type: %s, Operation: %s)') % \
-                                (self._description, operation))
+
 
     def check_access_rights(self, cr, uid, operation, raise_exception=True): # no context on purpose.
         """Verifies that the operation given by ``operation`` is allowed for the user
@@ -3853,7 +3860,8 @@ class BaseModel(object):
                     cr.execute('SELECT ' + self._table + '.id FROM ' + ','.join(tables) +
                                ' WHERE ' + self._table + '.id IN %s' + where_clause,
                                [sub_ids] + where_params)
-                    self._check_record_rules_result_count(cr, uid, sub_ids, operation, context=context)
+                    returned_ids = [x['id'] for x in cr.dictfetchall()]
+                    self._check_record_rules_result_count(cr, uid, sub_ids, returned_ids, operation, context=context)
 
     def _workflow_trigger(self, cr, uid, ids, trigger, context=None):
         """Call given workflow trigger as a result of a CRUD operation"""
index ef8c0cf..c114064 100644 (file)
@@ -1,11 +1,83 @@
-import unittest2
-
-import openerp
+from openerp import exceptions
+from openerp.tools import mute_logger
 import common
 
 UID = common.ADMIN_USER_ID
 DB = common.DB
 
+
+class TestORM(common.TransactionCase):
+    """ test special behaviors of ORM CRUD functions
+    
+        TODO: use real Exceptions types instead of Exception """
+
+    def setUp(self):
+        super(TestORM, self).setUp()
+        cr, uid = self.cr, self.uid
+        self.partner = self.registry('res.partner')
+        self.users = self.registry('res.users')
+        self.p1 = self.partner.name_create(cr, uid, 'W')[0]
+        self.p2 = self.partner.name_create(cr, uid, 'Y')[0]
+        self.ir_rule = self.registry('ir.rule')
+
+        # sample unprivileged user
+        employee_gid = self.ref('base.group_user')
+        self.uid2 = self.users.create(cr, uid, {'name': 'test user', 'login': 'test', 'groups_id': [4,employee_gid]})
+
+    @mute_logger('openerp.osv.orm')
+    def testAccessDeletedRecords(self):
+        """ Verify that accessing deleted records works as expected """
+        cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
+        self.partner.unlink(cr, uid, [p1])
+
+        # read() is expected to skip deleted records because our API is not
+        # transactional for a sequence of search()->read() performed from the
+        # client-side... a concurrent deletion could therefore cause spurious
+        # exceptions even when simply opening a list view!
+        # /!\ Using unprileged user to detect former side effects of ir.rules!
+        self.assertEqual([{'id': p2, 'name': 'Y'}], self.partner.read(cr, uid2, [p1,p2], ['name']), "read() should skip deleted records")
+        self.assertEqual([], self.partner.read(cr, uid2, [p1], ['name']), "read() should skip deleted records")
+
+        # Deleting an already deleted record should be simply ignored
+        self.assertTrue(self.partner.unlink(cr, uid, [p1]), "Re-deleting should be a no-op")
+
+        # Updating an already deleted record should raise, even as admin
+        with self.assertRaises(Exception):
+            self.partner.write(cr, uid, [p1], {'name': 'foo'})
+
+    @mute_logger('openerp.osv.orm')
+    def testAccessFilteredRecords(self):
+        """ Verify that accessing filtered records works as expected for non-admin user """
+        cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
+        partner_model = self.registry('ir.model').search(cr, uid, [('model','=','res.partner')])[0]
+        self.ir_rule.create(cr, uid, {'name': 'Y is invisible',
+                                      'domain_force': [('id', '!=', p1)],
+                                      'model_id': partner_model})
+        # search as unprivileged user
+        partners = self.partner.search(cr, uid2, [])
+        self.assertFalse(p1 in partners, "W should not be visible...")
+        self.assertTrue(p2 in partners, "... but Y should be visible")
+
+        # read as unprivileged user
+        with self.assertRaises(Exception):
+            self.partner.read(cr, uid2, [p1], ['name'])
+        # write as unprivileged user
+        with self.assertRaises(Exception):
+            self.partner.write(cr, uid2, [p1], {'name': 'foo'})
+        # unlink as unprivileged user
+        with self.assertRaises(Exception):
+            self.partner.unlink(cr, uid2, [p1])
+
+        # Prepare mixed case 
+        self.partner.unlink(cr, uid, [p2])
+        # read mixed records: some deleted and some filtered
+        with self.assertRaises(Exception):
+            self.partner.read(cr, uid2, [p1,p2], ['name'])
+        # delete mixed records: some deleted and some filtered
+        with self.assertRaises(Exception):
+            self.partner.unlink(cr, uid2, [p1,p2])
+
+
 class TestInherits(common.TransactionCase):
     """ test the behavior of the orm for models that use _inherits;
         specifically: res.users, that inherits from res.partner