[IMP] adds a test for the new eager/lazy functionality of read_group
[odoo/odoo.git] / openerp / addons / base / tests / test_orm.py
1 from collections import defaultdict
2 from openerp.tools import mute_logger
3 from openerp.tests import common
4
5 UID = common.ADMIN_USER_ID
6 DB = common.DB
7
8
9 class TestORM(common.TransactionCase):
10     """ test special behaviors of ORM CRUD functions
11     
12         TODO: use real Exceptions types instead of Exception """
13
14     def setUp(self):
15         super(TestORM, self).setUp()
16         cr, uid = self.cr, self.uid
17         self.partner = self.registry('res.partner')
18         self.users = self.registry('res.users')
19         self.p1 = self.partner.name_create(cr, uid, 'W')[0]
20         self.p2 = self.partner.name_create(cr, uid, 'Y')[0]
21         self.ir_rule = self.registry('ir.rule')
22
23         # sample unprivileged user
24         employee_gid = self.ref('base.group_user')
25         self.uid2 = self.users.create(cr, uid, {'name': 'test user', 'login': 'test', 'groups_id': [4,employee_gid]})
26
27     @mute_logger('openerp.osv.orm')
28     def testAccessDeletedRecords(self):
29         """ Verify that accessing deleted records works as expected """
30         cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
31         self.partner.unlink(cr, uid, [p1])
32
33         # read() is expected to skip deleted records because our API is not
34         # transactional for a sequence of search()->read() performed from the
35         # client-side... a concurrent deletion could therefore cause spurious
36         # exceptions even when simply opening a list view!
37         # /!\ Using unprileged user to detect former side effects of ir.rules!
38         self.assertEqual([{'id': p2, 'name': 'Y'}], self.partner.read(cr, uid2, [p1,p2], ['name']), "read() should skip deleted records")
39         self.assertEqual([], self.partner.read(cr, uid2, [p1], ['name']), "read() should skip deleted records")
40
41         # Deleting an already deleted record should be simply ignored
42         self.assertTrue(self.partner.unlink(cr, uid, [p1]), "Re-deleting should be a no-op")
43
44         # Updating an already deleted record should raise, even as admin
45         with self.assertRaises(Exception):
46             self.partner.write(cr, uid, [p1], {'name': 'foo'})
47
48     @mute_logger('openerp.osv.orm')
49     def testAccessFilteredRecords(self):
50         """ Verify that accessing filtered records works as expected for non-admin user """
51         cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
52         partner_model = self.registry('ir.model').search(cr, uid, [('model','=','res.partner')])[0]
53         self.ir_rule.create(cr, uid, {'name': 'Y is invisible',
54                                       'domain_force': [('id', '!=', p1)],
55                                       'model_id': partner_model})
56         # search as unprivileged user
57         partners = self.partner.search(cr, uid2, [])
58         self.assertFalse(p1 in partners, "W should not be visible...")
59         self.assertTrue(p2 in partners, "... but Y should be visible")
60
61         # read as unprivileged user
62         with self.assertRaises(Exception):
63             self.partner.read(cr, uid2, [p1], ['name'])
64         # write as unprivileged user
65         with self.assertRaises(Exception):
66             self.partner.write(cr, uid2, [p1], {'name': 'foo'})
67         # unlink as unprivileged user
68         with self.assertRaises(Exception):
69             self.partner.unlink(cr, uid2, [p1])
70
71         # Prepare mixed case 
72         self.partner.unlink(cr, uid, [p2])
73         # read mixed records: some deleted and some filtered
74         with self.assertRaises(Exception):
75             self.partner.read(cr, uid2, [p1,p2], ['name'])
76         # delete mixed records: some deleted and some filtered
77         with self.assertRaises(Exception):
78             self.partner.unlink(cr, uid2, [p1,p2])
79
80     @mute_logger('openerp.osv.orm')
81     def test_search_read(self):
82         # simple search_read
83         self.partner.create(self.cr, UID, {'name': 'MyPartner1'})
84         found = self.partner.search_read(self.cr, UID, [['name', '=', 'MyPartner1']], ['name'])
85         self.assertEqual(len(found), 1)
86         self.assertEqual(found[0]['name'], 'MyPartner1')
87         self.assertTrue('id' in found[0])
88
89         # search_read correct order
90         self.partner.create(self.cr, UID, {'name': 'MyPartner2'})
91         found = self.partner.search_read(self.cr, UID, [['name', 'like', 'MyPartner']], ['name'], order="name")
92         self.assertEqual(len(found), 2)
93         self.assertEqual(found[0]['name'], 'MyPartner1')
94         self.assertEqual(found[1]['name'], 'MyPartner2')
95         found = self.partner.search_read(self.cr, UID, [['name', 'like', 'MyPartner']], ['name'], order="name desc")
96         self.assertEqual(len(found), 2)
97         self.assertEqual(found[0]['name'], 'MyPartner2')
98         self.assertEqual(found[1]['name'], 'MyPartner1')
99
100         # search_read that finds nothing
101         found = self.partner.search_read(self.cr, UID, [['name', '=', 'Does not exists']], ['name'])
102         self.assertEqual(len(found), 0)
103
104     def test_groupby_date(self):
105         partners = dict(
106             A='2012-11-19',
107             B='2012-12-17',
108             C='2012-12-31',
109             D='2013-01-07',
110             E='2013-01-14',
111             F='2013-01-28',
112             G='2013-02-11',
113         )
114
115         all_partners = []
116         partners_by_day = defaultdict(set)
117         partners_by_month = defaultdict(set)
118         partners_by_year = defaultdict(set)
119
120         for name, date in partners.items():
121             p = self.partner.create(self.cr, UID, dict(name=name, date=date))
122             all_partners.append(p)
123             partners_by_day[date].add(p)
124             partners_by_month[date.rsplit('-', 1)[0]].add(p)
125             partners_by_year[date.split('-', 1)[0]].add(p)
126
127         def read_group(interval, domain=None):
128             main_domain = [('id', 'in', all_partners)]
129             if domain:
130                 domain = ['&'] + main_domain + domain
131             else:
132                 domain = main_domain
133
134             rg = self.partner.read_group(self.cr, self.uid, domain, ['date'], 'date' + ':' + interval)
135             result = {}
136             for r in rg:
137                 result[r['date:' + interval]] = set(self.partner.search(self.cr, self.uid, r['__domain']))
138             return result
139
140         self.assertEqual(len(read_group('day')), len(partners_by_day))
141         self.assertEqual(len(read_group('month')), len(partners_by_month))
142         self.assertEqual(len(read_group('year')), len(partners_by_year))
143
144         rg = self.partner.read_group(self.cr, self.uid, [('id', 'in', all_partners)], 
145                         ['date'], ['date:month', 'date:day'], lazy=False)
146         self.assertEqual(len(rg), len(all_partners))
147
148
149 class TestInherits(common.TransactionCase):
150     """ test the behavior of the orm for models that use _inherits;
151         specifically: res.users, that inherits from res.partner
152     """
153
154     def setUp(self):
155         super(TestInherits, self).setUp()
156         self.partner = self.registry('res.partner')
157         self.user = self.registry('res.users')
158
159     def test_create(self):
160         """ creating a user should automatically create a new partner """
161         partners_before = self.partner.search(self.cr, UID, [])
162         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
163         foo = self.user.browse(self.cr, UID, foo_id)
164
165         self.assertNotIn(foo.partner_id.id, partners_before)
166
167     def test_create_with_ancestor(self):
168         """ creating a user with a specific 'partner_id' should not create a new partner """
169         par_id = self.partner.create(self.cr, UID, {'name': 'Foo'})
170         partners_before = self.partner.search(self.cr, UID, [])
171         foo_id = self.user.create(self.cr, UID, {'partner_id': par_id, 'login': 'foo', 'password': 'foo'})
172         partners_after = self.partner.search(self.cr, UID, [])
173
174         self.assertEqual(set(partners_before), set(partners_after))
175
176         foo = self.user.browse(self.cr, UID, foo_id)
177         self.assertEqual(foo.name, 'Foo')
178         self.assertEqual(foo.partner_id.id, par_id)
179
180     @mute_logger('openerp.osv.orm')
181     def test_read(self):
182         """ inherited fields should be read without any indirection """
183         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
184         foo_values, = self.user.read(self.cr, UID, [foo_id])
185         partner_id = foo_values['partner_id'][0]
186         partner_values, = self.partner.read(self.cr, UID, [partner_id])
187         self.assertEqual(foo_values['name'], partner_values['name'])
188
189         foo = self.user.browse(self.cr, UID, foo_id)
190         self.assertEqual(foo.name, foo.partner_id.name)
191
192     @mute_logger('openerp.osv.orm')
193     def test_copy(self):
194         """ copying a user should automatically copy its partner, too """
195         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
196         foo_before, = self.user.read(self.cr, UID, [foo_id])
197         bar_id = self.user.copy(self.cr, UID, foo_id, {'login': 'bar', 'password': 'bar'})
198         foo_after, = self.user.read(self.cr, UID, [foo_id])
199
200         self.assertEqual(foo_before, foo_after)
201
202         foo, bar = self.user.browse(self.cr, UID, [foo_id, bar_id])
203         self.assertEqual(bar.login, 'bar')
204         self.assertNotEqual(foo.id, bar.id)
205         self.assertNotEqual(foo.partner_id.id, bar.partner_id.id)
206
207     @mute_logger('openerp.osv.orm')
208     def test_copy_with_ancestor(self):
209         """ copying a user with 'parent_id' in defaults should not duplicate the partner """
210         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
211         par_id = self.partner.create(self.cr, UID, {'name': 'Bar'})
212
213         foo_before, = self.user.read(self.cr, UID, [foo_id])
214         partners_before = self.partner.search(self.cr, UID, [])
215         bar_id = self.user.copy(self.cr, UID, foo_id, {'partner_id': par_id, 'login': 'bar'})
216         foo_after, = self.user.read(self.cr, UID, [foo_id])
217         partners_after = self.partner.search(self.cr, UID, [])
218
219         self.assertEqual(foo_before, foo_after)
220         self.assertEqual(set(partners_before), set(partners_after))
221
222         foo, bar = self.user.browse(self.cr, UID, [foo_id, bar_id])
223         self.assertNotEqual(foo.id, bar.id)
224         self.assertEqual(bar.partner_id.id, par_id)
225         self.assertEqual(bar.login, 'bar', "login is given from copy parameters")
226         self.assertEqual(bar.password, foo.password, "password is given from original record")
227         self.assertEqual(bar.name, 'Bar', "name is given from specific partner")
228
229
230
231 CREATE = lambda values: (0, False, values)
232 UPDATE = lambda id, values: (1, id, values)
233 DELETE = lambda id: (2, id, False)
234 FORGET = lambda id: (3, id, False)
235 LINK_TO = lambda id: (4, id, False)
236 DELETE_ALL = lambda: (5, False, False)
237 REPLACE_WITH = lambda ids: (6, False, ids)
238
239 def sorted_by_id(list_of_dicts):
240     "sort dictionaries by their 'id' field; useful for comparisons"
241     return sorted(list_of_dicts, key=lambda d: d.get('id'))
242
243 class TestO2MSerialization(common.TransactionCase):
244     """ test the orm method 'write' on one2many fields """
245
246     def setUp(self):
247         super(TestO2MSerialization, self).setUp()
248         self.partner = self.registry('res.partner')
249
250     def test_no_command(self):
251         " empty list of commands yields an empty list of records "
252         results = self.partner.resolve_2many_commands(
253             self.cr, UID, 'child_ids', [])
254
255         self.assertEqual(results, [])
256
257     def test_CREATE_commands(self):
258         " returns the VALUES dict as-is "
259         values = [{'foo': 'bar'}, {'foo': 'baz'}, {'foo': 'baq'}]
260         results = self.partner.resolve_2many_commands(
261             self.cr, UID, 'child_ids', map(CREATE, values))
262
263         self.assertEqual(results, values)
264
265     def test_LINK_TO_command(self):
266         " reads the records from the database, records are returned with their ids. "
267         ids = [
268             self.partner.create(self.cr, UID, {'name': 'foo'}),
269             self.partner.create(self.cr, UID, {'name': 'bar'}),
270             self.partner.create(self.cr, UID, {'name': 'baz'})
271         ]
272         commands = map(LINK_TO, ids)
273
274         results = self.partner.resolve_2many_commands(
275             self.cr, UID, 'child_ids', commands, ['name'])
276
277         self.assertEqual(sorted_by_id(results), sorted_by_id([
278             {'id': ids[0], 'name': 'foo'},
279             {'id': ids[1], 'name': 'bar'},
280             {'id': ids[2], 'name': 'baz'}
281         ]))
282
283     def test_bare_ids_command(self):
284         " same as the equivalent LINK_TO commands "
285         ids = [
286             self.partner.create(self.cr, UID, {'name': 'foo'}),
287             self.partner.create(self.cr, UID, {'name': 'bar'}),
288             self.partner.create(self.cr, UID, {'name': 'baz'})
289         ]
290
291         results = self.partner.resolve_2many_commands(
292             self.cr, UID, 'child_ids', ids, ['name'])
293
294         self.assertEqual(sorted_by_id(results), sorted_by_id([
295             {'id': ids[0], 'name': 'foo'},
296             {'id': ids[1], 'name': 'bar'},
297             {'id': ids[2], 'name': 'baz'}
298         ]))
299
300     def test_UPDATE_command(self):
301         " take the in-db records and merge the provided information in "
302         id_foo = self.partner.create(self.cr, UID, {'name': 'foo'})
303         id_bar = self.partner.create(self.cr, UID, {'name': 'bar'})
304         id_baz = self.partner.create(self.cr, UID, {'name': 'baz', 'city': 'tag'})
305
306         results = self.partner.resolve_2many_commands(
307             self.cr, UID, 'child_ids', [
308                 LINK_TO(id_foo),
309                 UPDATE(id_bar, {'name': 'qux', 'city': 'tagtag'}),
310                 UPDATE(id_baz, {'name': 'quux'})
311             ], ['name', 'city'])
312
313         self.assertEqual(sorted_by_id(results), sorted_by_id([
314             {'id': id_foo, 'name': 'foo', 'city': False},
315             {'id': id_bar, 'name': 'qux', 'city': 'tagtag'},
316             {'id': id_baz, 'name': 'quux', 'city': 'tag'}
317         ]))
318
319     def test_DELETE_command(self):
320         " deleted records are not returned at all. "
321         ids = [
322             self.partner.create(self.cr, UID, {'name': 'foo'}),
323             self.partner.create(self.cr, UID, {'name': 'bar'}),
324             self.partner.create(self.cr, UID, {'name': 'baz'})
325         ]
326         commands = [DELETE(ids[0]), DELETE(ids[1]), DELETE(ids[2])]
327
328         results = self.partner.resolve_2many_commands(
329             self.cr, UID, 'child_ids', commands, ['name'])
330
331         self.assertEqual(results, [])
332
333     def test_mixed_commands(self):
334         ids = [
335             self.partner.create(self.cr, UID, {'name': name})
336             for name in ['NObar', 'baz', 'qux', 'NOquux', 'NOcorge', 'garply']
337         ]
338
339         results = self.partner.resolve_2many_commands(
340             self.cr, UID, 'child_ids', [
341                 CREATE({'name': 'foo'}),
342                 UPDATE(ids[0], {'name': 'bar'}),
343                 LINK_TO(ids[1]),
344                 DELETE(ids[2]),
345                 UPDATE(ids[3], {'name': 'quux',}),
346                 UPDATE(ids[4], {'name': 'corge'}),
347                 CREATE({'name': 'grault'}),
348                 LINK_TO(ids[5])
349             ], ['name'])
350
351         self.assertEqual(sorted_by_id(results), sorted_by_id([
352             {'name': 'foo'},
353             {'id': ids[0], 'name': 'bar'},
354             {'id': ids[1], 'name': 'baz'},
355             {'id': ids[3], 'name': 'quux'},
356             {'id': ids[4], 'name': 'corge'},
357             {'name': 'grault'},
358             {'id': ids[5], 'name': 'garply'}
359         ]))
360
361     def test_LINK_TO_pairs(self):
362         "LINK_TO commands can be written as pairs, instead of triplets"
363         ids = [
364             self.partner.create(self.cr, UID, {'name': 'foo'}),
365             self.partner.create(self.cr, UID, {'name': 'bar'}),
366             self.partner.create(self.cr, UID, {'name': 'baz'})
367         ]
368         commands = map(lambda id: (4, id), ids)
369
370         results = self.partner.resolve_2many_commands(
371             self.cr, UID, 'child_ids', commands, ['name'])
372
373         self.assertEqual(sorted_by_id(results), sorted_by_id([
374             {'id': ids[0], 'name': 'foo'},
375             {'id': ids[1], 'name': 'bar'},
376             {'id': ids[2], 'name': 'baz'}
377         ]))
378
379     def test_singleton_commands(self):
380         "DELETE_ALL can appear as a singleton"
381         results = self.partner.resolve_2many_commands(
382             self.cr, UID, 'child_ids', [DELETE_ALL()], ['name'])
383
384         self.assertEqual(results, [])
385
386 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: