70f0e26a557d79cf2f60b1e967102052bd71add4
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import logging
7 import os
8 import os.path
9 import platform
10 import psutil
11 import random
12 if os.name == 'posix':
13     import resource
14 else:
15     resource = None
16 import select
17 import signal
18 import socket
19 import subprocess
20 import sys
21 import threading
22 import time
23 import unittest2
24
25 import werkzeug.serving
26
27 try:
28     import fcntl
29 except ImportError:
30     pass
31 try:
32     from setproctitle import setproctitle
33 except ImportError:
34     setproctitle = lambda x: None
35
36 import openerp
37 from openerp.modules.registry import RegistryManager
38 from openerp.release import nt_service_name
39 import openerp.tools.config as config
40 from openerp.tools.misc import stripped_sys_argv, dumpstacks
41
42 _logger = logging.getLogger(__name__)
43
44 try:
45     import watchdog
46     from watchdog.observers import Observer
47     from watchdog.events import FileCreatedEvent, FileModifiedEvent
48 except ImportError:
49     watchdog = None
50
51 SLEEP_INTERVAL = 60     # 1 min
52
53 #----------------------------------------------------------
54 # Werkzeug WSGI servers patched
55 #----------------------------------------------------------
56 class LoggingBaseWSGIServerMixIn(object):
57     def handle_error(self, request, client_address):
58         t, e, _ = sys.exc_info()
59         if t == socket.error and e.errno == errno.EPIPE:
60             # broken pipe, ignore error
61             return
62         _logger.exception('Exception happened during processing of request from %s', client_address)
63
64 class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
65     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
66     use this class, sets the socket and calls the process_request() manually
67     """
68     def __init__(self, app):
69         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
70     def server_bind(self):
71         # we dont bind beause we use the listen socket of PreforkServer#socket
72         # instead we close the socket
73         if self.socket:
74             self.socket.close()
75     def server_activate(self):
76         # dont listen as we use PreforkServer#socket
77         pass
78
79
80 class RequestHandler(werkzeug.serving.WSGIRequestHandler):
81     def setup(self):
82         # flag the current thread as handling a http request
83         super(RequestHandler, self).setup()
84         me = threading.currentThread()
85         me.name = 'openerp.service.http.request.%s' % (me.ident,)
86
87 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
88 # should also work with systemd socket activation. This is currently untested
89 # and not yet used.
90
91 class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
92     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
93     given by the environement, this is used by autoreload to keep the listen
94     socket open when a reload happens.
95     """
96     def __init__(self, host, port, app):
97         super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
98                                                            handler=RequestHandler)
99
100     def server_bind(self):
101         envfd = os.environ.get('LISTEN_FDS')
102         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
103             self.reload_socket = True
104             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
105             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
106         else:
107             self.reload_socket = False
108             super(ThreadedWSGIServerReloadable, self).server_bind()
109
110     def server_activate(self):
111         if not self.reload_socket:
112             super(ThreadedWSGIServerReloadable, self).server_activate()
113
114 #----------------------------------------------------------
115 # FileSystem Watcher for autoreload and cache invalidation
116 #----------------------------------------------------------
117 class FSWatcher(object):
118     def __init__(self):
119         self.observer = Observer()
120         for path in openerp.modules.module.ad_paths:
121             _logger.info('Watching addons folder %s', path)
122             self.observer.schedule(self, path, recursive=True)
123
124     def dispatch(self, event):
125         if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
126             if not event.is_directory:
127                 path = event.src_path
128                 if path.endswith('.py'):
129                     try:
130                         source = open(path, 'rb').read() + '\n'
131                         compile(source, path, 'exec')
132                     except SyntaxError:
133                         _logger.error('autoreload: python code change detected, SyntaxError in %s', path)
134                     else:
135                         _logger.info('autoreload: python code updated, autoreload activated')
136                         restart()
137
138     def start(self):
139         self.observer.start()
140         _logger.info('AutoReload watcher running')
141
142     def stop(self):
143         self.observer.stop()
144         self.observer.join()
145
146 #----------------------------------------------------------
147 # Servers: Threaded, Gevented and Prefork
148 #----------------------------------------------------------
149
150 class CommonServer(object):
151     def __init__(self, app):
152         # TODO Change the xmlrpc_* options to http_*
153         self.app = app
154         # config
155         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
156         self.port = config['xmlrpc_port']
157         # runtime
158         self.pid = os.getpid()
159
160     def close_socket(self, sock):
161         """ Closes a socket instance cleanly
162         :param sock: the network socket to close
163         :type sock: socket.socket
164         """
165         try:
166             sock.shutdown(socket.SHUT_RDWR)
167         except socket.error, e:
168             # On OSX, socket shutdowns both sides if any side closes it
169             # causing an error 57 'Socket is not connected' on shutdown
170             # of the other side (or something), see
171             # http://bugs.python.org/issue4397
172             # note: stdlib fixed test, not behavior
173             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
174                 raise
175         sock.close()
176
177 class ThreadedServer(CommonServer):
178     def __init__(self, app):
179         super(ThreadedServer, self).__init__(app)
180         self.main_thread_id = threading.currentThread().ident
181         # Variable keeping track of the number of calls to the signal handler defined
182         # below. This variable is monitored by ``quit_on_signals()``.
183         self.quit_signals_received = 0
184
185         #self.socket = None
186         self.httpd = None
187
188     def signal_handler(self, sig, frame):
189         if sig in [signal.SIGINT, signal.SIGTERM]:
190             # shutdown on kill -INT or -TERM
191             self.quit_signals_received += 1
192             if self.quit_signals_received > 1:
193                 # logging.shutdown was already called at this point.
194                 sys.stderr.write("Forced shutdown.\n")
195                 os._exit(0)
196         elif sig == signal.SIGHUP:
197             # restart on kill -HUP
198             openerp.phoenix = True
199             self.quit_signals_received += 1
200
201     def cron_thread(self, number):
202         while True:
203             time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
204             registries = openerp.modules.registry.RegistryManager.registries
205             _logger.debug('cron%d polling for jobs', number)
206             for db_name, registry in registries.iteritems():
207                 while registry.ready:
208                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
209                     if not acquired:
210                         break
211
212     def cron_spawn(self):
213         """ Start the above runner function in a daemon thread.
214
215         The thread is a typical daemon thread: it will never quit and must be
216         terminated when the main process exits - with no consequence (the processing
217         threads it spawns are not marked daemon).
218
219         """
220         # Force call to strptime just before starting the cron thread
221         # to prevent time.strptime AttributeError within the thread.
222         # See: http://bugs.python.org/issue7980
223         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
224         for i in range(openerp.tools.config['max_cron_threads']):
225             def target():
226                 self.cron_thread(i)
227             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
228             t.setDaemon(True)
229             t.start()
230             _logger.debug("cron%d started!" % i)
231
232     def http_thread(self):
233         def app(e, s):
234             return self.app(e, s)
235         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
236         self.httpd.serve_forever()
237
238     def http_spawn(self):
239         t = threading.Thread(target=self.http_thread, name="openerp.service.httpd")
240         t.setDaemon(True)
241         t.start()
242         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
243
244     def start(self, stop=False):
245         _logger.debug("Setting signal handlers")
246         if os.name == 'posix':
247             signal.signal(signal.SIGINT, self.signal_handler)
248             signal.signal(signal.SIGTERM, self.signal_handler)
249             signal.signal(signal.SIGCHLD, self.signal_handler)
250             signal.signal(signal.SIGHUP, self.signal_handler)
251             signal.signal(signal.SIGQUIT, dumpstacks)
252         elif os.name == 'nt':
253             import win32api
254             win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
255
256         test_mode = config['test_enable'] or config['test_file']
257         if not stop or test_mode:
258             # some tests need the http deamon to be available...
259             self.http_spawn()
260
261         if not stop:
262             # only relevant if we are not in "--stop-after-init" mode
263             self.cron_spawn()
264
265     def stop(self):
266         """ Shutdown the WSGI server. Wait for non deamon threads.
267         """
268         _logger.info("Initiating shutdown")
269         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
270
271         if self.httpd:
272             self.httpd.shutdown()
273             self.close_socket(self.httpd.socket)
274
275         # Manually join() all threads before calling sys.exit() to allow a second signal
276         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
277         # threading.Thread.join() should not mask signals (at least in python 2.5).
278         me = threading.currentThread()
279         _logger.debug('current thread: %r', me)
280         for thread in threading.enumerate():
281             _logger.debug('process %r (%r)', thread, thread.isDaemon())
282             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
283                 while thread.isAlive():
284                     _logger.debug('join and sleep')
285                     # Need a busyloop here as thread.join() masks signals
286                     # and would prevent the forced shutdown.
287                     thread.join(0.05)
288                     time.sleep(0.05)
289
290         _logger.debug('--')
291         openerp.modules.registry.RegistryManager.delete_all()
292         logging.shutdown()
293
294     def run(self, preload=None, stop=False):
295         """ Start the http server and the cron thread then wait for a signal.
296
297         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
298         a second one if any will force an immediate exit.
299         """
300         self.start(stop=stop)
301
302         rc = preload_registries(preload)
303
304         if stop:
305             self.stop()
306             return rc
307
308         # Wait for a first signal to be handled. (time.sleep will be interrupted
309         # by the signal handler.) The try/except is for the win32 case.
310         try:
311             while self.quit_signals_received == 0:
312                 time.sleep(60)
313         except KeyboardInterrupt:
314             pass
315
316         self.stop()
317
318     def reload(self):
319         os.kill(self.pid, signal.SIGHUP)
320
321 class GeventServer(CommonServer):
322     def __init__(self, app):
323         super(GeventServer, self).__init__(app)
324         self.port = config['longpolling_port']
325         self.httpd = None
326
327     def watch_parent(self, beat=4):
328         import gevent
329         ppid = os.getppid()
330         while True:
331             if ppid != os.getppid():
332                 pid = os.getpid()
333                 _logger.info("LongPolling (%s) Parent changed", pid)
334                 # suicide !!
335                 os.kill(pid, signal.SIGTERM)
336                 return
337             gevent.sleep(beat)
338
339     def start(self):
340         import gevent
341         from gevent.wsgi import WSGIServer
342
343         if os.name == 'posix':
344             signal.signal(signal.SIGQUIT, dumpstacks)
345
346         gevent.spawn(self.watch_parent)
347         self.httpd = WSGIServer((self.interface, self.port), self.app)
348         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
349         try:
350             self.httpd.serve_forever()
351         except:
352             _logger.exception("Evented Service (longpolling): uncaught error during main loop")
353             raise
354
355     def stop(self):
356         import gevent
357         self.httpd.stop()
358         gevent.shutdown()
359
360     def run(self, preload, stop):
361         self.start()
362         self.stop()
363
364 class PreforkServer(CommonServer):
365     """ Multiprocessing inspired by (g)unicorn.
366     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
367     method between workers but we plan to replace it by a more intelligent
368     dispatcher to will parse the first HTTP request line.
369     """
370     def __init__(self, app):
371         # config
372         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
373         self.population = config['workers']
374         self.timeout = config['limit_time_real']
375         self.limit_request = config['limit_request']
376         # working vars
377         self.beat = 4
378         self.app = app
379         self.pid = os.getpid()
380         self.socket = None
381         self.workers_http = {}
382         self.workers_cron = {}
383         self.workers = {}
384         self.generation = 0
385         self.queue = []
386         self.long_polling_pid = None
387
388     def pipe_new(self):
389         pipe = os.pipe()
390         for fd in pipe:
391             # non_blocking
392             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
393             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
394             # close_on_exec
395             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
396             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
397         return pipe
398
399     def pipe_ping(self, pipe):
400         try:
401             os.write(pipe[1], '.')
402         except IOError, e:
403             if e.errno not in [errno.EAGAIN, errno.EINTR]:
404                 raise
405
406     def signal_handler(self, sig, frame):
407         if len(self.queue) < 5 or sig == signal.SIGCHLD:
408             self.queue.append(sig)
409             self.pipe_ping(self.pipe)
410         else:
411             _logger.warn("Dropping signal: %s", sig)
412
413     def worker_spawn(self, klass, workers_registry):
414         self.generation += 1
415         worker = klass(self)
416         pid = os.fork()
417         if pid != 0:
418             worker.pid = pid
419             self.workers[pid] = worker
420             workers_registry[pid] = worker
421             return worker
422         else:
423             worker.run()
424             sys.exit(0)
425
426     def long_polling_spawn(self):
427         nargs = stripped_sys_argv()
428         cmd = nargs[0]
429         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
430         nargs[0] = cmd
431         popen = subprocess.Popen([sys.executable] + nargs)
432         self.long_polling_pid = popen.pid
433
434     def worker_pop(self, pid):
435         if pid == self.long_polling_pid:
436             self.long_polling_pid = None
437         if pid in self.workers:
438             _logger.debug("Worker (%s) unregistered", pid)
439             try:
440                 self.workers_http.pop(pid, None)
441                 self.workers_cron.pop(pid, None)
442                 u = self.workers.pop(pid)
443                 u.close()
444             except OSError:
445                 return
446
447     def worker_kill(self, pid, sig):
448         try:
449             os.kill(pid, sig)
450         except OSError, e:
451             if e.errno == errno.ESRCH:
452                 self.worker_pop(pid)
453
454     def process_signals(self):
455         while len(self.queue):
456             sig = self.queue.pop(0)
457             if sig in [signal.SIGINT, signal.SIGTERM]:
458                 raise KeyboardInterrupt
459             elif sig == signal.SIGHUP:
460                 # restart on kill -HUP
461                 openerp.phoenix = True
462                 raise KeyboardInterrupt
463             elif sig == signal.SIGQUIT:
464                 # dump stacks on kill -3
465                 self.dumpstacks()
466             elif sig == signal.SIGTTIN:
467                 # increase number of workers
468                 self.population += 1
469             elif sig == signal.SIGTTOU:
470                 # decrease number of workers
471                 self.population -= 1
472
473     def process_zombie(self):
474         # reap dead workers
475         while 1:
476             try:
477                 wpid, status = os.waitpid(-1, os.WNOHANG)
478                 if not wpid:
479                     break
480                 if (status >> 8) == 3:
481                     msg = "Critial worker error (%s)"
482                     _logger.critical(msg, wpid)
483                     raise Exception(msg % wpid)
484                 self.worker_pop(wpid)
485             except OSError, e:
486                 if e.errno == errno.ECHILD:
487                     break
488                 raise
489
490     def process_timeout(self):
491         now = time.time()
492         for (pid, worker) in self.workers.items():
493             if worker.watchdog_timeout is not None and \
494                     (now - worker.watchdog_time) >= worker.watchdog_timeout:
495                 _logger.error("Worker (%s) timeout", pid)
496                 self.worker_kill(pid, signal.SIGKILL)
497
498     def process_spawn(self):
499         while len(self.workers_http) < self.population:
500             self.worker_spawn(WorkerHTTP, self.workers_http)
501         while len(self.workers_cron) < config['max_cron_threads']:
502             self.worker_spawn(WorkerCron, self.workers_cron)
503         if not self.long_polling_pid:
504             self.long_polling_spawn()
505
506     def sleep(self):
507         try:
508             # map of fd -> worker
509             fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
510             fd_in = fds.keys() + [self.pipe[0]]
511             # check for ping or internal wakeups
512             ready = select.select(fd_in, [], [], self.beat)
513             # update worker watchdogs
514             for fd in ready[0]:
515                 if fd in fds:
516                     fds[fd].watchdog_time = time.time()
517                 try:
518                     # empty pipe
519                     while os.read(fd, 1):
520                         pass
521                 except OSError, e:
522                     if e.errno not in [errno.EAGAIN]:
523                         raise
524         except select.error, e:
525             if e[0] not in [errno.EINTR]:
526                 raise
527
528     def start(self):
529         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
530         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
531         # signal handler to overcome this behaviour
532         self.pipe = self.pipe_new()
533         # set signal handlers
534         signal.signal(signal.SIGINT, self.signal_handler)
535         signal.signal(signal.SIGTERM, self.signal_handler)
536         signal.signal(signal.SIGHUP, self.signal_handler)
537         signal.signal(signal.SIGCHLD, self.signal_handler)
538         signal.signal(signal.SIGTTIN, self.signal_handler)
539         signal.signal(signal.SIGTTOU, self.signal_handler)
540         signal.signal(signal.SIGQUIT, dumpstacks)
541
542         # listen to socket
543         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
544         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
545         self.socket.setblocking(0)
546         self.socket.bind(self.address)
547         self.socket.listen(8 * self.population)
548
549     def stop(self, graceful=True):
550         if self.long_polling_pid is not None:
551             # FIXME make longpolling process handle SIGTERM correctly
552             self.worker_kill(self.long_polling_pid, signal.SIGKILL)
553             self.long_polling_pid = None
554         if graceful:
555             _logger.info("Stopping gracefully")
556             limit = time.time() + self.timeout
557             for pid in self.workers.keys():
558                 self.worker_kill(pid, signal.SIGTERM)
559             while self.workers and time.time() < limit:
560                 self.process_zombie()
561                 time.sleep(0.1)
562         else:
563             _logger.info("Stopping forcefully")
564         for pid in self.workers.keys():
565             self.worker_kill(pid, signal.SIGTERM)
566         self.socket.close()
567
568     def run(self, preload, stop):
569         self.start()
570
571         rc = preload_registries(preload)
572
573         if stop:
574             self.stop()
575             return rc
576
577         # Empty the cursor pool, we dont want them to be shared among forked workers.
578         openerp.sql_db.close_all()
579
580         _logger.debug("Multiprocess starting")
581         while 1:
582             try:
583                 #_logger.debug("Multiprocess beat (%s)",time.time())
584                 self.process_signals()
585                 self.process_zombie()
586                 self.process_timeout()
587                 self.process_spawn()
588                 self.sleep()
589             except KeyboardInterrupt:
590                 _logger.debug("Multiprocess clean stop")
591                 self.stop()
592                 break
593             except Exception, e:
594                 _logger.exception(e)
595                 self.stop(False)
596                 return -1
597
598 class Worker(object):
599     """ Workers """
600     def __init__(self, multi):
601         self.multi = multi
602         self.watchdog_time = time.time()
603         self.watchdog_pipe = multi.pipe_new()
604         # Can be set to None if no watchdog is desired.
605         self.watchdog_timeout = multi.timeout
606         self.ppid = os.getpid()
607         self.pid = None
608         self.alive = True
609         # should we rename into lifetime ?
610         self.request_max = multi.limit_request
611         self.request_count = 0
612
613     def setproctitle(self, title=""):
614         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
615
616     def close(self):
617         os.close(self.watchdog_pipe[0])
618         os.close(self.watchdog_pipe[1])
619
620     def signal_handler(self, sig, frame):
621         self.alive = False
622
623     def sleep(self):
624         try:
625             select.select([self.multi.socket], [], [], self.multi.beat)
626         except select.error, e:
627             if e[0] not in [errno.EINTR]:
628                 raise
629
630     def process_limit(self):
631         if resource is None:
632             return
633         # If our parent changed sucide
634         if self.ppid != os.getppid():
635             _logger.info("Worker (%s) Parent changed", self.pid)
636             self.alive = False
637         # check for lifetime
638         if self.request_count >= self.request_max:
639             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
640             self.alive = False
641         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
642         rss, vms = psutil.Process(os.getpid()).get_memory_info()
643         if vms > config['limit_memory_soft']:
644             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
645             self.alive = False      # Commit suicide after the request.
646
647         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
648         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
649         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
650
651         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
652         r = resource.getrusage(resource.RUSAGE_SELF)
653         cpu_time = r.ru_utime + r.ru_stime
654         def time_expired(n, stack):
655             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
656             # We dont suicide in such case
657             raise Exception('CPU time limit exceeded.')
658         signal.signal(signal.SIGXCPU, time_expired)
659         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
660         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
661
662     def process_work(self):
663         pass
664
665     def start(self):
666         self.pid = os.getpid()
667         self.setproctitle()
668         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
669         # Reseed the random number generator
670         random.seed()
671         # Prevent fd inherientence close_on_exec
672         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
673         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
674         # reset blocking status
675         self.multi.socket.setblocking(0)
676         signal.signal(signal.SIGINT, self.signal_handler)
677         signal.signal(signal.SIGTERM, signal.SIG_DFL)
678         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
679
680     def stop(self):
681         pass
682
683     def run(self):
684         try:
685             self.start()
686             while self.alive:
687                 self.process_limit()
688                 self.multi.pipe_ping(self.watchdog_pipe)
689                 self.sleep()
690                 self.process_work()
691             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
692             self.stop()
693         except Exception:
694             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
695             # should we use 3 to abort everything ?
696             sys.exit(1)
697
698 class WorkerHTTP(Worker):
699     """ HTTP Request workers """
700     def process_request(self, client, addr):
701         client.setblocking(1)
702         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
703         # Prevent fd inherientence close_on_exec
704         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
705         fcntl.fcntl(client, fcntl.F_SETFD, flags)
706         # do request using BaseWSGIServerNoBind monkey patched with socket
707         self.server.socket = client
708         # tolerate broken pipe when the http client closes the socket before
709         # receiving the full reply
710         try:
711             self.server.process_request(client, addr)
712         except IOError, e:
713             if e.errno != errno.EPIPE:
714                 raise
715         self.request_count += 1
716
717     def process_work(self):
718         try:
719             client, addr = self.multi.socket.accept()
720             self.process_request(client, addr)
721         except socket.error, e:
722             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
723                 raise
724
725     def start(self):
726         Worker.start(self)
727         self.server = BaseWSGIServerNoBind(self.multi.app)
728
729 class WorkerCron(Worker):
730     """ Cron workers """
731
732     def __init__(self, multi):
733         super(WorkerCron, self).__init__(multi)
734         # process_work() below process a single database per call.
735         # The variable db_index is keeping track of the next database to
736         # process.
737         self.db_index = 0
738
739     def sleep(self):
740         # Really sleep once all the databases have been processed.
741         if self.db_index == 0:
742             interval = SLEEP_INTERVAL + self.pid % 10   # chorus effect
743             time.sleep(interval)
744
745     def _db_list(self):
746         if config['db_name']:
747             db_names = config['db_name'].split(',')
748         else:
749             db_names = openerp.service.db.exp_list(True)
750         return db_names
751
752     def process_work(self):
753         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
754         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
755         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
756         db_names = self._db_list()
757         if len(db_names):
758             self.db_index = (self.db_index + 1) % len(db_names)
759             db_name = db_names[self.db_index]
760             self.setproctitle(db_name)
761             if rpc_request_flag:
762                 start_time = time.time()
763                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
764
765             import openerp.addons.base as base
766             base.ir.ir_cron.ir_cron._acquire_job(db_name)
767             openerp.modules.registry.RegistryManager.delete(db_name)
768
769             # dont keep cursors in multi database mode
770             if len(db_names) > 1:
771                 openerp.sql_db.close_db(db_name)
772             if rpc_request_flag:
773                 run_time = time.time() - start_time
774                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
775                 vms_diff = (end_vms - start_vms) / 1024
776                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
777                     (db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
778                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
779
780             self.request_count += 1
781             if self.request_count >= self.request_max and self.request_max < len(db_names):
782                 _logger.error("There are more dabatases to process than allowed "
783                               "by the `limit_request` configuration variable: %s more.",
784                               len(db_names) - self.request_max)
785         else:
786             self.db_index = 0
787
788     def start(self):
789         os.nice(10)     # mommy always told me to be nice with others...
790         Worker.start(self)
791         self.multi.socket.close()
792
793 #----------------------------------------------------------
794 # start/stop public api
795 #----------------------------------------------------------
796
797 server = None
798
799 def load_server_wide_modules():
800     for m in openerp.conf.server_wide_modules:
801         try:
802             openerp.modules.module.load_openerp_module(m)
803         except Exception:
804             msg = ''
805             if m == 'web':
806                 msg = """
807 The `web` module is provided by the addons found in the `openerp-web` project.
808 Maybe you forgot to add those addons in your addons_path configuration."""
809             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
810
811 def _reexec(updated_modules=None):
812     """reexecute openerp-server process with (nearly) the same arguments"""
813     if openerp.tools.osutil.is_running_as_nt_service():
814         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
815     exe = os.path.basename(sys.executable)
816     args = stripped_sys_argv()
817     if updated_modules:
818         args += ["-u", ','.join(updated_modules)]
819     if not args or args[0] != exe:
820         args.insert(0, exe)
821     os.execv(sys.executable, args)
822
823 def load_test_file_yml(registry, test_file):
824     with registry.cursor() as cr:
825         openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'init')
826         if config['test_commit']:
827             _logger.info('test %s has been commited', test_file)
828             cr.commit()
829         else:
830             _logger.info('test %s has been rollbacked', test_file)
831             cr.rollback()
832
833 def load_test_file_py(registry, test_file):
834     # Locate python module based on its filename and run the tests
835     test_path, _ = os.path.splitext(os.path.abspath(test_file))
836     for mod_name, mod_mod in sys.modules.items():
837         if mod_mod:
838             mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
839             if test_path == mod_path:
840                 suite = unittest2.TestSuite()
841                 for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
842                     suite.addTest(t)
843                 _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
844                 stream = openerp.modules.module.TestStream()
845                 result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
846                 success = result.wasSuccessful()
847                 if hasattr(registry._assertion_report,'report_result'):
848                     registry._assertion_report.report_result(success)
849                 if not success:
850                     _logger.error('%s: at least one error occurred in a test', test_file)
851
852 def preload_registries(dbnames):
853     """ Preload a registries, possibly run a test file."""
854     # TODO: move all config checks to args dont check tools.config here
855     config = openerp.tools.config
856     test_file = config['test_file']
857     dbnames = dbnames or []
858     rc = 0
859     for dbname in dbnames:
860         try:
861             update_module = config['init'] or config['update']
862             registry = RegistryManager.new(dbname, update_module=update_module)
863             # run test_file if provided
864             if test_file:
865                 _logger.info('loading test file %s', test_file)
866                 if test_file.endswith('yml'):
867                     load_test_file_yml(registry, test_file)
868                 elif test_file.endswith('py'):
869                     load_test_file_py(registry, test_file)
870
871             if registry._assertion_report.failures:
872                 rc += 1
873         except Exception:
874             _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
875             return -1
876     return rc
877
878 def start(preload=None, stop=False):
879     """ Start the openerp http server and cron processor.
880     """
881     global server
882     load_server_wide_modules()
883     if openerp.evented:
884         server = GeventServer(openerp.service.wsgi_server.application)
885     elif config['workers']:
886         server = PreforkServer(openerp.service.wsgi_server.application)
887     else:
888         server = ThreadedServer(openerp.service.wsgi_server.application)
889
890     watcher = None
891     if config['dev_mode']:
892         if watchdog:
893             watcher = FSWatcher()
894             watcher.start()
895         else:
896             _logger.warning("'watchdog' module not installed. Code autoreload feature is disabled")
897
898     rc = server.run(preload, stop)
899
900     # like the legend of the phoenix, all ends with beginnings
901     if getattr(openerp, 'phoenix', False):
902         if watcher:
903             watcher.stop()
904         _reexec()
905
906     return rc if rc else 0
907
908 def restart():
909     """ Restart the server
910     """
911     if os.name == 'nt':
912         # run in a thread to let the current thread return response to the caller.
913         threading.Thread(target=_reexec).start()
914     else:
915         os.kill(server.pid, signal.SIGHUP)
916
917 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: