[REF] Refactoring according to the review of CHS
[odoo/odoo.git] / openerp / tests / test_orm.py
1 from collections import defaultdict
2 from openerp.tools import mute_logger
3 import common
4
5 UID = common.ADMIN_USER_ID
6 DB = common.DB
7
8
9 class TestORM(common.TransactionCase):
10     """ test special behaviors of ORM CRUD functions
11     
12         TODO: use real Exceptions types instead of Exception """
13
14     def setUp(self):
15         super(TestORM, self).setUp()
16         cr, uid = self.cr, self.uid
17         self.partner = self.registry('res.partner')
18         self.users = self.registry('res.users')
19         self.p1 = self.partner.name_create(cr, uid, 'W')[0]
20         self.p2 = self.partner.name_create(cr, uid, 'Y')[0]
21         self.ir_rule = self.registry('ir.rule')
22
23         # sample unprivileged user
24         employee_gid = self.ref('base.group_user')
25         self.uid2 = self.users.create(cr, uid, {'name': 'test user', 'login': 'test', 'groups_id': [4,employee_gid]})
26
27     @mute_logger('openerp.osv.orm')
28     def testAccessDeletedRecords(self):
29         """ Verify that accessing deleted records works as expected """
30         cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
31         self.partner.unlink(cr, uid, [p1])
32
33         # read() is expected to skip deleted records because our API is not
34         # transactional for a sequence of search()->read() performed from the
35         # client-side... a concurrent deletion could therefore cause spurious
36         # exceptions even when simply opening a list view!
37         # /!\ Using unprileged user to detect former side effects of ir.rules!
38         self.assertEqual([{'id': p2, 'name': 'Y'}], self.partner.read(cr, uid2, [p1,p2], ['name']), "read() should skip deleted records")
39         self.assertEqual([], self.partner.read(cr, uid2, [p1], ['name']), "read() should skip deleted records")
40
41         # Deleting an already deleted record should be simply ignored
42         self.assertTrue(self.partner.unlink(cr, uid, [p1]), "Re-deleting should be a no-op")
43
44         # Updating an already deleted record should raise, even as admin
45         with self.assertRaises(Exception):
46             self.partner.write(cr, uid, [p1], {'name': 'foo'})
47
48     @mute_logger('openerp.osv.orm')
49     def testAccessFilteredRecords(self):
50         """ Verify that accessing filtered records works as expected for non-admin user """
51         cr, uid, uid2, p1, p2 = self.cr, self.uid, self.uid2, self.p1, self.p2
52         partner_model = self.registry('ir.model').search(cr, uid, [('model','=','res.partner')])[0]
53         self.ir_rule.create(cr, uid, {'name': 'Y is invisible',
54                                       'domain_force': [('id', '!=', p1)],
55                                       'model_id': partner_model})
56         # search as unprivileged user
57         partners = self.partner.search(cr, uid2, [])
58         self.assertFalse(p1 in partners, "W should not be visible...")
59         self.assertTrue(p2 in partners, "... but Y should be visible")
60
61         # read as unprivileged user
62         with self.assertRaises(Exception):
63             self.partner.read(cr, uid2, [p1], ['name'])
64         # write as unprivileged user
65         with self.assertRaises(Exception):
66             self.partner.write(cr, uid2, [p1], {'name': 'foo'})
67         # unlink as unprivileged user
68         with self.assertRaises(Exception):
69             self.partner.unlink(cr, uid2, [p1])
70
71         # Prepare mixed case 
72         self.partner.unlink(cr, uid, [p2])
73         # read mixed records: some deleted and some filtered
74         with self.assertRaises(Exception):
75             self.partner.read(cr, uid2, [p1,p2], ['name'])
76         # delete mixed records: some deleted and some filtered
77         with self.assertRaises(Exception):
78             self.partner.unlink(cr, uid2, [p1,p2])
79
80     @mute_logger('openerp.osv.orm')
81     def test_search_read(self):
82         # simple search_read
83         self.partner.create(self.cr, UID, {'name': 'MyPartner1'})
84         found = self.partner.search_read(self.cr, UID, [['name', '=', 'MyPartner1']], ['name'])
85         self.assertEqual(len(found), 1)
86         self.assertEqual(found[0]['name'], 'MyPartner1')
87         self.assertTrue('id' in found[0])
88
89         # search_read correct order
90         self.partner.create(self.cr, UID, {'name': 'MyPartner2'})
91         found = self.partner.search_read(self.cr, UID, [['name', 'like', 'MyPartner']], ['name'], order="name")
92         self.assertEqual(len(found), 2)
93         self.assertEqual(found[0]['name'], 'MyPartner1')
94         self.assertEqual(found[1]['name'], 'MyPartner2')
95         found = self.partner.search_read(self.cr, UID, [['name', 'like', 'MyPartner']], ['name'], order="name desc")
96         self.assertEqual(len(found), 2)
97         self.assertEqual(found[0]['name'], 'MyPartner2')
98         self.assertEqual(found[1]['name'], 'MyPartner1')
99
100         # search_read that finds nothing
101         found = self.partner.search_read(self.cr, UID, [['name', '=', 'Does not exists']], ['name'])
102         self.assertEqual(len(found), 0)
103
104     def test_groupby_date(self):
105         partners = dict(
106             A='2012-11-19',
107             B='2012-12-17',
108             C='2012-12-31',
109             D='2013-01-07',
110             E='2013-01-14',
111             F='2013-01-28',
112             G='2013-02-11',
113         )
114
115         all_partners = []
116         partners_by_day = defaultdict(set)
117         partners_by_month = defaultdict(set)
118         partners_by_year = defaultdict(set)
119
120         for name, date in partners.items():
121             p = self.partner.create(self.cr, UID, dict(name=name, date=date))
122             all_partners.append(p)
123             partners_by_day[date].add(p)
124             partners_by_month[date.rsplit('-', 1)[0]].add(p)
125             partners_by_year[date.split('-', 1)[0]].add(p)
126
127         def read_group(interval, domain=None):
128             main_domain = [('id', 'in', all_partners)]
129             if domain:
130                 domain = ['&'] + main_domain + domain
131             else:
132                 domain = main_domain
133
134             display_format, groupby_format = {
135                 'year': ('YYYY', 'yyyy'),
136                 'month': ('YYYY-MM', 'yyyy-mm'),
137                 'day': ('yyyy-MM-dd', 'yyyy-mm-dd'),
138             }[interval]
139
140             datetime_format = {
141                 'date': dict(interval=interval, display_format=display_format, groupby_format=groupby_format)
142             }
143             context = dict(datetime_format=datetime_format)
144
145             rg = self.partner.read_group(self.cr, self.uid, domain, ['date'], 'date', context=context)
146             result = {}
147             for r in rg:
148                 result[r['date']] = set(self.partner.search(self.cr, self.uid, r['__domain']))
149             return result
150
151         self.assertDictEqual(read_group('day'), partners_by_day)
152         self.assertDictEqual(read_group('month'), partners_by_month)
153         self.assertDictEqual(read_group('year'), partners_by_year)
154
155
156 class TestInherits(common.TransactionCase):
157     """ test the behavior of the orm for models that use _inherits;
158         specifically: res.users, that inherits from res.partner
159     """
160
161     def setUp(self):
162         super(TestInherits, self).setUp()
163         self.partner = self.registry('res.partner')
164         self.user = self.registry('res.users')
165
166     def test_create(self):
167         """ creating a user should automatically create a new partner """
168         partners_before = self.partner.search(self.cr, UID, [])
169         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
170         foo = self.user.browse(self.cr, UID, foo_id)
171
172         self.assertNotIn(foo.partner_id.id, partners_before)
173
174     def test_create_with_ancestor(self):
175         """ creating a user with a specific 'partner_id' should not create a new partner """
176         par_id = self.partner.create(self.cr, UID, {'name': 'Foo'})
177         partners_before = self.partner.search(self.cr, UID, [])
178         foo_id = self.user.create(self.cr, UID, {'partner_id': par_id, 'login': 'foo', 'password': 'foo'})
179         partners_after = self.partner.search(self.cr, UID, [])
180
181         self.assertEqual(set(partners_before), set(partners_after))
182
183         foo = self.user.browse(self.cr, UID, foo_id)
184         self.assertEqual(foo.name, 'Foo')
185         self.assertEqual(foo.partner_id.id, par_id)
186
187     @mute_logger('openerp.osv.orm')
188     def test_read(self):
189         """ inherited fields should be read without any indirection """
190         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
191         foo_values, = self.user.read(self.cr, UID, [foo_id])
192         partner_id = foo_values['partner_id'][0]
193         partner_values, = self.partner.read(self.cr, UID, [partner_id])
194         self.assertEqual(foo_values['name'], partner_values['name'])
195
196         foo = self.user.browse(self.cr, UID, foo_id)
197         self.assertEqual(foo.name, foo.partner_id.name)
198
199     @mute_logger('openerp.osv.orm')
200     def test_copy(self):
201         """ copying a user should automatically copy its partner, too """
202         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
203         foo_before, = self.user.read(self.cr, UID, [foo_id])
204         bar_id = self.user.copy(self.cr, UID, foo_id, {'login': 'bar', 'password': 'bar'})
205         foo_after, = self.user.read(self.cr, UID, [foo_id])
206
207         self.assertEqual(foo_before, foo_after)
208
209         foo, bar = self.user.browse(self.cr, UID, [foo_id, bar_id])
210         self.assertEqual(bar.login, 'bar')
211         self.assertNotEqual(foo.id, bar.id)
212         self.assertNotEqual(foo.partner_id.id, bar.partner_id.id)
213
214     @mute_logger('openerp.osv.orm')
215     def test_copy_with_ancestor(self):
216         """ copying a user with 'parent_id' in defaults should not duplicate the partner """
217         foo_id = self.user.create(self.cr, UID, {'name': 'Foo', 'login': 'foo', 'password': 'foo'})
218         par_id = self.partner.create(self.cr, UID, {'name': 'Bar'})
219
220         foo_before, = self.user.read(self.cr, UID, [foo_id])
221         partners_before = self.partner.search(self.cr, UID, [])
222         bar_id = self.user.copy(self.cr, UID, foo_id, {'partner_id': par_id, 'login': 'bar'})
223         foo_after, = self.user.read(self.cr, UID, [foo_id])
224         partners_after = self.partner.search(self.cr, UID, [])
225
226         self.assertEqual(foo_before, foo_after)
227         self.assertEqual(set(partners_before), set(partners_after))
228
229         foo, bar = self.user.browse(self.cr, UID, [foo_id, bar_id])
230         self.assertNotEqual(foo.id, bar.id)
231         self.assertEqual(bar.partner_id.id, par_id)
232         self.assertEqual(bar.login, 'bar', "login is given from copy parameters")
233         self.assertEqual(bar.password, foo.password, "password is given from original record")
234         self.assertEqual(bar.name, 'Bar', "name is given from specific partner")
235
236
237
238 CREATE = lambda values: (0, False, values)
239 UPDATE = lambda id, values: (1, id, values)
240 DELETE = lambda id: (2, id, False)
241 FORGET = lambda id: (3, id, False)
242 LINK_TO = lambda id: (4, id, False)
243 DELETE_ALL = lambda: (5, False, False)
244 REPLACE_WITH = lambda ids: (6, False, ids)
245
246 def sorted_by_id(list_of_dicts):
247     "sort dictionaries by their 'id' field; useful for comparisons"
248     return sorted(list_of_dicts, key=lambda d: d.get('id'))
249
250 class TestO2MSerialization(common.TransactionCase):
251     """ test the orm method 'write' on one2many fields """
252
253     def setUp(self):
254         super(TestO2MSerialization, self).setUp()
255         self.partner = self.registry('res.partner')
256
257     def test_no_command(self):
258         " empty list of commands yields an empty list of records "
259         results = self.partner.resolve_2many_commands(
260             self.cr, UID, 'child_ids', [])
261
262         self.assertEqual(results, [])
263
264     def test_CREATE_commands(self):
265         " returns the VALUES dict as-is "
266         values = [{'foo': 'bar'}, {'foo': 'baz'}, {'foo': 'baq'}]
267         results = self.partner.resolve_2many_commands(
268             self.cr, UID, 'child_ids', map(CREATE, values))
269
270         self.assertEqual(results, values)
271
272     def test_LINK_TO_command(self):
273         " reads the records from the database, records are returned with their ids. "
274         ids = [
275             self.partner.create(self.cr, UID, {'name': 'foo'}),
276             self.partner.create(self.cr, UID, {'name': 'bar'}),
277             self.partner.create(self.cr, UID, {'name': 'baz'})
278         ]
279         commands = map(LINK_TO, ids)
280
281         results = self.partner.resolve_2many_commands(
282             self.cr, UID, 'child_ids', commands, ['name'])
283
284         self.assertEqual(sorted_by_id(results), sorted_by_id([
285             {'id': ids[0], 'name': 'foo'},
286             {'id': ids[1], 'name': 'bar'},
287             {'id': ids[2], 'name': 'baz'}
288         ]))
289
290     def test_bare_ids_command(self):
291         " same as the equivalent LINK_TO commands "
292         ids = [
293             self.partner.create(self.cr, UID, {'name': 'foo'}),
294             self.partner.create(self.cr, UID, {'name': 'bar'}),
295             self.partner.create(self.cr, UID, {'name': 'baz'})
296         ]
297
298         results = self.partner.resolve_2many_commands(
299             self.cr, UID, 'child_ids', ids, ['name'])
300
301         self.assertEqual(sorted_by_id(results), sorted_by_id([
302             {'id': ids[0], 'name': 'foo'},
303             {'id': ids[1], 'name': 'bar'},
304             {'id': ids[2], 'name': 'baz'}
305         ]))
306
307     def test_UPDATE_command(self):
308         " take the in-db records and merge the provided information in "
309         id_foo = self.partner.create(self.cr, UID, {'name': 'foo'})
310         id_bar = self.partner.create(self.cr, UID, {'name': 'bar'})
311         id_baz = self.partner.create(self.cr, UID, {'name': 'baz', 'city': 'tag'})
312
313         results = self.partner.resolve_2many_commands(
314             self.cr, UID, 'child_ids', [
315                 LINK_TO(id_foo),
316                 UPDATE(id_bar, {'name': 'qux', 'city': 'tagtag'}),
317                 UPDATE(id_baz, {'name': 'quux'})
318             ], ['name', 'city'])
319
320         self.assertEqual(sorted_by_id(results), sorted_by_id([
321             {'id': id_foo, 'name': 'foo', 'city': False},
322             {'id': id_bar, 'name': 'qux', 'city': 'tagtag'},
323             {'id': id_baz, 'name': 'quux', 'city': 'tag'}
324         ]))
325
326     def test_DELETE_command(self):
327         " deleted records are not returned at all. "
328         ids = [
329             self.partner.create(self.cr, UID, {'name': 'foo'}),
330             self.partner.create(self.cr, UID, {'name': 'bar'}),
331             self.partner.create(self.cr, UID, {'name': 'baz'})
332         ]
333         commands = [DELETE(ids[0]), DELETE(ids[1]), DELETE(ids[2])]
334
335         results = self.partner.resolve_2many_commands(
336             self.cr, UID, 'child_ids', commands, ['name'])
337
338         self.assertEqual(results, [])
339
340     def test_mixed_commands(self):
341         ids = [
342             self.partner.create(self.cr, UID, {'name': name})
343             for name in ['NObar', 'baz', 'qux', 'NOquux', 'NOcorge', 'garply']
344         ]
345
346         results = self.partner.resolve_2many_commands(
347             self.cr, UID, 'child_ids', [
348                 CREATE({'name': 'foo'}),
349                 UPDATE(ids[0], {'name': 'bar'}),
350                 LINK_TO(ids[1]),
351                 DELETE(ids[2]),
352                 UPDATE(ids[3], {'name': 'quux',}),
353                 UPDATE(ids[4], {'name': 'corge'}),
354                 CREATE({'name': 'grault'}),
355                 LINK_TO(ids[5])
356             ], ['name'])
357
358         self.assertEqual(sorted_by_id(results), sorted_by_id([
359             {'name': 'foo'},
360             {'id': ids[0], 'name': 'bar'},
361             {'id': ids[1], 'name': 'baz'},
362             {'id': ids[3], 'name': 'quux'},
363             {'id': ids[4], 'name': 'corge'},
364             {'name': 'grault'},
365             {'id': ids[5], 'name': 'garply'}
366         ]))
367
368     def test_LINK_TO_pairs(self):
369         "LINK_TO commands can be written as pairs, instead of triplets"
370         ids = [
371             self.partner.create(self.cr, UID, {'name': 'foo'}),
372             self.partner.create(self.cr, UID, {'name': 'bar'}),
373             self.partner.create(self.cr, UID, {'name': 'baz'})
374         ]
375         commands = map(lambda id: (4, id), ids)
376
377         results = self.partner.resolve_2many_commands(
378             self.cr, UID, 'child_ids', commands, ['name'])
379
380         self.assertEqual(sorted_by_id(results), sorted_by_id([
381             {'id': ids[0], 'name': 'foo'},
382             {'id': ids[1], 'name': 'bar'},
383             {'id': ids[2], 'name': 'baz'}
384         ]))
385
386     def test_singleton_commands(self):
387         "DELETE_ALL can appear as a singleton"
388         results = self.partner.resolve_2many_commands(
389             self.cr, UID, 'child_ids', [DELETE_ALL()], ['name'])
390
391         self.assertEqual(results, [])
392
393 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: