import openerp.modules
import openerp.exceptions
-class edi(netsvc.ExportService):
- def exp_get_edi_document(self, edi_token, db_name):
- res = None
- if db_name:
- cr = pooler.get_db(db_name).cursor()
- else:
- raise Exception("No database cursor found!")
- pool = pooler.get_pool(db_name)
- edi_pool = pool.get('ir.edi.document')
- try:
- res = edi_pool.get_document(cr, 1, edi_token)
- finally:
- cr.close()
- return res
-
- def exp_import_edi_document(self, db, uid, passwd, edi_document, context=None):
- res = None
- cr = pooler.get_db(db).cursor()
- pool = pooler.get_pool(db)
- edi_pool = pool.get('ir.edi.document')
- try:
- res = edi_pool.import_edi(cr, uid, edi_document=edi_document, context=context)
- cr.commit()
- except:
- print traceback.format_exc()
- cr.rollback()
- finally:
- cr.close()
- return res
-
- def exp_import_edi_url(self, db, uid, passwd, edi_url, context=None):
- res = None
- cr = pooler.get_db(db).cursor()
- pool = pooler.get_pool(db)
- edi_pool = pool.get('ir.edi.document')
- try:
- res = edi_pool.import_edi(cr, uid, edi_url=edi_url, context=context)
- cr.commit()
- except:
- print traceback.format_exc()
- cr.rollback()
- finally:
- cr.close()
- return res
-
- def __init__(self, name="edi"):
- netsvc.ExportService.__init__(self, name)
- self.joinGroup("web-services")
-
- def dispatch(self, method, auth, params):
- if method in ['import_edi_document', 'import_edi_url']:
- (db, uid, passwd ) = params[0:3]
- security.check(db, uid, passwd)
- elif method in ['get_edi_document']:
- # params = params
- # No security check for these methods
- pass
- else:
- raise KeyError("Method not found: %s" % method)
- fn = getattr(self, 'exp_'+method)
- return fn(*params)
-
#.apidoc title: Exported Service methods
#.apidoc module-mods: member-order: bysource
procedures to be called. Each method has its own arguments footprint.
"""
+RPC_VERSION_1 = {'server_version': '6.1', 'protocol_version': 1}
+
# This should be moved to openerp.modules.db, along side initialize().
def _initialize_db(serv, id, db_name, demo, lang, user_password):
cr = None
return True
class common(netsvc.ExportService):
+ _logger = logging.getLogger('web-services')
+
def __init__(self,name="common"):
netsvc.ExportService.__init__(self,name)
def dispatch(self, method, params):
- logger = netsvc.Logger()
- if method == 'login':
- res = security.login(params[0], params[1], params[2])
- msg = res and 'successful login' or 'bad login or password'
- # TODO log the client ip address..
- logger.notifyChannel("web-service", netsvc.LOG_INFO, "%s from '%s' using database '%s'" % (msg, params[1], params[0].lower()))
- return res or False
- elif method in ['about', 'timezone_get', 'get_server_environment',
- 'login_message','get_stats', 'check_connectivity',
- 'list_http_services']:
+ if method in ['login', 'about', 'timezone_get', 'get_server_environment',
+ 'login_message','get_stats', 'check_connectivity',
+ 'list_http_services', 'version', 'authenticate']:
pass
elif method in ['get_available_updates', 'get_migration_scripts', 'set_loglevel', 'get_os_time', 'get_sqlcount']:
passwd = params[0]
fn = getattr(self, 'exp_'+method)
return fn(*params)
+ def exp_login(self, db, login, password):
+ # TODO: legacy indirection through 'security', should use directly
+ # the res.users model
+ res = security.login(db, login, password)
+ msg = res and 'successful login' or 'bad login or password'
+ self._logger.info("%s from '%s' using database '%s'", msg, login, db.lower())
+ return res or False
+
+ def exp_authenticate(self, db, login, password, user_agent_env):
+ res_users = pooler.get_pool(db).get('res.users')
+ return res_users.authenticate(db, login, password, user_agent_env)
+
+ def exp_version(self):
+ return RPC_VERSION_1
+
def exp_about(self, extended=False):
"""Return information about the OpenERP Server.
objects_proxy()
wizard()
report_spool()
- edi()
# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: