[MERGE] [FIX] models: do not drop low level columns (aka 'magic columns') when deleti...
[odoo/odoo.git] / openerp / service / workers.py
1 #-----------------------------------------------------------
2 # Multicorn, multiprocessing inspired by gunicorn
3 # TODO rename class: Multicorn -> Arbiter ?
4 #-----------------------------------------------------------
5 import errno
6 import fcntl
7 import logging
8 import os
9 import psutil
10 import random
11 import resource
12 import select
13 import signal
14 import socket
15 import sys
16 import time
17
18 import werkzeug.serving
19 try:
20     from setproctitle import setproctitle
21 except ImportError:
22     setproctitle = lambda x: None
23
24 import openerp
25 import openerp.tools.config as config
26
27 _logger = logging.getLogger(__name__)
28
29 class Multicorn(object):
30     """ Multiprocessing inspired by (g)unicorn.
31     Multicorn currently uses accept(2) as dispatching method between workers
32     but we plan to replace it by a more intelligent dispatcher to will parse
33     the first HTTP request line.
34     """
35     def __init__(self, app):
36         # config
37         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
38         self.population = config['workers']
39         self.timeout = config['limit_time_real']
40         self.limit_request = config['limit_request']
41         # working vars
42         self.beat = 4
43         self.app = app
44         self.pid = os.getpid()
45         self.socket = None
46         self.workers_http = {}
47         self.workers_cron = {}
48         self.workers = {}
49         self.generation = 0
50         self.queue = []
51
52     def pipe_new(self):
53         pipe = os.pipe()
54         for fd in pipe:
55             # non_blocking
56             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
57             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
58             # close_on_exec
59             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
60             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
61         return pipe
62
63     def pipe_ping(self, pipe):
64         try:
65             os.write(pipe[1], '.')
66         except IOError, e:
67             if e.errno not in [errno.EAGAIN, errno.EINTR]:
68                 raise
69
70     def signal_handler(self, sig, frame):
71         if len(self.queue) < 5 or sig == signal.SIGCHLD:
72             self.queue.append(sig)
73             self.pipe_ping(self.pipe)
74         else:
75             _logger.warn("Dropping signal: %s", sig)
76
77     def worker_spawn(self, klass, workers_registry):
78         self.generation += 1
79         worker = klass(self)
80         pid = os.fork()
81         if pid != 0:
82             worker.pid = pid
83             self.workers[pid] = worker
84             workers_registry[pid] = worker
85             return worker
86         else:
87             worker.run()
88             sys.exit(0)
89
90     def worker_pop(self, pid):
91         if pid in self.workers:
92             _logger.debug("Worker (%s) unregistered",pid)
93             try:
94                 self.workers_http.pop(pid,None)
95                 self.workers_cron.pop(pid,None)
96                 u = self.workers.pop(pid)
97                 u.close()
98             except OSError:
99                 return
100
101     def worker_kill(self, pid, sig):
102         try:
103             os.kill(pid, sig)
104         except OSError, e:
105             if e.errno == errno.ESRCH:
106                 self.worker_pop(pid)
107
108     def process_signals(self):
109         while len(self.queue):
110             sig = self.queue.pop(0)
111             if sig in [signal.SIGINT,signal.SIGTERM]:
112                 raise KeyboardInterrupt
113
114     def process_zombie(self):
115         # reap dead workers
116         while 1:
117             try:
118                 wpid, status = os.waitpid(-1, os.WNOHANG)
119                 if not wpid:
120                     break
121                 if (status >> 8) == 3:
122                     msg = "Critial worker error (%s)"
123                     _logger.critical(msg, wpid)
124                     raise Exception(msg % wpid)
125                 self.worker_pop(wpid)
126             except OSError, e:
127                 if e.errno == errno.ECHILD:
128                     break
129                 raise
130
131     def process_timeout(self):
132         now = time.time()
133         for (pid, worker) in self.workers.items():
134             if now - worker.watchdog_time >= worker.watchdog_timeout:
135                 _logger.error("Worker (%s) timeout", pid)
136                 self.worker_kill(pid, signal.SIGKILL)
137
138     def process_spawn(self):
139         while len(self.workers_http) < self.population:
140             self.worker_spawn(WorkerHTTP, self.workers_http)
141         while len(self.workers_cron) < config['max_cron_threads']:
142             self.worker_spawn(WorkerCron, self.workers_cron)
143
144     def sleep(self):
145         try:
146             # map of fd -> worker
147             fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
148             fd_in = fds.keys() + [self.pipe[0]]
149             # check for ping or internal wakeups
150             ready = select.select(fd_in, [], [], self.beat)
151             # update worker watchdogs
152             for fd in ready[0]:
153                 if fd in fds:
154                     fds[fd].watchdog_time = time.time()
155                 try:
156                     # empty pipe
157                     while os.read(fd, 1):
158                         pass
159                 except OSError, e:
160                     if e.errno not in [errno.EAGAIN]:
161                         raise
162         except select.error, e:
163             if e[0] not in [errno.EINTR]:
164                 raise
165
166     def start(self):
167         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
168         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
169         # signal handler to overcome this behaviour
170         self.pipe = self.pipe_new()
171         # set signal
172         signal.signal(signal.SIGINT, self.signal_handler)
173         signal.signal(signal.SIGTERM, self.signal_handler)
174         signal.signal(signal.SIGCHLD, self.signal_handler)
175         # listen to socket
176         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
177         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
178         self.socket.setblocking(0)
179         self.socket.bind(self.address)
180         self.socket.listen(8*self.population)
181
182     def stop(self, graceful=True):
183         if graceful:
184             _logger.info("Stopping gracefully")
185             limit = time.time() + self.timeout
186             for pid in self.workers.keys():
187                 self.worker_kill(pid, signal.SIGTERM)
188             while self.workers and time.time() < limit:
189                 self.process_zombie()
190                 time.sleep(0.1)
191         else:
192             _logger.info("Stopping forcefully")
193         for pid in self.workers.keys():
194             self.worker_kill(pid, signal.SIGTERM)
195         self.socket.close()
196         openerp.cli.server.quit_signals_received = 1
197
198     def run(self):
199         self.start()
200         _logger.debug("Multiprocess starting")
201         while 1:
202             try:
203                 #_logger.debug("Multiprocess beat (%s)",time.time())
204                 self.process_signals()
205                 self.process_zombie()
206                 self.process_timeout()
207                 self.process_spawn()
208                 self.sleep()
209             except KeyboardInterrupt:
210                 _logger.debug("Multiprocess clean stop")
211                 self.stop()
212                 break
213             except Exception,e:
214                 _logger.exception(e)
215                 self.stop(False)
216                 sys.exit(-1)
217
218 class Worker(object):
219     """ Workers """
220     def __init__(self, multi):
221         self.multi = multi
222         self.watchdog_time = time.time()
223         self.watchdog_pipe = multi.pipe_new()
224         self.watchdog_timeout = multi.timeout
225         self.ppid = os.getpid()
226         self.pid = None
227         self.alive = True
228         # should we rename into lifetime ?
229         self.request_max = multi.limit_request
230         self.request_count = 0
231
232     def setproctitle(self, title=""):
233         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
234
235     def close(self):
236         os.close(self.watchdog_pipe[0])
237         os.close(self.watchdog_pipe[1])
238
239     def signal_handler(self, sig, frame):
240         self.alive = False
241
242     def sleep(self):
243         try:
244             ret = select.select([self.multi.socket], [], [], self.multi.beat)
245         except select.error, e:
246             if e[0] not in [errno.EINTR]:
247                 raise
248
249     def process_limit(self):
250         # If our parent changed sucide
251         if self.ppid != os.getppid():
252             _logger.info("Worker (%s) Parent changed", self.pid)
253             self.alive = False
254         # check for lifetime
255         if self.request_count >= self.request_max:
256             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
257             self.alive = False
258         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
259         rss, vms = psutil.Process(os.getpid()).get_memory_info()
260         if vms > config['limit_memory_soft']:
261             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
262             self.alive = False # Commit suicide after the request.
263
264         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
265         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
266         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
267
268         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
269         r = resource.getrusage(resource.RUSAGE_SELF)
270         cpu_time = r.ru_utime + r.ru_stime
271         def time_expired(n, stack):
272             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
273             # We dont suicide in such case
274             raise Exception('CPU time limit exceeded.')
275         signal.signal(signal.SIGXCPU, time_expired)
276         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
277         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
278
279     def process_work(self):
280         pass
281
282     def start(self):
283         self.pid = os.getpid()
284         self.setproctitle()
285         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
286         # Reseed the random number generator
287         random.seed()
288         # Prevent fd inherientence close_on_exec
289         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
290         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
291         # reset blocking status
292         self.multi.socket.setblocking(0)
293         signal.signal(signal.SIGINT, self.signal_handler)
294         signal.signal(signal.SIGTERM, signal.SIG_DFL)
295         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
296
297     def stop(self):
298         pass
299
300     def run(self):
301         try:
302             self.start()
303             while self.alive:
304                 self.process_limit()
305                 self.multi.pipe_ping(self.watchdog_pipe)
306                 self.sleep()
307                 self.process_work()
308             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
309             self.stop()
310         except Exception,e:
311             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
312             # should we use 3 to abort everything ?
313             sys.exit(1)
314
315 class WorkerHTTP(Worker):
316     """ HTTP Request workers """
317     def process_request(self, client, addr):
318         client.setblocking(1)
319         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
320         # Prevent fd inherientence close_on_exec
321         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
322         fcntl.fcntl(client, fcntl.F_SETFD, flags)
323         # do request using WorkerBaseWSGIServer monkey patched with socket
324         self.server.socket = client
325         # tolerate broken pipe when the http client closes the socket before
326         # receiving the full reply
327         try:
328             self.server.process_request(client,addr)
329         except IOError, e:
330             if e.errno != errno.EPIPE:
331                 raise
332         self.request_count += 1
333
334     def process_work(self):
335         try:
336             client, addr = self.multi.socket.accept()
337             self.process_request(client, addr)
338         except socket.error, e:
339             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
340                 raise
341
342     def start(self):
343         Worker.start(self)
344         self.server = WorkerBaseWSGIServer(self.multi.app)
345
346 class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
347     """ werkzeug WSGI Server patched to allow using an external listen socket
348     """
349     def __init__(self, app):
350         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
351     def server_bind(self):
352         # we dont bind beause we use the listen socket of Multicorn#socket
353         # instead we close the socket
354         if self.socket:
355             self.socket.close()
356     def server_activate(self):
357         # dont listen as we use Multicorn#socket
358         pass
359
360 class WorkerCron(Worker):
361     """ Cron workers """
362
363     def __init__(self, multi):
364         super(WorkerCron, self).__init__(multi)
365         # process_work() below process a single database per call.
366         # The variable db_index is keeping track of the next database to
367         # process.
368         self.db_index = 0
369
370     def sleep(self):
371         # Really sleep once all the databases have been processed.
372         if self.db_index == 0:
373             interval = 60 + self.pid % 10 # chorus effect
374             time.sleep(interval)
375
376     def _db_list(self):
377         if config['db_name']:
378             db_names = config['db_name'].split(',')
379         else:
380             db_names = openerp.netsvc.ExportService._services['db'].exp_list(True)
381         return db_names
382
383     def process_work(self):
384         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
385         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
386         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
387         db_names = self._db_list()
388         if len(db_names):
389             self.db_index = (self.db_index + 1) % len(db_names)
390             db_name = db_names[self.db_index]
391             self.setproctitle(db_name)
392             if rpc_request_flag:
393                 start_time = time.time()
394                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
395             while True:
396                 # acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
397                 # TODO why isnt openerp.addons.base defined ?
398                 import base
399                 acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name)
400                 if not acquired:
401                     openerp.modules.registry.RegistryManager.delete(db_name)
402                     break
403             # dont keep cursors in multi database mode
404             if len(db_names) > 1:
405                 openerp.sql_db.close_db(db_name)
406             if rpc_request_flag:
407                 end_time = time.time()
408                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
409                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
410                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
411
412             self.request_count += 1
413             if self.request_count >= self.request_max and self.request_max < len(db_names):
414                 _logger.error("There are more dabatases to process than allowed "
415                     "by the `limit_request` configuration variable: %s more.",
416                     len(db_names) - self.request_max)
417         else:
418             self.db_index = 0
419
420     def start(self):
421         os.nice(10)     # mommy always told me to be nice with others...
422         Worker.start(self)
423         openerp.service.start_internal()
424
425         # chorus effect: make cron workers do not all start at first database
426         mct = config['max_cron_threads']
427         p = float(self.pid % mct) / mct
428         self.db_index = int(len(self._db_list()) * p)
429
430
431 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: