5ea05df811272dfca3400cda78c683b3bf8b95a4
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import logging
7 import os
8 import os.path
9 import platform
10 import psutil
11 import random
12 import select
13 import signal
14 import socket
15 import subprocess
16 import sys
17 import threading
18 import time
19 import unittest2
20
21 import werkzeug.serving
22
23 if os.name == 'posix':
24     # Unix only for workers
25     import fcntl
26     import resource
27 else:
28     # Windows shim
29     signal.SIGHUP = -1
30
31 # Optional process names for workers
32 try:
33     from setproctitle import setproctitle
34 except ImportError:
35     setproctitle = lambda x: None
36
37 import openerp
38 from openerp.modules.registry import RegistryManager
39 from openerp.release import nt_service_name
40 import openerp.tools.config as config
41 from openerp.tools.misc import stripped_sys_argv, dumpstacks
42
43 _logger = logging.getLogger(__name__)
44
45 SLEEP_INTERVAL = 60     # 1 min
46
47 #----------------------------------------------------------
48 # Werkzeug WSGI servers patched
49 #----------------------------------------------------------
50 class LoggingBaseWSGIServerMixIn(object):
51     def handle_error(self, request, client_address):
52         t, e, _ = sys.exc_info()
53         if t == socket.error and e.errno == errno.EPIPE:
54             # broken pipe, ignore error
55             return
56         _logger.exception('Exception happened during processing of request from %s', client_address)
57
58 class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
59     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
60     use this class, sets the socket and calls the process_request() manually
61     """
62     def __init__(self, app):
63         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
64     def server_bind(self):
65         # we dont bind beause we use the listen socket of PreforkServer#socket
66         # instead we close the socket
67         if self.socket:
68             self.socket.close()
69     def server_activate(self):
70         # dont listen as we use PreforkServer#socket
71         pass
72
73
74 class RequestHandler(werkzeug.serving.WSGIRequestHandler):
75     def setup(self):
76         # flag the current thread as handling a http request
77         super(RequestHandler, self).setup()
78         me = threading.currentThread()
79         me.name = 'openerp.service.http.request.%s' % (me.ident,)
80
81 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
82 # should also work with systemd socket activation. This is currently untested
83 # and not yet used.
84
85 class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
86     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
87     given by the environement, this is used by autoreload to keep the listen
88     socket open when a reload happens.
89     """
90     def __init__(self, host, port, app):
91         super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
92                                                            handler=RequestHandler)
93
94     def server_bind(self):
95         envfd = os.environ.get('LISTEN_FDS')
96         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
97             self.reload_socket = True
98             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
99             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
100         else:
101             self.reload_socket = False
102             super(ThreadedWSGIServerReloadable, self).server_bind()
103
104     def server_activate(self):
105         if not self.reload_socket:
106             super(ThreadedWSGIServerReloadable, self).server_activate()
107
108 #----------------------------------------------------------
109 # AutoReload watcher
110 #----------------------------------------------------------
111
112 class AutoReload(object):
113     def __init__(self, server):
114         self.server = server
115         self.files = {}
116         self.modules = {}
117         import pyinotify
118         class EventHandler(pyinotify.ProcessEvent):
119             def __init__(self, autoreload):
120                 self.autoreload = autoreload
121
122             def process_IN_CREATE(self, event):
123                 _logger.debug('File created: %s', event.pathname)
124                 self.autoreload.files[event.pathname] = 1
125
126             def process_IN_MODIFY(self, event):
127                 _logger.debug('File modified: %s', event.pathname)
128                 self.autoreload.files[event.pathname] = 1
129
130         self.wm = pyinotify.WatchManager()
131         self.handler = EventHandler(self)
132         self.notifier = pyinotify.Notifier(self.wm, self.handler, timeout=0)
133         mask = pyinotify.IN_MODIFY | pyinotify.IN_CREATE  # IN_MOVED_FROM, IN_MOVED_TO ?
134         for path in openerp.modules.module.ad_paths:
135             _logger.info('Watching addons folder %s', path)
136             self.wm.add_watch(path, mask, rec=True)
137
138     def process_data(self, files):
139         xml_files = [i for i in files if i.endswith('.xml')]
140         for i in xml_files:
141             for path in openerp.modules.module.ad_paths:
142                 if i.startswith(path):
143                     # find out wich addons path the file belongs to
144                     # and extract it's module name
145                     right = i[len(path) + 1:].split('/')
146                     if len(right) < 2:
147                         continue
148                     module = right[0]
149                     self.modules[module] = 1
150         if self.modules:
151             _logger.info('autoreload: xml change detected, autoreload activated')
152             restart()
153
154     def process_python(self, files):
155         # process python changes
156         py_files = [i for i in files if i.endswith('.py')]
157         py_errors = []
158         # TODO keep python errors until they are ok
159         if py_files:
160             for i in py_files:
161                 try:
162                     source = open(i, 'rb').read() + '\n'
163                     compile(source, i, 'exec')
164                 except SyntaxError:
165                     py_errors.append(i)
166             if py_errors:
167                 _logger.info('autoreload: python code change detected, errors found')
168                 for i in py_errors:
169                     _logger.info('autoreload: SyntaxError %s', i)
170             else:
171                 _logger.info('autoreload: python code updated, autoreload activated')
172                 restart()
173
174     def check_thread(self):
175         # Check if some files have been touched in the addons path.
176         # If true, check if the touched file belongs to an installed module
177         # in any of the database used in the registry manager.
178         while 1:
179             while self.notifier.check_events(1000):
180                 self.notifier.read_events()
181                 self.notifier.process_events()
182             l = self.files.keys()
183             self.files.clear()
184             self.process_data(l)
185             self.process_python(l)
186
187     def run(self):
188         t = threading.Thread(target=self.check_thread)
189         t.setDaemon(True)
190         t.start()
191         _logger.info('AutoReload watcher running')
192
193 #----------------------------------------------------------
194 # Servers: Threaded, Gevented and Prefork
195 #----------------------------------------------------------
196
197 class CommonServer(object):
198     def __init__(self, app):
199         # TODO Change the xmlrpc_* options to http_*
200         self.app = app
201         # config
202         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
203         self.port = config['xmlrpc_port']
204         # runtime
205         self.pid = os.getpid()
206
207     def close_socket(self, sock):
208         """ Closes a socket instance cleanly
209         :param sock: the network socket to close
210         :type sock: socket.socket
211         """
212         try:
213             sock.shutdown(socket.SHUT_RDWR)
214         except socket.error, e:
215             # On OSX, socket shutdowns both sides if any side closes it
216             # causing an error 57 'Socket is not connected' on shutdown
217             # of the other side (or something), see
218             # http://bugs.python.org/issue4397
219             # note: stdlib fixed test, not behavior
220             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
221                 raise
222         sock.close()
223
224 class ThreadedServer(CommonServer):
225     def __init__(self, app):
226         super(ThreadedServer, self).__init__(app)
227         self.main_thread_id = threading.currentThread().ident
228         # Variable keeping track of the number of calls to the signal handler defined
229         # below. This variable is monitored by ``quit_on_signals()``.
230         self.quit_signals_received = 0
231
232         #self.socket = None
233         self.httpd = None
234
235     def signal_handler(self, sig, frame):
236         if sig in [signal.SIGINT, signal.SIGTERM]:
237             # shutdown on kill -INT or -TERM
238             self.quit_signals_received += 1
239             if self.quit_signals_received > 1:
240                 # logging.shutdown was already called at this point.
241                 sys.stderr.write("Forced shutdown.\n")
242                 os._exit(0)
243         elif sig == signal.SIGHUP:
244             # restart on kill -HUP
245             openerp.phoenix = True
246             self.quit_signals_received += 1
247
248     def cron_thread(self, number):
249         while True:
250             time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
251             registries = openerp.modules.registry.RegistryManager.registries
252             _logger.debug('cron%d polling for jobs', number)
253             for db_name, registry in registries.iteritems():
254                 while registry.ready:
255                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
256                     if not acquired:
257                         break
258
259     def cron_spawn(self):
260         """ Start the above runner function in a daemon thread.
261
262         The thread is a typical daemon thread: it will never quit and must be
263         terminated when the main process exits - with no consequence (the processing
264         threads it spawns are not marked daemon).
265
266         """
267         # Force call to strptime just before starting the cron thread
268         # to prevent time.strptime AttributeError within the thread.
269         # See: http://bugs.python.org/issue7980
270         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
271         for i in range(openerp.tools.config['max_cron_threads']):
272             def target():
273                 self.cron_thread(i)
274             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
275             t.setDaemon(True)
276             t.start()
277             _logger.debug("cron%d started!" % i)
278
279     def http_thread(self):
280         def app(e, s):
281             return self.app(e, s)
282         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
283         self.httpd.serve_forever()
284
285     def http_spawn(self):
286         t = threading.Thread(target=self.http_thread, name="openerp.service.httpd")
287         t.setDaemon(True)
288         t.start()
289         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
290
291     def start(self, stop=False):
292         _logger.debug("Setting signal handlers")
293         if os.name == 'posix':
294             signal.signal(signal.SIGINT, self.signal_handler)
295             signal.signal(signal.SIGTERM, self.signal_handler)
296             signal.signal(signal.SIGCHLD, self.signal_handler)
297             signal.signal(signal.SIGHUP, self.signal_handler)
298             signal.signal(signal.SIGQUIT, dumpstacks)
299         elif os.name == 'nt':
300             import win32api
301             win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
302
303         test_mode = config['test_enable'] or config['test_file']
304         if not stop or test_mode:
305             # some tests need the http deamon to be available...
306             self.http_spawn()
307
308         if not stop:
309             # only relevant if we are not in "--stop-after-init" mode
310             self.cron_spawn()
311
312     def stop(self):
313         """ Shutdown the WSGI server. Wait for non deamon threads.
314         """
315         _logger.info("Initiating shutdown")
316         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
317
318         if self.httpd:
319             self.httpd.shutdown()
320             self.close_socket(self.httpd.socket)
321
322         # Manually join() all threads before calling sys.exit() to allow a second signal
323         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
324         # threading.Thread.join() should not mask signals (at least in python 2.5).
325         me = threading.currentThread()
326         _logger.debug('current thread: %r', me)
327         for thread in threading.enumerate():
328             _logger.debug('process %r (%r)', thread, thread.isDaemon())
329             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
330                 while thread.isAlive():
331                     _logger.debug('join and sleep')
332                     # Need a busyloop here as thread.join() masks signals
333                     # and would prevent the forced shutdown.
334                     thread.join(0.05)
335                     time.sleep(0.05)
336
337         _logger.debug('--')
338         openerp.modules.registry.RegistryManager.delete_all()
339         logging.shutdown()
340
341     def run(self, preload=None, stop=False):
342         """ Start the http server and the cron thread then wait for a signal.
343
344         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
345         a second one if any will force an immediate exit.
346         """
347         self.start(stop=stop)
348
349         rc = preload_registries(preload)
350
351         if stop:
352             self.stop()
353             return rc
354
355         # Wait for a first signal to be handled. (time.sleep will be interrupted
356         # by the signal handler.) The try/except is for the win32 case.
357         try:
358             while self.quit_signals_received == 0:
359                 time.sleep(60)
360         except KeyboardInterrupt:
361             pass
362
363         self.stop()
364
365     def reload(self):
366         os.kill(self.pid, signal.SIGHUP)
367
368 class GeventServer(CommonServer):
369     def __init__(self, app):
370         super(GeventServer, self).__init__(app)
371         self.port = config['longpolling_port']
372         self.httpd = None
373
374     def watch_parent(self, beat=4):
375         import gevent
376         ppid = os.getppid()
377         while True:
378             if ppid != os.getppid():
379                 pid = os.getpid()
380                 _logger.info("LongPolling (%s) Parent changed", pid)
381                 # suicide !!
382                 os.kill(pid, signal.SIGTERM)
383                 return
384             gevent.sleep(beat)
385
386     def start(self):
387         import gevent
388         from gevent.wsgi import WSGIServer
389
390         if os.name == 'posix':
391             signal.signal(signal.SIGQUIT, dumpstacks)
392
393         gevent.spawn(self.watch_parent)
394         self.httpd = WSGIServer((self.interface, self.port), self.app)
395         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
396         try:
397             self.httpd.serve_forever()
398         except:
399             _logger.exception("Evented Service (longpolling): uncaught error during main loop")
400             raise
401
402     def stop(self):
403         import gevent
404         self.httpd.stop()
405         gevent.shutdown()
406
407     def run(self, preload, stop):
408         self.start()
409         self.stop()
410
411 class PreforkServer(CommonServer):
412     """ Multiprocessing inspired by (g)unicorn.
413     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
414     method between workers but we plan to replace it by a more intelligent
415     dispatcher to will parse the first HTTP request line.
416     """
417     def __init__(self, app):
418         # config
419         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
420         self.population = config['workers']
421         self.timeout = config['limit_time_real']
422         self.limit_request = config['limit_request']
423         # working vars
424         self.beat = 4
425         self.app = app
426         self.pid = os.getpid()
427         self.socket = None
428         self.workers_http = {}
429         self.workers_cron = {}
430         self.workers = {}
431         self.generation = 0
432         self.queue = []
433         self.long_polling_pid = None
434
435     def pipe_new(self):
436         pipe = os.pipe()
437         for fd in pipe:
438             # non_blocking
439             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
440             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
441             # close_on_exec
442             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
443             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
444         return pipe
445
446     def pipe_ping(self, pipe):
447         try:
448             os.write(pipe[1], '.')
449         except IOError, e:
450             if e.errno not in [errno.EAGAIN, errno.EINTR]:
451                 raise
452
453     def signal_handler(self, sig, frame):
454         if len(self.queue) < 5 or sig == signal.SIGCHLD:
455             self.queue.append(sig)
456             self.pipe_ping(self.pipe)
457         else:
458             _logger.warn("Dropping signal: %s", sig)
459
460     def worker_spawn(self, klass, workers_registry):
461         self.generation += 1
462         worker = klass(self)
463         pid = os.fork()
464         if pid != 0:
465             worker.pid = pid
466             self.workers[pid] = worker
467             workers_registry[pid] = worker
468             return worker
469         else:
470             worker.run()
471             sys.exit(0)
472
473     def long_polling_spawn(self):
474         nargs = stripped_sys_argv()
475         cmd = nargs[0]
476         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
477         nargs[0] = cmd
478         popen = subprocess.Popen([sys.executable] + nargs)
479         self.long_polling_pid = popen.pid
480
481     def worker_pop(self, pid):
482         if pid == self.long_polling_pid:
483             self.long_polling_pid = None
484         if pid in self.workers:
485             _logger.debug("Worker (%s) unregistered", pid)
486             try:
487                 self.workers_http.pop(pid, None)
488                 self.workers_cron.pop(pid, None)
489                 u = self.workers.pop(pid)
490                 u.close()
491             except OSError:
492                 return
493
494     def worker_kill(self, pid, sig):
495         try:
496             os.kill(pid, sig)
497         except OSError, e:
498             if e.errno == errno.ESRCH:
499                 self.worker_pop(pid)
500
501     def process_signals(self):
502         while len(self.queue):
503             sig = self.queue.pop(0)
504             if sig in [signal.SIGINT, signal.SIGTERM]:
505                 raise KeyboardInterrupt
506             elif sig == signal.SIGHUP:
507                 # restart on kill -HUP
508                 openerp.phoenix = True
509                 raise KeyboardInterrupt
510             elif sig == signal.SIGQUIT:
511                 # dump stacks on kill -3
512                 self.dumpstacks()
513             elif sig == signal.SIGTTIN:
514                 # increase number of workers
515                 self.population += 1
516             elif sig == signal.SIGTTOU:
517                 # decrease number of workers
518                 self.population -= 1
519
520     def process_zombie(self):
521         # reap dead workers
522         while 1:
523             try:
524                 wpid, status = os.waitpid(-1, os.WNOHANG)
525                 if not wpid:
526                     break
527                 if (status >> 8) == 3:
528                     msg = "Critial worker error (%s)"
529                     _logger.critical(msg, wpid)
530                     raise Exception(msg % wpid)
531                 self.worker_pop(wpid)
532             except OSError, e:
533                 if e.errno == errno.ECHILD:
534                     break
535                 raise
536
537     def process_timeout(self):
538         now = time.time()
539         for (pid, worker) in self.workers.items():
540             if worker.watchdog_timeout is not None and \
541                     (now - worker.watchdog_time) >= worker.watchdog_timeout:
542                 _logger.error("Worker (%s) timeout", pid)
543                 self.worker_kill(pid, signal.SIGKILL)
544
545     def process_spawn(self):
546         while len(self.workers_http) < self.population:
547             self.worker_spawn(WorkerHTTP, self.workers_http)
548         while len(self.workers_cron) < config['max_cron_threads']:
549             self.worker_spawn(WorkerCron, self.workers_cron)
550         if not self.long_polling_pid:
551             self.long_polling_spawn()
552
553     def sleep(self):
554         try:
555             # map of fd -> worker
556             fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
557             fd_in = fds.keys() + [self.pipe[0]]
558             # check for ping or internal wakeups
559             ready = select.select(fd_in, [], [], self.beat)
560             # update worker watchdogs
561             for fd in ready[0]:
562                 if fd in fds:
563                     fds[fd].watchdog_time = time.time()
564                 try:
565                     # empty pipe
566                     while os.read(fd, 1):
567                         pass
568                 except OSError, e:
569                     if e.errno not in [errno.EAGAIN]:
570                         raise
571         except select.error, e:
572             if e[0] not in [errno.EINTR]:
573                 raise
574
575     def start(self):
576         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
577         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
578         # signal handler to overcome this behaviour
579         self.pipe = self.pipe_new()
580         # set signal handlers
581         signal.signal(signal.SIGINT, self.signal_handler)
582         signal.signal(signal.SIGTERM, self.signal_handler)
583         signal.signal(signal.SIGHUP, self.signal_handler)
584         signal.signal(signal.SIGCHLD, self.signal_handler)
585         signal.signal(signal.SIGTTIN, self.signal_handler)
586         signal.signal(signal.SIGTTOU, self.signal_handler)
587         signal.signal(signal.SIGQUIT, dumpstacks)
588
589         # listen to socket
590         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
591         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
592         self.socket.setblocking(0)
593         self.socket.bind(self.address)
594         self.socket.listen(8 * self.population)
595
596     def stop(self, graceful=True):
597         if self.long_polling_pid is not None:
598             # FIXME make longpolling process handle SIGTERM correctly
599             self.worker_kill(self.long_polling_pid, signal.SIGKILL)
600             self.long_polling_pid = None
601         if graceful:
602             _logger.info("Stopping gracefully")
603             limit = time.time() + self.timeout
604             for pid in self.workers.keys():
605                 self.worker_kill(pid, signal.SIGTERM)
606             while self.workers and time.time() < limit:
607                 self.process_zombie()
608                 time.sleep(0.1)
609         else:
610             _logger.info("Stopping forcefully")
611         for pid in self.workers.keys():
612             self.worker_kill(pid, signal.SIGTERM)
613         self.socket.close()
614
615     def run(self, preload, stop):
616         self.start()
617
618         rc = preload_registries(preload)
619
620         if stop:
621             self.stop()
622             return rc
623
624         # Empty the cursor pool, we dont want them to be shared among forked workers.
625         openerp.sql_db.close_all()
626
627         _logger.debug("Multiprocess starting")
628         while 1:
629             try:
630                 #_logger.debug("Multiprocess beat (%s)",time.time())
631                 self.process_signals()
632                 self.process_zombie()
633                 self.process_timeout()
634                 self.process_spawn()
635                 self.sleep()
636             except KeyboardInterrupt:
637                 _logger.debug("Multiprocess clean stop")
638                 self.stop()
639                 break
640             except Exception, e:
641                 _logger.exception(e)
642                 self.stop(False)
643                 return -1
644
645 class Worker(object):
646     """ Workers """
647     def __init__(self, multi):
648         self.multi = multi
649         self.watchdog_time = time.time()
650         self.watchdog_pipe = multi.pipe_new()
651         # Can be set to None if no watchdog is desired.
652         self.watchdog_timeout = multi.timeout
653         self.ppid = os.getpid()
654         self.pid = None
655         self.alive = True
656         # should we rename into lifetime ?
657         self.request_max = multi.limit_request
658         self.request_count = 0
659
660     def setproctitle(self, title=""):
661         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
662
663     def close(self):
664         os.close(self.watchdog_pipe[0])
665         os.close(self.watchdog_pipe[1])
666
667     def signal_handler(self, sig, frame):
668         self.alive = False
669
670     def sleep(self):
671         try:
672             select.select([self.multi.socket], [], [], self.multi.beat)
673         except select.error, e:
674             if e[0] not in [errno.EINTR]:
675                 raise
676
677     def process_limit(self):
678         # If our parent changed sucide
679         if self.ppid != os.getppid():
680             _logger.info("Worker (%s) Parent changed", self.pid)
681             self.alive = False
682         # check for lifetime
683         if self.request_count >= self.request_max:
684             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
685             self.alive = False
686         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
687         rss, vms = psutil.Process(os.getpid()).get_memory_info()
688         if vms > config['limit_memory_soft']:
689             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
690             self.alive = False      # Commit suicide after the request.
691
692         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
693         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
694         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
695
696         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
697         r = resource.getrusage(resource.RUSAGE_SELF)
698         cpu_time = r.ru_utime + r.ru_stime
699         def time_expired(n, stack):
700             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
701             # We dont suicide in such case
702             raise Exception('CPU time limit exceeded.')
703         signal.signal(signal.SIGXCPU, time_expired)
704         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
705         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
706
707     def process_work(self):
708         pass
709
710     def start(self):
711         self.pid = os.getpid()
712         self.setproctitle()
713         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
714         # Reseed the random number generator
715         random.seed()
716         # Prevent fd inherientence close_on_exec
717         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
718         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
719         # reset blocking status
720         self.multi.socket.setblocking(0)
721         signal.signal(signal.SIGINT, self.signal_handler)
722         signal.signal(signal.SIGTERM, signal.SIG_DFL)
723         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
724
725     def stop(self):
726         pass
727
728     def run(self):
729         try:
730             self.start()
731             while self.alive:
732                 self.process_limit()
733                 self.multi.pipe_ping(self.watchdog_pipe)
734                 self.sleep()
735                 self.process_work()
736             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
737             self.stop()
738         except Exception:
739             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
740             # should we use 3 to abort everything ?
741             sys.exit(1)
742
743 class WorkerHTTP(Worker):
744     """ HTTP Request workers """
745     def process_request(self, client, addr):
746         client.setblocking(1)
747         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
748         # Prevent fd inherientence close_on_exec
749         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
750         fcntl.fcntl(client, fcntl.F_SETFD, flags)
751         # do request using BaseWSGIServerNoBind monkey patched with socket
752         self.server.socket = client
753         # tolerate broken pipe when the http client closes the socket before
754         # receiving the full reply
755         try:
756             self.server.process_request(client, addr)
757         except IOError, e:
758             if e.errno != errno.EPIPE:
759                 raise
760         self.request_count += 1
761
762     def process_work(self):
763         try:
764             client, addr = self.multi.socket.accept()
765             self.process_request(client, addr)
766         except socket.error, e:
767             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
768                 raise
769
770     def start(self):
771         Worker.start(self)
772         self.server = BaseWSGIServerNoBind(self.multi.app)
773
774 class WorkerCron(Worker):
775     """ Cron workers """
776
777     def __init__(self, multi):
778         super(WorkerCron, self).__init__(multi)
779         # process_work() below process a single database per call.
780         # The variable db_index is keeping track of the next database to
781         # process.
782         self.db_index = 0
783
784     def sleep(self):
785         # Really sleep once all the databases have been processed.
786         if self.db_index == 0:
787             interval = SLEEP_INTERVAL + self.pid % 10   # chorus effect
788             time.sleep(interval)
789
790     def _db_list(self):
791         if config['db_name']:
792             db_names = config['db_name'].split(',')
793         else:
794             db_names = openerp.service.db.exp_list(True)
795         return db_names
796
797     def process_work(self):
798         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
799         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
800         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
801         db_names = self._db_list()
802         if len(db_names):
803             self.db_index = (self.db_index + 1) % len(db_names)
804             db_name = db_names[self.db_index]
805             self.setproctitle(db_name)
806             if rpc_request_flag:
807                 start_time = time.time()
808                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
809
810             import openerp.addons.base as base
811             base.ir.ir_cron.ir_cron._acquire_job(db_name)
812             openerp.modules.registry.RegistryManager.delete(db_name)
813
814             # dont keep cursors in multi database mode
815             if len(db_names) > 1:
816                 openerp.sql_db.close_db(db_name)
817             if rpc_request_flag:
818                 run_time = time.time() - start_time
819                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
820                 vms_diff = (end_vms - start_vms) / 1024
821                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
822                     (db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
823                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
824
825             self.request_count += 1
826             if self.request_count >= self.request_max and self.request_max < len(db_names):
827                 _logger.error("There are more dabatases to process than allowed "
828                               "by the `limit_request` configuration variable: %s more.",
829                               len(db_names) - self.request_max)
830         else:
831             self.db_index = 0
832
833     def start(self):
834         os.nice(10)     # mommy always told me to be nice with others...
835         Worker.start(self)
836         self.multi.socket.close()
837
838 #----------------------------------------------------------
839 # start/stop public api
840 #----------------------------------------------------------
841
842 server = None
843
844 def load_server_wide_modules():
845     for m in openerp.conf.server_wide_modules:
846         try:
847             openerp.modules.module.load_openerp_module(m)
848         except Exception:
849             msg = ''
850             if m == 'web':
851                 msg = """
852 The `web` module is provided by the addons found in the `openerp-web` project.
853 Maybe you forgot to add those addons in your addons_path configuration."""
854             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
855
856 def _reexec(updated_modules=None):
857     """reexecute openerp-server process with (nearly) the same arguments"""
858     if openerp.tools.osutil.is_running_as_nt_service():
859         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
860     exe = os.path.basename(sys.executable)
861     args = stripped_sys_argv()
862     args += ["-u", ','.join(updated_modules)]
863     if not args or args[0] != exe:
864         args.insert(0, exe)
865     os.execv(sys.executable, args)
866
867 def load_test_file_yml(registry, test_file):
868     with registry.cursor() as cr:
869         openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'init')
870         if config['test_commit']:
871             _logger.info('test %s has been commited', test_file)
872             cr.commit()
873         else:
874             _logger.info('test %s has been rollbacked', test_file)
875             cr.rollback()
876
877 def load_test_file_py(registry, test_file):
878     # Locate python module based on its filename and run the tests
879     test_path, _ = os.path.splitext(os.path.abspath(test_file))
880     for mod_name, mod_mod in sys.modules.items():
881         if mod_mod:
882             mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
883             if test_path == mod_path:
884                 suite = unittest2.TestSuite()
885                 for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
886                     suite.addTest(t)
887                 _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
888                 stream = openerp.modules.module.TestStream()
889                 result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
890                 success = result.wasSuccessful()
891                 if hasattr(registry._assertion_report,'report_result'):
892                     registry._assertion_report.report_result(success)
893                 if not success:
894                     _logger.error('%s: at least one error occurred in a test', test_file)
895
896 def preload_registries(dbnames):
897     """ Preload a registries, possibly run a test file."""
898     # TODO: move all config checks to args dont check tools.config here
899     config = openerp.tools.config
900     test_file = config['test_file']
901     dbnames = dbnames or []
902     rc = 0
903     for dbname in dbnames:
904         try:
905             update_module = config['init'] or config['update']
906             registry = RegistryManager.new(dbname, update_module=update_module)
907             # run test_file if provided
908             if test_file:
909                 _logger.info('loading test file %s', test_file)
910                 if test_file.endswith('yml'):
911                     load_test_file_yml(registry, test_file)
912                 elif test_file.endswith('py'):
913                     load_test_file_py(registry, test_file)
914
915             if registry._assertion_report.failures:
916                 rc += 1
917         except Exception:
918             _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
919             return -1
920     return rc
921
922 def start(preload=None, stop=False):
923     """ Start the openerp http server and cron processor.
924     """
925     global server
926     load_server_wide_modules()
927     if openerp.evented:
928         server = GeventServer(openerp.service.wsgi_server.application)
929     elif config['workers']:
930         server = PreforkServer(openerp.service.wsgi_server.application)
931     else:
932         server = ThreadedServer(openerp.service.wsgi_server.application)
933
934     if config['auto_reload']:
935         autoreload = AutoReload(server)
936         autoreload.run()
937
938     rc = server.run(preload, stop)
939
940     # like the legend of the phoenix, all ends with beginnings
941     if getattr(openerp, 'phoenix', False):
942         modules = []
943         if config['auto_reload']:
944             modules = autoreload.modules.keys()
945         _reexec(modules)
946
947     return rc if rc else 0
948
949 def restart():
950     """ Restart the server
951     """
952     if os.name == 'nt':
953         # run in a thread to let the current thread return response to the caller.
954         threading.Thread(target=_reexec).start()
955     else:
956         os.kill(server.pid, signal.SIGHUP)
957
958 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: