"""
import calendar
+import collections
import copy
import datetime
import itertools
import simplejson
import time
import types
+
+import psycopg2
from lxml import etree
+import warnings
import fields
import openerp
import openerp.netsvc as netsvc
import openerp.tools as tools
from openerp.tools.config import config
+from openerp.tools.misc import CountingStream
from openerp.tools.safe_eval import safe_eval as eval
from openerp.tools.translate import _
from openerp import SUPERUSER_ID
from query import Query
-from openerp import SUPERUSER_ID
_logger = logging.getLogger(__name__)
_schema = logging.getLogger(__name__ + '.schema')
for c in cls.__dict__.get(s, []):
exist = False
for c2 in range(len(new)):
- #For _constraints, we should check field and methods as well
- if new[c2][2]==c[2] and (new[c2][0] == c[0] \
+ #For _constraints, we should check field and methods as well
+ if new[c2][2]==c[2] and (new[c2][0] == c[0] \
or getattr(new[c2][0],'__name__', True) == \
getattr(c[0],'__name__', False)):
# If new class defines a constraint with
return {'datas': datas}
def import_data(self, cr, uid, fields, datas, mode='init', current_module='', noupdate=False, context=None, filename=None):
- """Import given data in given module
+ """
+ .. deprecated:: 7.0
+ Use :meth:`~load` instead
+
+ Import given data in given module
This method is used when importing data via client menu.
* The last item is currently unused, with no specific semantics
:param fields: list of fields to import
- :param data: data to import
+ :param datas: data to import
:param mode: 'init' or 'update' for record creation
:param current_module: module name
:param noupdate: flag for record creation
:returns: 4-tuple in the form (return_code, errored_resource, error_message, unused)
:rtype: (int, dict or 0, str or 0, str or 0)
"""
- if not context:
- context = {}
+ context = dict(context) if context is not None else {}
+ context['_import_current_module'] = current_module
+
fields = map(fix_import_export_id_paths, fields)
ir_model_data_obj = self.pool.get('ir.model.data')
- # mode: id (XML id) or .id (database id) or False for name_get
- def _get_id(model_name, id, current_module=False, mode='id'):
- if mode=='.id':
- id = int(id)
- obj_model = self.pool.get(model_name)
- ids = obj_model.search(cr, uid, [('id', '=', int(id))])
- if not len(ids):
- raise Exception(_("Database ID doesn't exist: %s : %s") %(model_name, id))
- elif mode=='id':
- if '.' in id:
- module, xml_id = id.rsplit('.', 1)
- else:
- module, xml_id = current_module, id
- record_id = ir_model_data_obj._get_id(cr, uid, module, xml_id)
- ir_model_data = ir_model_data_obj.read(cr, uid, [record_id], ['res_id'])
- if not ir_model_data:
- raise ValueError('No references to %s.%s' % (module, xml_id))
- id = ir_model_data[0]['res_id']
- else:
- obj_model = self.pool.get(model_name)
- ids = obj_model.name_search(cr, uid, id, operator='=', context=context)
- if not ids:
- raise ValueError('No record found for %s' % (id,))
- id = ids[0][0]
- return id
-
- # IN:
- # datas: a list of records, each record is defined by a list of values
- # prefix: a list of prefix fields ['line_ids']
- # position: the line to process, skip is False if it's the first line of the current record
- # OUT:
- # (res, position, warning, res_id) with
- # res: the record for the next line to process (including it's one2many)
- # position: the new position for the next line
- # res_id: the ID of the record if it's a modification
- def process_liness(self, datas, prefix, current_module, model_name, fields_def, position=0, skip=0):
- line = datas[position]
- row = {}
- warning = []
- data_res_id = False
- xml_id = False
- nbrmax = position+1
-
- done = {}
- for i, field in enumerate(fields):
- res = False
- if i >= len(line):
- raise Exception(_('Please check that all your lines have %d columns.'
- 'Stopped around line %d having %d columns.') % \
- (len(fields), position+2, len(line)))
- if not line[i]:
- continue
-
- if field[:len(prefix)] != prefix:
- if line[i] and skip:
- return False
- continue
- field_name = field[len(prefix)]
-
- #set the mode for m2o, o2m, m2m : xml_id/id/name
- if len(field) == len(prefix)+1:
- mode = False
- else:
- mode = field[len(prefix)+1]
-
- # TODO: improve this by using csv.csv_reader
- def many_ids(line, relation, current_module, mode):
- res = []
- for db_id in line.split(config.get('csv_internal_sep')):
- res.append(_get_id(relation, db_id, current_module, mode))
- return [(6,0,res)]
-
- # ID of the record using a XML ID
- if field_name == 'id':
- try:
- data_res_id = _get_id(model_name, line[i], current_module)
- except ValueError:
- pass
- xml_id = line[i]
- continue
-
- # ID of the record using a database ID
- elif field_name == '.id':
- data_res_id = _get_id(model_name, line[i], current_module, '.id')
- continue
-
- field_type = fields_def[field_name]['type']
- # recursive call for getting children and returning [(0,0,{})] or [(1,ID,{})]
- if field_type == 'one2many':
- if field_name in done:
- continue
- done[field_name] = True
- relation = fields_def[field_name]['relation']
- relation_obj = self.pool.get(relation)
- newfd = relation_obj.fields_get( cr, uid, context=context )
- pos = position
-
- res = []
-
- first = 0
- while pos < len(datas):
- res2 = process_liness(self, datas, prefix + [field_name], current_module, relation_obj._name, newfd, pos, first)
- if not res2:
- break
- (newrow, pos, w2, data_res_id2, xml_id2) = res2
- nbrmax = max(nbrmax, pos)
- warning += w2
- first += 1
-
- if (not newrow) or not reduce(lambda x, y: x or y, newrow.values(), 0):
- break
-
- res.append( (data_res_id2 and 1 or 0, data_res_id2 or 0, newrow) )
-
- elif field_type == 'many2one':
- relation = fields_def[field_name]['relation']
- res = _get_id(relation, line[i], current_module, mode)
-
- elif field_type == 'many2many':
- relation = fields_def[field_name]['relation']
- res = many_ids(line[i], relation, current_module, mode)
-
- elif field_type == 'integer':
- res = line[i] and int(line[i]) or 0
- elif field_type == 'boolean':
- res = line[i].lower() not in ('0', 'false', 'off')
- elif field_type == 'float':
- res = line[i] and float(line[i]) or 0.0
- elif field_type == 'selection':
- for key, val in fields_def[field_name]['selection']:
- if tools.ustr(line[i]) in [tools.ustr(key), tools.ustr(val)]:
- res = key
- break
- if line[i] and not res:
- _logger.warning(
- _("key '%s' not found in selection field '%s'"),
- tools.ustr(line[i]), tools.ustr(field_name))
- warning.append(_("Key/value '%s' not found in selection field '%s'") % (
- tools.ustr(line[i]), tools.ustr(field_name)))
-
- else:
- res = line[i]
-
- row[field_name] = res or False
-
- return row, nbrmax, warning, data_res_id, xml_id
+ def log(m):
+ if m['type'] == 'error':
+ raise Exception(m['message'])
- fields_def = self.fields_get(cr, uid, context=context)
-
- position = 0
if config.get('import_partial') and filename:
with open(config.get('import_partial'), 'rb') as partial_import_file:
data = pickle.load(partial_import_file)
position = data.get(filename, 0)
- while position<len(datas):
- (res, position, warning, res_id, xml_id) = \
- process_liness(self, datas, [], current_module, self._name, fields_def, position=position)
- if len(warning):
- cr.rollback()
- return -1, res, 'Line ' + str(position) +' : ' + '!\n'.join(warning), ''
-
- try:
+ position = 0
+ try:
+ for res_id, xml_id, res, info in self._convert_records(cr, uid,
+ self._extract_records(cr, uid, fields, datas,
+ context=context, log=log),
+ context=context, log=log):
ir_model_data_obj._update(cr, uid, self._name,
current_module, res, mode=mode, xml_id=xml_id,
noupdate=noupdate, res_id=res_id, context=context)
- except Exception, e:
- return -1, res, 'Line ' + str(position) + ' : ' + tools.ustr(e), ''
-
- if config.get('import_partial') and filename and (not (position%100)):
- with open(config.get('import_partial'), 'rb') as partial_import:
- data = pickle.load(partial_import)
- data[filename] = position
- with open(config.get('import_partial'), 'wb') as partial_import:
- pickle.dump(data, partial_import)
- if context.get('defer_parent_store_computation'):
- self._parent_store_compute(cr)
- cr.commit()
+ position = info.get('rows', {}).get('to', 0) + 1
+ if config.get('import_partial') and filename and (not (position%100)):
+ with open(config.get('import_partial'), 'rb') as partial_import:
+ data = pickle.load(partial_import)
+ data[filename] = position
+ with open(config.get('import_partial'), 'wb') as partial_import:
+ pickle.dump(data, partial_import)
+ if context.get('defer_parent_store_computation'):
+ self._parent_store_compute(cr)
+ cr.commit()
+ except Exception, e:
+ cr.rollback()
+ return -1, {}, 'Line %d : %s' % (position + 1, tools.ustr(e)), ''
if context.get('defer_parent_store_computation'):
self._parent_store_compute(cr)
return position, 0, 0, 0
+ def load(self, cr, uid, fields, data, context=None):
+ """
+ Attempts to load the data matrix, and returns a list of ids (or
+ ``False`` if there was an error and no id could be generated) and a
+ list of messages.
+
+ The ids are those of the records created and saved (in database), in
+ the same order they were extracted from the file. They can be passed
+ directly to :meth:`~read`
+
+ :param cr: cursor for the request
+ :param int uid: ID of the user attempting the data import
+ :param fields: list of fields to import, at the same index as the corresponding data
+ :type fields: list(str)
+ :param data: row-major matrix of data to import
+ :type data: list(list(str))
+ :param dict context:
+ :returns: {ids: list(int)|False, messages: [Message]}
+ """
+ cr.execute('SAVEPOINT model_load')
+ messages = []
+
+ fields = map(fix_import_export_id_paths, fields)
+ ModelData = self.pool['ir.model.data']
+ fg = self.fields_get(cr, uid, context=context)
+
+ mode = 'init'
+ current_module = ''
+ noupdate = False
+
+ ids = []
+ for id, xid, record, info in self._convert_records(cr, uid,
+ self._extract_records(cr, uid, fields, data,
+ context=context, log=messages.append),
+ context=context, log=messages.append):
+ try:
+ cr.execute('SAVEPOINT model_load_save')
+ except psycopg2.InternalError, e:
+ # broken transaction, exit and hope the source error was
+ # already logged
+ if not any(message['type'] == 'error' for message in messages):
+ messages.append(dict(info, type='error',message=
+ u"Unknown database error: '%s'" % e))
+ break
+ try:
+ ids.append(ModelData._update(cr, uid, self._name,
+ current_module, record, mode=mode, xml_id=xid,
+ noupdate=noupdate, res_id=id, context=context))
+ cr.execute('RELEASE SAVEPOINT model_load_save')
+ except psycopg2.Warning, e:
+ cr.execute('ROLLBACK TO SAVEPOINT model_load_save')
+ messages.append(dict(info, type='warning', message=str(e)))
+ except psycopg2.Error, e:
+ # Failed to write, log to messages, rollback savepoint (to
+ # avoid broken transaction) and keep going
+ cr.execute('ROLLBACK TO SAVEPOINT model_load_save')
+ messages.append(dict(
+ info, type='error',
+ **PGERROR_TO_OE[e.pgcode](self, fg, info, e)))
+ if any(message['type'] == 'error' for message in messages):
+ cr.execute('ROLLBACK TO SAVEPOINT model_load')
+ ids = False
+ return {'ids': ids, 'messages': messages}
+ def _extract_records(self, cr, uid, fields_, data,
+ context=None, log=lambda a: None):
+ """ Generates record dicts from the data sequence.
+
+ The result is a generator of dicts mapping field names to raw
+ (unconverted, unvalidated) values.
+
+ For relational fields, if sub-fields were provided the value will be
+ a list of sub-records
+
+ The following sub-fields may be set on the record (by key):
+ * None is the name_get for the record (to use with name_create/name_search)
+ * "id" is the External ID for the record
+ * ".id" is the Database ID for the record
+
+ :param ImportLogger logger:
+ """
+ columns = dict((k, v.column) for k, v in self._all_columns.iteritems())
+ # Fake columns to avoid special cases in extractor
+ columns[None] = fields.char('rec_name')
+ columns['id'] = fields.char('External ID')
+ columns['.id'] = fields.integer('Database ID')
+
+ # m2o fields can't be on multiple lines so exclude them from the
+ # is_relational field rows filter, but special-case it later on to
+ # be handled with relational fields (as it can have subfields)
+ is_relational = lambda field: columns[field]._type in ('one2many', 'many2many', 'many2one')
+ get_o2m_values = itemgetter_tuple(
+ [index for index, field in enumerate(fields_)
+ if columns[field[0]]._type == 'one2many'])
+ get_nono2m_values = itemgetter_tuple(
+ [index for index, field in enumerate(fields_)
+ if columns[field[0]]._type != 'one2many'])
+ # Checks if the provided row has any non-empty non-relational field
+ def only_o2m_values(row, f=get_nono2m_values, g=get_o2m_values):
+ return any(g(row)) and not any(f(row))
+
+ index = 0
+ while True:
+ if index >= len(data): return
+
+ row = data[index]
+ # copy non-relational fields to record dict
+ record = dict((field[0], value)
+ for field, value in itertools.izip(fields_, row)
+ if not is_relational(field[0]))
+
+ # Get all following rows which have relational values attached to
+ # the current record (no non-relational values)
+ record_span = itertools.takewhile(
+ only_o2m_values, itertools.islice(data, index + 1, None))
+ # stitch record row back on for relational fields
+ record_span = list(itertools.chain([row], record_span))
+ for relfield in set(
+ field[0] for field in fields_
+ if is_relational(field[0])):
+ column = columns[relfield]
+ # FIXME: how to not use _obj without relying on fields_get?
+ Model = self.pool[column._obj]
+
+ # get only cells for this sub-field, should be strictly
+ # non-empty, field path [None] is for name_get column
+ indices, subfields = zip(*((index, field[1:] or [None])
+ for index, field in enumerate(fields_)
+ if field[0] == relfield))
+
+ # return all rows which have at least one value for the
+ # subfields of relfield
+ relfield_data = filter(any, map(itemgetter_tuple(indices), record_span))
+ record[relfield] = [subrecord
+ for subrecord, _subinfo in Model._extract_records(
+ cr, uid, subfields, relfield_data,
+ context=context, log=log)]
+
+ yield record, {'rows': {
+ 'from': index,
+ 'to': index + len(record_span) - 1
+ }}
+ index += len(record_span)
+ def _convert_records(self, cr, uid, records,
+ context=None, log=lambda a: None):
+ """ Converts records from the source iterable (recursive dicts of
+ strings) into forms which can be written to the database (via
+ self.create or (ir.model.data)._update)
+
+ :param ImportLogger parent_logger:
+ :returns: a list of triplets of (id, xid, record)
+ :rtype: list((int|None, str|None, dict))
+ """
+ if context is None: context = {}
+ Converter = self.pool['ir.fields.converter']
+ columns = dict((k, v.column) for k, v in self._all_columns.iteritems())
+ Translation = self.pool['ir.translation']
+ field_names = dict(
+ (f, (Translation._get_source(cr, uid, self._name + ',' + f, 'field',
+ context.get('lang', False) or 'en_US')
+ or column.string or f))
+ for f, column in columns.iteritems())
+ converters = dict(
+ (k, Converter.to_field(cr, uid, self, column, context=context))
+ for k, column in columns.iteritems())
+
+ def _log(base, field, exception):
+ type = 'warning' if isinstance(exception, Warning) else 'error'
+ record = dict(base, field=field, type=type,
+ message=unicode(exception.args[0]) % base)
+ if len(exception.args) > 1 and exception.args[1]:
+ record.update(exception.args[1])
+ log(record)
+
+ stream = CountingStream(records)
+ for record, extras in stream:
+ dbid = False
+ xid = False
+ converted = {}
+ # name_get/name_create
+ if None in record: pass
+ # xid
+ if 'id' in record:
+ xid = record['id']
+ # dbid
+ if '.id' in record:
+ try:
+ dbid = int(record['.id'])
+ except ValueError:
+ # in case of overridden id column
+ dbid = record['.id']
+ if not self.search(cr, uid, [('id', '=', dbid)], context=context):
+ log(dict(extras,
+ type='error',
+ record=stream.index,
+ field='.id',
+ message=_(u"Unknown database identifier '%s'") % dbid))
+ dbid = False
+
+ for field, strvalue in record.iteritems():
+ if field in (None, 'id', '.id'): continue
+ if not strvalue:
+ converted[field] = False
+ continue
+
+ # In warnings and error messages, use translated string as
+ # field name
+ message_base = dict(
+ extras, record=stream.index, field=field_names[field])
+ try:
+ converted[field], ws = converters[field](strvalue)
+
+ for w in ws:
+ if isinstance(w, basestring):
+ # wrap warning string in an ImportWarning for
+ # uniform handling
+ w = ImportWarning(w)
+ _log(message_base, field, w)
+ except ValueError, e:
+ _log(message_base, field, e)
+
+ yield dbid, xid, converted, dict(extras, record=stream.index)
+
def get_invalid_fields(self, cr, uid):
return list(self._invalids)
def _validate(self, cr, uid, ids, context=None):
context = context or {}
- lng = context.get('lang', False) or 'en_US'
+ lng = context.get('lang')
trans = self.pool.get('ir.translation')
error_msgs = []
for constraint in self._constraints:
else:
translated_msg = tmp_msg
else:
- translated_msg = trans._get_source(cr, uid, self._name, 'constraint', lng, msg) or msg
+ translated_msg = trans._get_source(cr, uid, self._name, 'constraint', lng, msg)
error_msgs.append(
_("Error occurred while validating the field(s) %s: %s") % (','.join(fields), translated_msg)
)
# if val is a many2one, just write the ID
if type(val) == tuple:
val = val[0]
- if (val<>False) or (type(val)<>bool):
+ if val is not False:
cr.execute(update_query, (ss[1](val), key))
def _check_selection_field_value(self, cr, uid, field, value, context=None):
elif val in dict(self._columns[field].selection(self, cr, uid, context=context)):
return
raise except_orm(_('ValidateError'),
- _('The value "%s" for the field "%s.%s" is not in the selection') % (value, self._table, field))
+ _('The value "%s" for the field "%s.%s" is not in the selection') % (value, self._table, field))
def _check_removed_columns(self, cr, log=False):
# iterate on the database columns to drop the NOT NULL constraints
cr.commit()
if f._type == 'text':
# FIXME: for fields.text columns we should try creating GIN indexes instead (seems most suitable for an ERP context)
- msg = "Table '%s': Adding (b-tree) index for text column '%s'."\
+ msg = "Table '%s': Adding (b-tree) index for %s column '%s'."\
"This is probably useless (does not work for fulltext search) and prevents INSERTs of long texts"\
" because there is a length limit for indexable btree values!\n"\
"Use a search view instead if you simply want to make the field searchable."
- _schema.warning(msg, self._table, k, f._type)
+ _schema.warning(msg, self._table, f._type, k)
if res2 and not f.select:
cr.execute('DROP INDEX "%s_%s_index"' % (self._table, k))
cr.commit()
if context is None:
context = {}
- write_access = self.check_access_rights(cr, user, 'write') or self.check_access_rights(cr, user, 'create')
+ write_access = self.check_access_rights(cr, user, 'write', raise_exception=False) \
+ or self.check_access_rights(cr, user, 'create', raise_exception=False)
res = {}
res[f]['readonly'] = True
res[f]['states'] = {}
- if 'string' in res[f]:
- res_trans = translation_obj._get_source(cr, user, self._name + ',' + f, 'field', context.get('lang', False) or 'en_US')
- if res_trans:
- res[f]['string'] = res_trans
- if 'help' in res[f]:
- help_trans = translation_obj._get_source(cr, user, self._name + ',' + f, 'help', context.get('lang', False) or 'en_US')
- if help_trans:
- res[f]['help'] = help_trans
- if 'selection' in res[f]:
- if isinstance(field.selection, (tuple, list)):
- sel = field.selection
- sel2 = []
- for key, val in sel:
- val2 = None
- if val:
- val2 = translation_obj._get_source(cr, user, self._name + ',' + f, 'selection', context.get('lang', False) or 'en_US', val)
- sel2.append((key, val2 or val))
- res[f]['selection'] = sel2
+ if 'lang' in context:
+ if 'string' in res[f]:
+ res_trans = translation_obj._get_source(cr, user, self._name + ',' + f, 'field', context['lang'])
+ if res_trans:
+ res[f]['string'] = res_trans
+ if 'help' in res[f]:
+ help_trans = translation_obj._get_source(cr, user, self._name + ',' + f, 'help', context['lang'])
+ if help_trans:
+ res[f]['help'] = help_trans
+ if 'selection' in res[f]:
+ if isinstance(field.selection, (tuple, list)):
+ sel = field.selection
+ sel2 = []
+ for key, val in sel:
+ val2 = None
+ if val:
+ val2 = translation_obj._get_source(cr, user, self._name + ',' + f, 'selection', context['lang'], val)
+ sel2.append((key, val2 or val))
+ res[f]['selection'] = sel2
return res
for res_id in ids:
getattr(wf_service, trigger)(uid, self._name, res_id, cr)
+ def _workflow_signal(self, cr, uid, ids, signal, context=None):
+ """Send given workflow signal"""
+ wf_service = netsvc.LocalService("workflow")
+ for res_id in ids:
+ wf_service.trg_validate(uid, self._name, res_id, signal, cr)
+
def unlink(self, cr, uid, ids, context=None):
"""
Delete records with given ids
and vals[field]:
self._check_selection_field_value(cr, user, field, vals[field], context=context)
if self._log_access:
- upd0 += ',create_uid,create_date'
- upd1 += ",%s,(now() at time zone 'UTC')"
- upd2.append(user)
+ upd0 += ',create_uid,create_date,write_uid,write_date'
+ upd1 += ",%s,(now() at time zone 'UTC'),%s,(now() at time zone 'UTC')"
+ upd2.extend((user, user))
cr.execute('insert into "'+self._table+'" (id'+upd0+") values ("+str(id_new)+upd1+')', tuple(upd2))
self.check_access_rule(cr, user, [id_new], 'create', context=context)
upd_todo.sort(lambda x, y: self._columns[x].priority-self._columns[y].priority)
def is_transient(self):
""" Return whether the model is transient.
- See TransientModel.
+ See :class:`TransientModel`.
"""
return self._transient
_auto = False # don't create any database backend for AbstractModels
_register = False # not visible in ORM registry, meant to be python-inherited only
+def itemgetter_tuple(items):
+ """ Fixes itemgetter inconsistency (useful in some cases) of not returning
+ a tuple if len(items) == 1: always returns an n-tuple where n = len(items)
+ """
+ if len(items) == 0:
+ return lambda a: ()
+ if len(items) == 1:
+ return lambda gettable: (gettable[items[0]],)
+ return operator.itemgetter(*items)
+class ImportWarning(Warning):
+ """ Used to send warnings upwards the stack during the import process
+ """
+ pass
+
+def convert_pgerror_23502(model, fields, info, e):
+ m = re.match(r'^null value in column "(?P<field>\w+)" violates '
+ r'not-null constraint\n',
+ str(e))
+ if not m or m.group('field') not in fields:
+ return {'message': unicode(e)}
+ field = fields[m.group('field')]
+ return {
+ 'message': _(u"Missing required value for the field '%(field)s'") % {
+ 'field': field['string']
+ },
+ 'field': m.group('field'),
+ }
+
+PGERROR_TO_OE = collections.defaultdict(
+ # shape of mapped converters
+ lambda: (lambda model, fvg, info, pgerror: {'message': unicode(pgerror)}), {
+ # not_null_violation
+ '23502': convert_pgerror_23502,
+})
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4:
model_name = 'export.integer'
def test_create_with_id(self):
- self.assertRaises(
- Exception, # dammit
- self.import_, ['.id', 'value'], [['42', '36']])
+ self.assertEqual(
+ self.import_(['.id', 'value'], [['42', '36']]),
+ error(1, u"Unknown database identifier '42'"))
def test_create_with_xid(self):
self.assertEqual(
self.import_(['id', 'value'], [['somexmlid', '42']]),
self.assertEqual(
self.import_(['value'], [
[u'0'],
- [u'off'],
+ [u'no'],
[u'false'],
[u'FALSE'],
- [u'OFF'],
[u''],
]),
- ok(6))
+ ok(5))
self.assertEqual([
False,
False,
False,
False,
False,
- False,
],
values(self.read()))
def test_trues(self):
self.assertEqual(
self.import_(['value'], [
- ['no'],
+ ['off'],
['None'],
['nil'],
['()'],
['#f'],
# Problem: OpenOffice (and probably excel) output localized booleans
['VRAI'],
+ [u'OFF'],
]),
- ok(7))
+ ok(8))
self.assertEqual(
- [True] * 7,
+ [True] * 8,
values(self.read()))
class test_integer_field(ImporterCase):
def test_out_of_range(self):
self.assertEqual(
self.import_(['value'], [[str(2**31)]]),
- error(1, "integer out of range\n", value=2**31))
+ error(1, "integer out of range\n"))
# auto-rollbacks if error is in process_liness, but not during
# ir.model.data write. Can differentiate because former ends lines
# error lines with "!"
self.cr.rollback()
self.assertEqual(
self.import_(['value'], [[str(-2**32)]]),
- error(1, "integer out of range\n", value=-2**32))
+ error(1, "integer out of range\n"))
def test_nonsense(self):
- # FIXME: shit error reporting, exceptions half the time, messages the other half
- self.assertRaises(
- ValueError,
- self.import_, ['value'], [['zorglub']])
+ self.assertEqual(
+ self.import_(['value'], [['zorglub']]),
+ error(1, u"'zorglub' does not seem to be an integer for field 'unknown'"))
class test_float_field(ImporterCase):
model_name = 'export.float'
], values(self.read()))
def test_nonsense(self):
- self.assertRaises(
- ValueError,
- self.import_, ['value'], [['foobar']])
+ self.assertEqual(
+ self.import_(['value'], [['foobar']]),
+ error(1, u"'foobar' does not seem to be a number for field 'unknown'"))
class test_string_field(ImporterCase):
model_name = 'export.string.bounded'
'translatable': True,
'date_format': '%d.%m.%Y',
'decimal_point': ',',
- 'thousand_sep': ' ',
+ 'thousands_sep': ' ',
})
Translations = self.registry('ir.translation')
for source, value in self.translations_fr:
'value': value
})
- # FIXME: can't import an exported selection field label if lang != en_US
- # (see test_export.test_selection.test_localized_export)
self.assertEqual(
self.import_(['value'], [
['toto'],
self.assertEqual([3, 1, 2], values(self.read()))
self.assertEqual(
self.import_(['value'], [['Foo']], context={'lang': 'fr_FR'}),
- error(1, "Key/value 'Foo' not found in selection field 'value'",
- value=False))
+ ok(1))
def test_invalid(self):
self.assertEqual(
self.import_(['value'], [['Baz']]),
- error(1, "Key/value 'Baz' not found in selection field 'value'",
- # what the fuck?
- value=False))
+ error(1, u"Value 'Baz' not found in selection field 'unknown'"))
self.cr.rollback()
self.assertEqual(
self.import_(['value'], [[42]]),
- error(1, "Key/value '42' not found in selection field 'value'",
- value=False))
+ error(1, u"Value '42' not found in selection field 'unknown'"))
class test_selection_function(ImporterCase):
model_name = 'export.selection.function'
translations_fr = [
("Corge", "toto"),
("Grault", "titi"),
- ("Whee", "tete"),
+ ("Wheee", "tete"),
("Moog", "tutu"),
]
'translatable': True,
'date_format': '%d.%m.%Y',
'decimal_point': ',',
- 'thousand_sep': ' ',
+ 'thousands_sep': ' ',
})
Translations = self.registry('ir.translation')
for source, value in self.translations_fr:
['toto'],
['tete'],
], context={'lang': 'fr_FR'}),
- error(1, "Key/value 'toto' not found in selection field 'value'",
- value=False))
+ ok(2))
self.assertEqual(
self.import_(['value'], [['Wheee']], context={'lang': 'fr_FR'}),
ok(1))
self.assertEqual(
self.import_(['value'], [[name2]]),
ok(1))
- # FIXME: is it really normal import does not care for name_search collisions?
self.assertEqual([
(integer_id1, name1)
], values(self.read()))
integer_id2 = self.registry('export.integer').create(
self.cr, openerp.SUPERUSER_ID, {'value': 36})
- self.assertRaises(
- ValueError, # Because name_search all the things. Fallback schmallback
- self.import_, ['value'], [
+ self.assertEqual(
+ self.import_(['value'], [
# import by id, without specifying it
[integer_id1],
[integer_id2],
[integer_id1],
- ])
+ ]),
+ error(1, u"No matching record found for name '%s' in field 'unknown'" % integer_id1))
def test_sub_field(self):
""" Does not implicitly create the record, does not warn that you can't
import m2o subfields (at all)...
"""
- self.assertRaises(
- ValueError, # No record found for 42, name_searches the bloody thing
- self.import_, ['value/value'], [['42']])
+ self.assertEqual(
+ self.import_(['value/value'], [['42']]),
+ error(1, u"Can not create Many-To-One records indirectly, import the field separately"))
def test_fail_noids(self):
- self.assertRaises(
- ValueError,
- self.import_, ['value'], [['nameisnoexist:3']])
+ self.assertEqual(
+ self.import_(['value'], [['nameisnoexist:3']]),
+ error(1, u"No matching record found for name 'nameisnoexist:3' in field 'unknown'"))
self.cr.rollback()
- self.assertRaises(
- ValueError,
- self.import_, ['value/id'], [['noxidhere']]),
+ self.assertEqual(
+ self.import_(['value/id'], [['noxidhere']]),
+ error(1, u"No matching record found for external id 'noxidhere' in field 'unknown'"))
self.cr.rollback()
- self.assertRaises(
- Exception, # FIXME: Why can't you be a ValueError like everybody else?
- self.import_, ['value/.id'], [[66]])
+ self.assertEqual(
+ self.import_(['value/.id'], [[66]]),
+ error(1, u"No matching record found for database id '66' in field 'unknown'"))
class test_m2m(ImporterCase):
model_name = 'export.many2many'
self.assertEqual(values(b[2].value), [3, 44, 84])
def test_noids(self):
- try:
- self.import_(['value/.id'], [['42']])
- self.fail("Should have raised an exception")
- except Exception, e:
- self.assertIs(type(e), Exception,
- "test should be fixed on exception subclass")
+ self.assertEqual(
+ self.import_(['value/.id'], [['42']]),
+ error(1, u"No matching record found for database id '42' in field 'unknown'"))
def test_xids(self):
M2O_o = self.registry('export.many2many.other')
self.assertEqual(values(b[0].value), [3, 44])
self.assertEqual(values(b[2].value), [44, 84])
def test_noxids(self):
- self.assertRaises(
- ValueError,
- self.import_, ['value/id'], [['noxidforthat']])
+ self.assertEqual(
+ self.import_(['value/id'], [['noxidforthat']]),
+ error(1, u"No matching record found for external id 'noxidforthat' in field 'unknown'"))
def test_names(self):
M2O_o = self.registry('export.many2many.other')
self.assertEqual(values(b[2].value), [3, 9])
def test_nonames(self):
- self.assertRaises(
- ValueError,
- self.import_, ['value'], [['wherethem2mhavenonames']])
+ self.assertEqual(
+ self.import_(['value'], [['wherethem2mhavenonames']]),
+ error(1, u"No matching record found for name 'wherethem2mhavenonames' in field 'unknown'"))
def test_import_to_existing(self):
M2O_o = self.registry('export.many2many.other')
model_name = 'export.one2many'
def test_name_get(self):
- # FIXME: bloody hell why can't this just name_create the record?
- self.assertRaises(
- IndexError,
- self.import_,
- ['const', 'value'],
- [['5', u'Java is a DSL for taking large XML files'
- u' and converting them to stack traces']])
+ s = u'Java is a DSL for taking large XML files and converting them to' \
+ u' stack traces'
+ self.assertEqual(
+ self.import_(
+ ['const', 'value'],
+ [['5', s]]),
+ error(1, u"No matching record found for name '%s' in field 'unknown'" % s))
def test_single(self):
self.assertEqual(
]),
ok(2))
- # No record values alongside id => o2m resolution skipped altogether,
- # creates 2 records => remove/don't import columns sideshow columns,
- # get completely different semantics
- b, b1 = self.browse()
+ [b] = self.browse()
self.assertEqual(b.const, 42)
- self.assertEqual(values(b.value), [])
- self.assertEqual(b1.const, 4)
- self.assertEqual(values(b1.value), [])
+ # automatically forces link between core record and o2ms
+ self.assertEqual(values(b.value), [109, 262])
+ self.assertEqual(values(b.value, field='parent_id'), [b, b])
def test_link_2(self):
O2M_c = self.registry('export.one2many.child')
]),
ok(2))
- (b,) = self.browse()
- # if an id (db or xid) is provided, expectations that objects are
- # *already* linked and emits UPDATE (1, id, {}).
- # Noid => CREATE (0, ?, {})
- # TODO: xid ignored aside from getting corresponding db id?
+ [b] = self.browse()
self.assertEqual(b.const, 42)
- self.assertEqual(values(b.value), [])
-
- # FIXME: updates somebody else's records?
- self.assertEqual(
- O2M_c.read(self.cr, openerp.SUPERUSER_ID, id1),
- {'id': id1, 'str': 'Bf', 'value': 1, 'parent_id': False})
- self.assertEqual(
- O2M_c.read(self.cr, openerp.SUPERUSER_ID, id2),
- {'id': id2, 'str': 'Me', 'value': 2, 'parent_id': False})
+ self.assertEqual(values(b.value), [1, 2])
+ self.assertEqual(values(b.value, field='parent_id'), [b, b])
class test_o2m_multiple(ImporterCase):
model_name = 'export.one2many.multiple'
['', '14', ''],
]),
ok(4))
- # Oh yeah, that's the stuff
- (b, b1, b2) = self.browse()
- self.assertEqual(values(b.child1), [11])
- self.assertEqual(values(b.child2), [21])
- self.assertEqual(values(b1.child1), [12])
- self.assertEqual(values(b1.child2), [22])
-
- self.assertEqual(values(b2.child1), [13, 14])
- self.assertEqual(values(b2.child2), [23])
+ [b] = self.browse()
+ self.assertEqual(values(b.child1), [11, 12, 13, 14])
+ self.assertEqual(values(b.child2), [21, 22, 23])
def test_multi(self):
self.assertEqual(
['', '', '23'],
]),
ok(6))
- # What the actual fuck?
- (b, b1) = self.browse()
+
+ [b] = self.browse()
self.assertEqual(values(b.child1), [11, 12, 13, 14])
- self.assertEqual(values(b.child2), [21])
- self.assertEqual(values(b1.child2), [22, 23])
+ self.assertEqual(values(b.child2), [21, 22, 23])
def test_multi_fullsplit(self):
self.assertEqual(
['', '', '23'],
]),
ok(7))
- # oh wow
- (b, b1) = self.browse()
+
+ [b] = self.browse()
self.assertEqual(b.const, 5)
self.assertEqual(values(b.child1), [11, 12, 13, 14])
- self.assertEqual(b1.const, 36)
- self.assertEqual(values(b1.child2), [21, 22, 23])
+ self.assertEqual(values(b.child2), [21, 22, 23])
# function, related, reference: written to db as-is...
# => function uses @type for value coercion/conversion