[MERGE] upstream
[odoo/odoo.git] / openerp / service / workers.py
1 #-----------------------------------------------------------
2 # Multicorn, multiprocessing inspired by gunicorn
3 # TODO rename class: Multicorn -> Arbiter ?
4 #-----------------------------------------------------------
5 import errno
6 import fcntl
7 import logging
8 import os
9 import psutil
10 import random
11 import resource
12 import select
13 import signal
14 import socket
15 import sys
16 import time
17 import subprocess
18 import os.path
19
20 import werkzeug.serving
21 try:
22     from setproctitle import setproctitle
23 except ImportError:
24     setproctitle = lambda x: None
25
26 import openerp
27 import openerp.tools.config as config
28 from openerp.tools.misc import stripped_sys_argv
29
30 _logger = logging.getLogger(__name__)
31
32 class Multicorn(object):
33     """ Multiprocessing inspired by (g)unicorn.
34     Multicorn currently uses accept(2) as dispatching method between workers
35     but we plan to replace it by a more intelligent dispatcher to will parse
36     the first HTTP request line.
37     """
38     def __init__(self, app):
39         # config
40         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
41         self.population = config['workers']
42         self.timeout = config['limit_time_real']
43         self.limit_request = config['limit_request']
44         # working vars
45         self.beat = 4
46         self.app = app
47         self.pid = os.getpid()
48         self.socket = None
49         self.workers_http = {}
50         self.workers_cron = {}
51         self.workers = {}
52         self.generation = 0
53         self.queue = []
54         self.long_polling_pid = None
55
56     def pipe_new(self):
57         pipe = os.pipe()
58         for fd in pipe:
59             # non_blocking
60             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
61             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
62             # close_on_exec
63             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
64             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
65         return pipe
66
67     def pipe_ping(self, pipe):
68         try:
69             os.write(pipe[1], '.')
70         except IOError, e:
71             if e.errno not in [errno.EAGAIN, errno.EINTR]:
72                 raise
73
74     def signal_handler(self, sig, frame):
75         if len(self.queue) < 5 or sig == signal.SIGCHLD:
76             self.queue.append(sig)
77             self.pipe_ping(self.pipe)
78         else:
79             _logger.warn("Dropping signal: %s", sig)
80
81     def worker_spawn(self, klass, workers_registry):
82         self.generation += 1
83         worker = klass(self)
84         pid = os.fork()
85         if pid != 0:
86             worker.pid = pid
87             self.workers[pid] = worker
88             workers_registry[pid] = worker
89             return worker
90         else:
91             worker.run()
92             sys.exit(0)
93
94     def long_polling_spawn(self):
95         nargs = stripped_sys_argv('--pidfile')
96         cmd = nargs[0]
97         cmd = os.path.join(os.path.dirname(cmd), "openerp-long-polling")
98         nargs[0] = cmd
99         popen = subprocess.Popen(nargs)
100         self.long_polling_pid = popen.pid
101
102     def worker_pop(self, pid):
103         if pid in self.workers:
104             _logger.debug("Worker (%s) unregistered",pid)
105             try:
106                 self.workers_http.pop(pid,None)
107                 self.workers_cron.pop(pid,None)
108                 u = self.workers.pop(pid)
109                 u.close()
110             except OSError:
111                 return
112
113     def worker_kill(self, pid, sig):
114         try:
115             os.kill(pid, sig)
116         except OSError, e:
117             if e.errno == errno.ESRCH:
118                 self.worker_pop(pid)
119
120     def process_signals(self):
121         while len(self.queue):
122             sig = self.queue.pop(0)
123             if sig in [signal.SIGINT,signal.SIGTERM]:
124                 raise KeyboardInterrupt
125
126     def process_zombie(self):
127         # reap dead workers
128         while 1:
129             try:
130                 wpid, status = os.waitpid(-1, os.WNOHANG)
131                 if not wpid:
132                     break
133                 if (status >> 8) == 3:
134                     msg = "Critial worker error (%s)"
135                     _logger.critical(msg, wpid)
136                     raise Exception(msg % wpid)
137                 self.worker_pop(wpid)
138             except OSError, e:
139                 if e.errno == errno.ECHILD:
140                     break
141                 raise
142
143     def process_timeout(self):
144         now = time.time()
145         for (pid, worker) in self.workers.items():
146             if (worker.watchdog_timeout is not None) and \
147                 (now - worker.watchdog_time >= worker.watchdog_timeout):
148                 _logger.error("Worker (%s) timeout", pid)
149                 self.worker_kill(pid, signal.SIGKILL)
150
151     def process_spawn(self):
152         while len(self.workers_http) < self.population:
153             self.worker_spawn(WorkerHTTP, self.workers_http)
154         while len(self.workers_cron) < config['max_cron_threads']:
155             self.worker_spawn(WorkerCron, self.workers_cron)
156         if not self.long_polling_pid:
157             self.long_polling_spawn()
158
159     def sleep(self):
160         try:
161             # map of fd -> worker
162             fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
163             fd_in = fds.keys() + [self.pipe[0]]
164             # check for ping or internal wakeups
165             ready = select.select(fd_in, [], [], self.beat)
166             # update worker watchdogs
167             for fd in ready[0]:
168                 if fd in fds:
169                     fds[fd].watchdog_time = time.time()
170                 try:
171                     # empty pipe
172                     while os.read(fd, 1):
173                         pass
174                 except OSError, e:
175                     if e.errno not in [errno.EAGAIN]:
176                         raise
177         except select.error, e:
178             if e[0] not in [errno.EINTR]:
179                 raise
180
181     def start(self):
182         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
183         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
184         # signal handler to overcome this behaviour
185         self.pipe = self.pipe_new()
186         # set signal
187         signal.signal(signal.SIGINT, self.signal_handler)
188         signal.signal(signal.SIGTERM, self.signal_handler)
189         signal.signal(signal.SIGCHLD, self.signal_handler)
190         # listen to socket
191         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
192         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
193         self.socket.setblocking(0)
194         self.socket.bind(self.address)
195         self.socket.listen(8*self.population)
196
197     def stop(self, graceful=True):
198         if self.long_polling_pid is not None:
199             self.worker_kill(self.long_polling_pid, signal.SIGTERM)
200             self.long_polling_pid = None
201         if graceful:
202             _logger.info("Stopping gracefully")
203             limit = time.time() + self.timeout
204             for pid in self.workers.keys():
205                 self.worker_kill(pid, signal.SIGTERM)
206             while self.workers and time.time() < limit:
207                 self.process_zombie()
208                 time.sleep(0.1)
209         else:
210             _logger.info("Stopping forcefully")
211         for pid in self.workers.keys():
212             self.worker_kill(pid, signal.SIGTERM)
213         self.socket.close()
214         openerp.cli.server.quit_signals_received = 1
215
216     def run(self):
217         self.start()
218         _logger.debug("Multiprocess starting")
219         while 1:
220             try:
221                 #_logger.debug("Multiprocess beat (%s)",time.time())
222                 self.process_signals()
223                 self.process_zombie()
224                 self.process_timeout()
225                 self.process_spawn()
226                 self.sleep()
227             except KeyboardInterrupt:
228                 _logger.debug("Multiprocess clean stop")
229                 self.stop()
230                 break
231             except Exception,e:
232                 _logger.exception(e)
233                 self.stop(False)
234                 sys.exit(-1)
235
236 class Worker(object):
237     """ Workers """
238     def __init__(self, multi):
239         self.multi = multi
240         self.watchdog_time = time.time()
241         self.watchdog_pipe = multi.pipe_new()
242         # Can be set to None if no watchdog is desired.
243         self.watchdog_timeout = multi.timeout
244         self.ppid = os.getpid()
245         self.pid = None
246         self.alive = True
247         # should we rename into lifetime ?
248         self.request_max = multi.limit_request
249         self.request_count = 0
250
251     def setproctitle(self, title=""):
252         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
253
254     def close(self):
255         os.close(self.watchdog_pipe[0])
256         os.close(self.watchdog_pipe[1])
257
258     def signal_handler(self, sig, frame):
259         self.alive = False
260
261     def sleep(self):
262         try:
263             ret = select.select([self.multi.socket], [], [], self.multi.beat)
264         except select.error, e:
265             if e[0] not in [errno.EINTR]:
266                 raise
267
268     def process_limit(self):
269         # If our parent changed sucide
270         if self.ppid != os.getppid():
271             _logger.info("Worker (%s) Parent changed", self.pid)
272             self.alive = False
273         # check for lifetime
274         if self.request_count >= self.request_max:
275             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
276             self.alive = False
277         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
278         rss, vms = psutil.Process(os.getpid()).get_memory_info()
279         if vms > config['limit_memory_soft']:
280             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
281             self.alive = False # Commit suicide after the request.
282
283         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
284         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
285         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
286
287         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
288         r = resource.getrusage(resource.RUSAGE_SELF)
289         cpu_time = r.ru_utime + r.ru_stime
290         def time_expired(n, stack):
291             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
292             # We dont suicide in such case
293             raise Exception('CPU time limit exceeded.')
294         signal.signal(signal.SIGXCPU, time_expired)
295         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
296         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
297
298     def process_work(self):
299         pass
300
301     def start(self):
302         self.pid = os.getpid()
303         self.setproctitle()
304         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
305         # Reseed the random number generator
306         random.seed()
307         # Prevent fd inherientence close_on_exec
308         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
309         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
310         # reset blocking status
311         self.multi.socket.setblocking(0)
312         signal.signal(signal.SIGINT, self.signal_handler)
313         signal.signal(signal.SIGTERM, signal.SIG_DFL)
314         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
315
316     def stop(self):
317         pass
318
319     def run(self):
320         try:
321             self.start()
322             while self.alive:
323                 self.process_limit()
324                 self.multi.pipe_ping(self.watchdog_pipe)
325                 self.sleep()
326                 self.process_work()
327             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
328             self.stop()
329         except Exception,e:
330             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
331             # should we use 3 to abort everything ?
332             sys.exit(1)
333
334 class WorkerHTTP(Worker):
335     """ HTTP Request workers """
336     def process_request(self, client, addr):
337         client.setblocking(1)
338         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
339         # Prevent fd inherientence close_on_exec
340         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
341         fcntl.fcntl(client, fcntl.F_SETFD, flags)
342         # do request using WorkerBaseWSGIServer monkey patched with socket
343         self.server.socket = client
344         # tolerate broken pipe when the http client closes the socket before
345         # receiving the full reply
346         try:
347             self.server.process_request(client,addr)
348         except IOError, e:
349             if e.errno != errno.EPIPE:
350                 raise
351         self.request_count += 1
352
353     def process_work(self):
354         try:
355             client, addr = self.multi.socket.accept()
356             self.process_request(client, addr)
357         except socket.error, e:
358             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
359                 raise
360
361     def start(self):
362         Worker.start(self)
363         self.server = WorkerBaseWSGIServer(self.multi.app)
364
365 class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
366     """ werkzeug WSGI Server patched to allow using an external listen socket
367     """
368     def __init__(self, app):
369         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
370     def server_bind(self):
371         # we dont bind beause we use the listen socket of Multicorn#socket
372         # instead we close the socket
373         if self.socket:
374             self.socket.close()
375     def server_activate(self):
376         # dont listen as we use Multicorn#socket
377         pass
378
379 class WorkerCron(Worker):
380     """ Cron workers """
381
382     def __init__(self, multi):
383         super(WorkerCron, self).__init__(multi)
384         # process_work() below process a single database per call.
385         # The variable db_index is keeping track of the next database to
386         # process.
387         self.db_index = 0
388
389     def sleep(self):
390         # Really sleep once all the databases have been processed.
391         if self.db_index == 0:
392             interval = 60 + self.pid % 10 # chorus effect
393             time.sleep(interval)
394
395     def process_work(self):
396         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
397         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
398         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
399         if config['db_name']:
400             db_names = config['db_name'].split(',')
401         else:
402             db_names = openerp.service.db.exp_list(True)
403         if len(db_names):
404             self.db_index = (self.db_index + 1) % len(db_names)
405             db_name = db_names[self.db_index]
406             self.setproctitle(db_name)
407             if rpc_request_flag:
408                 start_time = time.time()
409                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
410             while True:
411                 # acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
412                 # TODO why isnt openerp.addons.base defined ?
413                 import openerp.addons.base as base
414                 acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name)
415                 if not acquired:
416                     openerp.modules.registry.RegistryManager.delete(db_name)
417                     break
418             # dont keep cursors in multi database mode
419             if len(db_names) > 1:
420                 openerp.sql_db.close_db(db_name)
421             if rpc_request_flag:
422                 end_time = time.time()
423                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
424                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
425                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
426
427             self.request_count += 1
428             if self.request_count >= self.request_max and self.request_max < len(db_names):
429                 _logger.error("There are more dabatases to process than allowed "
430                     "by the `limit_request` configuration variable: %s more.",
431                     len(db_names) - self.request_max)
432         else:
433             self.db_index = 0
434
435     def start(self):
436         Worker.start(self)
437         self.multi.socket.close()
438         openerp.service.start_internal()
439
440 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: