[MERGE] forward port of branch 8.0 up to 2e092ac
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import logging
7 import os
8 import os.path
9 import platform
10 import psutil
11 import random
12 import select
13 import signal
14 import socket
15 import subprocess
16 import sys
17 import threading
18 import time
19 import unittest2
20
21 import werkzeug.serving
22
23 if os.name == 'posix':
24     # Unix only for workers
25     import fcntl
26     import resource
27 else:
28     # Windows shim
29     signal.SIGHUP = -1
30
31 # Optional process names for workers
32 try:
33     from setproctitle import setproctitle
34 except ImportError:
35     setproctitle = lambda x: None
36
37 import openerp
38 from openerp.modules.registry import RegistryManager
39 from openerp.release import nt_service_name
40 import openerp.tools.config as config
41 from openerp.tools.misc import stripped_sys_argv, dumpstacks
42
43 _logger = logging.getLogger(__name__)
44
45 try:
46     import watchdog
47     from watchdog.observers import Observer
48     from watchdog.events import FileCreatedEvent, FileModifiedEvent
49 except ImportError:
50     watchdog = None
51
52 SLEEP_INTERVAL = 60     # 1 min
53
54 #----------------------------------------------------------
55 # Werkzeug WSGI servers patched
56 #----------------------------------------------------------
57 class LoggingBaseWSGIServerMixIn(object):
58     def handle_error(self, request, client_address):
59         t, e, _ = sys.exc_info()
60         if t == socket.error and e.errno == errno.EPIPE:
61             # broken pipe, ignore error
62             return
63         _logger.exception('Exception happened during processing of request from %s', client_address)
64
65 class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
66     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
67     use this class, sets the socket and calls the process_request() manually
68     """
69     def __init__(self, app):
70         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
71     def server_bind(self):
72         # we dont bind beause we use the listen socket of PreforkServer#socket
73         # instead we close the socket
74         if self.socket:
75             self.socket.close()
76     def server_activate(self):
77         # dont listen as we use PreforkServer#socket
78         pass
79
80
81 class RequestHandler(werkzeug.serving.WSGIRequestHandler):
82     def setup(self):
83         # flag the current thread as handling a http request
84         super(RequestHandler, self).setup()
85         me = threading.currentThread()
86         me.name = 'openerp.service.http.request.%s' % (me.ident,)
87
88 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
89 # should also work with systemd socket activation. This is currently untested
90 # and not yet used.
91
92 class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
93     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
94     given by the environement, this is used by autoreload to keep the listen
95     socket open when a reload happens.
96     """
97     def __init__(self, host, port, app):
98         super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
99                                                            handler=RequestHandler)
100
101     def server_bind(self):
102         envfd = os.environ.get('LISTEN_FDS')
103         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
104             self.reload_socket = True
105             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
106             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
107         else:
108             self.reload_socket = False
109             super(ThreadedWSGIServerReloadable, self).server_bind()
110
111     def server_activate(self):
112         if not self.reload_socket:
113             super(ThreadedWSGIServerReloadable, self).server_activate()
114
115 #----------------------------------------------------------
116 # FileSystem Watcher for autoreload and cache invalidation
117 #----------------------------------------------------------
118 class FSWatcher(object):
119     def __init__(self):
120         self.observer = Observer()
121         for path in openerp.modules.module.ad_paths:
122             _logger.info('Watching addons folder %s', path)
123             self.observer.schedule(self, path, recursive=True)
124
125     def dispatch(self, event):
126         if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
127             if not event.is_directory:
128                 path = event.src_path
129                 if path.endswith('.py'):
130                     try:
131                         source = open(path, 'rb').read() + '\n'
132                         compile(source, path, 'exec')
133                     except SyntaxError:
134                         _logger.error('autoreload: python code change detected, SyntaxError in %s', path)
135                     else:
136                         _logger.info('autoreload: python code updated, autoreload activated')
137                         restart()
138
139     def start(self):
140         self.observer.start()
141         _logger.info('AutoReload watcher running')
142
143     def stop(self):
144         self.observer.stop()
145         self.observer.join()
146
147 #----------------------------------------------------------
148 # Servers: Threaded, Gevented and Prefork
149 #----------------------------------------------------------
150
151 class CommonServer(object):
152     def __init__(self, app):
153         # TODO Change the xmlrpc_* options to http_*
154         self.app = app
155         # config
156         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
157         self.port = config['xmlrpc_port']
158         # runtime
159         self.pid = os.getpid()
160
161     def close_socket(self, sock):
162         """ Closes a socket instance cleanly
163         :param sock: the network socket to close
164         :type sock: socket.socket
165         """
166         try:
167             sock.shutdown(socket.SHUT_RDWR)
168         except socket.error, e:
169             # On OSX, socket shutdowns both sides if any side closes it
170             # causing an error 57 'Socket is not connected' on shutdown
171             # of the other side (or something), see
172             # http://bugs.python.org/issue4397
173             # note: stdlib fixed test, not behavior
174             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
175                 raise
176         sock.close()
177
178 class ThreadedServer(CommonServer):
179     def __init__(self, app):
180         super(ThreadedServer, self).__init__(app)
181         self.main_thread_id = threading.currentThread().ident
182         # Variable keeping track of the number of calls to the signal handler defined
183         # below. This variable is monitored by ``quit_on_signals()``.
184         self.quit_signals_received = 0
185
186         #self.socket = None
187         self.httpd = None
188
189     def signal_handler(self, sig, frame):
190         if sig in [signal.SIGINT, signal.SIGTERM]:
191             # shutdown on kill -INT or -TERM
192             self.quit_signals_received += 1
193             if self.quit_signals_received > 1:
194                 # logging.shutdown was already called at this point.
195                 sys.stderr.write("Forced shutdown.\n")
196                 os._exit(0)
197         elif sig == signal.SIGHUP:
198             # restart on kill -HUP
199             openerp.phoenix = True
200             self.quit_signals_received += 1
201
202     def cron_thread(self, number):
203         while True:
204             time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
205             registries = openerp.modules.registry.RegistryManager.registries
206             _logger.debug('cron%d polling for jobs', number)
207             for db_name, registry in registries.iteritems():
208                 while registry.ready:
209                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
210                     if not acquired:
211                         break
212
213     def cron_spawn(self):
214         """ Start the above runner function in a daemon thread.
215
216         The thread is a typical daemon thread: it will never quit and must be
217         terminated when the main process exits - with no consequence (the processing
218         threads it spawns are not marked daemon).
219
220         """
221         # Force call to strptime just before starting the cron thread
222         # to prevent time.strptime AttributeError within the thread.
223         # See: http://bugs.python.org/issue7980
224         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
225         for i in range(openerp.tools.config['max_cron_threads']):
226             def target():
227                 self.cron_thread(i)
228             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
229             t.setDaemon(True)
230             t.start()
231             _logger.debug("cron%d started!" % i)
232
233     def http_thread(self):
234         def app(e, s):
235             return self.app(e, s)
236         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
237         self.httpd.serve_forever()
238
239     def http_spawn(self):
240         t = threading.Thread(target=self.http_thread, name="openerp.service.httpd")
241         t.setDaemon(True)
242         t.start()
243         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
244
245     def start(self, stop=False):
246         _logger.debug("Setting signal handlers")
247         if os.name == 'posix':
248             signal.signal(signal.SIGINT, self.signal_handler)
249             signal.signal(signal.SIGTERM, self.signal_handler)
250             signal.signal(signal.SIGCHLD, self.signal_handler)
251             signal.signal(signal.SIGHUP, self.signal_handler)
252             signal.signal(signal.SIGQUIT, dumpstacks)
253         elif os.name == 'nt':
254             import win32api
255             win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
256
257         test_mode = config['test_enable'] or config['test_file']
258         if not stop or test_mode:
259             # some tests need the http deamon to be available...
260             self.http_spawn()
261
262         if not stop:
263             # only relevant if we are not in "--stop-after-init" mode
264             self.cron_spawn()
265
266     def stop(self):
267         """ Shutdown the WSGI server. Wait for non deamon threads.
268         """
269         _logger.info("Initiating shutdown")
270         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
271
272         if self.httpd:
273             self.httpd.shutdown()
274             self.close_socket(self.httpd.socket)
275
276         # Manually join() all threads before calling sys.exit() to allow a second signal
277         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
278         # threading.Thread.join() should not mask signals (at least in python 2.5).
279         me = threading.currentThread()
280         _logger.debug('current thread: %r', me)
281         for thread in threading.enumerate():
282             _logger.debug('process %r (%r)', thread, thread.isDaemon())
283             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
284                 while thread.isAlive():
285                     _logger.debug('join and sleep')
286                     # Need a busyloop here as thread.join() masks signals
287                     # and would prevent the forced shutdown.
288                     thread.join(0.05)
289                     time.sleep(0.05)
290
291         _logger.debug('--')
292         openerp.modules.registry.RegistryManager.delete_all()
293         logging.shutdown()
294
295     def run(self, preload=None, stop=False):
296         """ Start the http server and the cron thread then wait for a signal.
297
298         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
299         a second one if any will force an immediate exit.
300         """
301         self.start(stop=stop)
302
303         rc = preload_registries(preload)
304
305         if stop:
306             self.stop()
307             return rc
308
309         # Wait for a first signal to be handled. (time.sleep will be interrupted
310         # by the signal handler.) The try/except is for the win32 case.
311         try:
312             while self.quit_signals_received == 0:
313                 time.sleep(60)
314         except KeyboardInterrupt:
315             pass
316
317         self.stop()
318
319     def reload(self):
320         os.kill(self.pid, signal.SIGHUP)
321
322 class GeventServer(CommonServer):
323     def __init__(self, app):
324         super(GeventServer, self).__init__(app)
325         self.port = config['longpolling_port']
326         self.httpd = None
327
328     def watch_parent(self, beat=4):
329         import gevent
330         ppid = os.getppid()
331         while True:
332             if ppid != os.getppid():
333                 pid = os.getpid()
334                 _logger.info("LongPolling (%s) Parent changed", pid)
335                 # suicide !!
336                 os.kill(pid, signal.SIGTERM)
337                 return
338             gevent.sleep(beat)
339
340     def start(self):
341         import gevent
342         from gevent.wsgi import WSGIServer
343
344         if os.name == 'posix':
345             signal.signal(signal.SIGQUIT, dumpstacks)
346
347         gevent.spawn(self.watch_parent)
348         self.httpd = WSGIServer((self.interface, self.port), self.app)
349         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
350         try:
351             self.httpd.serve_forever()
352         except:
353             _logger.exception("Evented Service (longpolling): uncaught error during main loop")
354             raise
355
356     def stop(self):
357         import gevent
358         self.httpd.stop()
359         gevent.shutdown()
360
361     def run(self, preload, stop):
362         self.start()
363         self.stop()
364
365 class PreforkServer(CommonServer):
366     """ Multiprocessing inspired by (g)unicorn.
367     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
368     method between workers but we plan to replace it by a more intelligent
369     dispatcher to will parse the first HTTP request line.
370     """
371     def __init__(self, app):
372         # config
373         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
374         self.population = config['workers']
375         self.timeout = config['limit_time_real']
376         self.limit_request = config['limit_request']
377         # working vars
378         self.beat = 4
379         self.app = app
380         self.pid = os.getpid()
381         self.socket = None
382         self.workers_http = {}
383         self.workers_cron = {}
384         self.workers = {}
385         self.generation = 0
386         self.queue = []
387         self.long_polling_pid = None
388
389     def pipe_new(self):
390         pipe = os.pipe()
391         for fd in pipe:
392             # non_blocking
393             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
394             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
395             # close_on_exec
396             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
397             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
398         return pipe
399
400     def pipe_ping(self, pipe):
401         try:
402             os.write(pipe[1], '.')
403         except IOError, e:
404             if e.errno not in [errno.EAGAIN, errno.EINTR]:
405                 raise
406
407     def signal_handler(self, sig, frame):
408         if len(self.queue) < 5 or sig == signal.SIGCHLD:
409             self.queue.append(sig)
410             self.pipe_ping(self.pipe)
411         else:
412             _logger.warn("Dropping signal: %s", sig)
413
414     def worker_spawn(self, klass, workers_registry):
415         self.generation += 1
416         worker = klass(self)
417         pid = os.fork()
418         if pid != 0:
419             worker.pid = pid
420             self.workers[pid] = worker
421             workers_registry[pid] = worker
422             return worker
423         else:
424             worker.run()
425             sys.exit(0)
426
427     def long_polling_spawn(self):
428         nargs = stripped_sys_argv()
429         cmd = nargs[0]
430         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
431         nargs[0] = cmd
432         popen = subprocess.Popen([sys.executable] + nargs)
433         self.long_polling_pid = popen.pid
434
435     def worker_pop(self, pid):
436         if pid == self.long_polling_pid:
437             self.long_polling_pid = None
438         if pid in self.workers:
439             _logger.debug("Worker (%s) unregistered", pid)
440             try:
441                 self.workers_http.pop(pid, None)
442                 self.workers_cron.pop(pid, None)
443                 u = self.workers.pop(pid)
444                 u.close()
445             except OSError:
446                 return
447
448     def worker_kill(self, pid, sig):
449         try:
450             os.kill(pid, sig)
451         except OSError, e:
452             if e.errno == errno.ESRCH:
453                 self.worker_pop(pid)
454
455     def process_signals(self):
456         while len(self.queue):
457             sig = self.queue.pop(0)
458             if sig in [signal.SIGINT, signal.SIGTERM]:
459                 raise KeyboardInterrupt
460             elif sig == signal.SIGHUP:
461                 # restart on kill -HUP
462                 openerp.phoenix = True
463                 raise KeyboardInterrupt
464             elif sig == signal.SIGQUIT:
465                 # dump stacks on kill -3
466                 self.dumpstacks()
467             elif sig == signal.SIGTTIN:
468                 # increase number of workers
469                 self.population += 1
470             elif sig == signal.SIGTTOU:
471                 # decrease number of workers
472                 self.population -= 1
473
474     def process_zombie(self):
475         # reap dead workers
476         while 1:
477             try:
478                 wpid, status = os.waitpid(-1, os.WNOHANG)
479                 if not wpid:
480                     break
481                 if (status >> 8) == 3:
482                     msg = "Critial worker error (%s)"
483                     _logger.critical(msg, wpid)
484                     raise Exception(msg % wpid)
485                 self.worker_pop(wpid)
486             except OSError, e:
487                 if e.errno == errno.ECHILD:
488                     break
489                 raise
490
491     def process_timeout(self):
492         now = time.time()
493         for (pid, worker) in self.workers.items():
494             if worker.watchdog_timeout is not None and \
495                     (now - worker.watchdog_time) >= worker.watchdog_timeout:
496                 _logger.error("Worker (%s) timeout", pid)
497                 self.worker_kill(pid, signal.SIGKILL)
498
499     def process_spawn(self):
500         while len(self.workers_http) < self.population:
501             self.worker_spawn(WorkerHTTP, self.workers_http)
502         while len(self.workers_cron) < config['max_cron_threads']:
503             self.worker_spawn(WorkerCron, self.workers_cron)
504         if not self.long_polling_pid:
505             self.long_polling_spawn()
506
507     def sleep(self):
508         try:
509             # map of fd -> worker
510             fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
511             fd_in = fds.keys() + [self.pipe[0]]
512             # check for ping or internal wakeups
513             ready = select.select(fd_in, [], [], self.beat)
514             # update worker watchdogs
515             for fd in ready[0]:
516                 if fd in fds:
517                     fds[fd].watchdog_time = time.time()
518                 try:
519                     # empty pipe
520                     while os.read(fd, 1):
521                         pass
522                 except OSError, e:
523                     if e.errno not in [errno.EAGAIN]:
524                         raise
525         except select.error, e:
526             if e[0] not in [errno.EINTR]:
527                 raise
528
529     def start(self):
530         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
531         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
532         # signal handler to overcome this behaviour
533         self.pipe = self.pipe_new()
534         # set signal handlers
535         signal.signal(signal.SIGINT, self.signal_handler)
536         signal.signal(signal.SIGTERM, self.signal_handler)
537         signal.signal(signal.SIGHUP, self.signal_handler)
538         signal.signal(signal.SIGCHLD, self.signal_handler)
539         signal.signal(signal.SIGTTIN, self.signal_handler)
540         signal.signal(signal.SIGTTOU, self.signal_handler)
541         signal.signal(signal.SIGQUIT, dumpstacks)
542
543         # listen to socket
544         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
545         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
546         self.socket.setblocking(0)
547         self.socket.bind(self.address)
548         self.socket.listen(8 * self.population)
549
550     def stop(self, graceful=True):
551         if self.long_polling_pid is not None:
552             # FIXME make longpolling process handle SIGTERM correctly
553             self.worker_kill(self.long_polling_pid, signal.SIGKILL)
554             self.long_polling_pid = None
555         if graceful:
556             _logger.info("Stopping gracefully")
557             limit = time.time() + self.timeout
558             for pid in self.workers.keys():
559                 self.worker_kill(pid, signal.SIGTERM)
560             while self.workers and time.time() < limit:
561                 self.process_zombie()
562                 time.sleep(0.1)
563         else:
564             _logger.info("Stopping forcefully")
565         for pid in self.workers.keys():
566             self.worker_kill(pid, signal.SIGTERM)
567         self.socket.close()
568
569     def run(self, preload, stop):
570         self.start()
571
572         rc = preload_registries(preload)
573
574         if stop:
575             self.stop()
576             return rc
577
578         # Empty the cursor pool, we dont want them to be shared among forked workers.
579         openerp.sql_db.close_all()
580
581         _logger.debug("Multiprocess starting")
582         while 1:
583             try:
584                 #_logger.debug("Multiprocess beat (%s)",time.time())
585                 self.process_signals()
586                 self.process_zombie()
587                 self.process_timeout()
588                 self.process_spawn()
589                 self.sleep()
590             except KeyboardInterrupt:
591                 _logger.debug("Multiprocess clean stop")
592                 self.stop()
593                 break
594             except Exception, e:
595                 _logger.exception(e)
596                 self.stop(False)
597                 return -1
598
599 class Worker(object):
600     """ Workers """
601     def __init__(self, multi):
602         self.multi = multi
603         self.watchdog_time = time.time()
604         self.watchdog_pipe = multi.pipe_new()
605         # Can be set to None if no watchdog is desired.
606         self.watchdog_timeout = multi.timeout
607         self.ppid = os.getpid()
608         self.pid = None
609         self.alive = True
610         # should we rename into lifetime ?
611         self.request_max = multi.limit_request
612         self.request_count = 0
613
614     def setproctitle(self, title=""):
615         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
616
617     def close(self):
618         os.close(self.watchdog_pipe[0])
619         os.close(self.watchdog_pipe[1])
620
621     def signal_handler(self, sig, frame):
622         self.alive = False
623
624     def sleep(self):
625         try:
626             select.select([self.multi.socket], [], [], self.multi.beat)
627         except select.error, e:
628             if e[0] not in [errno.EINTR]:
629                 raise
630
631     def process_limit(self):
632         # If our parent changed sucide
633         if self.ppid != os.getppid():
634             _logger.info("Worker (%s) Parent changed", self.pid)
635             self.alive = False
636         # check for lifetime
637         if self.request_count >= self.request_max:
638             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
639             self.alive = False
640         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
641         rss, vms = psutil.Process(os.getpid()).get_memory_info()
642         if vms > config['limit_memory_soft']:
643             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
644             self.alive = False      # Commit suicide after the request.
645
646         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
647         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
648         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
649
650         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
651         r = resource.getrusage(resource.RUSAGE_SELF)
652         cpu_time = r.ru_utime + r.ru_stime
653         def time_expired(n, stack):
654             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
655             # We dont suicide in such case
656             raise Exception('CPU time limit exceeded.')
657         signal.signal(signal.SIGXCPU, time_expired)
658         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
659         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
660
661     def process_work(self):
662         pass
663
664     def start(self):
665         self.pid = os.getpid()
666         self.setproctitle()
667         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
668         # Reseed the random number generator
669         random.seed()
670         # Prevent fd inherientence close_on_exec
671         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
672         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
673         # reset blocking status
674         self.multi.socket.setblocking(0)
675         signal.signal(signal.SIGINT, self.signal_handler)
676         signal.signal(signal.SIGTERM, signal.SIG_DFL)
677         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
678
679     def stop(self):
680         pass
681
682     def run(self):
683         try:
684             self.start()
685             while self.alive:
686                 self.process_limit()
687                 self.multi.pipe_ping(self.watchdog_pipe)
688                 self.sleep()
689                 self.process_work()
690             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
691             self.stop()
692         except Exception:
693             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
694             # should we use 3 to abort everything ?
695             sys.exit(1)
696
697 class WorkerHTTP(Worker):
698     """ HTTP Request workers """
699     def process_request(self, client, addr):
700         client.setblocking(1)
701         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
702         # Prevent fd inherientence close_on_exec
703         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
704         fcntl.fcntl(client, fcntl.F_SETFD, flags)
705         # do request using BaseWSGIServerNoBind monkey patched with socket
706         self.server.socket = client
707         # tolerate broken pipe when the http client closes the socket before
708         # receiving the full reply
709         try:
710             self.server.process_request(client, addr)
711         except IOError, e:
712             if e.errno != errno.EPIPE:
713                 raise
714         self.request_count += 1
715
716     def process_work(self):
717         try:
718             client, addr = self.multi.socket.accept()
719             self.process_request(client, addr)
720         except socket.error, e:
721             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
722                 raise
723
724     def start(self):
725         Worker.start(self)
726         self.server = BaseWSGIServerNoBind(self.multi.app)
727
728 class WorkerCron(Worker):
729     """ Cron workers """
730
731     def __init__(self, multi):
732         super(WorkerCron, self).__init__(multi)
733         # process_work() below process a single database per call.
734         # The variable db_index is keeping track of the next database to
735         # process.
736         self.db_index = 0
737
738     def sleep(self):
739         # Really sleep once all the databases have been processed.
740         if self.db_index == 0:
741             interval = SLEEP_INTERVAL + self.pid % 10   # chorus effect
742             time.sleep(interval)
743
744     def _db_list(self):
745         if config['db_name']:
746             db_names = config['db_name'].split(',')
747         else:
748             db_names = openerp.service.db.exp_list(True)
749         return db_names
750
751     def process_work(self):
752         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
753         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
754         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
755         db_names = self._db_list()
756         if len(db_names):
757             self.db_index = (self.db_index + 1) % len(db_names)
758             db_name = db_names[self.db_index]
759             self.setproctitle(db_name)
760             if rpc_request_flag:
761                 start_time = time.time()
762                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
763
764             import openerp.addons.base as base
765             base.ir.ir_cron.ir_cron._acquire_job(db_name)
766             openerp.modules.registry.RegistryManager.delete(db_name)
767
768             # dont keep cursors in multi database mode
769             if len(db_names) > 1:
770                 openerp.sql_db.close_db(db_name)
771             if rpc_request_flag:
772                 run_time = time.time() - start_time
773                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
774                 vms_diff = (end_vms - start_vms) / 1024
775                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
776                     (db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
777                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
778
779             self.request_count += 1
780             if self.request_count >= self.request_max and self.request_max < len(db_names):
781                 _logger.error("There are more dabatases to process than allowed "
782                               "by the `limit_request` configuration variable: %s more.",
783                               len(db_names) - self.request_max)
784         else:
785             self.db_index = 0
786
787     def start(self):
788         os.nice(10)     # mommy always told me to be nice with others...
789         Worker.start(self)
790         self.multi.socket.close()
791
792 #----------------------------------------------------------
793 # start/stop public api
794 #----------------------------------------------------------
795
796 server = None
797
798 def load_server_wide_modules():
799     for m in openerp.conf.server_wide_modules:
800         try:
801             openerp.modules.module.load_openerp_module(m)
802         except Exception:
803             msg = ''
804             if m == 'web':
805                 msg = """
806 The `web` module is provided by the addons found in the `openerp-web` project.
807 Maybe you forgot to add those addons in your addons_path configuration."""
808             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
809
810 def _reexec(updated_modules=None):
811     """reexecute openerp-server process with (nearly) the same arguments"""
812     if openerp.tools.osutil.is_running_as_nt_service():
813         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
814     exe = os.path.basename(sys.executable)
815     args = stripped_sys_argv()
816     if updated_modules:
817         args += ["-u", ','.join(updated_modules)]
818     if not args or args[0] != exe:
819         args.insert(0, exe)
820     os.execv(sys.executable, args)
821
822 def load_test_file_yml(registry, test_file):
823     with registry.cursor() as cr:
824         openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'init')
825         if config['test_commit']:
826             _logger.info('test %s has been commited', test_file)
827             cr.commit()
828         else:
829             _logger.info('test %s has been rollbacked', test_file)
830             cr.rollback()
831
832 def load_test_file_py(registry, test_file):
833     # Locate python module based on its filename and run the tests
834     test_path, _ = os.path.splitext(os.path.abspath(test_file))
835     for mod_name, mod_mod in sys.modules.items():
836         if mod_mod:
837             mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
838             if test_path == mod_path:
839                 suite = unittest2.TestSuite()
840                 for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
841                     suite.addTest(t)
842                 _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
843                 stream = openerp.modules.module.TestStream()
844                 result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
845                 success = result.wasSuccessful()
846                 if hasattr(registry._assertion_report,'report_result'):
847                     registry._assertion_report.report_result(success)
848                 if not success:
849                     _logger.error('%s: at least one error occurred in a test', test_file)
850
851 def preload_registries(dbnames):
852     """ Preload a registries, possibly run a test file."""
853     # TODO: move all config checks to args dont check tools.config here
854     config = openerp.tools.config
855     test_file = config['test_file']
856     dbnames = dbnames or []
857     rc = 0
858     for dbname in dbnames:
859         try:
860             update_module = config['init'] or config['update']
861             registry = RegistryManager.new(dbname, update_module=update_module)
862             # run test_file if provided
863             if test_file:
864                 _logger.info('loading test file %s', test_file)
865                 with openerp.api.Environment.manage():
866                     if test_file.endswith('yml'):
867                         load_test_file_yml(registry, test_file)
868                     elif test_file.endswith('py'):
869                         load_test_file_py(registry, test_file)
870
871             if registry._assertion_report.failures:
872                 rc += 1
873         except Exception:
874             _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
875             return -1
876     return rc
877
878 def start(preload=None, stop=False):
879     """ Start the openerp http server and cron processor.
880     """
881     global server
882     load_server_wide_modules()
883     if openerp.evented:
884         server = GeventServer(openerp.service.wsgi_server.application)
885     elif config['workers']:
886         server = PreforkServer(openerp.service.wsgi_server.application)
887     else:
888         server = ThreadedServer(openerp.service.wsgi_server.application)
889
890     watcher = None
891     if config['dev_mode']:
892         if watchdog:
893             watcher = FSWatcher()
894             watcher.start()
895         else:
896             _logger.warning("'watchdog' module not installed. Code autoreload feature is disabled")
897
898     rc = server.run(preload, stop)
899
900     # like the legend of the phoenix, all ends with beginnings
901     if getattr(openerp, 'phoenix', False):
902         if watcher:
903             watcher.stop()
904         _reexec()
905
906     return rc if rc else 0
907
908 def restart():
909     """ Restart the server
910     """
911     if os.name == 'nt':
912         # run in a thread to let the current thread return response to the caller.
913         threading.Thread(target=_reexec).start()
914     else:
915         os.kill(server.pid, signal.SIGHUP)
916
917 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: