fix res_group rename, move files to clean up res
[odoo/odoo.git] / openerp / service / server.py
1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
4 import datetime
5 import errno
6 import fcntl
7 import logging
8 import os
9 import os.path
10 import platform
11 import psutil
12 import random
13 import resource
14 import select
15 import signal
16 import socket
17 import subprocess
18 import sys
19 import threading
20 import time
21 import traceback
22
23 import werkzeug.serving
24 try:
25     from setproctitle import setproctitle
26 except ImportError:
27     setproctitle = lambda x: None
28
29 import openerp
30 import openerp.tools.config as config
31 from openerp.release import nt_service_name
32 from openerp.tools.misc import stripped_sys_argv
33
34 import wsgi_server
35
36 _logger = logging.getLogger(__name__)
37
38 SLEEP_INTERVAL = 60 # 1 min
39
40 #----------------------------------------------------------
41 # Werkzeug WSGI servers patched
42 #----------------------------------------------------------
43
44 class BaseWSGIServerNoBind(werkzeug.serving.BaseWSGIServer):
45     """ werkzeug Base WSGI Server patched to skip socket binding. PreforkServer
46     use this class, sets the socket and calls the process_request() manually
47     """
48     def __init__(self, app):
49         werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
50     def server_bind(self):
51         # we dont bind beause we use the listen socket of PreforkServer#socket
52         # instead we close the socket
53         if self.socket:
54             self.socket.close()
55     def server_activate(self):
56         # dont listen as we use PreforkServer#socket
57         pass
58
59 # _reexec() should set LISTEN_* to avoid connection refused during reload time. It
60 # should also work with systemd socket activation. This is currently untested
61 # and not yet used.
62
63 class ThreadedWSGIServerReloadable(werkzeug.serving.ThreadedWSGIServer):
64     """ werkzeug Threaded WSGI Server patched to allow reusing a listen socket
65     given by the environement, this is used by autoreload to keep the listen
66     socket open when a reload happens.
67     """
68     def server_bind(self):
69         envfd = os.environ.get('LISTEN_FDS')
70         if envfd and os.environ.get('LISTEN_PID') == str(os.getpid()):
71             self.reload_socket = True
72             self.socket = socket.fromfd(int(envfd), socket.AF_INET, socket.SOCK_STREAM)
73             # should we os.close(int(envfd)) ? it seem python duplicate the fd.
74         else:
75             self.reload_socket = False
76             super(ThreadedWSGIServerReloadable, self).server_bind()
77
78     def server_activate(self):
79         if not self.reload_socket:
80             super(ThreadedWSGIServerReloadable, self).server_activate()
81
82 #----------------------------------------------------------
83 # AutoReload watcher
84 #----------------------------------------------------------
85
86 class AutoReload(object):
87     def __init__(self, server):
88         self.server = server
89         self.files = {}
90         self.modules = {}
91         import pyinotify
92         class EventHandler(pyinotify.ProcessEvent):
93             def __init__(self, autoreload):
94                 self.autoreload = autoreload
95
96             def process_IN_CREATE(self, event):
97                 _logger.debug('File created: %s', event.pathname)
98                 self.autoreload.files[event.pathname] = 1
99
100             def process_IN_MODIFY(self, event):
101                 _logger.debug('File modified: %s', event.pathname)
102                 self.autoreload.files[event.pathname] = 1
103
104         self.wm = pyinotify.WatchManager()
105         self.handler = EventHandler(self)
106         self.notifier = pyinotify.Notifier(self.wm, self.handler, timeout=0)
107         mask = pyinotify.IN_MODIFY | pyinotify.IN_CREATE  # IN_MOVED_FROM, IN_MOVED_TO ?
108         for path in openerp.tools.config.options["addons_path"].split(','):
109             _logger.info('Watching addons folder %s', path)
110             self.wm.add_watch(path, mask, rec=True)
111
112     def process_data(self, files):
113         xml_files = [i for i in files if i.endswith('.xml')]
114         addons_path = openerp.tools.config.options["addons_path"].split(',')
115         for i in xml_files:
116             for path in addons_path:
117                 if i.startswith(path):
118                     # find out wich addons path the file belongs to
119                     # and extract it's module name
120                     right = i[len(path) + 1:].split('/')
121                     if len(right) < 2:
122                         continue
123                     module = right[0]
124                     self.modules[module]=1
125         if self.modules:
126             _logger.info('autoreload: xml change detected, autoreload activated')
127             restart()
128
129     def process_python(self, files):
130         # process python changes
131         py_files = [i for i in files if i.endswith('.py')]
132         py_errors = []
133         # TODO keep python errors until they are ok
134         if py_files:
135             for i in py_files:
136                 try:
137                     source = open(i, 'rb').read() + '\n'
138                     compile(source, i, 'exec')
139                 except SyntaxError:
140                     py_errors.append(i)
141             if py_errors:
142                 _logger.info('autoreload: python code change detected, errors found')
143                 for i in py_errors:
144                     _logger.info('autoreload: SyntaxError %s',i)
145             else:
146                 _logger.info('autoreload: python code updated, autoreload activated')
147                 restart()
148
149     def check_thread(self):
150         # Check if some files have been touched in the addons path.
151         # If true, check if the touched file belongs to an installed module
152         # in any of the database used in the registry manager.
153         while 1:
154             while self.notifier.check_events(1000):
155                 self.notifier.read_events()
156                 self.notifier.process_events()
157             l = self.files.keys()
158             self.files.clear()
159             self.process_data(l)
160             self.process_python(l)
161
162     def run(self):
163         t = threading.Thread(target=self.check_thread)
164         t.setDaemon(True)
165         t.start()
166         _logger.info('AutoReload watcher running')
167
168 #----------------------------------------------------------
169 # Servers: Threaded, Gevented and Prefork
170 #----------------------------------------------------------
171
172 class CommonServer(object):
173     def __init__(self, app):
174         # TODO Change the xmlrpc_* options to http_*
175         self.app = app
176         # config
177         self.interface = config['xmlrpc_interface'] or '0.0.0.0'
178         self.port = config['xmlrpc_port']
179         # runtime
180         self.pid = os.getpid()
181
182     def dumpstacks(self):
183         """ Signal handler: dump a stack trace for each existing thread."""
184         # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
185         # modified for python 2.5 compatibility
186         threads_info = dict([(th.ident, {'name': th.name,
187                                         'uid': getattr(th,'uid','n/a')})
188                                     for th in threading.enumerate()])
189         code = []
190         for threadId, stack in sys._current_frames().items():
191             thread_info = threads_info.get(threadId)
192             code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
193                         (thread_info and thread_info['name'] or 'n/a',
194                          threadId,
195                          thread_info and thread_info['uid'] or 'n/a'))
196             for filename, lineno, name, line in traceback.extract_stack(stack):
197                 code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
198                 if line:
199                     code.append("  %s" % (line.strip()))
200         _logger.info("\n".join(code))
201
202     def close_socket(self, sock):
203         """ Closes a socket instance cleanly
204         :param sock: the network socket to close
205         :type sock: socket.socket
206         """
207         try:
208             sock.shutdown(socket.SHUT_RDWR)
209         except socket.error, e:
210             # On OSX, socket shutdowns both sides if any side closes it
211             # causing an error 57 'Socket is not connected' on shutdown
212             # of the other side (or something), see
213             # http://bugs.python.org/issue4397
214             # note: stdlib fixed test, not behavior
215             if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
216                 raise
217         sock.close()
218
219 class ThreadedServer(CommonServer):
220     def __init__(self, app):
221         super(ThreadedServer, self).__init__(app)
222         self.main_thread_id = threading.currentThread().ident
223         # Variable keeping track of the number of calls to the signal handler defined
224         # below. This variable is monitored by ``quit_on_signals()``.
225         self.quit_signals_received = 0
226
227         #self.socket = None
228         self.httpd = None
229
230     def signal_handler(self, sig, frame):
231         if sig in [signal.SIGINT,signal.SIGTERM]:
232             # shutdown on kill -INT or -TERM
233             self.quit_signals_received += 1
234             if self.quit_signals_received > 1:
235                 # logging.shutdown was already called at this point.
236                 sys.stderr.write("Forced shutdown.\n")
237                 os._exit(0)
238         elif sig == signal.SIGHUP:
239             # restart on kill -HUP
240             openerp.phoenix = True
241             self.quit_signals_received += 1
242         elif sig == signal.SIGQUIT:
243             # dump stacks on kill -3
244             self.dumpstacks()
245
246     def cron_thread(self, number):
247         while True:
248             time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
249             registries = openerp.modules.registry.RegistryManager.registries
250             _logger.debug('cron%d polling for jobs', number)
251             for db_name, registry in registries.items():
252                 while True and registry.ready:
253                     acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
254                     if not acquired:
255                         break
256
257     def cron_spawn(self):
258         """ Start the above runner function in a daemon thread.
259
260         The thread is a typical daemon thread: it will never quit and must be
261         terminated when the main process exits - with no consequence (the processing
262         threads it spawns are not marked daemon).
263
264         """
265         # Force call to strptime just before starting the cron thread
266         # to prevent time.strptime AttributeError within the thread.
267         # See: http://bugs.python.org/issue7980
268         datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
269         for i in range(openerp.tools.config['max_cron_threads']):
270             def target():
271                 self.cron_thread(i)
272             t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
273             t.setDaemon(True)
274             t.start()
275             _logger.debug("cron%d started!" % i)
276
277     def http_thread(self):
278         def app(e,s):
279             return self.app(e,s)
280         self.httpd = ThreadedWSGIServerReloadable(self.interface, self.port, app)
281         self.httpd.serve_forever()
282
283     def http_spawn(self):
284         threading.Thread(target=self.http_thread).start()
285         _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
286
287     def start(self):
288         _logger.debug("Setting signal handlers")
289         if os.name == 'posix':
290             signal.signal(signal.SIGINT, self.signal_handler)
291             signal.signal(signal.SIGTERM, self.signal_handler)
292             signal.signal(signal.SIGCHLD, self.signal_handler)
293             signal.signal(signal.SIGHUP, self.signal_handler)
294             signal.signal(signal.SIGQUIT, self.signal_handler)
295         elif os.name == 'nt':
296             import win32api
297             win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
298         self.cron_spawn()
299         self.http_spawn()
300
301     def stop(self):
302         """ Shutdown the WSGI server. Wait for non deamon threads.
303         """
304         _logger.info("Initiating shutdown")
305         _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
306
307         self.httpd.shutdown()
308         self.close_socket(self.httpd.socket)
309
310         # Manually join() all threads before calling sys.exit() to allow a second signal
311         # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
312         # threading.Thread.join() should not mask signals (at least in python 2.5).
313         me = threading.currentThread()
314         _logger.debug('current thread: %r', me)
315         for thread in threading.enumerate():
316             _logger.debug('process %r (%r)', thread, thread.isDaemon())
317             if thread != me and not thread.isDaemon() and thread.ident != self.main_thread_id:
318                 while thread.isAlive():
319                     _logger.debug('join and sleep')
320                     # Need a busyloop here as thread.join() masks signals
321                     # and would prevent the forced shutdown.
322                     thread.join(0.05)
323                     time.sleep(0.05)
324
325         _logger.debug('--')
326         openerp.modules.registry.RegistryManager.delete_all()
327         logging.shutdown()
328
329     def run(self):
330         """ Start the http server and the cron thread then wait for a signal.
331
332         The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
333         a second one if any will force an immediate exit.
334         """
335         self.start()
336
337         # Wait for a first signal to be handled. (time.sleep will be interrupted
338         # by the signal handler.) The try/except is for the win32 case.
339         try:
340             while self.quit_signals_received == 0:
341                 time.sleep(60)
342         except KeyboardInterrupt:
343             pass
344
345         self.stop()
346
347     def reload(self):
348         os.kill(self.pid, signal.SIGHUP)
349
350 class GeventServer(CommonServer):
351     def __init__(self, app):
352         super(GeventServer, self).__init__(app)
353         self.port = config['longpolling_port']
354         self.httpd = None
355
356     def watch_parent(self, beat=4):
357         import gevent
358         ppid = os.getppid()
359         while True:
360             if ppid != os.getppid():
361                 pid = os.getpid()
362                 _logger.info("LongPolling (%s) Parent changed", pid)
363                 # suicide !!
364                 os.kill(pid, signal.SIGTERM)
365                 return
366             gevent.sleep(beat)
367
368     def start(self):
369         import gevent
370         from gevent.wsgi import WSGIServer
371         gevent.spawn(self.watch_parent)
372         self.httpd = WSGIServer((self.interface, self.port), self.app)
373         _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
374         self.httpd.serve_forever()
375
376     def stop(self):
377         import gevent
378         self.httpd.stop()
379         gevent.shutdown()
380
381     def run(self):
382         self.start()
383         self.stop()
384
385 class PreforkServer(CommonServer):
386     """ Multiprocessing inspired by (g)unicorn.
387     PreforkServer (aka Multicorn) currently uses accept(2) as dispatching
388     method between workers but we plan to replace it by a more intelligent
389     dispatcher to will parse the first HTTP request line.
390     """
391     def __init__(self, app):
392         # config
393         self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
394         self.population = config['workers']
395         self.timeout = config['limit_time_real']
396         self.limit_request = config['limit_request']
397         # working vars
398         self.beat = 4
399         self.app = app
400         self.pid = os.getpid()
401         self.socket = None
402         self.workers_http = {}
403         self.workers_cron = {}
404         self.workers = {}
405         self.generation = 0
406         self.queue = []
407         self.long_polling_pid = None
408
409     def pipe_new(self):
410         pipe = os.pipe()
411         for fd in pipe:
412             # non_blocking
413             flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
414             fcntl.fcntl(fd, fcntl.F_SETFL, flags)
415             # close_on_exec
416             flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
417             fcntl.fcntl(fd, fcntl.F_SETFD, flags)
418         return pipe
419
420     def pipe_ping(self, pipe):
421         try:
422             os.write(pipe[1], '.')
423         except IOError, e:
424             if e.errno not in [errno.EAGAIN, errno.EINTR]:
425                 raise
426
427     def signal_handler(self, sig, frame):
428         if len(self.queue) < 5 or sig == signal.SIGCHLD:
429             self.queue.append(sig)
430             self.pipe_ping(self.pipe)
431         else:
432             _logger.warn("Dropping signal: %s", sig)
433
434     def worker_spawn(self, klass, workers_registry):
435         self.generation += 1
436         worker = klass(self)
437         pid = os.fork()
438         if pid != 0:
439             worker.pid = pid
440             self.workers[pid] = worker
441             workers_registry[pid] = worker
442             return worker
443         else:
444             worker.run()
445             sys.exit(0)
446
447     def long_polling_spawn(self):
448         nargs = stripped_sys_argv('--pidfile','--workers')
449         cmd = nargs[0]
450         cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
451         nargs[0] = cmd
452         popen = subprocess.Popen(nargs)
453         self.long_polling_pid = popen.pid
454
455     def worker_pop(self, pid):
456         if pid in self.workers:
457             _logger.debug("Worker (%s) unregistered",pid)
458             try:
459                 self.workers_http.pop(pid,None)
460                 self.workers_cron.pop(pid,None)
461                 u = self.workers.pop(pid)
462                 u.close()
463             except OSError:
464                 return
465
466     def worker_kill(self, pid, sig):
467         try:
468             os.kill(pid, sig)
469         except OSError, e:
470             if e.errno == errno.ESRCH:
471                 self.worker_pop(pid)
472
473     def process_signals(self):
474         while len(self.queue):
475             sig = self.queue.pop(0)
476             if sig in [signal.SIGINT,signal.SIGTERM]:
477                 raise KeyboardInterrupt
478             elif sig == signal.SIGHUP:
479                 # restart on kill -HUP
480                 openerp.phoenix = True
481                 raise KeyboardInterrupt
482             elif sig == signal.SIGQUIT:
483                 # dump stacks on kill -3
484                 self.dumpstacks()
485             elif sig == signal.SIGTTIN:
486                 # increase number of workers
487                 self.population += 1
488             elif sig == signal.SIGTTOU:
489                 # decrease number of workers
490                 self.population -= 1
491
492     def process_zombie(self):
493         # reap dead workers
494         while 1:
495             try:
496                 wpid, status = os.waitpid(-1, os.WNOHANG)
497                 if not wpid:
498                     break
499                 if (status >> 8) == 3:
500                     msg = "Critial worker error (%s)"
501                     _logger.critical(msg, wpid)
502                     raise Exception(msg % wpid)
503                 self.worker_pop(wpid)
504             except OSError, e:
505                 if e.errno == errno.ECHILD:
506                     break
507                 raise
508
509     def process_timeout(self):
510         now = time.time()
511         for (pid, worker) in self.workers.items():
512             if (worker.watchdog_timeout is not None) and \
513                 (now - worker.watchdog_time >= worker.watchdog_timeout):
514                 _logger.error("Worker (%s) timeout", pid)
515                 self.worker_kill(pid, signal.SIGKILL)
516
517     def process_spawn(self):
518         while len(self.workers_http) < self.population:
519             self.worker_spawn(WorkerHTTP, self.workers_http)
520         while len(self.workers_cron) < config['max_cron_threads']:
521             self.worker_spawn(WorkerCron, self.workers_cron)
522         if not self.long_polling_pid:
523             self.long_polling_spawn()
524
525     def sleep(self):
526         try:
527             # map of fd -> worker
528             fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
529             fd_in = fds.keys() + [self.pipe[0]]
530             # check for ping or internal wakeups
531             ready = select.select(fd_in, [], [], self.beat)
532             # update worker watchdogs
533             for fd in ready[0]:
534                 if fd in fds:
535                     fds[fd].watchdog_time = time.time()
536                 try:
537                     # empty pipe
538                     while os.read(fd, 1):
539                         pass
540                 except OSError, e:
541                     if e.errno not in [errno.EAGAIN]:
542                         raise
543         except select.error, e:
544             if e[0] not in [errno.EINTR]:
545                 raise
546
547     def start(self):
548         # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
549         # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
550         # signal handler to overcome this behaviour
551         self.pipe = self.pipe_new()
552         # set signal handlers
553         signal.signal(signal.SIGINT, self.signal_handler)
554         signal.signal(signal.SIGTERM, self.signal_handler)
555         signal.signal(signal.SIGHUP, self.signal_handler)
556         signal.signal(signal.SIGCHLD, self.signal_handler)
557         signal.signal(signal.SIGQUIT, self.signal_handler)
558         signal.signal(signal.SIGTTIN, self.signal_handler)
559         signal.signal(signal.SIGTTOU, self.signal_handler)
560         # listen to socket
561         self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
562         self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
563         self.socket.setblocking(0)
564         self.socket.bind(self.address)
565         self.socket.listen(8*self.population)
566
567     def stop(self, graceful=True):
568         if self.long_polling_pid is not None:
569             self.worker_kill(self.long_polling_pid, signal.SIGKILL)     # FIXME make longpolling process handle SIGTERM correctly
570             self.long_polling_pid = None
571         if graceful:
572             _logger.info("Stopping gracefully")
573             limit = time.time() + self.timeout
574             for pid in self.workers.keys():
575                 self.worker_kill(pid, signal.SIGTERM)
576             while self.workers and time.time() < limit:
577                 self.process_zombie()
578                 time.sleep(0.1)
579         else:
580             _logger.info("Stopping forcefully")
581         for pid in self.workers.keys():
582             self.worker_kill(pid, signal.SIGTERM)
583         self.socket.close()
584
585     def run(self):
586         self.start()
587         _logger.debug("Multiprocess starting")
588         while 1:
589             try:
590                 #_logger.debug("Multiprocess beat (%s)",time.time())
591                 self.process_signals()
592                 self.process_zombie()
593                 self.process_timeout()
594                 self.process_spawn()
595                 self.sleep()
596             except KeyboardInterrupt:
597                 _logger.debug("Multiprocess clean stop")
598                 self.stop()
599                 break
600             except Exception,e:
601                 _logger.exception(e)
602                 self.stop(False)
603                 sys.exit(-1)
604
605 class Worker(object):
606     """ Workers """
607     def __init__(self, multi):
608         self.multi = multi
609         self.watchdog_time = time.time()
610         self.watchdog_pipe = multi.pipe_new()
611         # Can be set to None if no watchdog is desired.
612         self.watchdog_timeout = multi.timeout
613         self.ppid = os.getpid()
614         self.pid = None
615         self.alive = True
616         # should we rename into lifetime ?
617         self.request_max = multi.limit_request
618         self.request_count = 0
619
620     def setproctitle(self, title=""):
621         setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
622
623     def close(self):
624         os.close(self.watchdog_pipe[0])
625         os.close(self.watchdog_pipe[1])
626
627     def signal_handler(self, sig, frame):
628         self.alive = False
629
630     def sleep(self):
631         try:
632             ret = select.select([self.multi.socket], [], [], self.multi.beat)
633         except select.error, e:
634             if e[0] not in [errno.EINTR]:
635                 raise
636
637     def process_limit(self):
638         # If our parent changed sucide
639         if self.ppid != os.getppid():
640             _logger.info("Worker (%s) Parent changed", self.pid)
641             self.alive = False
642         # check for lifetime
643         if self.request_count >= self.request_max:
644             _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
645             self.alive = False
646         # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
647         rss, vms = psutil.Process(os.getpid()).get_memory_info()
648         if vms > config['limit_memory_soft']:
649             _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
650             self.alive = False # Commit suicide after the request.
651
652         # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
653         soft, hard = resource.getrlimit(resource.RLIMIT_AS)
654         resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
655
656         # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
657         r = resource.getrusage(resource.RUSAGE_SELF)
658         cpu_time = r.ru_utime + r.ru_stime
659         def time_expired(n, stack):
660             _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
661             # We dont suicide in such case
662             raise Exception('CPU time limit exceeded.')
663         signal.signal(signal.SIGXCPU, time_expired)
664         soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
665         resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
666
667     def process_work(self):
668         pass
669
670     def start(self):
671         self.pid = os.getpid()
672         self.setproctitle()
673         _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
674         # Reseed the random number generator
675         random.seed()
676         # Prevent fd inherientence close_on_exec
677         flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
678         fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
679         # reset blocking status
680         self.multi.socket.setblocking(0)
681         signal.signal(signal.SIGINT, self.signal_handler)
682         signal.signal(signal.SIGTERM, signal.SIG_DFL)
683         signal.signal(signal.SIGCHLD, signal.SIG_DFL)
684
685     def stop(self):
686         pass
687
688     def run(self):
689         try:
690             self.start()
691             while self.alive:
692                 self.process_limit()
693                 self.multi.pipe_ping(self.watchdog_pipe)
694                 self.sleep()
695                 self.process_work()
696             _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
697             self.stop()
698         except Exception,e:
699             _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
700             # should we use 3 to abort everything ?
701             sys.exit(1)
702
703 class WorkerHTTP(Worker):
704     """ HTTP Request workers """
705     def process_request(self, client, addr):
706         client.setblocking(1)
707         client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
708         # Prevent fd inherientence close_on_exec
709         flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
710         fcntl.fcntl(client, fcntl.F_SETFD, flags)
711         # do request using BaseWSGIServerNoBind monkey patched with socket
712         self.server.socket = client
713         # tolerate broken pipe when the http client closes the socket before
714         # receiving the full reply
715         try:
716             self.server.process_request(client,addr)
717         except IOError, e:
718             if e.errno != errno.EPIPE:
719                 raise
720         self.request_count += 1
721
722     def process_work(self):
723         try:
724             client, addr = self.multi.socket.accept()
725             self.process_request(client, addr)
726         except socket.error, e:
727             if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
728                 raise
729
730     def start(self):
731         Worker.start(self)
732         self.server = BaseWSGIServerNoBind(self.multi.app)
733
734 class WorkerCron(Worker):
735     """ Cron workers """
736
737     def __init__(self, multi):
738         super(WorkerCron, self).__init__(multi)
739         # process_work() below process a single database per call.
740         # The variable db_index is keeping track of the next database to
741         # process.
742         self.db_index = 0
743
744     def sleep(self):
745         # Really sleep once all the databases have been processed.
746         if self.db_index == 0:
747             interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
748             time.sleep(interval)
749
750     def _db_list(self):
751         if config['db_name']:
752             db_names = config['db_name'].split(',')
753         else:
754             db_names = openerp.service.db.exp_list(True)
755         return db_names
756
757     def process_work(self):
758         rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
759         rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
760         _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
761         db_names = self._db_list()
762         if len(db_names):
763             self.db_index = (self.db_index + 1) % len(db_names)
764             db_name = db_names[self.db_index]
765             self.setproctitle(db_name)
766             if rpc_request_flag:
767                 start_time = time.time()
768                 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
769             
770             import openerp.addons.base as base
771             base.ir.ir_cron.ir_cron._acquire_job(db_name)
772             openerp.modules.registry.RegistryManager.delete(db_name)
773
774             # dont keep cursors in multi database mode
775             if len(db_names) > 1:
776                 openerp.sql_db.close_db(db_name)
777             if rpc_request_flag:
778                 end_time = time.time()
779                 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
780                 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
781                 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
782
783             self.request_count += 1
784             if self.request_count >= self.request_max and self.request_max < len(db_names):
785                 _logger.error("There are more dabatases to process than allowed "
786                     "by the `limit_request` configuration variable: %s more.",
787                     len(db_names) - self.request_max)
788         else:
789             self.db_index = 0
790
791     def start(self):
792         os.nice(10)     # mommy always told me to be nice with others...
793         Worker.start(self)
794         self.multi.socket.close()
795
796         # chorus effect: make cron workers do not all start at first database
797         mct = config['max_cron_threads']
798         p = float(self.pid % mct) / mct
799         self.db_index = int(len(self._db_list()) * p)
800
801 #----------------------------------------------------------
802 # start/stop public api
803 #----------------------------------------------------------
804
805 server = None
806
807 def load_server_wide_modules():
808     for m in openerp.conf.server_wide_modules:
809         try:
810             openerp.modules.module.load_openerp_module(m)
811         except Exception:
812             msg = ''
813             if m == 'web':
814                 msg = """
815 The `web` module is provided by the addons found in the `openerp-web` project.
816 Maybe you forgot to add those addons in your addons_path configuration."""
817             _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
818
819 def _reexec(updated_modules=None):
820     """reexecute openerp-server process with (nearly) the same arguments"""
821     if openerp.tools.osutil.is_running_as_nt_service():
822         subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
823     exe = os.path.basename(sys.executable)
824     args = stripped_sys_argv()
825     args +=  ["-u", ','.join(updated_modules)]
826     if not args or args[0] != exe:
827         args.insert(0, exe)
828     os.execv(sys.executable, args)
829
830 def start():
831     """ Start the openerp http server and cron processor.
832     """
833     global server
834     load_server_wide_modules()
835     if config['workers']:
836         openerp.multi_process = True
837         server = PreforkServer(openerp.service.wsgi_server.application)
838     elif openerp.evented:
839         server = GeventServer(openerp.service.wsgi_server.application)
840     else:
841         server = ThreadedServer(openerp.service.wsgi_server.application)
842
843     if config['auto_reload']:
844         autoreload = AutoReload(server)
845         autoreload.run()
846
847     server.run()
848
849     # like the legend of the phoenix, all ends with beginnings
850     if getattr(openerp, 'phoenix', False):
851         modules = []
852         if config['auto_reload']:
853             modules = autoreload.modules.keys()
854         _reexec(modules)
855     sys.exit(0)
856
857 def restart():
858     """ Restart the server
859     """
860     if os.name == 'nt':
861         # run in a thread to let the current thread return response to the caller.
862         threading.Thread(target=_reexec).start()
863     else:
864         os.kill(server.pid, signal.SIGHUP)
865
866 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: