[MERGE] multi-process/multi-threaded ir.cron implementation
[odoo/odoo.git] / openerp / service / netrpc_server.py
1 # -*- coding: utf-8 -*-
2
3 #
4 # Copyright P. Christeas <p_christ@hol.gr> 2008,2009
5 #    Copyright (C) 2004-2009 Tiny SPRL (<http://tiny.be>).
6 #
7 #    This program is free software: you can redistribute it and/or modify
8 #    it under the terms of the GNU Affero General Public License as
9 #    published by the Free Software Foundation, either version 3 of the
10 #    License, or (at your option) any later version.
11 #
12 #    This program is distributed in the hope that it will be useful,
13 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
14 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 #    GNU Affero General Public License for more details.
16 #
17 #    You should have received a copy of the GNU Affero General Public License
18 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.     
19 #
20 ##############################################################################
21
22 #.apidoc title: NET-RPC Server
23
24 """ This file contains instance of the net-rpc server
25
26     
27 """
28 import logging
29 import select
30 import socket
31 import sys
32 import threading
33 import traceback
34
35 import openerp.netsvc as netsvc
36 import openerp.tiny_socket as tiny_socket
37 import openerp.tools as tools
38
39 class TinySocketClientThread(threading.Thread):
40     def __init__(self, sock, threads):
41         spn = sock and sock.getpeername()
42         spn = 'netrpc-client-%s:%s' % spn[0:2]
43         threading.Thread.__init__(self, name=spn)
44         self.sock = sock
45         # Only at the server side, use a big timeout: close the
46         # clients connection when they're idle for 20min.
47         self.sock.settimeout(1200)
48         self.threads = threads
49
50     def run(self):
51         self.running = True
52         try:
53             ts = tiny_socket.mysocket(self.sock)
54         except Exception:
55             self.threads.remove(self)
56             self.running = False
57             return False
58
59         while self.running:
60             try:
61                 msg = ts.myreceive()
62                 auth = getattr(self, 'auth_provider', None)
63                 result = netsvc.dispatch_rpc(msg[0], msg[1], msg[2:], auth)
64                 ts.mysend(result)
65             except socket.timeout:
66                 #terminate this channel because other endpoint is gone
67                 break
68             except Exception, e:
69                 try:
70                     new_e = Exception(tools.exception_to_unicode(e)) # avoid problems of pickeling
71                     tb = getattr(e, 'traceback', sys.exc_info())
72                     tb_s = "".join(traceback.format_exception(*tb))
73                     logging.getLogger('web-services').debug("netrpc: communication-level exception", exc_info=True)
74                     ts.mysend(new_e, exception=True, traceback=tb_s)
75                     break
76                 except Exception, ex:
77                     #terminate this channel if we can't properly send back the error
78                     logging.getLogger('web-services').exception("netrpc: cannot deliver exception message to client")
79                     break
80
81         netsvc.close_socket(self.sock)
82         self.sock = None
83         self.threads.remove(self)
84         self.running = False
85         return True
86
87     def stop(self):
88         self.running = False
89
90
91 class TinySocketServerThread(threading.Thread,netsvc.Server):
92     def __init__(self, interface, port, secure=False):
93         threading.Thread.__init__(self, name="NetRPCDaemon-%d"%port)
94         netsvc.Server.__init__(self)
95         self.__port = port
96         self.__interface = interface
97         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
98         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
99         self.socket.bind((self.__interface, self.__port))
100         self.socket.listen(5)
101         self.threads = []
102         netsvc.Logger().notifyChannel("web-services", netsvc.LOG_INFO, 
103                          "starting NET-RPC service on %s:%s" % (interface or '0.0.0.0', port,))
104
105     def run(self):
106         try:
107             self.running = True
108             while self.running:
109                 fd_sets = select.select([self.socket], [], [], self._busywait_timeout)
110                 if not fd_sets[0]:
111                     continue
112                 (clientsocket, address) = self.socket.accept()
113                 ct = TinySocketClientThread(clientsocket, self.threads)
114                 clientsocket = None
115                 self.threads.append(ct)
116                 ct.start()
117                 lt = len(self.threads)
118                 if (lt > 10) and (lt % 10 == 0):
119                      # Not many threads should be serving at the same time, so log
120                      # their abuse.
121                      netsvc.Logger().notifyChannel("web-services", netsvc.LOG_DEBUG,
122                         "Netrpc: %d threads" % len(self.threads))
123             self.socket.close()
124         except Exception, e:
125             logging.getLogger('web-services').warning("Netrpc: closing because of exception %s" % str(e))
126             self.socket.close()
127             return False
128
129     def stop(self):
130         self.running = False
131         for t in self.threads:
132             t.stop()
133         self._close_socket()
134
135     def stats(self):
136         res = "Net-RPC: " + ( (self.running and "running") or  "stopped")
137         i = 0
138         for t in self.threads:
139             i += 1
140             res += "\nNet-RPC #%d: %s " % (i, t.name)
141             if t.isAlive():
142                 res += "running"
143             else:
144                 res += "finished"
145             if t.sock:
146                 res += ", socket"
147         return res
148
149 netrpcd = None
150
151 def init_servers():
152     global netrpcd
153     if tools.config.get('netrpc', False):
154         netrpcd = TinySocketServerThread(
155             tools.config.get('netrpc_interface', ''), 
156             int(tools.config.get('netrpc_port', 8070)))