[REF] services: somewhat clean use of openerp exceptions.
[odoo/odoo.git] / openerp / service / netrpc_server.py
1 # -*- coding: utf-8 -*-
2
3 #
4 # Copyright P. Christeas <p_christ@hol.gr> 2008,2009
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.     
19 #
20 ##############################################################################
21
22 #.apidoc title: NET-RPC Server
23
24 """ This file contains instance of the net-rpc server
25 """
26 import logging
27 import select
28 import socket
29 import sys
30 import threading
31 import traceback
32 import openerp
33 import openerp.service.netrpc_socket
34 import openerp.netsvc as netsvc
35 import openerp.tools as tools
36
37 _logger = logging.getLogger(__name__)
38
39 class Server:
40     """ Generic interface for all servers with an event loop etc.
41         Override this to impement http, net-rpc etc. servers.
42
43         Servers here must have threaded behaviour. start() must not block,
44         there is no run().
45     """
46     __is_started = False
47     __servers = []
48     __starter_threads = []
49
50     # we don't want blocking server calls (think select()) to
51     # wait forever and possibly prevent exiting the process,
52     # but instead we want a form of polling/busy_wait pattern, where
53     # _server_timeout should be used as the default timeout for
54     # all I/O blocking operations
55     _busywait_timeout = 0.5
56
57     def __init__(self):
58         Server.__servers.append(self)
59         if Server.__is_started:
60             # raise Exception('All instances of servers must be inited before the startAll()')
61             # Since the startAll() won't be called again, allow this server to
62             # init and then start it after 1sec (hopefully). Register that
63             # timer thread in a list, so that we can abort the start if quitAll
64             # is called in the meantime
65             t = threading.Timer(1.0, self._late_start)
66             t.name = 'Late start timer for %s' % str(self.__class__)
67             Server.__starter_threads.append(t)
68             t.start()
69
70     def start(self):
71         _logger.debug("called stub Server.start")
72
73     def _late_start(self):
74         self.start()
75         for thr in Server.__starter_threads:
76             if thr.finished.is_set():
77                 Server.__starter_threads.remove(thr)
78
79     def stop(self):
80         _logger.debug("called stub Server.stop")
81
82     def stats(self):
83         """ This function should return statistics about the server """
84         return "%s: No statistics" % str(self.__class__)
85
86     @classmethod
87     def startAll(cls):
88         if cls.__is_started:
89             return
90         _logger.info("Starting %d services" % len(cls.__servers))
91         for srv in cls.__servers:
92             srv.start()
93         cls.__is_started = True
94
95     @classmethod
96     def quitAll(cls):
97         if not cls.__is_started:
98             return
99         _logger.info("Stopping %d services" % len(cls.__servers))
100         for thr in cls.__starter_threads:
101             if not thr.finished.is_set():
102                 thr.cancel()
103             cls.__starter_threads.remove(thr)
104
105         for srv in cls.__servers:
106             srv.stop()
107         cls.__is_started = False
108
109     @classmethod
110     def allStats(cls):
111         res = ["Servers %s" % ('stopped', 'started')[cls.__is_started]]
112         res.extend(srv.stats() for srv in cls.__servers)
113         return '\n'.join(res)
114
115     def _close_socket(self):
116         netsvc.close_socket(self.socket)
117
118 class TinySocketClientThread(threading.Thread):
119     def __init__(self, sock, threads):
120         spn = sock and sock.getpeername()
121         spn = 'netrpc-client-%s:%s' % spn[0:2]
122         threading.Thread.__init__(self, name=spn)
123         self.sock = sock
124         # Only at the server side, use a big timeout: close the
125         # clients connection when they're idle for 20min.
126         self.sock.settimeout(1200)
127         self.threads = threads
128
129     def run(self):
130         self.running = True
131         try:
132             ts = openerp.server.netrpc_socket.mysocket(self.sock)
133         except Exception:
134             self.threads.remove(self)
135             self.running = False
136             return False
137
138         while self.running:
139             try:
140                 msg = ts.myreceive()
141                 result = netsvc.dispatch_rpc(msg[0], msg[1], msg[2:])
142                 ts.mysend(result)
143             except socket.timeout:
144                 #terminate this channel because other endpoint is gone
145                 break
146             except Exception, e:
147                 try:
148                     valid_exception = Exception(netrpc_handle_exception_legacy(e)) 
149                     valid_traceback = getattr(e, 'traceback', sys.exc_info())
150                     formatted_traceback = "".join(traceback.format_exception(*valid_traceback))
151                     _logger.debug("netrpc: communication-level exception", exc_info=True)
152                     ts.mysend(valid_exception, exception=True, traceback=formatted_traceback)
153                     break
154                 except Exception, ex:
155                     #terminate this channel if we can't properly send back the error
156                     _logger.exception("netrpc: cannot deliver exception message to client")
157                     break
158
159         netsvc.close_socket(self.sock)
160         self.sock = None
161         self.threads.remove(self)
162         self.running = False
163         return True
164
165     def stop(self):
166         self.running = False
167         
168 def netrpc_handle_exception_legacy(e):
169     if isinstance(e, openerp.osv.orm.except_orm):
170         return 'warning -- ' + e.name + '\n\n' + e.value
171     if isinstance(e, openerp.exceptions.Warning):
172         return 'warning -- Warning\n\n' + str(e)
173     if isinstance(e, openerp.exceptions.AccessError):
174         return 'warning -- AccessError\n\n' + str(e)
175     if isinstance(e, openerp.exceptions.AccessDenied):
176         return 'AccessDenied ' + str(e)
177     return openerp.tools.exception_to_unicode(e)
178
179 class TinySocketServerThread(threading.Thread,Server):
180     def __init__(self, interface, port, secure=False):
181         threading.Thread.__init__(self, name="NetRPCDaemon-%d"%port)
182         Server.__init__(self)
183         self.__port = port
184         self.__interface = interface
185         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
186         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
187         self.socket.bind((self.__interface, self.__port))
188         self.socket.listen(5)
189         self.threads = []
190         _logger.info("starting NET-RPC service on %s:%s", interface or '0.0.0.0', port)
191
192     def run(self):
193         try:
194             self.running = True
195             while self.running:
196                 fd_sets = select.select([self.socket], [], [], self._busywait_timeout)
197                 if not fd_sets[0]:
198                     continue
199                 (clientsocket, address) = self.socket.accept()
200                 ct = TinySocketClientThread(clientsocket, self.threads)
201                 clientsocket = None
202                 self.threads.append(ct)
203                 ct.start()
204                 lt = len(self.threads)
205                 if (lt > 10) and (lt % 10 == 0):
206                     # Not many threads should be serving at the same time, so log
207                     # their abuse.
208                     _logger.debug("Netrpc: %d threads", len(self.threads))
209             self.socket.close()
210         except Exception, e:
211             _logger.warning("Netrpc: closing because of exception %s", e)
212             self.socket.close()
213             return False
214
215     def stop(self):
216         self.running = False
217         for t in self.threads:
218             t.stop()
219         self._close_socket()
220
221     def stats(self):
222         res = "Net-RPC: " + ( (self.running and "running") or  "stopped")
223         i = 0
224         for t in self.threads:
225             i += 1
226             res += "\nNet-RPC #%d: %s " % (i, t.name)
227             if t.isAlive():
228                 res += "running"
229             else:
230                 res += "finished"
231             if t.sock:
232                 res += ", socket"
233         return res
234
235 netrpcd = None
236
237 def start_service():
238     global netrpcd
239     if tools.config.get('netrpc', False):
240         netrpcd = TinySocketServerThread(tools.config.get('netrpc_interface', ''), int(tools.config.get('netrpc_port', 8070)))
241
242 def stop_service():
243     Server.quitAll()
244
245 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: