import logging
import operator
import os
-import re
-from socket import gethostname
import time
import openerp
from openerp import SUPERUSER_ID
from openerp import tools
+from openerp import workflow
from openerp.osv import fields, osv
+from openerp.osv.orm import browse_record
import openerp.report.interface
from openerp.report.report_sxw import report_sxw, report_rml
-from openerp.tools.config import config
from openerp.tools.safe_eval import safe_eval as eval
from openerp.tools.translate import _
import openerp.workflow
_name = 'ir.server.object.lines'
_sequence = 'ir_actions_id_seq'
_columns = {
- 'server_id': fields.many2one('ir.actions.server', 'Object Mapping'),
- 'col1': fields.many2one('ir.model.fields', 'Destination', required=True),
+ 'server_id': fields.many2one('ir.actions.server', 'Related Server Action'),
+ 'col1': fields.many2one('ir.model.fields', 'Field', required=True),
'value': fields.text('Value', required=True, help="Expression containing a value specification. \n"
"When Formula type is selected, this field may be a Python expression "
" that can use the same values as for the condition field on the server action.\n"
"If Value type is selected, the value will be used directly without evaluation."),
'type': fields.selection([
- ('value','Value'),
- ('equation','Formula')
- ], 'Type', required=True, size=32, change_default=True),
+ ('value', 'Value'),
+ ('equation', 'Python expression')
+ ], 'Evaluation Type', required=True, change_default=True),
}
_defaults = {
- 'type': 'equation',
+ 'type': 'value',
}
-server_object_lines()
+
##
# Actions that are run on the server side
#
class actions_server(osv.osv):
-
- def _select_signals(self, cr, uid, context=None):
- cr.execute("""SELECT distinct w.osv, t.signal FROM wkf w, wkf_activity a, wkf_transition t
- WHERE w.id = a.wkf_id AND
- (t.act_from = a.id OR t.act_to = a.id) AND
- t.signal IS NOT NULL""")
- result = cr.fetchall() or []
- res = []
- for rs in result:
- if rs[0] is not None and rs[1] is not None:
- line = rs[1], "%s - (%s)" % (rs[1], rs[0])
- res.append(line)
- return res
-
- def _select_objects(self, cr, uid, context=None):
- model_pool = self.pool.get('ir.model')
- ids = model_pool.search(cr, uid, [('name','not ilike','.')])
- res = model_pool.read(cr, uid, ids, ['model', 'name'])
- return [(r['model'], r['name']) for r in res] + [('','')]
-
- def change_object(self, cr, uid, ids, copy_object, state, context=None):
- if state == 'object_copy' and copy_object:
- if context is None:
- context = {}
- model_pool = self.pool.get('ir.model')
- model = copy_object.split(',')[0]
- mid = model_pool.search(cr, uid, [('model','=',model)])
- return {
- 'value': {'srcmodel_id': mid[0]},
- 'context': context
- }
- else:
- return {}
-
_name = 'ir.actions.server'
_table = 'ir_act_server'
_inherit = 'ir.actions.actions'
_sequence = 'ir_actions_id_seq'
_order = 'sequence,name'
+
+ def _select_objects(self, cr, uid, context=None):
+ model_pool = self.pool.get('ir.model')
+ ids = model_pool.search(cr, uid, [('name', 'not ilike', '.')])
+ res = model_pool.read(cr, uid, ids, ['model', 'name'])
+ return [(r['model'], r['name']) for r in res] + [('', '')]
+
+ def _get_states(self, cr, uid, context=None):
+ """ Override me in order to add new states in the server action. Please
+ note that the added key length should not be higher than already-existing
+ ones. """
+ return [('code', 'Execute Python Code'),
+ ('trigger', 'Trigger a Workflow Signal'),
+ ('client_action', 'Run a Client Action'),
+ ('object_create', 'Create or Copy a new Record'),
+ ('object_write', 'Write on a Record'),
+ ('multi', 'Execute several actions')]
+
+ def _get_states_wrapper(self, cr, uid, context=None):
+ return self._get_states(cr, uid, context)
+
_columns = {
'name': fields.char('Action Name', required=True, size=64, translate=True),
- 'condition' : fields.char('Condition', size=256, required=True,
- help="Condition that is tested before the action is executed, "
- "and prevent execution if it is not verified.\n"
- "Example: object.list_price > 5000\n"
- "It is a Python expression that can use the following values:\n"
- " - self: ORM model of the record on which the action is triggered\n"
- " - object or obj: browse_record of the record on which the action is triggered\n"
- " - pool: ORM model pool (i.e. self.pool)\n"
- " - time: Python time module\n"
- " - cr: database cursor\n"
- " - uid: current user id\n"
- " - context: current context"),
- 'state': fields.selection([
- ('client_action','Client Action'),
- ('dummy','Dummy'),
- ('loop','Iteration'),
- ('code','Python Code'),
- ('trigger','Trigger'),
- ('email','Email'),
- ('sms','SMS'),
- ('object_create','Create Object'),
- ('object_copy','Copy Object'),
- ('object_write','Write Object'),
- ('other','Multi Actions'),
- ], 'Action Type', required=True, size=32, help="Type of the Action that is to be executed"),
- 'code':fields.text('Python Code', help="Python code to be executed if condition is met.\n"
- "It is a Python block that can use the same values as for the condition field"),
- 'sequence': fields.integer('Sequence', help="Important when you deal with multiple actions, the execution order will be decided based on this, low number is higher priority."),
- 'model_id': fields.many2one('ir.model', 'Object', required=True, help="Select the object on which the action will work (read, write, create).", ondelete='cascade'),
- 'action_id': fields.many2one('ir.actions.actions', 'Client Action', help="Select the Action Window, Report, Wizard to be executed."),
- 'trigger_name': fields.selection(_select_signals, string='Trigger Signal', size=128, help="The workflow signal to trigger"),
- 'wkf_model_id': fields.many2one('ir.model', 'Target Object', help="The object that should receive the workflow signal (must have an associated workflow)"),
- 'trigger_obj_id': fields.many2one('ir.model.fields','Relation Field', help="The field on the current object that links to the target object record (must be a many2one, or an integer field with the record ID)"),
- 'email': fields.char('Email Address', size=512, help="Expression that returns the email address to send to. Can be based on the same values as for the condition field.\n"
- "Example: object.invoice_address_id.email, or 'me@example.com'"),
- 'subject': fields.char('Subject', size=1024, translate=True, help="Email subject, may contain expressions enclosed in double brackets based on the same values as those "
- "available in the condition field, e.g. `Hello [[ object.partner_id.name ]]`"),
- 'message': fields.text('Message', translate=True, help="Email contents, may contain expressions enclosed in double brackets based on the same values as those "
- "available in the condition field, e.g. `Dear [[ object.partner_id.name ]]`"),
- 'mobile': fields.char('Mobile No', size=512, help="Provides fields that be used to fetch the mobile number, e.g. you select the invoice, then `object.invoice_address_id.mobile` is the field which gives the correct mobile number"),
- 'sms': fields.char('SMS', size=160, translate=True),
- 'child_ids': fields.many2many('ir.actions.server', 'rel_server_actions', 'server_id', 'action_id', 'Other Actions'),
+ 'condition': fields.char('Condition',
+ help="Condition verified before executing the server action. If it "
+ "is not verified, the action will not be executed. The condition is "
+ "a Python expression, like 'object.list_price > 5000'. A void "
+ "condition is considered as always True. Help about pyhon expression "
+ "is given in the help tab."),
+ 'state': fields.selection(_get_states_wrapper, 'Action To Do', required=True,
+ help="Type of server action. The following values are available:\n"
+ "- 'Execute Python Code': a block of python code that will be executed\n"
+ "- 'Trigger a Workflow Signal': send a signal to a workflow\n"
+ "- 'Run a Client Action': choose a client action to launch\n"
+ "- 'Create or Copy a new Record': create a new record with new values, or copy an existing record in your database\n"
+ "- 'Write on a Record': update the values of a record\n"
+ "- 'Execute several actions': define an action that triggers several other sever actions\n"
+ "- 'Send Email': automatically send an email (available in email_template)"),
'usage': fields.char('Action Usage', size=32),
'type': fields.char('Action Type', size=32, required=True),
- 'srcmodel_id': fields.many2one('ir.model', 'Model', help="Object in which you want to create / write the object. If it is empty then refer to the Object field."),
- 'fields_lines': fields.one2many('ir.server.object.lines', 'server_id', 'Field Mappings.'),
- 'record_id':fields.many2one('ir.model.fields', 'Create Id', help="Provide the field name where the record id is stored after the create operations. If it is empty, you can not track the new record."),
- 'write_id':fields.char('Write Id', size=256, help="Provide the field name that the record id refers to for the write operation. If it is empty it will refer to the active id of the object."),
- 'loop_action':fields.many2one('ir.actions.server', 'Loop Action', help="Select the action that will be executed. Loop action will not be avaliable inside loop."),
- 'expression':fields.char('Loop Expression', size=512, help="Enter the field/expression that will return the list. E.g. select the sale order in Object, and you can have loop on the sales order line. Expression = `object.order_line`."),
- 'copy_object': fields.reference('Copy Of', selection=_select_objects, size=256),
+ # Generic
+ 'sequence': fields.integer('Sequence',
+ help="When dealing with multiple actions, the execution order is "
+ "based on the sequence. Low number means high priority."),
+ 'model_id': fields.many2one('ir.model', 'Base Model', required=True, ondelete='cascade',
+ help="Base model on which the server action runs."),
+ 'menu_ir_values_id': fields.many2one('ir.values', 'More Menu entry', readonly=True,
+ help='More menu entry.'),
+ # Client Action
+ 'action_id': fields.many2one('ir.actions.actions', 'Client Action',
+ help="Select the client action that has to be executed."),
+ # Python code
+ 'code': fields.text('Python Code',
+ help="Write Python code that the action will execute. Some variables are "
+ "available for use; help about pyhon expression is given in the help tab."),
+ # Workflow signal
+ 'use_relational_model': fields.selection([('base', 'Use the base model of the action'),
+ ('relational', 'Use a relation field on the base model')],
+ string='Target Model', required=True),
+ 'wkf_transition_id': fields.many2one('workflow.transition', string='Signal to Trigger',
+ help="Select the workflow signal to trigger."),
+ 'wkf_model_id': fields.many2one('ir.model', 'Target Model',
+ help="The model that will receive the workflow signal. Note that it should have a workflow associated with it."),
+ 'wkf_model_name': fields.related('wkf_model_id', 'model', type='char', string='Target Model Name', store=True, readonly=True),
+ 'wkf_field_id': fields.many2one('ir.model.fields', string='Relation Field',
+ oldname='trigger_obj_id',
+ help="The field on the current object that links to the target object record (must be a many2one, or an integer field with the record ID)"),
+ # Multi
+ 'child_ids': fields.many2many('ir.actions.server', 'rel_server_actions',
+ 'server_id', 'action_id',
+ string='Child Actions',
+ help='Child server actions that will be executed. Note that the last return returned action value will be used as global return value.'),
+ # Create/Copy/Write
+ 'use_create': fields.selection([('new', 'Create a new record in the Base Model'),
+ ('new_other', 'Create a new record in another model'),
+ ('copy_current', 'Copy the current record'),
+ ('copy_other', 'Copy another record')],
+ string="Creation Policy", required=True,
+ help=""),
+ 'crud_model_id': fields.many2one('ir.model', 'Target Model',
+ oldname='srcmodel_id',
+ help="Model for record creation / update. Set this field only to specify a different model than the base model."),
+ 'ref_object': fields.reference('Reference record', selection=_select_objects, size=128,
+ oldname='copy_object'),
+ 'link_new_record': fields.boolean('Link to current record',
+ help="Check this if you want to link the newly-created record "
+ "to the current record on which the server action runs."),
+ 'link_field_id': fields.many2one('ir.model.fields', 'Link Field',
+ oldname='record_id',
+ help="Provide the field where the record id is stored after the operations."),
+ 'use_write': fields.selection([('current', 'Update the current record'),
+ ('other', 'Update another record'),
+ ('expression', 'Update according a Python expression')],
+ string='Update Policy', required=True,
+ help=""),
+ 'write_expression': fields.char('Write Record Expression',
+ oldname='write_id',
+ help="Provide the field name that the record id refers to for the write operation. If it is empty it will refer to the active id of the object."),
+ 'fields_lines': fields.one2many('ir.server.object.lines', 'server_id',
+ string='Value Mapping',
+ help=""),
+
+ # Fake fields used to implement the placeholder assistant
+ 'model_object_field': fields.many2one('ir.model.fields', string="Field",
+ help="Select target field from the related document model.\n"
+ "If it is a relationship field you will be able to select "
+ "a target field at the destination of the relationship."),
+ 'sub_object': fields.many2one('ir.model', 'Sub-model', readonly=True,
+ help="When a relationship field is selected as first field, "
+ "this field shows the document model the relationship goes to."),
+ 'sub_model_object_field': fields.many2one('ir.model.fields', 'Sub-field',
+ help="When a relationship field is selected as first field, "
+ "this field lets you select the target field within the "
+ "destination document model (sub-model)."),
+ 'copyvalue': fields.char('Placeholder Expression', help="Final placeholder expression, to be copy-pasted in the desired template field."),
}
+
_defaults = {
- 'state': 'dummy',
+ 'state': 'code',
'condition': 'True',
'type': 'ir.actions.server',
'sequence': 5,
- 'code': """# You can use the following variables:
-# - self: ORM model of the record on which the action is triggered
-# - object: browse_record of the record on which the action is triggered if there is one, otherwise None
-# - pool: ORM model pool (i.e. self.pool)
-# - time: Python time module
-# - cr: database cursor
-# - uid: current user id
-# - context: current context
-# If you plan to return an action, assign: action = {...}
-""",
+ 'use_relational_model': 'base',
+ 'use_create': 'new',
+ 'use_write': 'current',
}
- def get_email(self, cr, uid, action, context):
- obj_pool = self.pool[action.model_id.model]
- id = context.get('active_id')
- obj = obj_pool.browse(cr, uid, id)
+ def _check_expression(self, cr, uid, expression, model_id, context):
+ """ Check python expression (condition, write_expression) """
+ if not model_id:
+ return (False, None, 'Your expression cannot be validated because the Base Model is not set.')
+ # fetch current model
+ current_model_name = self.pool.get('ir.model').browse(cr, uid, model_id, context).model
+ # transform expression into a path that should look like 'object.many2onefield.many2onefield'
+ path = expression.split('.')
+ initial = path.pop(0)
+ if initial not in ['obj', 'object']:
+ return (False, None, 'Your expression should begin with obj or object.\nAn expression builder is available in the help tab.')
+ # analyze path
+ while path:
+ step = path.pop(0)
+ column_info = self.pool[current_model_name]._all_columns.get(step)
+ if not column_info:
+ return (False, None, 'Part of the expression (%s) is not recognized as a column in the model %s.' % (step, current_model_name))
+ column_type = column_info.column._type
+ if column_type not in ['many2one']:
+ return (False, None, 'Part of the expression (%s) is not a valid column type (is %s, should be a many2one)' % (step, column_type))
+ current_model_name = column_info.column._obj
+ return (True, current_model_name, None)
+
+ def _check_write_expression(self, cr, uid, ids, context=None):
+ for record in self.browse(cr, uid, ids, context=context):
+ if record.write_expression and record.model_id:
+ correct, model_name, message = self._check_expression(cr, uid, record.write_expression, record.model_id.id, context=context)
+ if not correct:
+ _logger.warning('Invalid expression: %s' % message)
+ return False
+ return True
+
+ _constraints = [
+ (_check_write_expression,
+ 'Incorrect Write Record Expression',
+ ['write_expression']),
+ ]
- fields = None
+ def on_change_model_id(self, cr, uid, ids, model_id, wkf_model_id, crud_model_id, context=None):
+ """ When changing the action base model, reset workflow and crud config
+ to ease value coherence """
+ values = {
+ 'use_create': 'new',
+ 'use_write': 'current',
+ 'use_relational_model': 'base',
+ 'wkf_model_id': model_id,
+ 'crud_model_id': model_id,
+ }
+ return {'value': values}
- if '/' in action.email.complete_name:
- fields = action.email.complete_name.split('/')
- elif '.' in action.email.complete_name:
- fields = action.email.complete_name.split('.')
+ def on_change_wkf_wonfig(self, cr, uid, ids, use_relational_model, wkf_field_id, wkf_model_id, model_id, context=None):
+ """ Update workflow configuration
+ - update the workflow model (for base (model_id) /relational (field.relation))
+ - update wkf_transition_id to False if workflow model changes, to force the user
+ to choose a new one
+ """
+ values = {}
+ if use_relational_model == 'relational' and wkf_field_id:
+ field = self.pool['ir.model.fields'].browse(cr, uid, wkf_field_id, context=context)
+ new_wkf_model_id = self.pool.get('ir.model').search(cr, uid, [('model', '=', field.relation)], context=context)[0]
+ values['wkf_model_id'] = new_wkf_model_id
+ else:
+ values['wkf_model_id'] = model_id
+ if values.get('wkf_model_id') != wkf_model_id:
+ values['wkf_transition_id'] = False
+ return {'value': values}
+
+ def on_change_wkf_model_id(self, cr, uid, ids, wkf_model_id, context=None):
+ """ When changing the workflow model, update its stored name also """
+ wkf_model_name = False
+ if wkf_model_id:
+ wkf_model_name = self.pool.get('ir.model').browse(cr, uid, wkf_model_id, context).model
+ return {'value': {'wkf_model_name': wkf_model_name}}
+
+ def on_change_crud_config(self, cr, uid, ids, state, use_create, use_write, ref_object, crud_model_id, model_id, context=None):
+ """ TODO """
+ if state == 'object_create':
+ return self.on_change_create_config(cr, uid, ids, use_create, ref_object, crud_model_id, model_id, context=context)
+ elif state == 'object_write':
+ return self.on_change_write_config(cr, uid, ids, use_write, ref_object, crud_model_id, model_id, context=context)
+ else:
+ return {}
+
+ def on_change_create_config(self, cr, uid, ids, use_create, ref_object, crud_model_id, model_id, context=None):
+ """ TODO """
+ values = {}
+ if use_create == 'new':
+ values['crud_model_id'] = model_id
+ elif use_create == 'new_other':
+ pass
+ elif use_create == 'copy_current':
+ values['crud_model_id'] = model_id
+ elif use_create == 'copy_other' and ref_object:
+ ref_model, ref_id = ref_object.split(',')
+ ref_model_id = self.pool['ir.model'].search(cr, uid, [('model', '=', ref_model)], context=context)[0]
+ values['crud_model_id'] = ref_model_id
+
+ if values.get('crud_model_id') != crud_model_id:
+ values['link_field_id'] = False
+ return {'value': values}
+
+ def on_change_write_config(self, cr, uid, ids, use_write, ref_object, crud_model_id, model_id, context=None):
+ """ TODO """
+ values = {}
+ if use_write == 'current':
+ values['crud_model_id'] = model_id
+ elif use_write == 'other' and ref_object:
+ ref_model, ref_id = ref_object.split(',')
+ ref_model_id = self.pool['ir.model'].search(cr, uid, [('model', '=', ref_model)], context=context)[0]
+ values['crud_model_id'] = ref_model_id
+ elif use_write == 'expression':
+ pass
+
+ if values.get('crud_model_id') != crud_model_id:
+ values['link_field_id'] = False
+ return {'value': values}
+
+ def on_change_write_expression(self, cr, uid, ids, write_expression, model_id, context=None):
+ """ Check the write_expression, about fields and models. """
+ values = {}
+ valid, model_name, message = self._check_expression(cr, uid, write_expression, model_id, context=context)
+ if valid:
+ ref_model_id = self.pool['ir.model'].search(cr, uid, [('model', '=', model_name)], context=context)[0]
+ values['crud_model_id'] = ref_model_id
+ return {'value': values}
+ if not message:
+ message = 'Invalid expression'
+ return {
+ 'warning': {
+ 'title': 'Incorrect expression',
+ 'message': message,
+ }
+ }
+
+ def build_expression(self, field_name, sub_field_name):
+ """Returns a placeholder expression for use in a template field,
+ based on the values provided in the placeholder assistant.
+
+ :param field_name: main field name
+ :param sub_field_name: sub field name (M2O)
+ :return: final placeholder expression
+ """
+ expression = ''
+ if field_name:
+ expression = "object." + field_name
+ if sub_field_name:
+ expression += "." + sub_field_name
+ return expression
+
+ def onchange_sub_model_object_value_field(self, cr, uid, ids, model_object_field, sub_model_object_field=False, context=None):
+ result = {
+ 'sub_object': False,
+ 'copyvalue': False,
+ 'sub_model_object_field': False,
+ }
+ if model_object_field:
+ fields_obj = self.pool.get('ir.model.fields')
+ field_value = fields_obj.browse(cr, uid, model_object_field, context)
+ if field_value.ttype in ['many2one', 'one2many', 'many2many']:
+ res_ids = self.pool.get('ir.model').search(cr, uid, [('model', '=', field_value.relation)], context=context)
+ sub_field_value = False
+ if sub_model_object_field:
+ sub_field_value = fields_obj.browse(cr, uid, sub_model_object_field, context)
+ if res_ids:
+ result.update({
+ 'sub_object': res_ids[0],
+ 'copyvalue': self.build_expression(field_value.name, sub_field_value and sub_field_value.name or False),
+ 'sub_model_object_field': sub_model_object_field or False,
+ })
+ else:
+ result.update({
+ 'copyvalue': self.build_expression(field_value.name, False),
+ })
+ return {'value': result}
+
+ def create_action(self, cr, uid, ids, context=None):
+ """ Create a contextual action for each of the server actions. """
+ for action in self.browse(cr, uid, ids, context=context):
+ ir_values_id = self.pool.get('ir.values').create(cr, SUPERUSER_ID, {
+ 'name': _('Run %s') % action.name,
+ 'model': action.model_id.model,
+ 'key2': 'client_action_multi',
+ 'value': "ir.actions.server,%s" % action.id,
+ }, context)
+ action.write({
+ 'menu_ir_values_id': ir_values_id,
+ })
- for field in fields:
- try:
- obj = getattr(obj, field)
- except Exception:
- _logger.exception('Failed to parse: %s', field)
+ return True
- return obj
+ def unlink_action(self, cr, uid, ids, context=None):
+ """ Remove the contextual actions created for the server actions. """
+ for action in self.browse(cr, uid, ids, context=context):
+ if action.menu_ir_values_id:
+ try:
+ self.pool.get('ir.values').unlink(cr, SUPERUSER_ID, action.menu_ir_values_id.id, context)
+ except Exception:
+ raise osv.except_osv(_('Warning'), _('Deletion of the action record failed.'))
+ return True
- def get_mobile(self, cr, uid, action, context):
+ def run_action_client_action(self, cr, uid, action, eval_context=None, context=None):
+ if not action.action_id:
+ raise osv.except_osv(_('Error'), _("Please specify an action to launch!"))
+ return self.pool[action.action_id.type].read(cr, uid, action.action_id.id, context=context)
+
+ def run_action_code(self, cr, uid, action, eval_context=None, context=None):
+ eval(action.code.strip(), eval_context, mode="exec", nocopy=True) # nocopy allows to return 'action'
+ if 'action' in eval_context:
+ return eval_context['action']
+
+ def run_action_trigger(self, cr, uid, action, eval_context=None, context=None):
+ """ Trigger a workflow signal, depending on the use_relational_model:
+ - `base`: base_model_pool.signal_<TRIGGER_NAME>(cr, uid, context.get('active_id'))
+ - `relational`: find the related model and object, using the relational
+ field, then target_model_pool.signal_<TRIGGER_NAME>(cr, uid, target_id)
+ """
obj_pool = self.pool[action.model_id.model]
- id = context.get('active_id')
- obj = obj_pool.browse(cr, uid, id)
+ if action.use_relational_model == 'base':
+ target_id = context.get('active_id')
+ target_pool = obj_pool
+ else:
+ value = getattr(obj_pool.browse(cr, uid, context.get('active_id'), context=context), action.wkf_field_id.name)
+ if action.wkf_field_id.ttype == 'many2one':
+ target_id = value.id
+ else:
+ target_id = value
+ target_pool = self.pool[action.wkf_model_id.model]
- fields = None
+ trigger_name = action.wkf_transition_id.signal
- if '/' in action.mobile.complete_name:
- fields = action.mobile.complete_name.split('/')
- elif '.' in action.mobile.complete_name:
- fields = action.mobile.complete_name.split('.')
+ workflow.trg_validate(uid, target_pool._name, target_id, trigger_name, cr)
- for field in fields:
- try:
- obj = getattr(obj, field)
- except Exception:
- _logger.exception('Failed to parse: %s', field)
+ def run_action_multi(self, cr, uid, action, eval_context=None, context=None):
+ # TDE FIXME: loops are not considered here ^^
+ res = []
+ for act in action.child_ids:
+ # context['active_id'] = context['active_ids'][0]
+ result = self.run(cr, uid, [act.id], context)
+ if result:
+ res.append(result)
+ return res and res[0] or False
+
+ def run_action_object_write(self, cr, uid, action, eval_context=None, context=None):
+ res = {}
+ for exp in action.fields_lines:
+ if exp.type == 'equation':
+ expr = eval(exp.value, eval_context)
+ else:
+ expr = exp.value
+ res[exp.col1.name] = expr
+
+ if action.use_write == 'current':
+ model = action.model_id.model
+ ref_id = context.get('active_id')
+ elif action.use_write == 'other':
+ model = action.crud_model_id.model
+ ref_id = action.ref_object.id
+ elif action.use_write == 'expression':
+ model = action.crud_model_id.model
+ ref = eval(action.write_expression, eval_context)
+ if isinstance(ref, browse_record):
+ ref_id = getattr(ref, 'id')
+ else:
+ ref_id = int(ref)
- return obj
+ obj_pool = self.pool[model]
+ obj_pool.write(cr, uid, [ref_id], res, context=context)
- def merge_message(self, cr, uid, keystr, action, context=None):
- if context is None:
- context = {}
+ def run_action_object_create(self, cr, uid, action, eval_context=None, context=None):
+ res = {}
+ for exp in action.fields_lines:
+ if exp.type == 'equation':
+ expr = eval(exp.value, eval_context)
+ else:
+ expr = exp.value
+ res[exp.col1.name] = expr
+
+ if action.use_create in ['new', 'copy_current']:
+ model = action.model_id.model
+ elif action.use_create in ['new_other', 'copy_other']:
+ model = action.crud_model_id.model
+
+ obj_pool = self.pool[model]
+ if action.use_create == 'copy_current':
+ ref_id = context.get('active_id')
+ res_id = obj_pool.copy(cr, uid, ref_id, res, context=context)
+ elif action.use_create == 'copy_other':
+ ref_id = action.ref_object.id
+ res_id = obj_pool.copy(cr, uid, ref_id, res, context=context)
+ else:
+ res_id = obj_pool.create(cr, uid, res, context=context)
+
+ if action.link_new_record and action.link_field_id:
+ self.pool[action.model_id.model].write(cr, uid, [context.get('active_id')], {action.link_field_id.name: res_id})
- def merge(match):
- obj_pool = self.pool[action.model_id.model]
- id = context.get('active_id')
- obj = obj_pool.browse(cr, uid, id)
- exp = str(match.group()[2:-2]).strip()
- result = eval(exp,
- {
- 'object': obj,
- 'context': dict(context), # copy context to prevent side-effects of eval
- 'time': time,
- })
- if result in (None, False):
- return str("--------")
- return tools.ustr(result)
-
- com = re.compile('(\[\[.+?\]\])')
- message = com.sub(merge, keystr)
-
- return message
-
- # Context should contains:
- # ids : original ids
- # id : current id of the object
- # OUT:
- # False : Finished correctly
- # ACTION_ID : Action to launch
-
- # FIXME: refactor all the eval() calls in run()!
def run(self, cr, uid, ids, context=None):
+ """ Run the server action, by check the condition and then calling
+ run_action_<STATE>, i.e. run_action_code, allowing easy inheritance
+ of the server actions.
+
+ A void (aka False) condition is considered as always valid.
+
+ Note coming from previous implementation: FIXME: refactor all the eval()
+ calls in run()!
+
+ :param dict context: context should contain following keys:
+ - active_id: current id of the object
+ - active_model: current model that should equal the action's model
+ - TDE: ?? ids: original ids
+
+ :return: False: finished correctly or action_id: action to lanch
+ """
if context is None:
context = {}
+ res = False
user = self.pool.get('res.users').browse(cr, uid, uid)
for action in self.browse(cr, uid, ids, context):
obj = None
'pool': self.pool,
'time': time,
'cr': cr,
- 'context': dict(context), # copy context to prevent side-effects of eval
+ 'context': dict(context), # copy context to prevent side-effects of eval
'uid': uid,
'user': user
}
- expr = eval(str(action.condition), cxt)
+ # evaluate the condition, with the specific case that a void (aka False) condition is considered as True
+ condition = action.condition
+ if action.condition is False:
+ condition = True
+ expr = eval(str(condition), cxt)
if not expr:
continue
+ # call the method related to the action: run_action_<STATE>
+ if hasattr(self, 'run_action_%s' % action.state):
+ res = getattr(self, 'run_action_%s' % action.state)(cr, uid, action, eval_context=cxt, context=context)
+ return res
- if action.state=='client_action':
- if not action.action_id:
- raise osv.except_osv(_('Error'), _("Please specify an action to launch!"))
- return self.pool[action.action_id.type].read(cr, uid, action.action_id.id, context=context)
-
- if action.state=='code':
- eval(action.code.strip(), cxt, mode="exec", nocopy=True) # nocopy allows to return 'action'
- if 'action' in cxt:
- return cxt['action']
-
- if action.state == 'email':
- email_from = config['email_from']
- if not email_from:
- _logger.debug('--email-from command line option is not specified, using a fallback value instead.')
- if user.email:
- email_from = user.email
- else:
- email_from = "%s@%s" % (user.login, gethostname())
-
- try:
- address = eval(str(action.email), cxt)
- except Exception:
- address = str(action.email)
-
- if not address:
- _logger.info('No partner email address specified, not sending any email.')
- continue
-
- # handle single and multiple recipient addresses
- addresses = address if isinstance(address, (tuple, list)) else [address]
- subject = self.merge_message(cr, uid, action.subject, action, context)
- body = self.merge_message(cr, uid, action.message, action, context)
-
- ir_mail_server = self.pool.get('ir.mail_server')
- msg = ir_mail_server.build_email(email_from, addresses, subject, body)
- res_email = ir_mail_server.send_email(cr, uid, msg)
- if res_email:
- _logger.info('Email successfully sent to: %s', addresses)
- else:
- _logger.warning('Failed to send email to: %s', addresses)
-
- if action.state == 'trigger':
- model = action.wkf_model_id.model
- m2o_field_name = action.trigger_obj_id.name
- target_id = obj_pool.read(cr, uid, context.get('active_id'), [m2o_field_name])[m2o_field_name]
- target_id = target_id[0] if isinstance(target_id,tuple) else target_id
- openerp.workflow.trg_validate(uid, model, int(target_id), action.trigger_name, cr)
-
- if action.state == 'sms':
- #TODO: set the user and password from the system
- # for the sms gateway user / password
- # USE smsclient module from extra-addons
- _logger.warning('SMS Facility has not been implemented yet. Use smsclient module!')
-
- if action.state == 'other':
- res = []
- for act in action.child_ids:
- context['active_id'] = context['active_ids'][0]
- result = self.run(cr, uid, [act.id], context)
- if result:
- res.append(result)
- return res
-
- if action.state == 'loop':
- expr = eval(str(action.expression), cxt)
- context['object'] = obj
- for i in expr:
- context['active_id'] = i.id
- self.run(cr, uid, [action.loop_action.id], context)
-
- if action.state == 'object_write':
- res = {}
- for exp in action.fields_lines:
- euq = exp.value
- if exp.type == 'equation':
- expr = eval(euq, cxt)
- else:
- expr = exp.value
- res[exp.col1.name] = expr
-
- if not action.write_id:
- if not action.srcmodel_id:
- obj_pool = self.pool[action.model_id.model]
- obj_pool.write(cr, uid, [context.get('active_id')], res)
- else:
- write_id = context.get('active_id')
- obj_pool = self.pool[action.srcmodel_id.model]
- obj_pool.write(cr, uid, [write_id], res)
-
- elif action.write_id:
- obj_pool = self.pool[action.srcmodel_id.model]
- rec = self.pool[action.model_id.model].browse(cr, uid, context.get('active_id'))
- id = eval(action.write_id, {'object': rec})
- try:
- id = int(id)
- except:
- raise osv.except_osv(_('Error'), _("Problem in configuration `Record Id` in Server Action!"))
-
- if type(id) != type(1):
- raise osv.except_osv(_('Error'), _("Problem in configuration `Record Id` in Server Action!"))
- write_id = id
- obj_pool.write(cr, uid, [write_id], res)
-
- if action.state == 'object_create':
- res = {}
- for exp in action.fields_lines:
- euq = exp.value
- if exp.type == 'equation':
- expr = eval(euq, cxt)
- else:
- expr = exp.value
- res[exp.col1.name] = expr
-
- obj_pool = self.pool[action.srcmodel_id.model]
- res_id = obj_pool.create(cr, uid, res)
- if action.record_id:
- self.pool[action.model_id.model].write(cr, uid, [context.get('active_id')], {action.record_id.name:res_id})
-
- if action.state == 'object_copy':
- res = {}
- for exp in action.fields_lines:
- euq = exp.value
- if exp.type == 'equation':
- expr = eval(euq, cxt)
- else:
- expr = exp.value
- res[exp.col1.name] = expr
-
- model = action.copy_object.split(',')[0]
- cid = action.copy_object.split(',')[1]
- obj_pool = self.pool[model]
- obj_pool.copy(cr, uid, int(cid), res)
-
- return False
-
-actions_server()
class act_window_close(osv.osv):
_name = 'ir.actions.act_window_close'
<field name="model">ir.actions.server</field>
<field name="arch" type="xml">
<form string="Server Action" version="7.0">
- <group>
- <group>
- <field name="name"/>
- <field name="model_id"/>
- <field name="state"/>
- </group>
+ <sheet>
+ <div class="oe_title">
+ <label for="name" class="oe_edit_only"/>
+ <h1><field name="name"/></h1>
+ </div>
+ <div class="oe_right oe_button_box" name="buttons">
+ <field name="menu_ir_values_id" invisible="1"/>
+ <button name="create_action" string="Add More menu entry" type="object"
+ attrs="{'invisible':[('menu_ir_values_id','!=',False)]}"
+ help="Display an option on related documents to run this sever action"/>
+ <button name="unlink_action" string="Remove More menu entry" type="object"
+ attrs="{'invisible':[('menu_ir_values_id','=',False)]}"
+ help="Remove the contextual action related to this server action"/>
+ </div>
<group>
- <field name="condition"/>
- <field name="sequence"/>
- </group>
- </group>
- <notebook colspan="4">
- <page string="Python Code" attrs="{'invisible':[('state','!=','code')]}">
- <field name="code"/>
- </page>
- <page string="Trigger" attrs="{'invisible':[('state','!=','trigger')]}">
- <group string="Trigger Configuration" col="4">
- <field name="wkf_model_id" attrs="{'required':[('state','=','trigger')]}"/>
- <field name="trigger_obj_id" context="{'key':''}"
- domain="[('model_id','=',model_id),('ttype','in',['many2one','int'])]"
- attrs="{'required':[('state','=','trigger')]}"/>
- <field name="trigger_name" attrs="{'required':[('state','=','trigger')]}"/>
- </group>
- </page>
- <page string="Action to Launch" attrs="{'invisible':[('state','!=','client_action')]}">
<group>
- <field name="action_id" attrs="{'required':[('state','=','client_action')]}"/>
+ <field name="type" invisible="1"/>
+ <field name="model_id"
+ on_change="on_change_model_id(model_id, wkf_model_id, crud_model_id)"/>
+ <field name="state"/>
</group>
- </page>
- <page string="Email Configuration" attrs="{'invisible':[('state','!=','email')]}">
<group>
- <field name="email" domain="[('model_id','=',model_id)]" attrs="{'required':[('state','=','email')]}"/>
- <field name="subject" attrs="{'required':[('state','=','email')]}"/>
- <field name="message" attrs="{'required':[('state','=','email')]}"/>
- <newline/>
- <label colspan="2" string="Access all the fields related to the current object using expressions, i.e. object.partner_id.name " align="0.0"/>
+ <field name="condition"/>
+ <field name="sequence"/>
</group>
- </page>
- <page string="SMS Configuration" attrs="{'invisible':[('state','!=','sms')]}">
- <group>
- <field name="mobile" domain="[('model_id','=',model_id)]" attrs="{'required':[('state','=','sms')]}"/>
- <field name="sms" attrs="{'required':[('state','=','sms')]}"/>
- </group>
- <label string="Access all the fields related to the current object using expressions, i.e. object.partner_id.name " align="0.0"/>
- </page>
- <page string="Create / Write / Copy" attrs="{'invisible':[('state','!=','object_create'), ('state','!=','object_write'), ('state','!=','object_copy')]}">
- <group col="4" string="Fields Mapping">
- <field name="srcmodel_id" attrs="{'required':[('state','!=','dummy'), ('state','!=','sms'), ('state','!=','code'), ('state','!=','loop'), ('state','!=','trigger'), ('state','!=','object_copy'), ('state','!=','client_action'), ('state','!=','email'), ('state','!=','sms'), ('state','!=','other')]}"/>
- <field name="copy_object" on_change="change_object(copy_object, state)" attrs="{'required':[('state','!=','dummy'), ('state','!=','sms'), ('state','!=','code'), ('state','!=','loop'), ('state','!=','trigger'), ('state','!=','object_write'), ('state','!=','object_create'), ('state','!=','client_action'), ('state','!=','email'), ('state','!=','sms'), ('state','!=','other')]}"/>
- <field name="fields_lines" nolabel="1" colspan="2">
- <tree string="Field Mappings" editable="top">
- <field name="col1" domain="[('model_id','=',parent.srcmodel_id or parent.model_id)]"/>
- <field name="type"/>
- <field name="value" colspan="4"/>
- </tree>
- <form string="Field Mapping" version="7.0">
- <group col="4">
- <field name="col1" domain="[('model_id','=',parent.srcmodel_id or parent.model_id)]"/>
+ </group>
+ <notebook colspan="4">
+ <page string="Python Code" name='code' autofocus="autofocus"
+ attrs="{'invisible': [('state', '!=', 'code')]}">
+ <field name="code" placeholder="Enter Python code here. Help about Python expression is available in the help tab of this document."/>
+ </page>
+
+ <page string="Worflow Signal" autofocus="autofocus"
+ attrs="{'invisible': [('state', '!=', 'trigger')]}">
+ <p attrs="{'invisible': [('model_id', '!=', False)]}">
+ Please set the Base Model before setting the action details.
+ </p>
+ <group attrs="{'invisible': [('model_id', '=', False)]}">
+ <field name="use_relational_model" widget="radio"
+ on_change="on_change_wkf_wonfig(use_relational_model, wkf_field_id, wkf_model_id, model_id)"
+ attrs="{'readonly': [('model_id', '=', False)]}"/>
+ <field name="wkf_field_id" context="{'key': ''}"
+ on_change="on_change_wkf_wonfig(use_relational_model, wkf_field_id, wkf_model_id, model_id)"
+ attrs="{'required': [('state', '=', 'trigger'), ('use_relational_model', '=', 'relational')],
+ 'invisible': [('use_relational_model', '=', 'base')]}"
+ domain="[('model_id', '=', model_id), ('ttype', 'in', ['many2one'])]"/>
+ <field name="wkf_model_id" readonly="1"
+ on_change="on_change_wkf_model_id(wkf_model_id)"/> <!-- set me invisible -->
+ <field name="wkf_model_name" readonly="1"/> <!-- set me invisible -->
+ <field name="wkf_transition_id" attrs="{'required': [('state', '=', 'trigger')]}"
+ domain="[('wkf_id.osv', '=', wkf_model_name)]"/>
+ </group>
+ </page>
+
+ <page string="Client" autofocus="autofocus"
+ attrs="{'invisible': [('state', '!=', 'client_action')]}">
+ <group>
+ <field name="action_id" attrs="{'required':[('state', '=', 'client_action')]}"/>
+ </group>
+ </page>
+
+
+ <page string="Create / Write / Copy" autofocus="autofocus"
+ attrs="{'invisible':[('state', 'not in', ['object_create', 'object_write'])]}">
+ <p attrs="{'invisible': [('model_id', '!=', False)]}">
+ Please set the Base Model before setting the action details.
+ </p>
+ <group attrs="{'invisible': [('model_id', '=', False)]}">
+ <field name="use_create" widget="radio"
+ on_change="on_change_crud_config(state, use_create, use_write, ref_object, crud_model_id, model_id)"
+ attrs="{'invisible': [('state', '!=', 'object_create')]}"/>
+
+ <field name="use_write" widget="radio"
+ on_change="on_change_crud_config(state, use_create, use_write, ref_object, crud_model_id, model_id)"
+ attrs="{'invisible': [('state', '!=', 'object_write')]}"/>
+
+ <field name="crud_model_id"
+ attrs="{'readonly': ['|', ('state', '!=', 'object_create'), ('use_create', '!=', 'new_other')]}"/> <!-- set me part invisible -->
+
+ <field name="ref_object"
+ on_change="on_change_crud_config(state, use_create, use_write, ref_object, crud_model_id, model_id)"
+ attrs="{'invisible': [('use_write', '!=', 'other'), ('use_create', '!=', 'copy_other')]}"/>
+
+ <field name="link_new_record" attrs="{'invisible': [('state', '!=', 'object_create')]}"/>
+ <field name="link_field_id"
+ domain="[('model_id', '=', crud_model_id), ('ttype', 'in', ['many2one'])]"
+ attrs="{'readonly': [('state', '!=', 'object_create')],
+ 'invisible': ['|', ('state', '!=', 'object_create'), ('link_new_record', '=', False)]}"/>
+
+ <field name="write_expression"
+ on_change="on_change_write_expression(write_expression, model_id)"
+ attrs="{'invisible': ['|', ('state', '!=', 'object_write'), ('use_write', '!=', 'expression')],
+ 'required': [('state', '=', 'object_write'), ('use_write', '=', 'expression')]}"/>
+
+ <field name="fields_lines">
+ <tree string="Field Mappings" editable="top">
+ <field name="col1" domain="[('model_id', '=', parent.crud_model_id)]"/>
<field name="type"/>
- <field name="value" colspan="4"/>
- </group>
- </form>
- </field>
- <field name="record_id" attrs="{'readonly':[('state','!=','object_create')]}" domain="[('model_id','in',[model_id])]"/>
- <field name="write_id" attrs="{'readonly':[('state','!=','object_write')]}"/>
- </group>
- <label string="If you use a formula type, use a python expression using the variable 'object'." align="0.0"/>
- </page>
- <page string="Iteration Actions" attrs="{'invisible':[('state','!=','loop')]}">
- <group col="4">
- <field name="expression" attrs="{'required':[('state','=','loop')]}"/>
- <field name="loop_action" domain="[('state','!=','loop')]" attrs="{'required':[('state','=','loop')]}"/>
- </group>
- </page>
- <page string="Multi Actions" attrs="{'invisible':[('state','!=','other')]}">
- <field name="child_ids"/>
- <label string="Only one client action will be executed, last client action will be considered in case of multiple client actions." align="0.0"/>
- </page>
- </notebook>
- <field name="type" readonly="1"/>
+ <field name="value"/>
+ </tree>
+ <form string="Field Mapping" version="7.0">
+ <group >
+ <field name="col1" domain="[('model_id', '=', parent.crud_model_id)]"/>
+ <field name="type"/>
+ <field name="value"/>
+ </group>
+ </form>
+ </field>
+ </group>
+ </page>
+
+ <page string="Execute several actions" autofocus="autofocus"
+ attrs="{'invisible':[('state','!=','other')]}">
+ <p>If you use client actions in your multiple actions, only the last client action will be executed. Other client actions will be discarded.</p>
+ <field name="child_ids"/>
+ </page>
+
+ <page string="Help" class="oe_edit_only">
+ <group>
+ <div style="margin-top: 4px;">
+ <h3>Help with Python expressions.</h3>
+ <p>Various fields may use Python code or Python expressions. The following variables can be used:</p>
+ <ul>
+ <li>self: ORM model of the record on which the action is triggered</li>
+ <li>object or obj: browse_record of the record on which the action is triggered</li>
+ <li>pool: ORM model pool (i.e. self.pool)</li>
+ <li>time: Python time module</li>
+ <li>cr: database cursor</li>
+ <li>uid: current user id</li>
+ <li>context: current context</li>
+ </ul>
+ <div>
+ <p>Hints for using python in the condition</p>
+ <ul>
+ <li>condition: True</li>
+ <li>condition: object.list_price > 5000</li>
+ </ul>
+ </div>
+ <div attrs="{'invisible': [('state', '!=', 'code')]}">
+ <p>Hints for using python for a code action</p>
+ <ul>
+ <li>if you plan to return an action, assign action = {...}</li>
+ </ul>
+ </div>
+ <div attrs="{'invisible': [('state', '!=', 'email')]}">
+ <p>Hints for using python in email fields</p>
+ <ul>
+ <li>Email address example: object.invoice_address_id.email</li>
+ <li>Subject example: Hello [[object.partner_id.name ]]</li>
+ <li>BOdy example: Dear [[ object.partner_id.name ]]</li>
+ </ul>
+ </div>
+ </div>
+ <group>
+ <h3 colspan="2">Dynamic expression builder</h3>
+ <p colspan="2" attrs="{'invisible': [('model_id', '!=', False)]}">
+ Please set the Base Model of the action to enable the dynamic expression buidler.
+ </p>
+ <field name="model_object_field"
+ attrs="{'invisible': [('model_id', '=', False)]}"
+ domain="[('model_id', '=', model_id), ('ttype', '!=', 'one2many'), ('ttype', '!=', 'many2many')]"
+ on_change="onchange_sub_model_object_value_field(model_object_field)"/>
+ <field name="sub_object" readonly="1" attrs="{'invisible': [('model_id', '=', False)]}"/>
+ <field name="sub_model_object_field"
+ domain="[('model_id', '=', sub_object), ('ttype', '!=', 'one2many'), ('ttype', '!=', 'many2many')]"
+ attrs="{'readonly':[('sub_object','=',False)],
+ 'required':[('sub_object','!=',False)],
+ 'invisible': [('model_id', '=', False)]}"
+ on_change="onchange_sub_model_object_value_field(model_object_field,sub_model_object_field)"/>
+ <field name="copyvalue" attrs="{'invisible': [('model_id', '=', False)]}"/>
+ </group>
+ </group>
+ </page>
+ </notebook>
+ </sheet>
</form>
</field>
</record>
--- /dev/null
+import unittest2
+
+import openerp.tests.common as common
+
+
+class TestServerActionsBase(common.TransactionCase):
+
+ def setUp(self):
+ super(TestServerActionsBase, self).setUp()
+ cr, uid = self.cr, self.uid
+
+ # Models
+ self.ir_actions_server = self.registry('ir.actions.server')
+ self.ir_actions_client = self.registry('ir.actions.client')
+ self.ir_values = self.registry('ir.values')
+ self.ir_model = self.registry('ir.model')
+ self.ir_model_fields = self.registry('ir.model.fields')
+ self.res_partner = self.registry('res.partner')
+ self.res_country = self.registry('res.country')
+
+ # Data on which we will run the server action
+ self.test_country_id = self.res_country.create(cr, uid, {
+ 'name': 'TestingCountry',
+ 'code': 'TY',
+ 'address_format': 'SuperFormat',
+ })
+ self.test_country = self.res_country.browse(cr, uid, self.test_country_id)
+ self.test_partner_id = self.res_partner.create(cr, uid, {
+ 'name': 'TestingPartner',
+ 'city': 'OrigCity',
+ 'country_id': self.test_country_id,
+ })
+ self.test_partner = self.res_partner.browse(cr, uid, self.test_partner_id)
+ self.context = {
+ 'active_id': self.test_partner_id,
+ 'active_model': 'res.partner',
+ }
+
+ # Model data
+ self.res_partner_model_id = self.ir_model.search(cr, uid, [('model', '=', 'res.partner')])[0]
+ self.res_partner_name_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'name')])[0]
+ self.res_partner_city_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'city')])[0]
+ self.res_partner_country_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'country_id')])[0]
+ self.res_partner_parent_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.partner'), ('name', '=', 'parent_id')])[0]
+ self.res_country_model_id = self.ir_model.search(cr, uid, [('model', '=', 'res.country')])[0]
+ self.res_country_name_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.country'), ('name', '=', 'name')])[0]
+ self.res_country_code_field_id = self.ir_model_fields.search(cr, uid, [('model', '=', 'res.country'), ('name', '=', 'code')])[0]
+
+ # create server action to
+ self.act_id = self.ir_actions_server.create(cr, uid, {
+ 'name': 'TestAction',
+ 'condition': 'True',
+ 'model_id': self.res_partner_model_id,
+ 'state': 'code',
+ 'code': 'obj.write({"comment": "MyComment"})',
+ })
+
+
+class TestServerActions(TestServerActionsBase):
+
+ def test_00_action(self):
+ cr, uid = self.cr, self.uid
+
+ # Do: eval 'True' condition
+ self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
+ self.test_partner.refresh()
+ self.assertEqual(self.test_partner.comment, 'MyComment', 'ir_actions_server: invalid condition check')
+ self.test_partner.write({'comment': False})
+
+ # Do: eval False condition, that should be considered as True (void = True)
+ self.ir_actions_server.write(cr, uid, [self.act_id], {'condition': False})
+ self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
+ self.test_partner.refresh()
+ self.assertEqual(self.test_partner.comment, 'MyComment', 'ir_actions_server: invalid condition check')
+
+ # Do: create contextual action
+ self.ir_actions_server.create_action(cr, uid, [self.act_id])
+
+ # Test: ir_values created
+ ir_values_ids = self.ir_values.search(cr, uid, [('name', '=', 'Run TestAction')])
+ self.assertEqual(len(ir_values_ids), 1, 'ir_actions_server: create_action should have created an entry in ir_values')
+ ir_value = self.ir_values.browse(cr, uid, ir_values_ids[0])
+ self.assertEqual(ir_value.value, 'ir.actions.server,%s' % self.act_id, 'ir_actions_server: created ir_values should reference the server action')
+ self.assertEqual(ir_value.model, 'res.partner', 'ir_actions_server: created ir_values should be linked to the action base model')
+
+ # Do: remove contextual action
+ self.ir_actions_server.unlink_action(cr, uid, [self.act_id])
+
+ # Test: ir_values removed
+ ir_values_ids = self.ir_values.search(cr, uid, [('name', '=', 'Run TestAction')])
+ self.assertEqual(len(ir_values_ids), 0, 'ir_actions_server: unlink_action should remove the ir_values record')
+
+ def test_10_code(self):
+ cr, uid = self.cr, self.uid
+ self.ir_actions_server.write(cr, uid, self.act_id, {
+ 'state': 'code',
+ 'code': """partner_name = obj.name + '_code'
+self.pool["res.partner"].create(cr, uid, {"name": partner_name}, context=context)"""
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
+ self.assertFalse(run_res, 'ir_actions_server: code server action correctly finished should return False')
+
+ pids = self.res_partner.search(cr, uid, [('name', 'ilike', 'TestingPartner_code')])
+ self.assertEqual(len(pids), 1, 'ir_actions_server: 1 new partner should have been created')
+
+ def test_20_trigger(self):
+ cr, uid = self.cr, self.uid
+
+ # Data: code server action (at this point code-based actions should work)
+ act_id2 = self.ir_actions_server.create(cr, uid, {
+ 'name': 'TestAction2',
+ 'type': 'ir.actions.server',
+ 'condition': 'True',
+ 'model_id': self.res_partner_model_id,
+ 'state': 'code',
+ 'code': 'obj.write({"comment": "MyComment"})',
+ })
+ act_id3 = self.ir_actions_server.create(cr, uid, {
+ 'name': 'TestAction3',
+ 'type': 'ir.actions.server',
+ 'condition': 'True',
+ 'model_id': self.res_country_model_id,
+ 'state': 'code',
+ 'code': 'obj.write({"code": "ZZ"})',
+ })
+
+ # Data: create workflows
+ partner_wf_id = self.registry('workflow').create(cr, uid, {
+ 'name': 'TestWorkflow',
+ 'osv': 'res.partner',
+ 'on_create': True,
+ })
+ partner_act1_id = self.registry('workflow.activity').create(cr, uid, {
+ 'name': 'PartnerStart',
+ 'wkf_id': partner_wf_id,
+ 'flow_start': True
+ })
+ partner_act2_id = self.registry('workflow.activity').create(cr, uid, {
+ 'name': 'PartnerTwo',
+ 'wkf_id': partner_wf_id,
+ 'kind': 'function',
+ 'action': 'True',
+ 'action_id': act_id2,
+ })
+ partner_trs1_id = self.registry('workflow.transition').create(cr, uid, {
+ 'signal': 'partner_trans',
+ 'act_from': partner_act1_id,
+ 'act_to': partner_act2_id
+ })
+ country_wf_id = self.registry('workflow').create(cr, uid, {
+ 'name': 'TestWorkflow',
+ 'osv': 'res.country',
+ 'on_create': True,
+ })
+ country_act1_id = self.registry('workflow.activity').create(cr, uid, {
+ 'name': 'CountryStart',
+ 'wkf_id': country_wf_id,
+ 'flow_start': True
+ })
+ country_act2_id = self.registry('workflow.activity').create(cr, uid, {
+ 'name': 'CountryTwo',
+ 'wkf_id': country_wf_id,
+ 'kind': 'function',
+ 'action': 'True',
+ 'action_id': act_id3,
+ })
+ country_trs1_id = self.registry('workflow.transition').create(cr, uid, {
+ 'signal': 'country_trans',
+ 'act_from': country_act1_id,
+ 'act_to': country_act2_id
+ })
+
+ # Data: re-create country and partner to benefit from the workflows
+ self.test_country_id = self.res_country.create(cr, uid, {
+ 'name': 'TestingCountry2',
+ 'code': 'T2',
+ })
+ self.test_country = self.res_country.browse(cr, uid, self.test_country_id)
+ self.test_partner_id = self.res_partner.create(cr, uid, {
+ 'name': 'TestingPartner2',
+ 'country_id': self.test_country_id,
+ })
+ self.test_partner = self.res_partner.browse(cr, uid, self.test_partner_id)
+ self.context = {
+ 'active_id': self.test_partner_id,
+ 'active_model': 'res.partner',
+ }
+
+ # Run the action on partner object itself ('base')
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'trigger',
+ 'use_relational_model': 'base',
+ 'wkf_model_id': self.res_partner_model_id,
+ 'wkf_model_name': 'res.partner',
+ 'wkf_transition_id': partner_trs1_id,
+ })
+ self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
+ self.test_partner.refresh()
+ self.assertEqual(self.test_partner.comment, 'MyComment', 'ir_actions_server: incorrect signal trigger')
+
+ # Run the action on related country object ('relational')
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'use_relational_model': 'relational',
+ 'wkf_model_id': self.res_country_model_id,
+ 'wkf_model_name': 'res.country',
+ 'wkf_field_id': self.res_partner_country_field_id,
+ 'wkf_transition_id': country_trs1_id,
+ })
+ self.ir_actions_server.run(cr, uid, [self.act_id], self.context)
+ self.test_country.refresh()
+ self.assertEqual(self.test_country.code, 'ZZ', 'ir_actions_server: incorrect signal trigger')
+
+ # Clear workflow cache, otherwise openerp will try to create workflows even if it has been deleted
+ from openerp.workflow import clear_cache
+ clear_cache(cr, uid)
+
+ def test_30_client(self):
+ cr, uid = self.cr, self.uid
+ client_action_id = self.registry('ir.actions.client').create(cr, uid, {
+ 'name': 'TestAction2',
+ 'tag': 'Test',
+ })
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'client_action',
+ 'action_id': client_action_id,
+ })
+ res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertEqual(res['name'], 'TestAction2', 'ir_actions_server: incorrect return result for a client action')
+
+ def test_40_crud_create(self):
+ cr, uid = self.cr, self.uid
+ _city = 'TestCity'
+ _name = 'TestNew'
+
+ # Do: create a new record in the same model and link it
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'object_create',
+ 'use_create': 'new',
+ 'link_new_record': True,
+ 'link_field_id': self.res_partner_parent_field_id,
+ 'fields_lines': [(0, 0, {'col1': self.res_partner_name_field_id, 'value': _name}),
+ (0, 0, {'col1': self.res_partner_city_field_id, 'value': _city})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new partner created
+ pids = self.res_partner.search(cr, uid, [('name', 'ilike', _name)])
+ self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
+ partner = self.res_partner.browse(cr, uid, pids[0])
+ self.assertEqual(partner.city, _city, 'ir_actions_server: TODO')
+ # Test: new partner linked
+ self.test_partner.refresh()
+ self.assertEqual(self.test_partner.parent_id.id, pids[0], 'ir_actions_server: TODO')
+
+ # Do: copy current record
+ self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'object_create',
+ 'use_create': 'copy_current',
+ 'link_new_record': False,
+ 'fields_lines': [(0, 0, {'col1': self.res_partner_name_field_id, 'value': 'TestCopyCurrent'}),
+ (0, 0, {'col1': self.res_partner_city_field_id, 'value': 'TestCity'})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new partner created
+ pids = self.res_partner.search(cr, uid, [('name', 'ilike', 'TestingPartner (copy)')]) # currently res_partner overrides default['name'] whatever its value
+ self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
+ partner = self.res_partner.browse(cr, uid, pids[0])
+ self.assertEqual(partner.city, 'TestCity', 'ir_actions_server: TODO')
+ self.assertEqual(partner.country_id.id, self.test_partner.country_id.id, 'ir_actions_server: TODO')
+
+ # Do: create a new record in another model
+ self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'object_create',
+ 'use_create': 'new_other',
+ 'crud_model_id': self.res_country_model_id,
+ 'link_new_record': False,
+ 'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'obj.name', 'type': 'equation'}),
+ (0, 0, {'col1': self.res_country_code_field_id, 'value': 'obj.name[0:2]', 'type': 'equation'})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new country created
+ cids = self.res_country.search(cr, uid, [('name', 'ilike', 'TestingPartner')])
+ self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
+ country = self.res_country.browse(cr, uid, cids[0])
+ self.assertEqual(country.code, 'TE', 'ir_actions_server: TODO')
+
+ # Do: copy a record in another model
+ self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'object_create',
+ 'use_create': 'copy_other',
+ 'crud_model_id': self.res_country_model_id,
+ 'link_new_record': False,
+ 'ref_object': 'res.country,%s' % self.test_country_id,
+ 'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'NewCountry', 'type': 'value'}),
+ (0, 0, {'col1': self.res_country_code_field_id, 'value': 'NY', 'type': 'value'})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new country created
+ cids = self.res_country.search(cr, uid, [('name', 'ilike', 'NewCountry')])
+ self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
+ country = self.res_country.browse(cr, uid, cids[0])
+ self.assertEqual(country.code, 'NY', 'ir_actions_server: TODO')
+ self.assertEqual(country.address_format, 'SuperFormat', 'ir_actions_server: TODO')
+
+ def test_50_crud_write(self):
+ cr, uid = self.cr, self.uid
+ _name = 'TestNew'
+
+ # Do: create a new record in the same model and link it
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'object_write',
+ 'use_write': 'current',
+ 'fields_lines': [(0, 0, {'col1': self.res_partner_name_field_id, 'value': _name})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new partner created
+ pids = self.res_partner.search(cr, uid, [('name', 'ilike', _name)])
+ self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
+ partner = self.res_partner.browse(cr, uid, pids[0])
+ self.assertEqual(partner.city, 'OrigCity', 'ir_actions_server: TODO')
+
+ # Do: copy current record
+ self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'use_write': 'other',
+ 'crud_model_id': self.res_country_model_id,
+ 'ref_object': 'res.country,%s' % self.test_country_id,
+ 'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'obj.name', 'type': 'equation'})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new country created
+ cids = self.res_country.search(cr, uid, [('name', 'ilike', 'TestNew')])
+ self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
+
+ # Do: copy a record in another model
+ self.ir_actions_server.write(cr, uid, [self.act_id], {'fields_lines': [[5]]})
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'use_write': 'expression',
+ 'crud_model_id': self.res_country_model_id,
+ 'write_expression': 'object.country_id',
+ 'fields_lines': [(0, 0, {'col1': self.res_country_name_field_id, 'value': 'NewCountry', 'type': 'value'})],
+ })
+ run_res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+ self.assertFalse(run_res, 'ir_actions_server: create record action correctly finished should return False')
+ # Test: new country created
+ cids = self.res_country.search(cr, uid, [('name', 'ilike', 'NewCountry')])
+ self.assertEqual(len(cids), 1, 'ir_actions_server: TODO')
+
+ def test_60_multi(self):
+ cr, uid = self.cr, self.uid
+
+ # Data: 2 server actions that will be nested
+ act1_id = self.ir_actions_server.create(cr, uid, {
+ 'name': 'Subaction1',
+ 'model_id': self.res_partner_model_id,
+ 'state': 'code',
+ 'code': 'action = {"type": "ir.actions.act_window"}',
+ })
+ # Do: create a new record in the same model and link it
+ act2_id = self.ir_actions_server.create(cr, uid, {
+ 'name': 'Subaction2',
+ 'model_id': self.res_partner_model_id,
+ 'state': 'object_create',
+ 'use_create': 'copy_current',
+ })
+ self.ir_actions_server.write(cr, uid, [self.act_id], {
+ 'state': 'multi',
+ 'child_ids': [(6, 0, [act1_id, act2_id])],
+ })
+
+ # Do: run the action
+ res = self.ir_actions_server.run(cr, uid, [self.act_id], context=self.context)
+
+ # Test: new partner created
+ pids = self.res_partner.search(cr, uid, [('name', 'ilike', 'TestingPartner (copy)')]) # currently res_partner overrides default['name'] whatever its value
+ self.assertEqual(len(pids), 1, 'ir_actions_server: TODO')
+ # Test: action returned
+ self.assertEqual(res.get('type'), 'ir.actions.act_window', '')
+
+
+if __name__ == '__main__':
+ unittest2.main()