def _auto_init(self, cr, context=None):
"""Installation hook to create aliases for all jobs and avoid constraint errors."""
- res = self.pool.get('mail.alias').migrate_to_alias(cr, self._name, self._table, super(hr_job, self)._auto_init,
- if context is None:
- context = {}
- alias_context = dict(context, alias_model_name='hr.applicant')
- res = self.pool.get('mail.alias').migrate_to_alias(cr, self._name, self._table, super(hr_job, self)._auto_init,
- self._columns['alias_id'], 'name', alias_prefix='job+', alias_defaults={'job_id': 'id'}, context=alias_context)
- return res
++ return self.pool.get('mail.alias').migrate_to_alias(cr, self._name, self._table, super(hr_job, self)._auto_init,
+ 'hr.applicant', self._columns['alias_id'], 'name', alias_prefix='job+', alias_defaults={'job_id': 'id'}, context=context)
- return res
def create(self, cr, uid, vals, context=None):
- create_context = dict(context, alias_model_name='hr.applicant', alias_parent_model_name=self._name)
- job_id = super(hr_job, self).create(cr, uid, vals, context=create_context)
- mail_alias = self.pool.get('mail.alias')
- if not vals.get('alias_id'):
- vals.pop('alias_name', None) # prevent errors during copy()
- alias_id = mail_alias.create_unique_alias(cr, uid,
- # Using '+' allows using subaddressing for those who don't
- # have a catchall domain setup.
- {'alias_name': 'jobs+'+vals['name']},
- model_name="hr.applicant",
- context=context)
- vals['alias_id'] = alias_id
- res = super(hr_job, self).create(cr, uid, vals, context)
- mail_alias.write(cr, uid, [vals['alias_id']], {"alias_defaults": {'job_id': res}}, context)
- return res
++ alias_context = dict(context, alias_model_name='hr.applicant', alias_parent_model_name=self._name)
++ job_id = super(hr_job, self).create(cr, uid, vals, context=alias_context)
+ job = self.browse(cr, uid, job_id, context=context)
+ self.pool.get('mail.alias').write(cr, uid, [job.alias_id.id], {'alias_parent_thread_id': job_id, "alias_defaults": {'job_id': job_id}}, context)
+ return job_id
def unlink(self, cr, uid, ids, context=None):
# Cascade-delete mail aliases as well, as they should not exist without the job position.
self._message_add_suggested_recipient(cr, uid, result, obj, partner=obj.user_id.partner_id, reason=self._all_columns['user_id'].column.string, context=context)
return result
- def message_get_partner_info_from_emails(self, cr, uid, emails, link_mail=False, context=None, res_id=None):
- """ Wrapper with weird order parameter because of 7.0 fix.
++ def _find_partner_from_emails(self, cr, uid, id, emails, context=None,
++ check_followers=True, only_users=False, take_all=False, full_info=False):
++ """ Utility method to find partners
+
- TDE TODO: remove me in 8.0 """
- return self.message_find_partner_from_emails(cr, uid, res_id, emails, link_mail=link_mail, context=context)
++ :param boolean check_followers: TODO
++ :param boolean only_users: TODO
++ :param boolean take_all: TODO
++ """
++ partner_obj = self.pool['res.partner']
++ partner_ids = []
++ obj = None
++ if id and self._name != 'mail.thread' and check_followers:
++ obj = self.browse(cr, uid, id, context=context)
++ for contact in emails:
++ partner_id = False
++ email_address = tools.email_split(contact)
++ if not email_address:
++ partner_ids.append(partner_id)
++ continue
++ email_address = email_address[0]
++ # first try: check in document's followers
++ if obj:
++ for follower in obj.message_follower_ids:
++ if follower.email == email_address:
++ partner_id = follower.id
++ # second try: check in partners that are also users
++ if not partner_id:
++ ids = partner_obj.search(cr, SUPERUSER_ID, [
++ ('email', 'ilike', email_address),
++ ('user_ids', '!=', False)
++ ], limit=1, context=context)
++ if ids:
++ partner_id = ids[0]
++ # third try: check in partners
++ if not partner_id:
++ ids = partner_obj.search(cr, SUPERUSER_ID, [
++ ('email', 'ilike', email_address)
++ ], limit=1, context=context)
++ if ids:
++ partner_id = ids[0]
++ partner_ids.append(partner_id)
++ return partner_ids
+
- def message_find_partner_from_emails(self, cr, uid, id, emails, link_mail=False, context=None):
++ def message_partner_info_from_emails(self, cr, uid, id, emails, link_mail=False, context=None):
+ """ Convert a list of emails into a list partner_ids and a list
+ new_partner_ids. The return value is non conventional because
+ it is meant to be used by the mail widget.
+
+ :return dict: partner_ids and new_partner_ids
+
+ TDE TODO: merge me with other partner finding methods in 8.0 """
+ mail_message_obj = self.pool.get('mail.message')
- partner_obj = self.pool.get('res.partner')
++ partner_ids = self._find_partner_from_emails(cr, uid, id, emails, context=context)
+ result = list()
- if id and self._name != 'mail.thread':
- obj = self.browse(cr, SUPERUSER_ID, id, context=context)
- else:
- obj = None
- for email in emails:
- partner_info = {'full_name': email, 'partner_id': False}
- m = re.search(r"((.+?)\s*<)?([^<>]+@[^<>]+)>?", email, re.IGNORECASE | re.DOTALL)
- if not m:
- continue
- email_address = m.group(3)
- # first try: check in document's followers
- if obj:
- for follower in obj.message_follower_ids:
- if follower.email == email_address:
- partner_info['partner_id'] = follower.id
- # second try: check in partners
- if not partner_info.get('partner_id'):
- ids = partner_obj.search(cr, SUPERUSER_ID, [('email', 'ilike', email_address), ('user_ids', '!=', False)], limit=1, context=context)
- if not ids:
- ids = partner_obj.search(cr, SUPERUSER_ID, [('email', 'ilike', email_address)], limit=1, context=context)
- if ids:
- partner_info['partner_id'] = ids[0]
++ for idx in range(len(emails)):
++ email_address = emails[idx]
++ partner_id = partner_ids[idx]
++ partner_info = {'full_name': email_address, 'partner_id': partner_id}
+ result.append(partner_info)
+
+ # link mail with this from mail to the new partner id
+ if link_mail and partner_info['partner_id']:
+ message_ids = mail_message_obj.search(cr, SUPERUSER_ID, [
+ '|',
- ('email_from', '=', email),
- ('email_from', 'ilike', '<%s>' % email),
++ ('email_from', '=', email_address),
++ ('email_from', 'ilike', '<%s>' % email_address),
+ ('author_id', '=', False)
+ ], context=context)
+ if message_ids:
+ mail_message_obj.write(cr, SUPERUSER_ID, message_ids, {'author_id': partner_info['partner_id']}, context=context)
+ return result
+
def message_post(self, cr, uid, thread_id, body='', subject=None, type='notification',
subtype=None, parent_id=False, attachments=None, context=None,
content_subtype='html', **kwargs):
def _auto_init(self, cr, context=None):
""" Installation hook: aliases, partner following themselves """
# create aliases for all users and avoid constraint errors
-- res = self.pool.get('mail.alias').migrate_to_alias(cr, self._name, self._table, super(res_users, self)._auto_init,
- self._columns['alias_id'], 'login', alias_force_key='id', context=context)
- return res
++ return self.pool.get('mail.alias').migrate_to_alias(cr, self._name, self._table, super(res_users, self)._auto_init,
+ self._name, self._columns['alias_id'], 'login', alias_force_key='id', context=context)
- return res
def create(self, cr, uid, data, context=None):
- # create default alias same as the login
if not data.get('login', False):
raise osv.except_osv(_('Invalid Action!'), _('You may not create a user. To create new users, you should use the "Settings > Users" menu.'))
+ if context is None:
+ context = {}
- mail_alias = self.pool.get('mail.alias')
- alias_id = mail_alias.create_unique_alias(cr, uid, {'alias_name': data['login']}, model_name=self._name, context=context)
- data['alias_id'] = alias_id
- data.pop('alias_name', None) # prevent errors during copy()
-
- # create user
- user_id = super(res_users, self).create(cr, uid, data, context=context)
+ create_context = dict(context, alias_model_name=self._name, alias_parent_model_name=self._name)
+ user_id = super(res_users, self).create(cr, uid, data, context=create_context)
user = self.browse(cr, uid, user_id, context=context)
- # alias
- mail_alias.write(cr, SUPERUSER_ID, [alias_id], {"alias_force_thread_id": user_id}, context)
+ self.pool.get('mail.alias').write(cr, SUPERUSER_ID, [user.alias_id.id], {"alias_force_thread_id": user_id, "alias_parent_thread_id": user_id}, context)
+
# create a welcome message
self._create_welcome_message(cr, uid, user, context=context)
return user_id
# Do: find partner in group where there is a follower with the email -> should be taken
self.mail_group.message_subscribe(cr, uid, [group_pigs.id], [p_b_id])
- partner_info = self.mail_group.message_find_partner_from_emails(cr, uid, group_pigs.id, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
+ partner_info = self.mail_group.message_partner_info_from_emails(cr, uid, group_pigs.id, ['Maybe Raoul <test@test.fr>'], link_mail=False)[0]
self.assertEqual(partner_info['partner_id'], p_b_id,
- 'mail_thread: message_find_partner_from_emails wrong partner found')
+ 'mail_thread: message_partner_info_from_emails wrong partner found')
+ def test_05_mail_message_mail_mail(self):
+ """ Tests designed for testing email values based on mail.message, aliases, ... """
+ cr, uid = self.cr, self.uid
+
+ # Data: clean catchall domain
+ param_ids = self.registry('ir.config_parameter').search(cr, uid, [('key', '=', 'mail.catchall.domain')])
+ self.registry('ir.config_parameter').unlink(cr, uid, param_ids)
+
+ # Do: create a mail_message with a reply_to, without message-id
+ msg_id = self.mail_message.create(cr, uid, {'subject': 'Subject', 'body': 'Body', 'reply_to': 'custom@example.com'})
+ msg = self.mail_message.browse(cr, uid, msg_id)
+ # Test: message content
+ self.assertIn('reply_to', msg.message_id,
+ 'mail_message: message_id should be specific to a mail_message with a given reply_to')
+ self.assertEqual('custom@example.com', msg.reply_to,
+ 'mail_message: incorrect reply_to')
+ # Do: create a mail_mail with the previous mail_message and specified reply_to
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'reply_to': 'other@example.com', 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, 'other@example.com',
+ 'mail_mail: reply_to should be equal to the one coming from creation values')
+ # Do: create a mail_mail with the previous mail_message
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, msg.reply_to,
+ 'mail_mail: reply_to should be equal to the one coming from the mail_message')
+
+ # Do: create a mail_message without a reply_to
+ msg_id = self.mail_message.create(cr, uid, {'subject': 'Subject', 'body': 'Body', 'model': 'mail.group', 'res_id': self.group_pigs_id, 'email_from': False})
+ msg = self.mail_message.browse(cr, uid, msg_id)
+ # Test: message content
+ self.assertIn('mail.group', msg.message_id,
+ 'mail_message: message_id should contain model')
+ self.assertIn('%s' % self.group_pigs_id, msg.message_id,
+ 'mail_message: message_id should contain res_id')
+ self.assertFalse(msg.reply_to,
+ 'mail_message: should not generate a reply_to address when not specified')
+ # Do: create a mail_mail based on the previous mail_message
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertFalse(mail.reply_to,
+ 'mail_mail: reply_to should not have been guessed')
+ # Update message
+ self.mail_message.write(cr, uid, [msg_id], {'email_from': 'someone@example.com'})
+ msg.refresh()
+ # Do: create a mail_mail based on the previous mail_message
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, msg.email_from,
+ 'mail_mail: reply_to should equal to mail_message.email_from when having no document or default alias')
+
+ # Data: set catchall domain
+ self.registry('ir.config_parameter').set_param(cr, uid, 'mail.catchall.domain', 'schlouby.fr')
+ self.registry('ir.config_parameter').unlink(cr, uid, self.registry('ir.config_parameter').search(cr, uid, [('key', '=', 'mail.catchall.alias')]))
+
+ # Do: create a mail_mail based on the previous mail_message
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, '"Followers of Pigs" <group+pigs@schlouby.fr>',
+ 'mail_mail: reply_to should equal the mail.group alias')
+
+ # Update message
+ self.mail_message.write(cr, uid, [msg_id], {'res_id': False, 'email_from': 'someone@schlouby.fr'})
+ msg.refresh()
+ # Do: create a mail_mail based on the previous mail_message
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, msg.email_from,
+ 'mail_mail: reply_to should equal the mail_message email_from')
+
+ # Data: set catchall alias
+ self.registry('ir.config_parameter').set_param(self.cr, self.uid, 'mail.catchall.alias', 'gateway')
+
+ # Do: create a mail_mail based on the previous mail_message
+ mail_id = self.mail_mail.create(cr, uid, {'mail_message_id': msg_id, 'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, 'gateway@schlouby.fr',
+ 'mail_mail: reply_to should equal the catchall email alias')
+
+ # Do: create a mail_mail
+ mail_id = self.mail_mail.create(cr, uid, {'state': 'cancel'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, 'gateway@schlouby.fr',
+ 'mail_mail: reply_to should equal the catchall email alias')
+
+ # Do: create a mail_mail
+ mail_id = self.mail_mail.create(cr, uid, {'state': 'cancel', 'reply_to': 'someone@example.com'})
+ mail = self.mail_mail.browse(cr, uid, mail_id)
+ # Test: mail_mail content
+ self.assertEqual(mail.reply_to, 'someone@example.com',
+ 'mail_mail: reply_to should equal the rpely_to given to create')
+
@mute_logger('openerp.addons.mail.mail_thread', 'openerp.osv.orm')
def test_10_message_process(self):
""" Testing incoming emails processing. """
task_attachments = attachment.search(cr, uid, [('res_model', '=', 'project.task'), ('res_id', 'in', task_ids)], context=context, count=True)
res[id] = (project_attachments or 0) + (task_attachments or 0)
return res
-
+
def _task_count(self, cr, uid, ids, field_name, arg, context=None):
+ if context is None:
+ context = {}
res = dict.fromkeys(ids, 0)
- task_ids = self.pool.get('project.task').search(cr, uid, [('project_id', 'in', ids)])
+ ctx = context.copy()
+ ctx['active_test'] = False
+ task_ids = self.pool.get('project.task').search(cr, uid, [('project_id', 'in', ids)], context=ctx)
for task in self.pool.get('project.task').browse(cr, uid, task_ids, context):
res[task.project_id.id] += 1
return res
<field name="parent_id" ref="all_projects_account"/>
<field name="name">Website Design Templates</field>
<field name="user_id" ref="base.user_root"/>
+ <field name="privacy_visibility">followers</field>
<field name="alias_model">project.task</field>
+ <field name="privacy_visibility">employees</field>
<field name="members" eval="[(4, ref('base.user_root')), (4,ref('base.user_demo'))]"/>
</record>