b63eec4c39c7b34b5394f55fc1dedea0a7aa44b6
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import logging
7 import os
8 import os.path
9 import platform
10 import psutil
11 import random
12 if os.name == 'posix':
13     import resource
14 else:
15     resource = None
16 import select
17 import signal
18 import socket
19 import subprocess
20 import sys
21 import threading
22 import time
23 import unittest2
24
25 import werkzeug.serving
26
27 try:
28     import fcntl
29 except ImportError:
30     pass
31 try:
32     from setproctitle import setproctitle
33 except ImportError:
34     setproctitle = lambda x: None
35
36 import openerp
37 from openerp.modules.registry import RegistryManager
38 from openerp.release import nt_service_name
39 import openerp.tools.config as config
40 from openerp.tools.misc import stripped_sys_argv, dumpstacks
41
42 _logger = logging.getLogger(__name__)
43
44 SLEEP_INTERVAL = 60     # 1 min
45
46 #----------------------------------------------------------
47 # Werkzeug WSGI servers patched
48 #----------------------------------------------------------
49 class LoggingBaseWSGIServerMixIn(object):
50     def handle_error(self, request, client_address):
51         t, e, _ = sys.exc_info()
52         if t == socket.error and e.errno == errno.EPIPE:
53             # broken pipe, ignore error
54             return
55         _logger.exception('Exception happened during processing of request from %s', client_address)
56
57 class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
58     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
59     use this class, sets the socket and calls the process_request() manually
60     """
61     def __init__(self, app):
62         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
63     def server_bind(self):
64         # we dont bind beause we use the listen socket of PreforkServer#socket
65         # instead we close the socket
66         if self.socket:
67             self.socket.close()
68     def server_activate(self):
69         # dont listen as we use PreforkServer#socket
70         pass
71
72
73 class RequestHandler(werkzeug.serving.WSGIRequestHandler):
74     def setup(self):
75         # flag the current thread as handling a http request
76         super(RequestHandler, self).setup()
77         me = threading.currentThread()
78         me.name = 'openerp.service.http.request.%s' % (me.ident,)
79
80 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
81 # should also work with systemd socket activation. This is currently untested
82 # and not yet used.
83
84 class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
85     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
86     given by the environement, this is used by autoreload to keep the listen
87     socket open when a reload happens.
88     """
89     def __init__(self, host, port, app):
90         super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
91                                                            handler=RequestHandler)
92
93     def server_bind(self):
94         envfd = os.environ.get('LISTEN_FDS')
95         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
96             self.reload_socket = True
97             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
98             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
99         else:
100             self.reload_socket = False
101             super(ThreadedWSGIServerReloadable, self).server_bind()
102
103     def server_activate(self):
104         if not self.reload_socket:
105             super(ThreadedWSGIServerReloadable, self).server_activate()
106
107 #----------------------------------------------------------
108 # FileSystem Watcher for autoreload and orm cache void
109 #----------------------------------------------------------
110 class FSWatcher(object):
111     def __init__(self):
112         # TODO: check if debian package is available
113         from watchdog.observers import Observer
114         self.observer = Observer()
115         for path in openerp.modules.module.ad_paths:
116             _logger.info('Watching addons folder %s', path)
117             self.observer.schedule(self, path, recursive=True)
118
119     def dispatch(self, event):
120         # TODO: check if debian package is available
121         from watchdog.events import FileCreatedEvent, FileModifiedEvent
122         if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
123             if not event.is_directory:
124                 path = event.src_path
125                 if path.endswith('.xml'):
126                     _logger.info('autoreload: xml change detected, voiding orm cache')
127                     # TODO: void orm caches
128                 elif config['auto_reload'] and path.endswith('.py'):
129                     try:
130                         source = open(path, 'rb').read() + '\n'
131                         compile(source, path, 'exec')
132                     except SyntaxError:
133                         _logger.info('autoreload: python code change detected, SyntaxError in %s', path)
134                     else:
135                         _logger.info('autoreload: python code updated, autoreload activated')
136                         restart()
137
138     def run(self):
139         self.observer.start()
140         _logger.info('AutoReload watcher running')
141
142     def stop(self):
143         # TODO: properly stop observer
144         self.observer.stop()
145         self.observer.join()
146
147 #----------------------------------------------------------
148 # Servers: Threaded, Gevented and Prefork
149 #----------------------------------------------------------
150
151 class CommonServer(object):
152     def __init__(self, app):
153         # TODO Change the xmlrpc_* options to http_*
154         self.app = app
155         # config
156         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
157         self.port = config['xmlrpc_port']
158         # runtime
159         self.pid = os.getpid()
160
161     def close_socket(self, sock):
162         """ Closes a socket instance cleanly
163         :param sock: the network socket to close
164         :type sock: socket.socket
165         """
166         try:
167             sock.shutdown(socket.SHUT_RDWR)
168         except socket.error, e:
169             # On OSX, socket shutdowns both sides if any side closes it
170             # causing an error 57 'Socket is not connected' on shutdown
171             # of the other side (or something), see
172             # http://bugs.python.org/issue4397
173             # note: stdlib fixed test, not behavior
174             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
175                 raise
176         sock.close()
177
178 class ThreadedServer(CommonServer):
179     def __init__(self, app):
180         super(ThreadedServer, self).__init__(app)
181         self.main_thread_id = threading.currentThread().ident
182         # Variable keeping track of the number of calls to the signal handler defined
183         # below. This variable is monitored by ``quit_on_signals()``.
184         self.quit_signals_received = 0
185
186         #self.socket = None
187         self.httpd = None
188
189     def signal_handler(self, sig, frame):
190         if sig in [signal.SIGINT, signal.SIGTERM]:
191             # shutdown on kill -INT or -TERM
192             self.quit_signals_received += 1
193             if self.quit_signals_received > 1:
194                 # logging.shutdown was already called at this point.
195                 sys.stderr.write("Forced shutdown.\n")
196                 os._exit(0)
197         elif sig == signal.SIGHUP:
198             # restart on kill -HUP
199             openerp.phoenix = True
200             self.quit_signals_received += 1
201
202     def cron_thread(self, number):
203         while True:
204             time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
205             registries = openerp.modules.registry.RegistryManager.registries
206             _logger.debug('cron%d polling for jobs', number)
207             for db_name, registry in registries.items():
208                 while True and registry.ready:
209                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
210                     if not acquired:
211                         break
212
213     def cron_spawn(self):
214         """ Start the above runner function in a daemon thread.
215
216         The thread is a typical daemon thread: it will never quit and must be
217         terminated when the main process exits - with no consequence (the processing
218         threads it spawns are not marked daemon).
219
220         """
221         # Force call to strptime just before starting the cron thread
222         # to prevent time.strptime AttributeError within the thread.
223         # See: http://bugs.python.org/issue7980
224         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
225         for i in range(openerp.tools.config['max_cron_threads']):
226             def target():
227                 self.cron_thread(i)
228             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
229             t.setDaemon(True)
230             t.start()
231             _logger.debug("cron%d started!" % i)
232
233     def http_thread(self):
234         def app(e, s):
235             return self.app(e, s)
236         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
237         self.httpd.serve_forever()
238
239     def http_spawn(self):
240         t = threading.Thread(target=self.http_thread, name="openerp.service.httpd")
241         t.setDaemon(True)
242         t.start()
243         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
244
245     def start(self, stop=False):
246         _logger.debug("Setting signal handlers")
247         if os.name == 'posix':
248             signal.signal(signal.SIGINT, self.signal_handler)
249             signal.signal(signal.SIGTERM, self.signal_handler)
250             signal.signal(signal.SIGCHLD, self.signal_handler)
251             signal.signal(signal.SIGHUP, self.signal_handler)
252             signal.signal(signal.SIGQUIT, dumpstacks)
253         elif os.name == 'nt':
254             import win32api
255             win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
256
257         test_mode = config['test_enable'] or config['test_file']
258         if not stop or test_mode:
259             # some tests need the http deamon to be available...
260             self.http_spawn()
261
262         if not stop:
263             # only relevant if we are not in "--stop-after-init" mode
264             self.cron_spawn()
265
266     def stop(self):
267         """ Shutdown the WSGI server. Wait for non deamon threads.
268         """
269         _logger.info("Initiating shutdown")
270         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
271
272         if self.httpd:
273             self.httpd.shutdown()
274             self.close_socket(self.httpd.socket)
275
276         # Manually join() all threads before calling sys.exit() to allow a second signal
277         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
278         # threading.Thread.join() should not mask signals (at least in python 2.5).
279         me = threading.currentThread()
280         _logger.debug('current thread: %r', me)
281         for thread in threading.enumerate():
282             _logger.debug('process %r (%r)', thread, thread.isDaemon())
283             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
284                 while thread.isAlive():
285                     _logger.debug('join and sleep')
286                     # Need a busyloop here as thread.join() masks signals
287                     # and would prevent the forced shutdown.
288                     thread.join(0.05)
289                     time.sleep(0.05)
290
291         _logger.debug('--')
292         openerp.modules.registry.RegistryManager.delete_all()
293         logging.shutdown()
294
295     def run(self, preload=None, stop=False):
296         """ Start the http server and the cron thread then wait for a signal.
297
298         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
299         a second one if any will force an immediate exit.
300         """
301         self.start(stop=stop)
302
303         rc = preload_registries(preload)
304
305         if stop:
306             self.stop()
307             return rc
308
309         # Wait for a first signal to be handled. (time.sleep will be interrupted
310         # by the signal handler.) The try/except is for the win32 case.
311         try:
312             while self.quit_signals_received == 0:
313                 time.sleep(60)
314         except KeyboardInterrupt:
315             pass
316
317         self.stop()
318
319     def reload(self):
320         os.kill(self.pid, signal.SIGHUP)
321
322 class GeventServer(CommonServer):
323     def __init__(self, app):
324         super(GeventServer, self).__init__(app)
325         self.port = config['longpolling_port']
326         self.httpd = None
327
328     def watch_parent(self, beat=4):
329         import gevent
330         ppid = os.getppid()
331         while True:
332             if ppid != os.getppid():
333                 pid = os.getpid()
334                 _logger.info("LongPolling (%s) Parent changed", pid)
335                 # suicide !!
336                 os.kill(pid, signal.SIGTERM)
337                 return
338             gevent.sleep(beat)
339
340     def start(self):
341         import gevent
342         from gevent.wsgi import WSGIServer
343
344         if os.name == 'posix':
345             signal.signal(signal.SIGQUIT, dumpstacks)
346
347         gevent.spawn(self.watch_parent)
348         self.httpd = WSGIServer((self.interface, self.port), self.app)
349         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
350         self.httpd.serve_forever()
351
352     def stop(self):
353         import gevent
354         self.httpd.stop()
355         gevent.shutdown()
356
357     def run(self, preload, stop):
358         self.start()
359         self.stop()
360
361 class PreforkServer(CommonServer):
362     """ Multiprocessing inspired by (g)unicorn.
363     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
364     method between workers but we plan to replace it by a more intelligent
365     dispatcher to will parse the first HTTP request line.
366     """
367     def __init__(self, app):
368         # config
369         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
370         self.population = config['workers']
371         self.timeout = config['limit_time_real']
372         self.limit_request = config['limit_request']
373         # working vars
374         self.beat = 4
375         self.app = app
376         self.pid = os.getpid()
377         self.socket = None
378         self.workers_http = {}
379         self.workers_cron = {}
380         self.workers = {}
381         self.generation = 0
382         self.queue = []
383         self.long_polling_pid = None
384
385     def pipe_new(self):
386         pipe = os.pipe()
387         for fd in pipe:
388             # non_blocking
389             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
390             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
391             # close_on_exec
392             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
393             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
394         return pipe
395
396     def pipe_ping(self, pipe):
397         try:
398             os.write(pipe[1], '.')
399         except IOError, e:
400             if e.errno not in [errno.EAGAIN, errno.EINTR]:
401                 raise
402
403     def signal_handler(self, sig, frame):
404         if len(self.queue) < 5 or sig == signal.SIGCHLD:
405             self.queue.append(sig)
406             self.pipe_ping(self.pipe)
407         else:
408             _logger.warn("Dropping signal: %s", sig)
409
410     def worker_spawn(self, klass, workers_registry):
411         self.generation += 1
412         worker = klass(self)
413         pid = os.fork()
414         if pid != 0:
415             worker.pid = pid
416             self.workers[pid] = worker
417             workers_registry[pid] = worker
418             return worker
419         else:
420             worker.run()
421             sys.exit(0)
422
423     def long_polling_spawn(self):
424         nargs = stripped_sys_argv()
425         cmd = nargs[0]
426         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
427         nargs[0] = cmd
428         popen = subprocess.Popen([sys.executable] + nargs)
429         self.long_polling_pid = popen.pid
430
431     def worker_pop(self, pid):
432         if pid in self.workers:
433             _logger.debug("Worker (%s) unregistered", pid)
434             try:
435                 self.workers_http.pop(pid, None)
436                 self.workers_cron.pop(pid, None)
437                 u = self.workers.pop(pid)
438                 u.close()
439             except OSError:
440                 return
441
442     def worker_kill(self, pid, sig):
443         try:
444             os.kill(pid, sig)
445         except OSError, e:
446             if e.errno == errno.ESRCH:
447                 self.worker_pop(pid)
448
449     def process_signals(self):
450         while len(self.queue):
451             sig = self.queue.pop(0)
452             if sig in [signal.SIGINT, signal.SIGTERM]:
453                 raise KeyboardInterrupt
454             elif sig == signal.SIGHUP:
455                 # restart on kill -HUP
456                 openerp.phoenix = True
457                 raise KeyboardInterrupt
458             elif sig == signal.SIGQUIT:
459                 # dump stacks on kill -3
460                 self.dumpstacks()
461             elif sig == signal.SIGTTIN:
462                 # increase number of workers
463                 self.population += 1
464             elif sig == signal.SIGTTOU:
465                 # decrease number of workers
466                 self.population -= 1
467
468     def process_zombie(self):
469         # reap dead workers
470         while 1:
471             try:
472                 wpid, status = os.waitpid(-1, os.WNOHANG)
473                 if not wpid:
474                     break
475                 if (status >> 8) == 3:
476                     msg = "Critial worker error (%s)"
477                     _logger.critical(msg, wpid)
478                     raise Exception(msg % wpid)
479                 self.worker_pop(wpid)
480             except OSError, e:
481                 if e.errno == errno.ECHILD:
482                     break
483                 raise
484
485     def process_timeout(self):
486         now = time.time()
487         for (pid, worker) in self.workers.items():
488             if worker.watchdog_timeout is not None and \
489                     (now - worker.watchdog_time) >= worker.watchdog_timeout:
490                 _logger.error("Worker (%s) timeout", pid)
491                 self.worker_kill(pid, signal.SIGKILL)
492
493     def process_spawn(self):
494         while len(self.workers_http) < self.population:
495             self.worker_spawn(WorkerHTTP, self.workers_http)
496         while len(self.workers_cron) < config['max_cron_threads']:
497             self.worker_spawn(WorkerCron, self.workers_cron)
498         if not self.long_polling_pid:
499             self.long_polling_spawn()
500
501     def sleep(self):
502         try:
503             # map of fd -> worker
504             fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
505             fd_in = fds.keys() + [self.pipe[0]]
506             # check for ping or internal wakeups
507             ready = select.select(fd_in, [], [], self.beat)
508             # update worker watchdogs
509             for fd in ready[0]:
510                 if fd in fds:
511                     fds[fd].watchdog_time = time.time()
512                 try:
513                     # empty pipe
514                     while os.read(fd, 1):
515                         pass
516                 except OSError, e:
517                     if e.errno not in [errno.EAGAIN]:
518                         raise
519         except select.error, e:
520             if e[0] not in [errno.EINTR]:
521                 raise
522
523     def start(self):
524         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
525         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
526         # signal handler to overcome this behaviour
527         self.pipe = self.pipe_new()
528         # set signal handlers
529         signal.signal(signal.SIGINT, self.signal_handler)
530         signal.signal(signal.SIGTERM, self.signal_handler)
531         signal.signal(signal.SIGHUP, self.signal_handler)
532         signal.signal(signal.SIGCHLD, self.signal_handler)
533         signal.signal(signal.SIGTTIN, self.signal_handler)
534         signal.signal(signal.SIGTTOU, self.signal_handler)
535         signal.signal(signal.SIGQUIT, dumpstacks)
536
537         # listen to socket
538         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
539         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
540         self.socket.setblocking(0)
541         self.socket.bind(self.address)
542         self.socket.listen(8 * self.population)
543
544     def stop(self, graceful=True):
545         if self.long_polling_pid is not None:
546             # FIXME make longpolling process handle SIGTERM correctly
547             self.worker_kill(self.long_polling_pid, signal.SIGKILL)
548             self.long_polling_pid = None
549         if graceful:
550             _logger.info("Stopping gracefully")
551             limit = time.time() + self.timeout
552             for pid in self.workers.keys():
553                 self.worker_kill(pid, signal.SIGTERM)
554             while self.workers and time.time() < limit:
555                 self.process_zombie()
556                 time.sleep(0.1)
557         else:
558             _logger.info("Stopping forcefully")
559         for pid in self.workers.keys():
560             self.worker_kill(pid, signal.SIGTERM)
561         self.socket.close()
562
563     def run(self, preload, stop):
564         self.start()
565
566         rc = preload_registries(preload)
567
568         if stop:
569             self.stop()
570             return rc
571
572         # Empty the cursor pool, we dont want them to be shared among forked workers.
573         openerp.sql_db.close_all()
574
575         _logger.debug("Multiprocess starting")
576         while 1:
577             try:
578                 #_logger.debug("Multiprocess beat (%s)",time.time())
579                 self.process_signals()
580                 self.process_zombie()
581                 self.process_timeout()
582                 self.process_spawn()
583                 self.sleep()
584             except KeyboardInterrupt:
585                 _logger.debug("Multiprocess clean stop")
586                 self.stop()
587                 break
588             except Exception, e:
589                 _logger.exception(e)
590                 self.stop(False)
591                 return -1
592
593 class Worker(object):
594     """ Workers """
595     def __init__(self, multi):
596         self.multi = multi
597         self.watchdog_time = time.time()
598         self.watchdog_pipe = multi.pipe_new()
599         # Can be set to None if no watchdog is desired.
600         self.watchdog_timeout = multi.timeout
601         self.ppid = os.getpid()
602         self.pid = None
603         self.alive = True
604         # should we rename into lifetime ?
605         self.request_max = multi.limit_request
606         self.request_count = 0
607
608     def setproctitle(self, title=""):
609         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
610
611     def close(self):
612         os.close(self.watchdog_pipe[0])
613         os.close(self.watchdog_pipe[1])
614
615     def signal_handler(self, sig, frame):
616         self.alive = False
617
618     def sleep(self):
619         try:
620             select.select([self.multi.socket], [], [], self.multi.beat)
621         except select.error, e:
622             if e[0] not in [errno.EINTR]:
623                 raise
624
625     def process_limit(self):
626         if resource is None:
627             return
628         # If our parent changed sucide
629         if self.ppid != os.getppid():
630             _logger.info("Worker (%s) Parent changed", self.pid)
631             self.alive = False
632         # check for lifetime
633         if self.request_count >= self.request_max:
634             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
635             self.alive = False
636         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
637         rss, vms = psutil.Process(os.getpid()).get_memory_info()
638         if vms > config['limit_memory_soft']:
639             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
640             self.alive = False      # Commit suicide after the request.
641
642         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
643         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
644         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
645
646         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
647         r = resource.getrusage(resource.RUSAGE_SELF)
648         cpu_time = r.ru_utime + r.ru_stime
649         def time_expired(n, stack):
650             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
651             # We dont suicide in such case
652             raise Exception('CPU time limit exceeded.')
653         signal.signal(signal.SIGXCPU, time_expired)
654         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
655         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
656
657     def process_work(self):
658         pass
659
660     def start(self):
661         self.pid = os.getpid()
662         self.setproctitle()
663         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
664         # Reseed the random number generator
665         random.seed()
666         # Prevent fd inherientence close_on_exec
667         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
668         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
669         # reset blocking status
670         self.multi.socket.setblocking(0)
671         signal.signal(signal.SIGINT, self.signal_handler)
672         signal.signal(signal.SIGTERM, signal.SIG_DFL)
673         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
674
675     def stop(self):
676         pass
677
678     def run(self):
679         try:
680             self.start()
681             while self.alive:
682                 self.process_limit()
683                 self.multi.pipe_ping(self.watchdog_pipe)
684                 self.sleep()
685                 self.process_work()
686             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
687             self.stop()
688         except Exception:
689             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
690             # should we use 3 to abort everything ?
691             sys.exit(1)
692
693 class WorkerHTTP(Worker):
694     """ HTTP Request workers """
695     def process_request(self, client, addr):
696         client.setblocking(1)
697         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
698         # Prevent fd inherientence close_on_exec
699         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
700         fcntl.fcntl(client, fcntl.F_SETFD, flags)
701         # do request using BaseWSGIServerNoBind monkey patched with socket
702         self.server.socket = client
703         # tolerate broken pipe when the http client closes the socket before
704         # receiving the full reply
705         try:
706             self.server.process_request(client, addr)
707         except IOError, e:
708             if e.errno != errno.EPIPE:
709                 raise
710         self.request_count += 1
711
712     def process_work(self):
713         try:
714             client, addr = self.multi.socket.accept()
715             self.process_request(client, addr)
716         except socket.error, e:
717             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
718                 raise
719
720     def start(self):
721         Worker.start(self)
722         self.server = BaseWSGIServerNoBind(self.multi.app)
723
724 class WorkerCron(Worker):
725     """ Cron workers """
726
727     def __init__(self, multi):
728         super(WorkerCron, self).__init__(multi)
729         # process_work() below process a single database per call.
730         # The variable db_index is keeping track of the next database to
731         # process.
732         self.db_index = 0
733
734     def sleep(self):
735         # Really sleep once all the databases have been processed.
736         if self.db_index == 0:
737             interval = SLEEP_INTERVAL + self.pid % 10   # chorus effect
738             time.sleep(interval)
739
740     def _db_list(self):
741         if config['db_name']:
742             db_names = config['db_name'].split(',')
743         else:
744             db_names = openerp.service.db.exp_list(True)
745         return db_names
746
747     def process_work(self):
748         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
749         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
750         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
751         db_names = self._db_list()
752         if len(db_names):
753             self.db_index = (self.db_index + 1) % len(db_names)
754             db_name = db_names[self.db_index]
755             self.setproctitle(db_name)
756             if rpc_request_flag:
757                 start_time = time.time()
758                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
759
760             import openerp.addons.base as base
761             base.ir.ir_cron.ir_cron._acquire_job(db_name)
762             openerp.modules.registry.RegistryManager.delete(db_name)
763
764             # dont keep cursors in multi database mode
765             if len(db_names) > 1:
766                 openerp.sql_db.close_db(db_name)
767             if rpc_request_flag:
768                 run_time = time.time() - start_time
769                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
770                 vms_diff = (end_vms - start_vms) / 1024
771                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
772                     (db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
773                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
774
775             self.request_count += 1
776             if self.request_count >= self.request_max and self.request_max < len(db_names):
777                 _logger.error("There are more dabatases to process than allowed "
778                               "by the `limit_request` configuration variable: %s more.",
779                               len(db_names) - self.request_max)
780         else:
781             self.db_index = 0
782
783     def start(self):
784         os.nice(10)     # mommy always told me to be nice with others...
785         Worker.start(self)
786         self.multi.socket.close()
787
788 #----------------------------------------------------------
789 # start/stop public api
790 #----------------------------------------------------------
791
792 server = None
793
794 def load_server_wide_modules():
795     for m in openerp.conf.server_wide_modules:
796         try:
797             openerp.modules.module.load_openerp_module(m)
798         except Exception:
799             msg = ''
800             if m == 'web':
801                 msg = """
802 The `web` module is provided by the addons found in the `openerp-web` project.
803 Maybe you forgot to add those addons in your addons_path configuration."""
804             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
805
806 def _reexec(updated_modules=None):
807     """reexecute openerp-server process with (nearly) the same arguments"""
808     if openerp.tools.osutil.is_running_as_nt_service():
809         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
810     exe = os.path.basename(sys.executable)
811     args = stripped_sys_argv()
812     args += ["-u", ','.join(updated_modules)]
813     if not args or args[0] != exe:
814         args.insert(0, exe)
815     os.execv(sys.executable, args)
816
817 def load_test_file_yml(registry, test_file):
818     with registry.cursor() as cr:
819         openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'init')
820         if config['test_commit']:
821             _logger.info('test %s has been commited', test_file)
822             cr.commit()
823         else:
824             _logger.info('test %s has been rollbacked', test_file)
825             cr.rollback()
826
827 def load_test_file_py(registry, test_file):
828     # Locate python module based on its filename and run the tests
829     test_path, _ = os.path.splitext(os.path.abspath(test_file))
830     for mod_name, mod_mod in sys.modules.items():
831         if mod_mod:
832             mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
833             if test_path == mod_path:
834                 suite = unittest2.TestSuite()
835                 for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
836                     suite.addTest(t)
837                 _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
838                 stream = openerp.modules.module.TestStream()
839                 result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
840                 success = result.wasSuccessful()
841                 if hasattr(registry._assertion_report,'report_result'):
842                     registry._assertion_report.report_result(success)
843                 if not success:
844                     _logger.error('%s: at least one error occurred in a test', test_file)
845
846 def preload_registries(dbnames):
847     """ Preload a registries, possibly run a test file."""
848     # TODO: move all config checks to args dont check tools.config here
849     config = openerp.tools.config
850     test_file = config['test_file']
851     dbnames = dbnames or []
852     rc = 0
853     for dbname in dbnames:
854         try:
855             update_module = config['init'] or config['update']
856             registry = RegistryManager.new(dbname, update_module=update_module)
857             # run test_file if provided
858             if test_file:
859                 _logger.info('loading test file %s', test_file)
860                 if test_file.endswith('yml'):
861                     load_test_file_yml(registry, test_file)
862                 elif test_file.endswith('py'):
863                     load_test_file_py(registry, test_file)
864
865             if registry._assertion_report.failures:
866                 rc += 1
867         except Exception:
868             _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
869             return -1
870     return rc
871
872 def start(preload=None, stop=False):
873     """ Start the openerp http server and cron processor.
874     """
875     global server
876     load_server_wide_modules()
877     if openerp.evented:
878         server = GeventServer(openerp.service.wsgi_server.application)
879     elif config['workers']:
880         server = PreforkServer(openerp.service.wsgi_server.application)
881     else:
882         server = ThreadedServer(openerp.service.wsgi_server.application)
883
884     watcher = FSWatcher()
885     watcher.run()
886
887     rc = server.run(preload, stop)
888
889     # like the legend of the phoenix, all ends with beginnings
890     if getattr(openerp, 'phoenix', False):
891         modules = []
892         if config['auto_reload']:
893             # WIP: make it work
894             # modules = autoreload.modules.keys()
895             pass
896         _reexec(modules)
897
898     return rc if rc else 0
899
900 def restart():
901     """ Restart the server
902     """
903     if os.name == 'nt':
904         # run in a thread to let the current thread return response to the caller.
905         threading.Thread(target=_reexec).start()
906     else:
907         os.kill(server.pid, signal.SIGHUP)
908
909 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: