1 #-----------------------------------------------------------
2 # Threaded, Gevent and Prefork Servers
3 #-----------------------------------------------------------
25 import werkzeug.serving
27 from setproctitle import setproctitle
29 setproctitle = lambda x: None
32 import openerp.tools.config as config
33 from openerp.release import nt_service_name
34 from openerp.tools.misc import stripped_sys_argv
36 _logger = logging.getLogger(__name__)
38 SLEEP_INTERVAL = 60 # 1 min
40 #----------------------------------------------------------
42 #----------------------------------------------------------
44 class CommonServer(object):
45 def __init__(self, app):
46 # TODO Change the xmlrpc_* options to http_*
49 self.interface = config['xmlrpc_interface'] or '0.0.0.0'
50 self.port = config['xmlrpc_port']
52 self.pid = os.getpid()
55 """ Signal handler: dump a stack trace for each existing thread."""
56 # code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696
57 # modified for python 2.5 compatibility
58 threads_info = dict([(th.ident, {'name': th.name,
59 'uid': getattr(th,'uid','n/a')})
60 for th in threading.enumerate()])
62 for threadId, stack in sys._current_frames().items():
63 thread_info = threads_info.get(threadId)
64 code.append("\n# Thread: %s (id:%s) (uid:%s)" % \
65 (thread_info and thread_info['name'] or 'n/a',
67 thread_info and thread_info['uid'] or 'n/a'))
68 for filename, lineno, name, line in traceback.extract_stack(stack):
69 code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
71 code.append(" %s" % (line.strip()))
72 _logger.info("\n".join(code))
74 def close_socket(self, sock):
75 """ Closes a socket instance cleanly
76 :param sock: the network socket to close
77 :type sock: socket.socket
80 sock.shutdown(socket.SHUT_RDWR)
81 except socket.error, e:
82 # On OSX, socket shutdowns both sides if any side closes it
83 # causing an error 57 'Socket is not connected' on shutdown
84 # of the other side (or something), see
85 # http://bugs.python.org/issue4397
86 # note: stdlib fixed test, not behavior
87 if e.errno != errno.ENOTCONN or platform.system() not in ['Darwin', 'Windows']:
91 #----------------------------------------------------------
93 #----------------------------------------------------------
95 class ThreadedServer(CommonServer):
96 def __init__(self, app):
97 super(ThreadedServer, self).__init__(app)
98 self.main_thread_id = threading.currentThread().ident
99 # Variable keeping track of the number of calls to the signal handler defined
100 # below. This variable is monitored by ``quit_on_signals()``.
101 self.quit_signals_received = 0
107 def signal_handler(self, sig, frame):
108 if sig in [signal.SIGINT,signal.SIGTERM]:
109 # shutdown on kill -INT or -TERM
110 self.quit_signals_received += 1
111 if self.quit_signals_received > 1:
112 # logging.shutdown was already called at this point.
113 sys.stderr.write("Forced shutdown.\n")
115 elif sig == signal.SIGHUP:
116 # restart on kill -HUP
117 openerp.phoenix = True
118 self.quit_signals_received += 1
119 elif sig == signal.SIGQUIT:
120 # dump stacks on kill -3
123 def cron_thread(self, number):
125 time.sleep(SLEEP_INTERVAL + number) # Steve Reich timing style
126 registries = openerp.modules.registry.RegistryManager.registries
127 _logger.debug('cron%d polling for jobs', number)
128 for db_name, registry in registries.items():
129 while True and registry.ready:
130 acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
134 def cron_spawn(self):
135 """ Start the above runner function in a daemon thread.
137 The thread is a typical daemon thread: it will never quit and must be
138 terminated when the main process exits - with no consequence (the processing
139 threads it spawns are not marked daemon).
142 # Force call to strptime just before starting the cron thread
143 # to prevent time.strptime AttributeError within the thread.
144 # See: http://bugs.python.org/issue7980
145 datetime.datetime.strptime('2012-01-01', '%Y-%m-%d')
146 for i in range(openerp.tools.config['max_cron_threads']):
149 t = threading.Thread(target=target, name="openerp.service.cron.cron%d" % i)
152 _logger.debug("cron%d started!" % i)
154 def http_thread(self):
155 self.httpd = werkzeug.serving.make_server(self.interface, self.port, self.app, threaded=True)
156 self.httpd.serve_forever()
158 def http_spawn(self):
159 threading.Thread(target=self.http_thread).start()
160 _logger.info('HTTP service (werkzeug) running on %s:%s', self.interface, self.port)
163 _logger.debug("Setting signal handlers")
164 if os.name == 'posix':
165 signal.signal(signal.SIGINT, self.signal_handler)
166 signal.signal(signal.SIGTERM, self.signal_handler)
167 signal.signal(signal.SIGCHLD, self.signal_handler)
168 signal.signal(signal.SIGHUP, self.signal_handler)
169 signal.signal(signal.SIGQUIT, self.signal_handler)
170 elif os.name == 'nt':
172 win32api.SetConsoleCtrlHandler(lambda sig: signal_handler(sig, None), 1)
177 """ Shutdown the WSGI server. Wait for non deamon threads.
179 _logger.info("Initiating shutdown")
180 _logger.info("Hit CTRL-C again or send a second signal to force the shutdown.")
182 self.httpd.shutdown()
183 self.close_socket(self.httpd.socket)
185 # Manually join() all threads before calling sys.exit() to allow a second signal
186 # to trigger _force_quit() in case some non-daemon threads won't exit cleanly.
187 # threading.Thread.join() should not mask signals (at least in python 2.5).
188 me = threading.currentThread()
189 _logger.debug('current thread: %r', me)
190 for thread in threading.enumerate():
191 _logger.debug('process %r (%r)', thread, thread.isDaemon())
192 if thread != me and not thread.isDaemon() and thread.ident != main_thread_id:
193 while thread.isAlive():
194 _logger.debug('join and sleep')
195 # Need a busyloop here as thread.join() masks signals
196 # and would prevent the forced shutdown.
201 openerp.modules.registry.RegistryManager.delete_all()
205 """ Start the http server and the cron thread then wait for a signal.
207 The first SIGINT or SIGTERM signal will initiate a graceful shutdown while
208 a second one if any will force an immediate exit.
212 # Wait for a first signal to be handled. (time.sleep will be interrupted
213 # by the signal handler.) The try/except is for the win32 case.
215 while self.quit_signals_received == 0:
217 except KeyboardInterrupt:
222 #----------------------------------------------------------
224 #----------------------------------------------------------
226 class GeventServer(CommonServer):
227 def __init__(self, app):
228 super(GeventServer, self).__init__(app)
229 self.port = config['longpolling_port']
232 def watch_parent(self, beat=4):
236 if ppid != os.getppid():
238 _logger.info("LongPolling (%s) Parent changed", pid)
240 os.kill(pid, signal.SIGTERM)
246 from gevent.wsgi import WSGIServer
247 gevent.spawn(self.watch_parent)
248 self.httpd = WSGIServer((self.interface, self.port), self.app)
249 _logger.info('Evented Service (longpolling) running on %s:%s', self.interface, self.port)
250 self.httpd.serve_forever()
261 #----------------------------------------------------------
263 #----------------------------------------------------------
265 class Multicorn(CommonServer):
266 """ Multiprocessing inspired by (g)unicorn.
267 Multicorn currently uses accept(2) as dispatching method between workers
268 but we plan to replace it by a more intelligent dispatcher to will parse
269 the first HTTP request line.
271 def __init__(self, app):
273 self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
274 self.population = config['workers']
275 self.timeout = config['limit_time_real']
276 self.limit_request = config['limit_request']
280 self.pid = os.getpid()
282 self.workers_http = {}
283 self.workers_cron = {}
287 self.long_polling_pid = None
293 flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
294 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
296 flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
297 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
300 def pipe_ping(self, pipe):
302 os.write(pipe[1], '.')
304 if e.errno not in [errno.EAGAIN, errno.EINTR]:
307 def signal_handler(self, sig, frame):
308 if len(self.queue) < 5 or sig == signal.SIGCHLD:
309 self.queue.append(sig)
310 self.pipe_ping(self.pipe)
312 _logger.warn("Dropping signal: %s", sig)
314 def worker_spawn(self, klass, workers_registry):
320 self.workers[pid] = worker
321 workers_registry[pid] = worker
327 def long_polling_spawn(self):
328 nargs = stripped_sys_argv('--pidfile','--workers')
330 cmd = os.path.join(os.path.dirname(cmd), "openerp-gevent")
332 popen = subprocess.Popen(nargs)
333 self.long_polling_pid = popen.pid
335 def worker_pop(self, pid):
336 if pid in self.workers:
337 _logger.debug("Worker (%s) unregistered",pid)
339 self.workers_http.pop(pid,None)
340 self.workers_cron.pop(pid,None)
341 u = self.workers.pop(pid)
346 def worker_kill(self, pid, sig):
350 if e.errno == errno.ESRCH:
353 def process_signals(self):
354 while len(self.queue):
355 sig = self.queue.pop(0)
356 if sig in [signal.SIGINT,signal.SIGTERM]:
357 raise KeyboardInterrupt
358 elif sig == signal.SIGHUP:
359 # restart on kill -HUP
360 openerp.phoenix = True
361 raise KeyboardInterrupt
362 elif sig == signal.SIGQUIT:
363 # dump stacks on kill -3
366 def process_zombie(self):
370 wpid, status = os.waitpid(-1, os.WNOHANG)
373 if (status >> 8) == 3:
374 msg = "Critial worker error (%s)"
375 _logger.critical(msg, wpid)
376 raise Exception(msg % wpid)
377 self.worker_pop(wpid)
379 if e.errno == errno.ECHILD:
383 def process_timeout(self):
385 for (pid, worker) in self.workers.items():
386 if (worker.watchdog_timeout is not None) and \
387 (now - worker.watchdog_time >= worker.watchdog_timeout):
388 _logger.error("Worker (%s) timeout", pid)
389 self.worker_kill(pid, signal.SIGKILL)
391 def process_spawn(self):
392 while len(self.workers_http) < self.population:
393 self.worker_spawn(WorkerHTTP, self.workers_http)
394 while len(self.workers_cron) < config['max_cron_threads']:
395 self.worker_spawn(WorkerCron, self.workers_cron)
396 if not self.long_polling_pid:
397 self.long_polling_spawn()
401 # map of fd -> worker
402 fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
403 fd_in = fds.keys() + [self.pipe[0]]
404 # check for ping or internal wakeups
405 ready = select.select(fd_in, [], [], self.beat)
406 # update worker watchdogs
409 fds[fd].watchdog_time = time.time()
412 while os.read(fd, 1):
415 if e.errno not in [errno.EAGAIN]:
417 except select.error, e:
418 if e[0] not in [errno.EINTR]:
422 # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
423 # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
424 # signal handler to overcome this behaviour
425 self.pipe = self.pipe_new()
427 signal.signal(signal.SIGINT, self.signal_handler)
428 signal.signal(signal.SIGTERM, self.signal_handler)
429 signal.signal(signal.SIGCHLD, self.signal_handler)
431 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
432 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
433 self.socket.setblocking(0)
434 self.socket.bind(self.address)
435 self.socket.listen(8*self.population)
437 def stop(self, graceful=True):
438 if self.long_polling_pid is not None:
439 self.worker_kill(self.long_polling_pid, signal.SIGKILL) # FIXME make longpolling process handle SIGTERM correctly
440 self.long_polling_pid = None
442 _logger.info("Stopping gracefully")
443 limit = time.time() + self.timeout
444 for pid in self.workers.keys():
445 self.worker_kill(pid, signal.SIGTERM)
446 while self.workers and time.time() < limit:
447 self.process_zombie()
450 _logger.info("Stopping forcefully")
451 for pid in self.workers.keys():
452 self.worker_kill(pid, signal.SIGTERM)
457 _logger.debug("Multiprocess starting")
460 #_logger.debug("Multiprocess beat (%s)",time.time())
461 self.process_signals()
462 self.process_zombie()
463 self.process_timeout()
466 except KeyboardInterrupt:
467 _logger.debug("Multiprocess clean stop")
475 class Worker(object):
477 def __init__(self, multi):
479 self.watchdog_time = time.time()
480 self.watchdog_pipe = multi.pipe_new()
481 # Can be set to None if no watchdog is desired.
482 self.watchdog_timeout = multi.timeout
483 self.ppid = os.getpid()
486 # should we rename into lifetime ?
487 self.request_max = multi.limit_request
488 self.request_count = 0
490 def setproctitle(self, title=""):
491 setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
494 os.close(self.watchdog_pipe[0])
495 os.close(self.watchdog_pipe[1])
497 def signal_handler(self, sig, frame):
502 ret = select.select([self.multi.socket], [], [], self.multi.beat)
503 except select.error, e:
504 if e[0] not in [errno.EINTR]:
507 def process_limit(self):
508 # If our parent changed sucide
509 if self.ppid != os.getppid():
510 _logger.info("Worker (%s) Parent changed", self.pid)
513 if self.request_count >= self.request_max:
514 _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
516 # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
517 rss, vms = psutil.Process(os.getpid()).get_memory_info()
518 if vms > config['limit_memory_soft']:
519 _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
520 self.alive = False # Commit suicide after the request.
522 # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
523 soft, hard = resource.getrlimit(resource.RLIMIT_AS)
524 resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
526 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
527 r = resource.getrusage(resource.RUSAGE_SELF)
528 cpu_time = r.ru_utime + r.ru_stime
529 def time_expired(n, stack):
530 _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
531 # We dont suicide in such case
532 raise Exception('CPU time limit exceeded.')
533 signal.signal(signal.SIGXCPU, time_expired)
534 soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
535 resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
537 def process_work(self):
541 self.pid = os.getpid()
543 _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
544 # Reseed the random number generator
546 # Prevent fd inherientence close_on_exec
547 flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
548 fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
549 # reset blocking status
550 self.multi.socket.setblocking(0)
551 signal.signal(signal.SIGINT, self.signal_handler)
552 signal.signal(signal.SIGTERM, signal.SIG_DFL)
553 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
563 self.multi.pipe_ping(self.watchdog_pipe)
566 _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
569 _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
570 # should we use 3 to abort everything ?
573 class WorkerHTTP(Worker):
574 """ HTTP Request workers """
575 def process_request(self, client, addr):
576 client.setblocking(1)
577 client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
578 # Prevent fd inherientence close_on_exec
579 flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
580 fcntl.fcntl(client, fcntl.F_SETFD, flags)
581 # do request using WorkerBaseWSGIServer monkey patched with socket
582 self.server.socket = client
583 # tolerate broken pipe when the http client closes the socket before
584 # receiving the full reply
586 self.server.process_request(client,addr)
588 if e.errno != errno.EPIPE:
590 self.request_count += 1
592 def process_work(self):
594 client, addr = self.multi.socket.accept()
595 self.process_request(client, addr)
596 except socket.error, e:
597 if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
602 self.server = WorkerBaseWSGIServer(self.multi.app)
604 class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
605 """ werkzeug WSGI Server patched to allow using an external listen socket
607 def __init__(self, app):
608 werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
609 def server_bind(self):
610 # we dont bind beause we use the listen socket of Multicorn#socket
611 # instead we close the socket
614 def server_activate(self):
615 # dont listen as we use Multicorn#socket
618 class WorkerCron(Worker):
621 def __init__(self, multi):
622 super(WorkerCron, self).__init__(multi)
623 # process_work() below process a single database per call.
624 # The variable db_index is keeping track of the next database to
629 # Really sleep once all the databases have been processed.
630 if self.db_index == 0:
631 interval = SLEEP_INTERVAL + self.pid % 10 # chorus effect
635 if config['db_name']:
636 db_names = config['db_name'].split(',')
638 db_names = openerp.service.db.exp_list(True)
641 def process_work(self):
642 rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
643 rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
644 _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
645 db_names = self._db_list()
647 self.db_index = (self.db_index + 1) % len(db_names)
648 db_name = db_names[self.db_index]
649 self.setproctitle(db_name)
651 start_time = time.time()
652 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
654 import openerp.addons.base as base
655 base.ir.ir_cron.ir_cron._acquire_job(db_name)
656 openerp.modules.registry.RegistryManager.delete(db_name)
658 # dont keep cursors in multi database mode
659 if len(db_names) > 1:
660 openerp.sql_db.close_db(db_name)
662 end_time = time.time()
663 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
664 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
665 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
667 self.request_count += 1
668 if self.request_count >= self.request_max and self.request_max < len(db_names):
669 _logger.error("There are more dabatases to process than allowed "
670 "by the `limit_request` configuration variable: %s more.",
671 len(db_names) - self.request_max)
676 os.nice(10) # mommy always told me to be nice with others...
678 self.multi.socket.close()
680 # chorus effect: make cron workers do not all start at first database
681 mct = config['max_cron_threads']
682 p = float(self.pid % mct) / mct
683 self.db_index = int(len(self._db_list()) * p)
685 #----------------------------------------------------------
686 # start/stop public api
687 #----------------------------------------------------------
691 def load_server_wide_modules():
692 for m in openerp.conf.server_wide_modules:
694 openerp.modules.module.load_openerp_module(m)
699 The `web` module is provided by the addons found in the `openerp-web` project.
700 Maybe you forgot to add those addons in your addons_path configuration."""
701 _logger.exception('Failed to load server-wide module `%s`.%s', m, msg)
704 """reexecute openerp-server process with (nearly) the same arguments"""
705 if openerp.tools.osutil.is_running_as_nt_service():
706 subprocess.call('net stop {0} && net start {0}'.format(nt_service_name), shell=True)
707 exe = os.path.basename(sys.executable)
708 args = stripped_sys_argv()
709 if not args or args[0] != exe:
711 os.execv(sys.executable, args)
714 """ Start the openerp http server and cron processor.
716 load_server_wide_modules()
717 if config['workers']:
718 openerp.multi_process = True
719 server = Multicorn(openerp.service.wsgi_server.application)
720 elif openerp.evented:
721 server = GeventServer(openerp.service.wsgi_server.application)
723 server = ThreadedServer(openerp.service.wsgi_server.application)
726 # like the legend of the phoenix, all ends with beginnings
727 if getattr(openerp, 'phoenix', False):
731 def restart_server():
732 """ Restart the server
735 # run in a thread to let the current thread return response to the caller.
736 threading.Thread(target=_reexec).start()
738 os.kill(server.pid, signal.SIGHUP)
741 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: