[REF] Refactoring according to the review of CHS
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import logging
7 import os
8 import os.path
9 import platform
10 import psutil
11 import random
12 import resource
13 import select
14 import signal
15 import socket
16 import subprocess
17 import sys
18 import threading
19 import time
20 import traceback
21
22 import werkzeug.serving
23
24 try:
25     import fcntl
26 except ImportError:
27     pass
28 try:
29     from setproctitle import setproctitle
30 except ImportError:
31     setproctitle = lambda x: None
32
33 import openerp
34 import openerp.tools.config as config
35 from openerp.release import nt_service_name
36 from openerp.tools.misc import stripped_sys_argv
37
38 import wsgi_server
39
40 _logger = logging.getLogger(__name__)
41
42 SLEEP_INTERVAL = 60 # 1 min
43
44 #----------------------------------------------------------
45 # Werkzeug WSGI servers patched
46 #----------------------------------------------------------
47
48 class BaseWSGIServerNoBind(werkzeug.serving.BaseWSGIServer):
49     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
50     use this class, sets the socket and calls the process_request() manually
51     """
52     def __init__(self, app):
53         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
54     def server_bind(self):
55         # we dont bind beause we use the listen socket of PreforkServer#socket
56         # instead we close the socket
57         if self.socket:
58             self.socket.close()
59     def server_activate(self):
60         # dont listen as we use PreforkServer#socket
61         pass
62
63 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
64 # should also work with systemd socket activation. This is currently untested
65 # and not yet used.
66
67 class ThreadedWSGIServerReloadable(werkzeug.serving.ThreadedWSGIServer):
68     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
69     given by the environement, this is used by autoreload to keep the listen
70     socket open when a reload happens.
71     """
72     def server_bind(self):
73         envfd = os.environ.get('LISTEN_FDS')
74         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
75             self.reload_socket = True
76             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
77             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
78         else:
79             self.reload_socket = False
80             super(ThreadedWSGIServerReloadable, self).server_bind()
81
82     def server_activate(self):
83         if not self.reload_socket:
84             super(ThreadedWSGIServerReloadable, self).server_activate()
85
86 #----------------------------------------------------------
87 # AutoReload watcher
88 #----------------------------------------------------------
89
90 class AutoReload(object):
91     def __init__(self, server):
92         self.server = server
93         self.files = {}
94         self.modules = {}
95         import pyinotify
96         class EventHandler(pyinotify.ProcessEvent):
97             def __init__(self, autoreload):
98                 self.autoreload = autoreload
99
100             def process_IN_CREATE(self, event):
101                 _logger.debug('File created: %s', event.pathname)
102                 self.autoreload.files[event.pathname] = 1
103
104             def process_IN_MODIFY(self, event):
105                 _logger.debug('File modified: %s', event.pathname)
106                 self.autoreload.files[event.pathname] = 1
107
108         self.wm = pyinotify.WatchManager()
109         self.handler = EventHandler(self)
110         self.notifier = pyinotify.Notifier(self.wm, self.handler, timeout=0)
111         mask = pyinotify.IN_MODIFY | pyinotify.IN_CREATE  # IN_MOVED_FROM, IN_MOVED_TO ?
112         for path in openerp.tools.config.options["addons_path"].split(','):
113             _logger.info('Watching addons folder %s', path)
114             self.wm.add_watch(path, mask, rec=True)
115
116     def process_data(self, files):
117         xml_files = [i for i in files if i.endswith('.xml')]
118         addons_path = openerp.tools.config.options["addons_path"].split(',')
119         for i in xml_files:
120             for path in addons_path:
121                 if i.startswith(path):
122                     # find out wich addons path the file belongs to
123                     # and extract it's module name
124                     right = i[len(path) + 1:].split('/')
125                     if len(right) < 2:
126                         continue
127                     module = right[0]
128                     self.modules[module]=1
129         if self.modules:
130             _logger.info('autoreload: xml change detected, autoreload activated')
131             restart()
132
133     def process_python(self, files):
134         # process python changes
135         py_files = [i for i in files if i.endswith('.py')]
136         py_errors = []
137         # TODO keep python errors until they are ok
138         if py_files:
139             for i in py_files:
140                 try:
141                     source = open(i, 'rb').read() + '\n'
142                     compile(source, i, 'exec')
143                 except SyntaxError:
144                     py_errors.append(i)
145             if py_errors:
146                 _logger.info('autoreload: python code change detected, errors found')
147                 for i in py_errors:
148                     _logger.info('autoreload: SyntaxError %s',i)
149             else:
150                 _logger.info('autoreload: python code updated, autoreload activated')
151                 restart()
152
153     def check_thread(self):
154         # Check if some files have been touched in the addons path.
155         # If true, check if the touched file belongs to an installed module
156         # in any of the database used in the registry manager.
157         while 1:
158             while self.notifier.check_events(1000):
159                 self.notifier.read_events()
160                 self.notifier.process_events()
161             l = self.files.keys()
162             self.files.clear()
163             self.process_data(l)
164             self.process_python(l)
165
166     def run(self):
167         t = threading.Thread(target=self.check_thread)
168         t.setDaemon(True)
169         t.start()
170         _logger.info('AutoReload watcher running')
171
172 #----------------------------------------------------------
173 # Servers: Threaded, Gevented and Prefork
174 #----------------------------------------------------------
175
176 class CommonServer(object):
177     def __init__(self, app):
178         # TODO Change the xmlrpc_* options to http_*
179         self.app = app
180         # config
181         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
182         self.port = config['xmlrpc_port']
183         # runtime
184         self.pid = os.getpid()
185
186     def dumpstacks(self):
187         """ Signal handler: dump a stack trace for each existing thread."""
188         # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
189         # modified for python 2.5 compatibility
190         threads_info = dict([(th.ident, {'name': th.name,
191                                         'uid': getattr(th,'uid','n/a')})
192                                     for th in threading.enumerate()])
193         code = []
194         for threadId, stack in sys._current_frames().items():
195             thread_info = threads_info.get(threadId)
196             code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
197                         (thread_info and thread_info['name'] or 'n/a',
198                          threadId,
199                          thread_info and thread_info['uid'] or 'n/a'))
200             for filename, lineno, name, line in traceback.extract_stack(stack):
201                 code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
202                 if line:
203                     code.append("  %s" % (line.strip()))
204         _logger.info("\n".join(code))
205
206     def close_socket(self, sock):
207         """ Closes a socket instance cleanly
208         :param sock: the network socket to close
209         :type sock: socket.socket
210         """
211         try:
212             sock.shutdown(socket.SHUT_RDWR)
213         except socket.error, e:
214             # On OSX, socket shutdowns both sides if any side closes it
215             # causing an error 57 'Socket is not connected' on shutdown
216             # of the other side (or something), see
217             # http://bugs.python.org/issue4397
218             # note: stdlib fixed test, not behavior
219             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
220                 raise
221         sock.close()
222
223 class ThreadedServer(CommonServer):
224     def __init__(self, app):
225         super(ThreadedServer, self).__init__(app)
226         self.main_thread_id = threading.currentThread().ident
227         # Variable keeping track of the number of calls to the signal handler defined
228         # below. This variable is monitored by ``quit_on_signals()``.
229         self.quit_signals_received = 0
230
231         #self.socket = None
232         self.httpd = None
233
234     def signal_handler(self, sig, frame):
235         if sig in [signal.SIGINT,signal.SIGTERM]:
236             # shutdown on kill -INT or -TERM
237             self.quit_signals_received += 1
238             if self.quit_signals_received > 1:
239                 # logging.shutdown was already called at this point.
240                 sys.stderr.write("Forced shutdown.\n")
241                 os._exit(0)
242         elif sig == signal.SIGHUP:
243             # restart on kill -HUP
244             openerp.phoenix = True
245             self.quit_signals_received += 1
246         elif sig == signal.SIGQUIT:
247             # dump stacks on kill -3
248             self.dumpstacks()
249
250     def cron_thread(self, number):
251         while True:
252             time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
253             registries = openerp.modules.registry.RegistryManager.registries
254             _logger.debug('cron%d polling for jobs', number)
255             for db_name, registry in registries.items():
256                 while True and registry.ready:
257                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
258                     if not acquired:
259                         break
260
261     def cron_spawn(self):
262         """ Start the above runner function in a daemon thread.
263
264         The thread is a typical daemon thread: it will never quit and must be
265         terminated when the main process exits - with no consequence (the processing
266         threads it spawns are not marked daemon).
267
268         """
269         # Force call to strptime just before starting the cron thread
270         # to prevent time.strptime AttributeError within the thread.
271         # See: http://bugs.python.org/issue7980
272         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
273         for i in range(openerp.tools.config['max_cron_threads']):
274             def target():
275                 self.cron_thread(i)
276             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
277             t.setDaemon(True)
278             t.start()
279             _logger.debug("cron%d started!" % i)
280
281     def http_thread(self):
282         def app(e,s):
283             return self.app(e,s)
284         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
285         self.httpd.serve_forever()
286
287     def http_spawn(self):
288         threading.Thread(target=self.http_thread).start()
289         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
290
291     def start(self):
292         _logger.debug("Setting signal handlers")
293         if os.name == 'posix':
294             signal.signal(signal.SIGINT, self.signal_handler)
295             signal.signal(signal.SIGTERM, self.signal_handler)
296             signal.signal(signal.SIGCHLD, self.signal_handler)
297             signal.signal(signal.SIGHUP, self.signal_handler)
298             signal.signal(signal.SIGQUIT, self.signal_handler)
299         elif os.name == 'nt':
300             import win32api
301             win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
302         self.cron_spawn()
303         self.http_spawn()
304
305     def stop(self):
306         """ Shutdown the WSGI server. Wait for non deamon threads.
307         """
308         _logger.info("Initiating shutdown")
309         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
310
311         self.httpd.shutdown()
312         self.close_socket(self.httpd.socket)
313
314         # Manually join() all threads before calling sys.exit() to allow a second signal
315         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
316         # threading.Thread.join() should not mask signals (at least in python 2.5).
317         me = threading.currentThread()
318         _logger.debug('current thread: %r', me)
319         for thread in threading.enumerate():
320             _logger.debug('process %r (%r)', thread, thread.isDaemon())
321             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
322                 while thread.isAlive():
323                     _logger.debug('join and sleep')
324                     # Need a busyloop here as thread.join() masks signals
325                     # and would prevent the forced shutdown.
326                     thread.join(0.05)
327                     time.sleep(0.05)
328
329         _logger.debug('--')
330         openerp.modules.registry.RegistryManager.delete_all()
331         logging.shutdown()
332
333     def run(self):
334         """ Start the http server and the cron thread then wait for a signal.
335
336         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
337         a second one if any will force an immediate exit.
338         """
339         self.start()
340
341         # Wait for a first signal to be handled. (time.sleep will be interrupted
342         # by the signal handler.) The try/except is for the win32 case.
343         try:
344             while self.quit_signals_received == 0:
345                 time.sleep(60)
346         except KeyboardInterrupt:
347             pass
348
349         self.stop()
350
351     def reload(self):
352         os.kill(self.pid, signal.SIGHUP)
353
354 class GeventServer(CommonServer):
355     def __init__(self, app):
356         super(GeventServer, self).__init__(app)
357         self.port = config['longpolling_port']
358         self.httpd = None
359
360     def watch_parent(self, beat=4):
361         import gevent
362         ppid = os.getppid()
363         while True:
364             if ppid != os.getppid():
365                 pid = os.getpid()
366                 _logger.info("LongPolling (%s) Parent changed", pid)
367                 # suicide !!
368                 os.kill(pid, signal.SIGTERM)
369                 return
370             gevent.sleep(beat)
371
372     def start(self):
373         import gevent
374         from gevent.wsgi import WSGIServer
375         gevent.spawn(self.watch_parent)
376         self.httpd = WSGIServer((self.interface, self.port), self.app)
377         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
378         self.httpd.serve_forever()
379
380     def stop(self):
381         import gevent
382         self.httpd.stop()
383         gevent.shutdown()
384
385     def run(self):
386         self.start()
387         self.stop()
388
389 class PreforkServer(CommonServer):
390     """ Multiprocessing inspired by (g)unicorn.
391     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
392     method between workers but we plan to replace it by a more intelligent
393     dispatcher to will parse the first HTTP request line.
394     """
395     def __init__(self, app):
396         # config
397         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
398         self.population = config['workers']
399         self.timeout = config['limit_time_real']
400         self.limit_request = config['limit_request']
401         # working vars
402         self.beat = 4
403         self.app = app
404         self.pid = os.getpid()
405         self.socket = None
406         self.workers_http = {}
407         self.workers_cron = {}
408         self.workers = {}
409         self.generation = 0
410         self.queue = []
411         self.long_polling_pid = None
412
413     def pipe_new(self):
414         pipe = os.pipe()
415         for fd in pipe:
416             # non_blocking
417             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
418             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
419             # close_on_exec
420             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
421             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
422         return pipe
423
424     def pipe_ping(self, pipe):
425         try:
426             os.write(pipe[1], '.')
427         except IOError, e:
428             if e.errno not in [errno.EAGAIN, errno.EINTR]:
429                 raise
430
431     def signal_handler(self, sig, frame):
432         if len(self.queue) < 5 or sig == signal.SIGCHLD:
433             self.queue.append(sig)
434             self.pipe_ping(self.pipe)
435         else:
436             _logger.warn("Dropping signal: %s", sig)
437
438     def worker_spawn(self, klass, workers_registry):
439         self.generation += 1
440         worker = klass(self)
441         pid = os.fork()
442         if pid != 0:
443             worker.pid = pid
444             self.workers[pid] = worker
445             workers_registry[pid] = worker
446             return worker
447         else:
448             worker.run()
449             sys.exit(0)
450
451     def long_polling_spawn(self):
452         nargs = stripped_sys_argv('--pidfile','--workers')
453         cmd = nargs[0]
454         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
455         nargs[0] = cmd
456         popen = subprocess.Popen(nargs)
457         self.long_polling_pid = popen.pid
458
459     def worker_pop(self, pid):
460         if pid in self.workers:
461             _logger.debug("Worker (%s) unregistered",pid)
462             try:
463                 self.workers_http.pop(pid,None)
464                 self.workers_cron.pop(pid,None)
465                 u = self.workers.pop(pid)
466                 u.close()
467             except OSError:
468                 return
469
470     def worker_kill(self, pid, sig):
471         try:
472             os.kill(pid, sig)
473         except OSError, e:
474             if e.errno == errno.ESRCH:
475                 self.worker_pop(pid)
476
477     def process_signals(self):
478         while len(self.queue):
479             sig = self.queue.pop(0)
480             if sig in [signal.SIGINT,signal.SIGTERM]:
481                 raise KeyboardInterrupt
482             elif sig == signal.SIGHUP:
483                 # restart on kill -HUP
484                 openerp.phoenix = True
485                 raise KeyboardInterrupt
486             elif sig == signal.SIGQUIT:
487                 # dump stacks on kill -3
488                 self.dumpstacks()
489             elif sig == signal.SIGTTIN:
490                 # increase number of workers
491                 self.population += 1
492             elif sig == signal.SIGTTOU:
493                 # decrease number of workers
494                 self.population -= 1
495
496     def process_zombie(self):
497         # reap dead workers
498         while 1:
499             try:
500                 wpid, status = os.waitpid(-1, os.WNOHANG)
501                 if not wpid:
502                     break
503                 if (status >> 8) == 3:
504                     msg = "Critial worker error (%s)"
505                     _logger.critical(msg, wpid)
506                     raise Exception(msg % wpid)
507                 self.worker_pop(wpid)
508             except OSError, e:
509                 if e.errno == errno.ECHILD:
510                     break
511                 raise
512
513     def process_timeout(self):
514         now = time.time()
515         for (pid, worker) in self.workers.items():
516             if (worker.watchdog_timeout is not None) and \
517                 (now - worker.watchdog_time >= worker.watchdog_timeout):
518                 _logger.error("Worker (%s) timeout", pid)
519                 self.worker_kill(pid, signal.SIGKILL)
520
521     def process_spawn(self):
522         while len(self.workers_http) < self.population:
523             self.worker_spawn(WorkerHTTP, self.workers_http)
524         while len(self.workers_cron) < config['max_cron_threads']:
525             self.worker_spawn(WorkerCron, self.workers_cron)
526         if not self.long_polling_pid:
527             self.long_polling_spawn()
528
529     def sleep(self):
530         try:
531             # map of fd -> worker
532             fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
533             fd_in = fds.keys() + [self.pipe[0]]
534             # check for ping or internal wakeups
535             ready = select.select(fd_in, [], [], self.beat)
536             # update worker watchdogs
537             for fd in ready[0]:
538                 if fd in fds:
539                     fds[fd].watchdog_time = time.time()
540                 try:
541                     # empty pipe
542                     while os.read(fd, 1):
543                         pass
544                 except OSError, e:
545                     if e.errno not in [errno.EAGAIN]:
546                         raise
547         except select.error, e:
548             if e[0] not in [errno.EINTR]:
549                 raise
550
551     def start(self):
552         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
553         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
554         # signal handler to overcome this behaviour
555         self.pipe = self.pipe_new()
556         # set signal handlers
557         signal.signal(signal.SIGINT, self.signal_handler)
558         signal.signal(signal.SIGTERM, self.signal_handler)
559         signal.signal(signal.SIGHUP, self.signal_handler)
560         signal.signal(signal.SIGCHLD, self.signal_handler)
561         signal.signal(signal.SIGQUIT, self.signal_handler)
562         signal.signal(signal.SIGTTIN, self.signal_handler)
563         signal.signal(signal.SIGTTOU, self.signal_handler)
564         # listen to socket
565         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
566         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
567         self.socket.setblocking(0)
568         self.socket.bind(self.address)
569         self.socket.listen(8*self.population)
570
571     def stop(self, graceful=True):
572         if self.long_polling_pid is not None:
573             self.worker_kill(self.long_polling_pid, signal.SIGKILL)     # FIXME make longpolling process handle SIGTERM correctly
574             self.long_polling_pid = None
575         if graceful:
576             _logger.info("Stopping gracefully")
577             limit = time.time() + self.timeout
578             for pid in self.workers.keys():
579                 self.worker_kill(pid, signal.SIGTERM)
580             while self.workers and time.time() < limit:
581                 self.process_zombie()
582                 time.sleep(0.1)
583         else:
584             _logger.info("Stopping forcefully")
585         for pid in self.workers.keys():
586             self.worker_kill(pid, signal.SIGTERM)
587         self.socket.close()
588
589     def run(self):
590         self.start()
591         _logger.debug("Multiprocess starting")
592         while 1:
593             try:
594                 #_logger.debug("Multiprocess beat (%s)",time.time())
595                 self.process_signals()
596                 self.process_zombie()
597                 self.process_timeout()
598                 self.process_spawn()
599                 self.sleep()
600             except KeyboardInterrupt:
601                 _logger.debug("Multiprocess clean stop")
602                 self.stop()
603                 break
604             except Exception,e:
605                 _logger.exception(e)
606                 self.stop(False)
607                 sys.exit(-1)
608
609 class Worker(object):
610     """ Workers """
611     def __init__(self, multi):
612         self.multi = multi
613         self.watchdog_time = time.time()
614         self.watchdog_pipe = multi.pipe_new()
615         # Can be set to None if no watchdog is desired.
616         self.watchdog_timeout = multi.timeout
617         self.ppid = os.getpid()
618         self.pid = None
619         self.alive = True
620         # should we rename into lifetime ?
621         self.request_max = multi.limit_request
622         self.request_count = 0
623
624     def setproctitle(self, title=""):
625         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
626
627     def close(self):
628         os.close(self.watchdog_pipe[0])
629         os.close(self.watchdog_pipe[1])
630
631     def signal_handler(self, sig, frame):
632         self.alive = False
633
634     def sleep(self):
635         try:
636             ret = select.select([self.multi.socket], [], [], self.multi.beat)
637         except select.error, e:
638             if e[0] not in [errno.EINTR]:
639                 raise
640
641     def process_limit(self):
642         # If our parent changed sucide
643         if self.ppid != os.getppid():
644             _logger.info("Worker (%s) Parent changed", self.pid)
645             self.alive = False
646         # check for lifetime
647         if self.request_count >= self.request_max:
648             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
649             self.alive = False
650         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
651         rss, vms = psutil.Process(os.getpid()).get_memory_info()
652         if vms > config['limit_memory_soft']:
653             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
654             self.alive = False # Commit suicide after the request.
655
656         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
657         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
658         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
659
660         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
661         r = resource.getrusage(resource.RUSAGE_SELF)
662         cpu_time = r.ru_utime + r.ru_stime
663         def time_expired(n, stack):
664             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
665             # We dont suicide in such case
666             raise Exception('CPU time limit exceeded.')
667         signal.signal(signal.SIGXCPU, time_expired)
668         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
669         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
670
671     def process_work(self):
672         pass
673
674     def start(self):
675         self.pid = os.getpid()
676         self.setproctitle()
677         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
678         # Reseed the random number generator
679         random.seed()
680         # Prevent fd inherientence close_on_exec
681         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
682         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
683         # reset blocking status
684         self.multi.socket.setblocking(0)
685         signal.signal(signal.SIGINT, self.signal_handler)
686         signal.signal(signal.SIGTERM, signal.SIG_DFL)
687         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
688
689     def stop(self):
690         pass
691
692     def run(self):
693         try:
694             self.start()
695             while self.alive:
696                 self.process_limit()
697                 self.multi.pipe_ping(self.watchdog_pipe)
698                 self.sleep()
699                 self.process_work()
700             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
701             self.stop()
702         except Exception,e:
703             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
704             # should we use 3 to abort everything ?
705             sys.exit(1)
706
707 class WorkerHTTP(Worker):
708     """ HTTP Request workers """
709     def process_request(self, client, addr):
710         client.setblocking(1)
711         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
712         # Prevent fd inherientence close_on_exec
713         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
714         fcntl.fcntl(client, fcntl.F_SETFD, flags)
715         # do request using BaseWSGIServerNoBind monkey patched with socket
716         self.server.socket = client
717         # tolerate broken pipe when the http client closes the socket before
718         # receiving the full reply
719         try:
720             self.server.process_request(client,addr)
721         except IOError, e:
722             if e.errno != errno.EPIPE:
723                 raise
724         self.request_count += 1
725
726     def process_work(self):
727         try:
728             client, addr = self.multi.socket.accept()
729             self.process_request(client, addr)
730         except socket.error, e:
731             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
732                 raise
733
734     def start(self):
735         Worker.start(self)
736         self.server = BaseWSGIServerNoBind(self.multi.app)
737
738 class WorkerCron(Worker):
739     """ Cron workers """
740
741     def __init__(self, multi):
742         super(WorkerCron, self).__init__(multi)
743         # process_work() below process a single database per call.
744         # The variable db_index is keeping track of the next database to
745         # process.
746         self.db_index = 0
747
748     def sleep(self):
749         # Really sleep once all the databases have been processed.
750         if self.db_index == 0:
751             interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
752             time.sleep(interval)
753
754     def _db_list(self):
755         if config['db_name']:
756             db_names = config['db_name'].split(',')
757         else:
758             db_names = openerp.service.db.exp_list(True)
759         return db_names
760
761     def process_work(self):
762         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
763         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
764         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
765         db_names = self._db_list()
766         if len(db_names):
767             self.db_index = (self.db_index + 1) % len(db_names)
768             db_name = db_names[self.db_index]
769             self.setproctitle(db_name)
770             if rpc_request_flag:
771                 start_time = time.time()
772                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
773             
774             import openerp.addons.base as base
775             base.ir.ir_cron.ir_cron._acquire_job(db_name)
776             openerp.modules.registry.RegistryManager.delete(db_name)
777
778             # dont keep cursors in multi database mode
779             if len(db_names) > 1:
780                 openerp.sql_db.close_db(db_name)
781             if rpc_request_flag:
782                 end_time = time.time()
783                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
784                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
785                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
786
787             self.request_count += 1
788             if self.request_count >= self.request_max and self.request_max < len(db_names):
789                 _logger.error("There are more dabatases to process than allowed "
790                     "by the `limit_request` configuration variable: %s more.",
791                     len(db_names) - self.request_max)
792         else:
793             self.db_index = 0
794
795     def start(self):
796         os.nice(10)     # mommy always told me to be nice with others...
797         Worker.start(self)
798         self.multi.socket.close()
799
800 #----------------------------------------------------------
801 # start/stop public api
802 #----------------------------------------------------------
803
804 server = None
805
806 def load_server_wide_modules():
807     for m in openerp.conf.server_wide_modules:
808         try:
809             openerp.modules.module.load_openerp_module(m)
810         except Exception:
811             msg = ''
812             if m == 'web':
813                 msg = """
814 The `web` module is provided by the addons found in the `openerp-web` project.
815 Maybe you forgot to add those addons in your addons_path configuration."""
816             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
817
818 def _reexec(updated_modules=None):
819     """reexecute openerp-server process with (nearly) the same arguments"""
820     if openerp.tools.osutil.is_running_as_nt_service():
821         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
822     exe = os.path.basename(sys.executable)
823     args = stripped_sys_argv()
824     args +=  ["-u", ','.join(updated_modules)]
825     if not args or args[0] != exe:
826         args.insert(0, exe)
827     os.execv(sys.executable, args)
828
829 def start():
830     """ Start the openerp http server and cron processor.
831     """
832     global server
833     load_server_wide_modules()
834     if config['workers']:
835         server = PreforkServer(openerp.service.wsgi_server.application)
836     elif openerp.evented:
837         server = GeventServer(openerp.service.wsgi_server.application)
838     else:
839         server = ThreadedServer(openerp.service.wsgi_server.application)
840
841     if config['auto_reload']:
842         autoreload = AutoReload(server)
843         autoreload.run()
844
845     server.run()
846
847     # like the legend of the phoenix, all ends with beginnings
848     if getattr(openerp, 'phoenix', False):
849         modules = []
850         if config['auto_reload']:
851             modules = autoreload.modules.keys()
852         _reexec(modules)
853     sys.exit(0)
854
855 def restart():
856     """ Restart the server
857     """
858     if os.name == 'nt':
859         # run in a thread to let the current thread return response to the caller.
860         threading.Thread(target=_reexec).start()
861     else:
862         os.kill(server.pid, signal.SIGHUP)
863
864 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: