##############################################################################
import base64
-from collections import defaultdict
import dateutil
import email
-from functools import partial
import logging
import pytz
import time
_name = 'mail.thread'
_description = 'Email Thread'
_mail_flat_thread = True
+
_TRACK_TEMPLATE = """
- <span>${updated_fields}</span>
+ %if message_description:
+ <span>${message_description}</span>
+ %endif
<ul>
- %for chg in changes:
- <li><span>${chg[0]}</span>: ${chg[1]} -> ${chg[2]}</li>
+ %for name, change in tracked_values.items():
+ <li><span>${name}</span>:
+ %if change.get('old_value'):
+ ${change.get('old_value')} ->
+ %endif
+ ${change.get('new_value')}</li>
%endfor
</ul>
"""
}
#------------------------------------------------------
- # Automatic subscription when creating
+ # CRUD overrides for automatic subscription and logging
#------------------------------------------------------
- def create(self, cr, uid, vals, context=None):
+ def create(self, cr, uid, values, context=None):
""" Override to subscribe the current user. """
if context is None:
context = {}
- thread_id = super(mail_thread, self).create(cr, uid, vals, context=context)
+ thread_id = super(mail_thread, self).create(cr, uid, values, context=context)
+
+ # subscribe uid unless asked not to
if not context.get('mail_nosubscribe'):
self.message_subscribe_users(cr, uid, [thread_id], [uid], context=context)
+
+ # automatic logging
+ # self.message_post(cr, uid, thread_id, body='Document <b>created</b>.', context=context)
+
return thread_id
+ def write(self, cr, uid, ids, values, context=None):
+ tracked_fields = self._get_tracked_fields(cr, uid, values.keys(), context=context)
+ to_log = [name for name in values.keys() if name in tracked_fields]
+ if to_log:
+ initial = self.read(cr, uid, ids, [name for name, info in tracked_fields.items()], context=context)
+ initial_values = dict((item['id'], item) for item in initial)
+
+ result = super(mail_thread, self).write(cr, uid, ids, values, context=context)
+
+ if to_log:
+ self.message_track(cr, uid, ids, values.keys(), initial_values, context=context)
+ return result
+
def unlink(self, cr, uid, ids, context=None):
""" Override unlink to delete messages and followers. This cannot be
cascaded, because link is done through (res_model, res_id). """
# Automatically log tracked fields
#------------------------------------------------------
- def write(self, cr, uid, ids, values, context=None):
-
- if context is None:
- context = {}
- #import pudb;pudb.set_trace()
-
- def false_value(f):
- if f._type == 'boolean':
- return False
- return f._symbol_set[1](False)
-
- def convert_for_comparison(v, f):
- # It will convert value for comparison between current and new.
- if not v:
- return false_value(f)
- if isinstance(v, browse_record):
- return v.id
- return v
-
- tracked = dict((n, f) for n, f in self._all_columns.items() if getattr(f.column, 'tracked', False))
- to_log = [k for k in values if k in tracked]
-
- from_ = None
- changes = defaultdict(list)
- if to_log:
- for record in self.browse(cr, uid, ids, context):
- for tl in to_log:
- column = tracked[tl].column
- current = convert_for_comparison(record[tl], column)
- new = convert_for_comparison(values[tl], column)
- if new != current:
- changes[record].append(tl)
- from_ = record[tl]
-
- result = super(mail_thread, self).write(cr, uid, ids, values, context=context)
-
- updated_fields = _('Updated Fields:')
-
- Trans = self.pool['ir.translation']
- def _t(c):
- # translate field
- model = c.parent_model or self._name
- lang = context.get('lang')
- return Trans._get_source(cr, uid, '{0},{1}'.format(model, c.name), 'field', lang, ci.column.string)
-
- def get_subtype(model, record):
- # it will return subtype name(xml_id) for stage.
- record_model = self.pool[model].browse(cr, SUPERUSER_ID, record)
- if record_model.__hasattr__('subtype'):
- return record_model.subtype
- return False
-
- for record, changed_fields in changes.items():
- # TODO tpl changed_fields
- chg = []
- subtype = False
- for f in changed_fields:
- to = self.browse(cr, uid, ids[0], context)[f]
- ci = tracked[f]
- if ci.column._type == "many2one":
- if to:
- to = to.name_get()[0][1]
- else:
- to = "Removed"
- if isinstance(from_, browse_record):
- from_ = from_.name_get()[0][1]
-
- subtype = get_subtype(ci.column._obj,values[f])
- chg.append((_t(ci), from_, to))
+ def _get_tracked_fields(self, cr, uid, updated_fields, context=None):
+ """ Return a structure of tracked fields for the current model.
+ :param list updated_fields: modified field names
+ :return list: a list of (field_name, column_info obj), containing
+ always tracked fields and modified on_change fields
+ """
+ return dict((name, column_info)
+ for name, column_info in self._all_columns.items()
+ if getattr(column_info.column, '_track_visibility', False) == 2
+ or (getattr(column_info.column, '_track_visibility', False) == 1 and name in updated_fields))
- message = MakoTemplate(self._TRACK_TEMPLATE).render_unicode(updated_fields=updated_fields,
- changes=chg)
+ def message_track(self, cr, uid, ids, updated_fields, initial_values, log_message='', context=None):
+ """
+ :param list updated_fields: modified field names
+ """
+ translation_obj = self.pool.get('ir.translation')
- record.message_post(message,subtype=subtype)
+ def format_false_value(field_obj):
+ if field_obj._type == 'boolean':
+ return False
+ return field_obj._symbol_set[1](False)
+
+ def convert_for_comparison(value, field_obj):
+ if not value:
+ return format_false_value(field_obj)
+ if isinstance(value, browse_record): # compare browse record on id only
+ return value.id
+ if isinstance(value, tuple) and len(value) == 2: # name_get result
+ return value[0]
+ return value
+
+ def convert_for_display(value, field_obj):
+ if not value:
+ return format_false_value(field_obj)
+ if field_obj._type == 'many2one':
+ if isinstance(value, tuple) and len(value) == 2: # already name_get result
+ return value[1]
+ if not isinstance(value, browse_record): # value should be an ID
+ value = self.pool.get(field_obj._obj).browse(cr, SUPERUSER_ID, value, context=None)
+ return value.name_get()[0][1]
+ if field_obj._type == 'selection': # CHS/TDE TODO: translated value ?
+ select_value = filter(lambda item: item[0] == value, field_obj.selection)
+ return select_value[0][1]
+ return value
+
+ def translate_field(column_info):
+ model = column_info.parent_model or self._name
+ return translation_obj._get_source(cr, uid, '{0},{1}'.format(model, column_info.name), 'field', context.get('lang'), column_info.column.string)
+
+ tracked_fields = self._get_tracked_fields(cr, uid, updated_fields, context=context)
+ to_log = [name for name in updated_fields if name in tracked_fields]
+ if not to_log:
+ return True
+
+ # browse with SUPERUSER_ID to avoid rights issues (i.e. tracking res.partner relational field -> name_get result should always be visible)
+ for record in self.browse(cr, SUPERUSER_ID, ids, context=context):
+ tracked_values = {}
+ default_log = True
+ changes_found = False
+ initial = initial_values[record.id]
+
+ # generate tracked_values data structure: {'col_name': {col_info, new_value, old_value}}
+ for col_name, col_info in tracked_fields.items():
+ old_value = convert_for_comparison(initial[col_name], col_info.column)
+ new_value = convert_for_comparison(record[col_name], col_info.column)
+ if old_value == new_value and col_info.column._track_visibility == 2:
+ tracked_values[col_name] = dict(col_info=col_info, new_value=convert_for_display(record[col_name], col_info.column))
+ elif old_value != new_value:
+ tracked_values[col_name] = dict(col_info=col_info, old_value=convert_for_display(initial[col_name], col_info.column), new_value=convert_for_display(record[col_name], col_info.column))
+ changes_found = True
+ if not changes_found:
+ continue
+
+ # find subtypes and post messages or log if no subtype found
+ subtypes = set([subtype for field, track_info in self._track.items() if field in to_log
+ for subtype, method in track_info.items() if method(self, cr, uid, record, context)])
+ for subtype in subtypes:
+ subtype_data = subtype.split('.')
+ subtype_ref = self.pool.get('ir.model.data').get_object_reference(cr, uid, subtype_data[0], subtype_data[1])
+ if not subtype_ref:
+ continue
+ subtype_rec = self.pool.get('mail.message.subtype').browse(cr, uid, subtype_ref[1], context=context)
+ message = MakoTemplate(self._TRACK_TEMPLATE).render_unicode(message_description=subtype_rec.description, tracked_values=tracked_values)
+ self.message_post(cr, uid, record.id, body=message, subtype=subtype, context=context)
+ default_log = False
+ if default_log:
+ message = MakoTemplate(self._TRACK_TEMPLATE).render_unicode(message_description=log_message, tracked_values=tracked_values)
+ self.message_post(cr, uid, record.id, body=message, context=context)
- return result
+ return True
#------------------------------------------------------
# mail.message wrappers and tools
na_demo_group = self.mail_message._needaction_count(cr, user_raoul.id, domain=[('model', '=', 'mail.group'), ('res_id', '=', self.group_pigs_id)])
self.assertEqual(na_demo, na_demo_base + 0, 'Demo should have 0 new needaction')
self.assertEqual(na_demo_group, 0, 'Demo should have 0 needaction related to Pigs')
-
- def test_40_track_field(self):
- """ Test designed for tracking field. """
-
- cr, uid, user_admin, user_raoul, group_pigs = self.cr, self.uid, self.user_admin, self.user_raoul, self.group_pigs
- partner_user_id = self.res_partner._columns.get('user_id')
-
- # Set tracked attribute as True for tracked field.
- partner_user_id.tracked = True
- p_d_id = self.res_partner.create(cr, uid, {'name': 'Osbert Armour', 'notification_email_send': 'all'})
-
- #Set Raoul as sales person.
- self.res_partner.write(cr, uid, [p_d_id], {'user_id' : user_raoul.id})
- mail_ids = self.mail_message.search(cr, uid, [('model', '=', 'res.partner'),('res_id','=',p_d_id)])
- #Test: tracked record logged in openchatter
- self.assertEqual(1,len(mail_ids), ' After change in field logged in openchatter.')
-
- # Set tracked attribute as False for tracked field.
- partner_user_id.tracked = False
- p_a_id = self.res_partner.create(cr, uid, {'name': 'Timmy Simons', 'notification_email_send': 'all'})
+ def test_40_track_field(self):
+ """ Testing auto tracking of fields. """
+ def _strip_string_spaces(body):
+ return body.replace(' ', '').replace('\n', '')
- #Set Admin as sales person.
- self.res_partner.write(cr, uid, [p_a_id], {'user_id' : user_admin.id})
- mail_a_ids = self.mail_message.search(cr, uid, [('model', '=', 'res.partner'),('res_id','=',p_a_id)])
+ cr, uid = self.cr, self.uid
- #Test: tracked record logged in openchatter.
- self.assertEqual(len(mail_a_ids),0,' No more logged openchatter.')
+ # Data: subscribe Raoul to Pigs, because he will change the public attribute and may loose access to the record
+ self.mail_group.message_subscribe_users(cr, uid, [self.group_pigs_id], [self.user_raoul_id])
+
+ # Data: res.users.group, to test group_public_id automatic logging
+ group_system_ref = self.registry('ir.model.data').get_object_reference(cr, uid, 'base', 'group_system')
+ group_system_id = group_system_ref and group_system_ref[1] or False
+
+ # Data: custom subtypes
+ mt_private_id = self.mail_message_subtype.create(cr, uid, {'name': 'private', 'description': 'Private public'})
+ self.ir_model_data.create(cr, uid, {'name': 'mt_private', 'model': 'mail.group', 'module': 'mail', 'res_id': mt_private_id})
+ mt_name_supername_id = self.mail_message_subtype.create(cr, uid, {'name': 'name_supername', 'description': 'Supername name'})
+ self.ir_model_data.create(cr, uid, {'name': 'mt_name_supername', 'model': 'mail.group', 'module': 'mail', 'res_id': mt_name_supername_id})
+ mt_group_public_id = self.mail_message_subtype.create(cr, uid, {'name': 'group_public', 'description': 'Group changed'})
+ self.ir_model_data.create(cr, uid, {'name': 'mt_group_public', 'model': 'mail.group', 'module': 'mail', 'res_id': mt_group_public_id})
+
+ # Data: alter mail_group model for testing purposes (test on classic, selection and many2one fields)
+ self.mail_group._track = {
+ 'public': {
+ 'mail.mt_private': lambda self, cr, uid, obj, ctx=None: obj.public == 'private',
+ },
+ 'name': {
+ 'mail.mt_name_supername': lambda self, cr, uid, obj, ctx=None: obj.name == 'supername',
+ },
+ 'group_public_id': {
+ 'mail.mt_group_public': lambda self, cr, uid, obj, ctx=None: True,
+ },
+ }
+ public_col = self.mail_group._columns.get('public')
+ name_col = self.mail_group._columns.get('name')
+ group_public_col = self.mail_group._columns.get('group_public_id')
+ public_col._track_visibility = 1
+ name_col._track_visibility = 2
+ group_public_col._track_visibility = 1
+
+ # Test: change name -> always tracked, not related to a subtype
+ self.mail_group.write(cr, self.user_raoul_id, [self.group_pigs_id], {'public': 'public'})
+ self.group_pigs.refresh()
+ self.assertEqual(len(self.group_pigs.message_ids), 1, 'tracked: a message should have been produced')
+ # Test: first produced message: no subtype, name change tracked
+ last_msg = self.group_pigs.message_ids[-1]
+ self.assertFalse(last_msg.subtype_id, 'tracked: message should not have been linked to a subtype')
+ self.assertIn('SelectedGroupOnly->Public', _strip_string_spaces(last_msg.body), 'tracked: message body incorrect')
+ self.assertIn('Pigs', _strip_string_spaces(last_msg.body), 'tracked: message body does not hold always tracked field')
+
+ # Test: change name as supername, public as private -> 2 subtypes
+ self.mail_group.write(cr, self.user_raoul_id, [self.group_pigs_id], {'name': 'supername', 'public': 'private'})
+ self.group_pigs.refresh()
+ self.assertEqual(len(self.group_pigs.message_ids), 3, 'tracked: two messages should have been produced')
+ # Test: first produced message: mt_name_supername
+ last_msg = self.group_pigs.message_ids[-2]
+ self.assertEqual(last_msg.subtype_id.id, mt_private_id, 'tracked: message should be linked to mt_private subtype')
+ self.assertIn('Private public', last_msg.body, 'tracked: message body does not hold the subtype description')
+ self.assertIn('Pigs->supername', _strip_string_spaces(last_msg.body), 'tracked: message body incorrect')
+ # Test: second produced message: mt_name_supername
+ last_msg = self.group_pigs.message_ids[-3]
+ self.assertEqual(last_msg.subtype_id.id, mt_name_supername_id, 'tracked: message should be linked to mt_name_supername subtype')
+ self.assertIn('Supername name', last_msg.body, 'tracked: message body does not hold the subtype description')
+ self.assertIn('Public->Private', _strip_string_spaces(last_msg.body), 'tracked: message body incorrect')
+ self.assertIn('Pigs->supername', _strip_string_spaces(last_msg.body), 'tracked feature: message body does not hold always tracked field')
+
+ # Test: change public as public, group_public_id -> 1 subtype, name always tracked
+ self.mail_group.write(cr, self.user_raoul_id, [self.group_pigs_id], {'public': 'public', 'group_public_id': group_system_id})
+ self.group_pigs.refresh()
+ self.assertEqual(len(self.group_pigs.message_ids), 4, 'tracked: one message should have been produced')
+ # Test: first produced message: mt_group_public_id, with name always tracked, public tracked on change
+ last_msg = self.group_pigs.message_ids[-4]
+ self.assertEqual(last_msg.subtype_id.id, mt_group_public_id, 'tracked: message should not be linked to any subtype')
+ self.assertIn('Group changed', last_msg.body, 'tracked: message body does not hold the subtype description')
+ self.assertIn('Private->Public', _strip_string_spaces(last_msg.body), 'tracked: message body does not hold changed tracked field')
+ self.assertIn('HumanResources/Employee->Administration/Settings', _strip_string_spaces(last_msg.body), 'tracked: message body does not hold always tracked field')
+
+ # Test: change not tracked field, no tracking message
+ self.mail_group.write(cr, self.user_raoul_id, [self.group_pigs_id], {'description': 'Dummy'})
+ self.group_pigs.refresh()
+ self.assertEqual(len(self.group_pigs.message_ids), 4, 'tracked: No message should have been produced')
+
+ # Data: removed changes
+ public_col._track_visibility = False
+ name_col._track_visibility = False
+ group_public_col._track_visibility = False
+ self.mail_group._track = {}