[IMP] ir_http: don't handle exception in dev mode but use the werkzeug debugger excep...
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import logging
7 import os
8 import os.path
9 import platform
10 import psutil
11 import random
12 import select
13 import signal
14 import socket
15 import subprocess
16 import sys
17 import threading
18 import time
19 import unittest2
20
21 import werkzeug.serving
22 from werkzeug.debug import DebuggedApplication
23
24 if os.name == 'posix':
25     # Unix only for workers
26     import fcntl
27     import resource
28 else:
29     # Windows shim
30     signal.SIGHUP = -1
31
32 # Optional process names for workers
33 try:
34     from setproctitle import setproctitle
35 except ImportError:
36     setproctitle = lambda x: None
37
38 import openerp
39 from openerp.modules.registry import RegistryManager
40 from openerp.release import nt_service_name
41 import openerp.tools.config as config
42 from openerp.tools.misc import stripped_sys_argv, dumpstacks
43
44 _logger = logging.getLogger(__name__)
45
46 try:
47     import watchdog
48     from watchdog.observers import Observer
49     from watchdog.events import FileCreatedEvent, FileModifiedEvent
50 except ImportError:
51     watchdog = None
52
53 SLEEP_INTERVAL = 60     # 1 min
54
55 #----------------------------------------------------------
56 # Werkzeug WSGI servers patched
57 #----------------------------------------------------------
58 class LoggingBaseWSGIServerMixIn(object):
59     def handle_error(self, request, client_address):
60         t, e, _ = sys.exc_info()
61         if t == socket.error and e.errno == errno.EPIPE:
62             # broken pipe, ignore error
63             return
64         _logger.exception('Exception happened during processing of request from %s', client_address)
65
66 class BaseWSGIServerNoBind(LoggingBaseWSGIServerMixIn, werkzeug.serving.BaseWSGIServer):
67     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
68     use this class, sets the socket and calls the process_request() manually
69     """
70     def __init__(self, app):
71         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
72     def server_bind(self):
73         # we dont bind beause we use the listen socket of PreforkServer#socket
74         # instead we close the socket
75         if self.socket:
76             self.socket.close()
77     def server_activate(self):
78         # dont listen as we use PreforkServer#socket
79         pass
80
81
82 class RequestHandler(werkzeug.serving.WSGIRequestHandler):
83     def setup(self):
84         # flag the current thread as handling a http request
85         super(RequestHandler, self).setup()
86         me = threading.currentThread()
87         me.name = 'openerp.service.http.request.%s' % (me.ident,)
88
89 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
90 # should also work with systemd socket activation. This is currently untested
91 # and not yet used.
92
93 class ThreadedWSGIServerReloadable(LoggingBaseWSGIServerMixIn, werkzeug.serving.ThreadedWSGIServer):
94     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
95     given by the environement, this is used by autoreload to keep the listen
96     socket open when a reload happens.
97     """
98     def __init__(self, host, port, app):
99         super(ThreadedWSGIServerReloadable, self).__init__(host, port, app,
100                                                            handler=RequestHandler)
101
102     def server_bind(self):
103         envfd = os.environ.get('LISTEN_FDS')
104         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
105             self.reload_socket = True
106             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
107             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
108         else:
109             self.reload_socket = False
110             super(ThreadedWSGIServerReloadable, self).server_bind()
111
112     def server_activate(self):
113         if not self.reload_socket:
114             super(ThreadedWSGIServerReloadable, self).server_activate()
115
116 #----------------------------------------------------------
117 # FileSystem Watcher for autoreload and cache invalidation
118 #----------------------------------------------------------
119 class FSWatcher(object):
120     def __init__(self):
121         self.observer = Observer()
122         for path in openerp.modules.module.ad_paths:
123             _logger.info('Watching addons folder %s', path)
124             self.observer.schedule(self, path, recursive=True)
125
126     def dispatch(self, event):
127         if isinstance(event, (FileCreatedEvent, FileModifiedEvent)):
128             if not event.is_directory:
129                 path = event.src_path
130                 if path.endswith('.py'):
131                     try:
132                         source = open(path, 'rb').read() + '\n'
133                         compile(source, path, 'exec')
134                     except SyntaxError:
135                         _logger.error('autoreload: python code change detected, SyntaxError in %s', path)
136                     else:
137                         _logger.info('autoreload: python code updated, autoreload activated')
138                         restart()
139
140     def start(self):
141         self.observer.start()
142         _logger.info('AutoReload watcher running')
143
144     def stop(self):
145         self.observer.stop()
146         self.observer.join()
147
148 #----------------------------------------------------------
149 # Servers: Threaded, Gevented and Prefork
150 #----------------------------------------------------------
151
152 class CommonServer(object):
153     def __init__(self, app):
154         # TODO Change the xmlrpc_* options to http_*
155         self.app = app
156         # config
157         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
158         self.port = config['xmlrpc_port']
159         # runtime
160         self.pid = os.getpid()
161
162     def close_socket(self, sock):
163         """ Closes a socket instance cleanly
164         :param sock: the network socket to close
165         :type sock: socket.socket
166         """
167         try:
168             sock.shutdown(socket.SHUT_RDWR)
169         except socket.error, e:
170             # On OSX, socket shutdowns both sides if any side closes it
171             # causing an error 57 'Socket is not connected' on shutdown
172             # of the other side (or something), see
173             # http://bugs.python.org/issue4397
174             # note: stdlib fixed test, not behavior
175             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
176                 raise
177         sock.close()
178
179 class ThreadedServer(CommonServer):
180     def __init__(self, app):
181         super(ThreadedServer, self).__init__(app)
182         self.main_thread_id = threading.currentThread().ident
183         # Variable keeping track of the number of calls to the signal handler defined
184         # below. This variable is monitored by ``quit_on_signals()``.
185         self.quit_signals_received = 0
186
187         #self.socket = None
188         self.httpd = None
189
190     def signal_handler(self, sig, frame):
191         if sig in [signal.SIGINT, signal.SIGTERM]:
192             # shutdown on kill -INT or -TERM
193             self.quit_signals_received += 1
194             if self.quit_signals_received > 1:
195                 # logging.shutdown was already called at this point.
196                 sys.stderr.write("Forced shutdown.\n")
197                 os._exit(0)
198         elif sig == signal.SIGHUP:
199             # restart on kill -HUP
200             openerp.phoenix = True
201             self.quit_signals_received += 1
202
203     def cron_thread(self, number):
204         while True:
205             time.sleep(SLEEP_INTERVAL + number)     # Steve Reich timing style
206             registries = openerp.modules.registry.RegistryManager.registries
207             _logger.debug('cron%d polling for jobs', number)
208             for db_name, registry in registries.iteritems():
209                 while registry.ready:
210                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
211                     if not acquired:
212                         break
213
214     def cron_spawn(self):
215         """ Start the above runner function in a daemon thread.
216
217         The thread is a typical daemon thread: it will never quit and must be
218         terminated when the main process exits - with no consequence (the processing
219         threads it spawns are not marked daemon).
220
221         """
222         # Force call to strptime just before starting the cron thread
223         # to prevent time.strptime AttributeError within the thread.
224         # See: http://bugs.python.org/issue7980
225         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
226         for i in range(openerp.tools.config['max_cron_threads']):
227             def target():
228                 self.cron_thread(i)
229             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
230             t.setDaemon(True)
231             t.start()
232             _logger.debug("cron%d started!" % i)
233
234     def http_thread(self):
235         def app(e, s):
236             return self.app(e, s)
237         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
238         self.httpd.serve_forever()
239
240     def http_spawn(self):
241         t = threading.Thread(target=self.http_thread, name="openerp.service.httpd")
242         t.setDaemon(True)
243         t.start()
244         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
245
246     def start(self, stop=False):
247         _logger.debug("Setting signal handlers")
248         if os.name == 'posix':
249             signal.signal(signal.SIGINT, self.signal_handler)
250             signal.signal(signal.SIGTERM, self.signal_handler)
251             signal.signal(signal.SIGCHLD, self.signal_handler)
252             signal.signal(signal.SIGHUP, self.signal_handler)
253             signal.signal(signal.SIGQUIT, dumpstacks)
254         elif os.name == 'nt':
255             import win32api
256             win32api.SetConsoleCtrlHandler(lambda sig: self.signal_handler(sig, None), 1)
257
258         test_mode = config['test_enable'] or config['test_file']
259         if not stop or test_mode:
260             # some tests need the http deamon to be available...
261             self.http_spawn()
262
263         if not stop:
264             # only relevant if we are not in "--stop-after-init" mode
265             self.cron_spawn()
266
267     def stop(self):
268         """ Shutdown the WSGI server. Wait for non deamon threads.
269         """
270         _logger.info("Initiating shutdown")
271         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
272
273         if self.httpd:
274             self.httpd.shutdown()
275             self.close_socket(self.httpd.socket)
276
277         # Manually join() all threads before calling sys.exit() to allow a second signal
278         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
279         # threading.Thread.join() should not mask signals (at least in python 2.5).
280         me = threading.currentThread()
281         _logger.debug('current thread: %r', me)
282         for thread in threading.enumerate():
283             _logger.debug('process %r (%r)', thread, thread.isDaemon())
284             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
285                 while thread.isAlive():
286                     _logger.debug('join and sleep')
287                     # Need a busyloop here as thread.join() masks signals
288                     # and would prevent the forced shutdown.
289                     thread.join(0.05)
290                     time.sleep(0.05)
291
292         _logger.debug('--')
293         openerp.modules.registry.RegistryManager.delete_all()
294         logging.shutdown()
295
296     def run(self, preload=None, stop=False):
297         """ Start the http server and the cron thread then wait for a signal.
298
299         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
300         a second one if any will force an immediate exit.
301         """
302         self.start(stop=stop)
303
304         rc = preload_registries(preload)
305
306         if stop:
307             self.stop()
308             return rc
309
310         # Wait for a first signal to be handled. (time.sleep will be interrupted
311         # by the signal handler.) The try/except is for the win32 case.
312         try:
313             while self.quit_signals_received == 0:
314                 time.sleep(60)
315         except KeyboardInterrupt:
316             pass
317
318         self.stop()
319
320     def reload(self):
321         os.kill(self.pid, signal.SIGHUP)
322
323 class GeventServer(CommonServer):
324     def __init__(self, app):
325         super(GeventServer, self).__init__(app)
326         self.port = config['longpolling_port']
327         self.httpd = None
328
329     def watch_parent(self, beat=4):
330         import gevent
331         ppid = os.getppid()
332         while True:
333             if ppid != os.getppid():
334                 pid = os.getpid()
335                 _logger.info("LongPolling (%s) Parent changed", pid)
336                 # suicide !!
337                 os.kill(pid, signal.SIGTERM)
338                 return
339             gevent.sleep(beat)
340
341     def start(self):
342         import gevent
343         from gevent.wsgi import WSGIServer
344
345         if os.name == 'posix':
346             signal.signal(signal.SIGQUIT, dumpstacks)
347
348         gevent.spawn(self.watch_parent)
349         self.httpd = WSGIServer((self.interface, self.port), self.app)
350         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
351         try:
352             self.httpd.serve_forever()
353         except:
354             _logger.exception("Evented Service (longpolling): uncaught error during main loop")
355             raise
356
357     def stop(self):
358         import gevent
359         self.httpd.stop()
360         gevent.shutdown()
361
362     def run(self, preload, stop):
363         self.start()
364         self.stop()
365
366 class PreforkServer(CommonServer):
367     """ Multiprocessing inspired by (g)unicorn.
368     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
369     method between workers but we plan to replace it by a more intelligent
370     dispatcher to will parse the first HTTP request line.
371     """
372     def __init__(self, app):
373         # config
374         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
375         self.population = config['workers']
376         self.timeout = config['limit_time_real']
377         self.limit_request = config['limit_request']
378         # working vars
379         self.beat = 4
380         self.app = app
381         self.pid = os.getpid()
382         self.socket = None
383         self.workers_http = {}
384         self.workers_cron = {}
385         self.workers = {}
386         self.generation = 0
387         self.queue = []
388         self.long_polling_pid = None
389
390     def pipe_new(self):
391         pipe = os.pipe()
392         for fd in pipe:
393             # non_blocking
394             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
395             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
396             # close_on_exec
397             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
398             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
399         return pipe
400
401     def pipe_ping(self, pipe):
402         try:
403             os.write(pipe[1], '.')
404         except IOError, e:
405             if e.errno not in [errno.EAGAIN, errno.EINTR]:
406                 raise
407
408     def signal_handler(self, sig, frame):
409         if len(self.queue) < 5 or sig == signal.SIGCHLD:
410             self.queue.append(sig)
411             self.pipe_ping(self.pipe)
412         else:
413             _logger.warn("Dropping signal: %s", sig)
414
415     def worker_spawn(self, klass, workers_registry):
416         self.generation += 1
417         worker = klass(self)
418         pid = os.fork()
419         if pid != 0:
420             worker.pid = pid
421             self.workers[pid] = worker
422             workers_registry[pid] = worker
423             return worker
424         else:
425             worker.run()
426             sys.exit(0)
427
428     def long_polling_spawn(self):
429         nargs = stripped_sys_argv()
430         cmd = nargs[0]
431         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
432         nargs[0] = cmd
433         popen = subprocess.Popen([sys.executable] + nargs)
434         self.long_polling_pid = popen.pid
435
436     def worker_pop(self, pid):
437         if pid == self.long_polling_pid:
438             self.long_polling_pid = None
439         if pid in self.workers:
440             _logger.debug("Worker (%s) unregistered", pid)
441             try:
442                 self.workers_http.pop(pid, None)
443                 self.workers_cron.pop(pid, None)
444                 u = self.workers.pop(pid)
445                 u.close()
446             except OSError:
447                 return
448
449     def worker_kill(self, pid, sig):
450         try:
451             os.kill(pid, sig)
452         except OSError, e:
453             if e.errno == errno.ESRCH:
454                 self.worker_pop(pid)
455
456     def process_signals(self):
457         while len(self.queue):
458             sig = self.queue.pop(0)
459             if sig in [signal.SIGINT, signal.SIGTERM]:
460                 raise KeyboardInterrupt
461             elif sig == signal.SIGHUP:
462                 # restart on kill -HUP
463                 openerp.phoenix = True
464                 raise KeyboardInterrupt
465             elif sig == signal.SIGQUIT:
466                 # dump stacks on kill -3
467                 self.dumpstacks()
468             elif sig == signal.SIGTTIN:
469                 # increase number of workers
470                 self.population += 1
471             elif sig == signal.SIGTTOU:
472                 # decrease number of workers
473                 self.population -= 1
474
475     def process_zombie(self):
476         # reap dead workers
477         while 1:
478             try:
479                 wpid, status = os.waitpid(-1, os.WNOHANG)
480                 if not wpid:
481                     break
482                 if (status >> 8) == 3:
483                     msg = "Critial worker error (%s)"
484                     _logger.critical(msg, wpid)
485                     raise Exception(msg % wpid)
486                 self.worker_pop(wpid)
487             except OSError, e:
488                 if e.errno == errno.ECHILD:
489                     break
490                 raise
491
492     def process_timeout(self):
493         now = time.time()
494         for (pid, worker) in self.workers.items():
495             if worker.watchdog_timeout is not None and \
496                     (now - worker.watchdog_time) >= worker.watchdog_timeout:
497                 _logger.error("Worker (%s) timeout", pid)
498                 self.worker_kill(pid, signal.SIGKILL)
499
500     def process_spawn(self):
501         while len(self.workers_http) < self.population:
502             self.worker_spawn(WorkerHTTP, self.workers_http)
503         while len(self.workers_cron) < config['max_cron_threads']:
504             self.worker_spawn(WorkerCron, self.workers_cron)
505         if not self.long_polling_pid:
506             self.long_polling_spawn()
507
508     def sleep(self):
509         try:
510             # map of fd -> worker
511             fds = dict([(w.watchdog_pipe[0], w) for k, w in self.workers.items()])
512             fd_in = fds.keys() + [self.pipe[0]]
513             # check for ping or internal wakeups
514             ready = select.select(fd_in, [], [], self.beat)
515             # update worker watchdogs
516             for fd in ready[0]:
517                 if fd in fds:
518                     fds[fd].watchdog_time = time.time()
519                 try:
520                     # empty pipe
521                     while os.read(fd, 1):
522                         pass
523                 except OSError, e:
524                     if e.errno not in [errno.EAGAIN]:
525                         raise
526         except select.error, e:
527             if e[0] not in [errno.EINTR]:
528                 raise
529
530     def start(self):
531         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
532         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
533         # signal handler to overcome this behaviour
534         self.pipe = self.pipe_new()
535         # set signal handlers
536         signal.signal(signal.SIGINT, self.signal_handler)
537         signal.signal(signal.SIGTERM, self.signal_handler)
538         signal.signal(signal.SIGHUP, self.signal_handler)
539         signal.signal(signal.SIGCHLD, self.signal_handler)
540         signal.signal(signal.SIGTTIN, self.signal_handler)
541         signal.signal(signal.SIGTTOU, self.signal_handler)
542         signal.signal(signal.SIGQUIT, dumpstacks)
543
544         # listen to socket
545         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
546         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
547         self.socket.setblocking(0)
548         self.socket.bind(self.address)
549         self.socket.listen(8 * self.population)
550
551     def stop(self, graceful=True):
552         if self.long_polling_pid is not None:
553             # FIXME make longpolling process handle SIGTERM correctly
554             self.worker_kill(self.long_polling_pid, signal.SIGKILL)
555             self.long_polling_pid = None
556         if graceful:
557             _logger.info("Stopping gracefully")
558             limit = time.time() + self.timeout
559             for pid in self.workers.keys():
560                 self.worker_kill(pid, signal.SIGTERM)
561             while self.workers and time.time() < limit:
562                 self.process_zombie()
563                 time.sleep(0.1)
564         else:
565             _logger.info("Stopping forcefully")
566         for pid in self.workers.keys():
567             self.worker_kill(pid, signal.SIGTERM)
568         self.socket.close()
569
570     def run(self, preload, stop):
571         self.start()
572
573         rc = preload_registries(preload)
574
575         if stop:
576             self.stop()
577             return rc
578
579         # Empty the cursor pool, we dont want them to be shared among forked workers.
580         openerp.sql_db.close_all()
581
582         _logger.debug("Multiprocess starting")
583         while 1:
584             try:
585                 #_logger.debug("Multiprocess beat (%s)",time.time())
586                 self.process_signals()
587                 self.process_zombie()
588                 self.process_timeout()
589                 self.process_spawn()
590                 self.sleep()
591             except KeyboardInterrupt:
592                 _logger.debug("Multiprocess clean stop")
593                 self.stop()
594                 break
595             except Exception, e:
596                 _logger.exception(e)
597                 self.stop(False)
598                 return -1
599
600 class Worker(object):
601     """ Workers """
602     def __init__(self, multi):
603         self.multi = multi
604         self.watchdog_time = time.time()
605         self.watchdog_pipe = multi.pipe_new()
606         # Can be set to None if no watchdog is desired.
607         self.watchdog_timeout = multi.timeout
608         self.ppid = os.getpid()
609         self.pid = None
610         self.alive = True
611         # should we rename into lifetime ?
612         self.request_max = multi.limit_request
613         self.request_count = 0
614
615     def setproctitle(self, title=""):
616         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
617
618     def close(self):
619         os.close(self.watchdog_pipe[0])
620         os.close(self.watchdog_pipe[1])
621
622     def signal_handler(self, sig, frame):
623         self.alive = False
624
625     def sleep(self):
626         try:
627             select.select([self.multi.socket], [], [], self.multi.beat)
628         except select.error, e:
629             if e[0] not in [errno.EINTR]:
630                 raise
631
632     def process_limit(self):
633         # If our parent changed sucide
634         if self.ppid != os.getppid():
635             _logger.info("Worker (%s) Parent changed", self.pid)
636             self.alive = False
637         # check for lifetime
638         if self.request_count >= self.request_max:
639             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
640             self.alive = False
641         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
642         rss, vms = psutil.Process(os.getpid()).get_memory_info()
643         if vms > config['limit_memory_soft']:
644             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
645             self.alive = False      # Commit suicide after the request.
646
647         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
648         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
649         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
650
651         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
652         r = resource.getrusage(resource.RUSAGE_SELF)
653         cpu_time = r.ru_utime + r.ru_stime
654         def time_expired(n, stack):
655             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
656             # We dont suicide in such case
657             raise Exception('CPU time limit exceeded.')
658         signal.signal(signal.SIGXCPU, time_expired)
659         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
660         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
661
662     def process_work(self):
663         pass
664
665     def start(self):
666         self.pid = os.getpid()
667         self.setproctitle()
668         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
669         # Reseed the random number generator
670         random.seed()
671         # Prevent fd inherientence close_on_exec
672         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
673         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
674         # reset blocking status
675         self.multi.socket.setblocking(0)
676         signal.signal(signal.SIGINT, self.signal_handler)
677         signal.signal(signal.SIGTERM, signal.SIG_DFL)
678         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
679
680     def stop(self):
681         pass
682
683     def run(self):
684         try:
685             self.start()
686             while self.alive:
687                 self.process_limit()
688                 self.multi.pipe_ping(self.watchdog_pipe)
689                 self.sleep()
690                 self.process_work()
691             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
692             self.stop()
693         except Exception:
694             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
695             # should we use 3 to abort everything ?
696             sys.exit(1)
697
698 class WorkerHTTP(Worker):
699     """ HTTP Request workers """
700     def process_request(self, client, addr):
701         client.setblocking(1)
702         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
703         # Prevent fd inherientence close_on_exec
704         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
705         fcntl.fcntl(client, fcntl.F_SETFD, flags)
706         # do request using BaseWSGIServerNoBind monkey patched with socket
707         self.server.socket = client
708         # tolerate broken pipe when the http client closes the socket before
709         # receiving the full reply
710         try:
711             self.server.process_request(client, addr)
712         except IOError, e:
713             if e.errno != errno.EPIPE:
714                 raise
715         self.request_count += 1
716
717     def process_work(self):
718         try:
719             client, addr = self.multi.socket.accept()
720             self.process_request(client, addr)
721         except socket.error, e:
722             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
723                 raise
724
725     def start(self):
726         Worker.start(self)
727         self.server = BaseWSGIServerNoBind(self.multi.app)
728
729 class WorkerCron(Worker):
730     """ Cron workers """
731
732     def __init__(self, multi):
733         super(WorkerCron, self).__init__(multi)
734         # process_work() below process a single database per call.
735         # The variable db_index is keeping track of the next database to
736         # process.
737         self.db_index = 0
738
739     def sleep(self):
740         # Really sleep once all the databases have been processed.
741         if self.db_index == 0:
742             interval = SLEEP_INTERVAL + self.pid % 10   # chorus effect
743             time.sleep(interval)
744
745     def _db_list(self):
746         if config['db_name']:
747             db_names = config['db_name'].split(',')
748         else:
749             db_names = openerp.service.db.exp_list(True)
750         return db_names
751
752     def process_work(self):
753         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
754         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
755         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
756         db_names = self._db_list()
757         if len(db_names):
758             self.db_index = (self.db_index + 1) % len(db_names)
759             db_name = db_names[self.db_index]
760             self.setproctitle(db_name)
761             if rpc_request_flag:
762                 start_time = time.time()
763                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
764
765             import openerp.addons.base as base
766             base.ir.ir_cron.ir_cron._acquire_job(db_name)
767             openerp.modules.registry.RegistryManager.delete(db_name)
768
769             # dont keep cursors in multi database mode
770             if len(db_names) > 1:
771                 openerp.sql_db.close_db(db_name)
772             if rpc_request_flag:
773                 run_time = time.time() - start_time
774                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
775                 vms_diff = (end_vms - start_vms) / 1024
776                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % \
777                     (db_name, run_time, start_vms / 1024, end_vms / 1024, vms_diff)
778                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
779
780             self.request_count += 1
781             if self.request_count >= self.request_max and self.request_max < len(db_names):
782                 _logger.error("There are more dabatases to process than allowed "
783                               "by the `limit_request` configuration variable: %s more.",
784                               len(db_names) - self.request_max)
785         else:
786             self.db_index = 0
787
788     def start(self):
789         os.nice(10)     # mommy always told me to be nice with others...
790         Worker.start(self)
791         self.multi.socket.close()
792
793 #----------------------------------------------------------
794 # start/stop public api
795 #----------------------------------------------------------
796
797 server = None
798
799 def load_server_wide_modules():
800     for m in openerp.conf.server_wide_modules:
801         try:
802             openerp.modules.module.load_openerp_module(m)
803         except Exception:
804             msg = ''
805             if m == 'web':
806                 msg = """
807 The `web` module is provided by the addons found in the `openerp-web` project.
808 Maybe you forgot to add those addons in your addons_path configuration."""
809             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
810
811 def _reexec(updated_modules=None):
812     """reexecute openerp-server process with (nearly) the same arguments"""
813     if openerp.tools.osutil.is_running_as_nt_service():
814         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
815     exe = os.path.basename(sys.executable)
816     args = stripped_sys_argv()
817     if updated_modules:
818         args += ["-u", ','.join(updated_modules)]
819     if not args or args[0] != exe:
820         args.insert(0, exe)
821     os.execv(sys.executable, args)
822
823 def load_test_file_yml(registry, test_file):
824     with registry.cursor() as cr:
825         openerp.tools.convert_yaml_import(cr, 'base', file(test_file), 'test', {}, 'init')
826         if config['test_commit']:
827             _logger.info('test %s has been commited', test_file)
828             cr.commit()
829         else:
830             _logger.info('test %s has been rollbacked', test_file)
831             cr.rollback()
832
833 def load_test_file_py(registry, test_file):
834     # Locate python module based on its filename and run the tests
835     test_path, _ = os.path.splitext(os.path.abspath(test_file))
836     for mod_name, mod_mod in sys.modules.items():
837         if mod_mod:
838             mod_path, _ = os.path.splitext(getattr(mod_mod, '__file__', ''))
839             if test_path == mod_path:
840                 suite = unittest2.TestSuite()
841                 for t in unittest2.TestLoader().loadTestsFromModule(mod_mod):
842                     suite.addTest(t)
843                 _logger.log(logging.INFO, 'running tests %s.', mod_mod.__name__)
844                 stream = openerp.modules.module.TestStream()
845                 result = unittest2.TextTestRunner(verbosity=2, stream=stream).run(suite)
846                 success = result.wasSuccessful()
847                 if hasattr(registry._assertion_report,'report_result'):
848                     registry._assertion_report.report_result(success)
849                 if not success:
850                     _logger.error('%s: at least one error occurred in a test', test_file)
851
852 def preload_registries(dbnames):
853     """ Preload a registries, possibly run a test file."""
854     # TODO: move all config checks to args dont check tools.config here
855     config = openerp.tools.config
856     test_file = config['test_file']
857     dbnames = dbnames or []
858     rc = 0
859     for dbname in dbnames:
860         try:
861             update_module = config['init'] or config['update']
862             registry = RegistryManager.new(dbname, update_module=update_module)
863             # run test_file if provided
864             if test_file:
865                 _logger.info('loading test file %s', test_file)
866                 with openerp.api.Environment.manage():
867                     if test_file.endswith('yml'):
868                         load_test_file_yml(registry, test_file)
869                     elif test_file.endswith('py'):
870                         load_test_file_py(registry, test_file)
871
872             if registry._assertion_report.failures:
873                 rc += 1
874         except Exception:
875             _logger.critical('Failed to initialize database `%s`.', dbname, exc_info=True)
876             return -1
877     return rc
878
879 def start(preload=None, stop=False):
880     """ Start the openerp http server and cron processor.
881     """
882     global server
883     load_server_wide_modules()
884     if openerp.evented:
885         server = GeventServer(openerp.service.wsgi_server.application)
886     elif config['workers']:
887         server = PreforkServer(openerp.service.wsgi_server.application)
888     else:
889         server = ThreadedServer(openerp.service.wsgi_server.application)
890
891     watcher = None
892     if config['dev_mode']:
893         if watchdog:
894             watcher = FSWatcher()
895             watcher.start()
896         else:
897             _logger.warning("'watchdog' module not installed. Code autoreload feature is disabled")
898         server.app = DebuggedApplication(server.app, evalex=True)
899
900     rc = server.run(preload, stop)
901
902     # like the legend of the phoenix, all ends with beginnings
903     if getattr(openerp, 'phoenix', False):
904         if watcher:
905             watcher.stop()
906         _reexec()
907
908     return rc if rc else 0
909
910 def restart():
911     """ Restart the server
912     """
913     if os.name == 'nt':
914         # run in a thread to let the current thread return response to the caller.
915         threading.Thread(target=_reexec).start()
916     else:
917         os.kill(server.pid, signal.SIGHUP)
918
919 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: