1 #-----------------------------------------------------------
2 # Multicorn, multiprocessing inspired by gunicorn
3 # TODO rename class: Multicorn -> Arbiter ?
4 #-----------------------------------------------------------
20 import werkzeug.serving
22 from setproctitle import setproctitle
24 setproctitle = lambda x: None
27 import openerp.tools.config as config
28 from openerp.tools.misc import stripped_sys_argv
30 _logger = logging.getLogger(__name__)
32 class Multicorn(object):
33 """ Multiprocessing inspired by (g)unicorn.
34 Multicorn currently uses accept(2) as dispatching method between workers
35 but we plan to replace it by a more intelligent dispatcher to will parse
36 the first HTTP request line.
38 def __init__(self, app):
40 self.address = (config['xmlrpc_interface'] or '0.0.0.0', config['xmlrpc_port'])
41 self.population = config['workers']
42 self.timeout = config['limit_time_real']
43 self.limit_request = config['limit_request']
47 self.pid = os.getpid()
49 self.workers_http = {}
50 self.workers_cron = {}
54 self.long_polling_pid = None
60 flags = fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK
61 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
63 flags = fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
64 fcntl.fcntl(fd, fcntl.F_SETFD, flags)
67 def pipe_ping(self, pipe):
69 os.write(pipe[1], '.')
71 if e.errno not in [errno.EAGAIN, errno.EINTR]:
74 def signal_handler(self, sig, frame):
75 if len(self.queue) < 5 or sig == signal.SIGCHLD:
76 self.queue.append(sig)
77 self.pipe_ping(self.pipe)
79 _logger.warn("Dropping signal: %s", sig)
81 def worker_spawn(self, klass, workers_registry):
87 self.workers[pid] = worker
88 workers_registry[pid] = worker
94 def long_polling_spawn(self):
95 nargs = stripped_sys_argv('--pidfile')
97 cmd = os.path.join(os.path.dirname(cmd), "openerp-long-polling")
99 popen = subprocess.Popen(nargs)
100 self.long_polling_pid = popen.pid
102 def worker_pop(self, pid):
103 if pid in self.workers:
104 _logger.debug("Worker (%s) unregistered",pid)
106 self.workers_http.pop(pid,None)
107 self.workers_cron.pop(pid,None)
108 u = self.workers.pop(pid)
113 def worker_kill(self, pid, sig):
117 if e.errno == errno.ESRCH:
120 def process_signals(self):
121 while len(self.queue):
122 sig = self.queue.pop(0)
123 if sig in [signal.SIGINT,signal.SIGTERM]:
124 raise KeyboardInterrupt
126 def process_zombie(self):
130 wpid, status = os.waitpid(-1, os.WNOHANG)
133 if (status >> 8) == 3:
134 msg = "Critial worker error (%s)"
135 _logger.critical(msg, wpid)
136 raise Exception(msg % wpid)
137 self.worker_pop(wpid)
139 if e.errno == errno.ECHILD:
143 def process_timeout(self):
145 for (pid, worker) in self.workers.items():
146 if (worker.watchdog_timeout is not None) and \
147 (now - worker.watchdog_time >= worker.watchdog_timeout):
148 _logger.error("Worker (%s) timeout", pid)
149 self.worker_kill(pid, signal.SIGKILL)
151 def process_spawn(self):
152 while len(self.workers_http) < self.population:
153 self.worker_spawn(WorkerHTTP, self.workers_http)
154 while len(self.workers_cron) < config['max_cron_threads']:
155 self.worker_spawn(WorkerCron, self.workers_cron)
156 if not self.long_polling_pid:
157 self.long_polling_spawn()
161 # map of fd -> worker
162 fds = dict([(w.watchdog_pipe[0],w) for k,w in self.workers.items()])
163 fd_in = fds.keys() + [self.pipe[0]]
164 # check for ping or internal wakeups
165 ready = select.select(fd_in, [], [], self.beat)
166 # update worker watchdogs
169 fds[fd].watchdog_time = time.time()
172 while os.read(fd, 1):
175 if e.errno not in [errno.EAGAIN]:
177 except select.error, e:
178 if e[0] not in [errno.EINTR]:
182 # wakeup pipe, python doesnt throw EINTR when a syscall is interrupted
183 # by a signal simulating a pseudo SA_RESTART. We write to a pipe in the
184 # signal handler to overcome this behaviour
185 self.pipe = self.pipe_new()
187 signal.signal(signal.SIGINT, self.signal_handler)
188 signal.signal(signal.SIGTERM, self.signal_handler)
189 signal.signal(signal.SIGCHLD, self.signal_handler)
191 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
192 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
193 self.socket.setblocking(0)
194 self.socket.bind(self.address)
195 self.socket.listen(8*self.population)
197 def stop(self, graceful=True):
198 if self.long_polling_pid is not None:
199 self.worker_kill(self.long_polling_pid, signal.SIGTERM)
200 self.long_polling_pid = None
202 _logger.info("Stopping gracefully")
203 limit = time.time() + self.timeout
204 for pid in self.workers.keys():
205 self.worker_kill(pid, signal.SIGTERM)
206 while self.workers and time.time() < limit:
207 self.process_zombie()
210 _logger.info("Stopping forcefully")
211 for pid in self.workers.keys():
212 self.worker_kill(pid, signal.SIGTERM)
214 openerp.cli.server.quit_signals_received = 1
218 _logger.debug("Multiprocess starting")
221 #_logger.debug("Multiprocess beat (%s)",time.time())
222 self.process_signals()
223 self.process_zombie()
224 self.process_timeout()
227 except KeyboardInterrupt:
228 _logger.debug("Multiprocess clean stop")
236 class Worker(object):
238 def __init__(self, multi):
240 self.watchdog_time = time.time()
241 self.watchdog_pipe = multi.pipe_new()
242 # Can be set to None if no watchdog is desired.
243 self.watchdog_timeout = multi.timeout
244 self.ppid = os.getpid()
247 # should we rename into lifetime ?
248 self.request_max = multi.limit_request
249 self.request_count = 0
251 def setproctitle(self, title=""):
252 setproctitle('openerp: %s %s %s' % (self.__class__.__name__, self.pid, title))
255 os.close(self.watchdog_pipe[0])
256 os.close(self.watchdog_pipe[1])
258 def signal_handler(self, sig, frame):
263 ret = select.select([self.multi.socket], [], [], self.multi.beat)
264 except select.error, e:
265 if e[0] not in [errno.EINTR]:
268 def process_limit(self):
269 # If our parent changed sucide
270 if self.ppid != os.getppid():
271 _logger.info("Worker (%s) Parent changed", self.pid)
274 if self.request_count >= self.request_max:
275 _logger.info("Worker (%d) max request (%s) reached.", self.pid, self.request_count)
277 # Reset the worker if it consumes too much memory (e.g. caused by a memory leak).
278 rss, vms = psutil.Process(os.getpid()).get_memory_info()
279 if vms > config['limit_memory_soft']:
280 _logger.info('Worker (%d) virtual memory limit (%s) reached.', self.pid, vms)
281 self.alive = False # Commit suicide after the request.
283 # VMS and RLIMIT_AS are the same thing: virtual memory, a.k.a. address space
284 soft, hard = resource.getrlimit(resource.RLIMIT_AS)
285 resource.setrlimit(resource.RLIMIT_AS, (config['limit_memory_hard'], hard))
287 # SIGXCPU (exceeded CPU time) signal handler will raise an exception.
288 r = resource.getrusage(resource.RUSAGE_SELF)
289 cpu_time = r.ru_utime + r.ru_stime
290 def time_expired(n, stack):
291 _logger.info('Worker (%d) CPU time limit (%s) reached.', config['limit_time_cpu'])
292 # We dont suicide in such case
293 raise Exception('CPU time limit exceeded.')
294 signal.signal(signal.SIGXCPU, time_expired)
295 soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
296 resource.setrlimit(resource.RLIMIT_CPU, (cpu_time + config['limit_time_cpu'], hard))
298 def process_work(self):
302 self.pid = os.getpid()
304 _logger.info("Worker %s (%s) alive", self.__class__.__name__, self.pid)
305 # Reseed the random number generator
307 # Prevent fd inherientence close_on_exec
308 flags = fcntl.fcntl(self.multi.socket, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
309 fcntl.fcntl(self.multi.socket, fcntl.F_SETFD, flags)
310 # reset blocking status
311 self.multi.socket.setblocking(0)
312 signal.signal(signal.SIGINT, self.signal_handler)
313 signal.signal(signal.SIGTERM, signal.SIG_DFL)
314 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
324 self.multi.pipe_ping(self.watchdog_pipe)
327 _logger.info("Worker (%s) exiting. request_count: %s.", self.pid, self.request_count)
330 _logger.exception("Worker (%s) Exception occured, exiting..." % self.pid)
331 # should we use 3 to abort everything ?
334 class WorkerHTTP(Worker):
335 """ HTTP Request workers """
336 def process_request(self, client, addr):
337 client.setblocking(1)
338 client.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
339 # Prevent fd inherientence close_on_exec
340 flags = fcntl.fcntl(client, fcntl.F_GETFD) | fcntl.FD_CLOEXEC
341 fcntl.fcntl(client, fcntl.F_SETFD, flags)
342 # do request using WorkerBaseWSGIServer monkey patched with socket
343 self.server.socket = client
344 # tolerate broken pipe when the http client closes the socket before
345 # receiving the full reply
347 self.server.process_request(client,addr)
349 if e.errno != errno.EPIPE:
351 self.request_count += 1
353 def process_work(self):
355 client, addr = self.multi.socket.accept()
356 self.process_request(client, addr)
357 except socket.error, e:
358 if e[0] not in (errno.EAGAIN, errno.ECONNABORTED):
363 self.server = WorkerBaseWSGIServer(self.multi.app)
365 class WorkerBaseWSGIServer(werkzeug.serving.BaseWSGIServer):
366 """ werkzeug WSGI Server patched to allow using an external listen socket
368 def __init__(self, app):
369 werkzeug.serving.BaseWSGIServer.__init__(self, "1", "1", app)
370 def server_bind(self):
371 # we dont bind beause we use the listen socket of Multicorn#socket
372 # instead we close the socket
375 def server_activate(self):
376 # dont listen as we use Multicorn#socket
379 class WorkerCron(Worker):
382 def __init__(self, multi):
383 super(WorkerCron, self).__init__(multi)
384 # process_work() below process a single database per call.
385 # The variable db_index is keeping track of the next database to
390 # Really sleep once all the databases have been processed.
391 if self.db_index == 0:
392 interval = 60 + self.pid % 10 # chorus effect
395 def process_work(self):
396 rpc_request = logging.getLogger('openerp.netsvc.rpc.request')
397 rpc_request_flag = rpc_request.isEnabledFor(logging.DEBUG)
398 _logger.debug("WorkerCron (%s) polling for jobs", self.pid)
399 if config['db_name']:
400 db_names = config['db_name'].split(',')
402 db_names = openerp.service.db.exp_list(True)
404 self.db_index = (self.db_index + 1) % len(db_names)
405 db_name = db_names[self.db_index]
406 self.setproctitle(db_name)
408 start_time = time.time()
409 start_rss, start_vms = psutil.Process(os.getpid()).get_memory_info()
411 # acquired = openerp.addons.base.ir.ir_cron.ir_cron._acquire_job(db_name)
412 # TODO why isnt openerp.addons.base defined ?
413 import openerp.addons.base as base
414 acquired = base.ir.ir_cron.ir_cron._acquire_job(db_name)
416 openerp.modules.registry.RegistryManager.delete(db_name)
418 # dont keep cursors in multi database mode
419 if len(db_names) > 1:
420 openerp.sql_db.close_db(db_name)
422 end_time = time.time()
423 end_rss, end_vms = psutil.Process(os.getpid()).get_memory_info()
424 logline = '%s time:%.3fs mem: %sk -> %sk (diff: %sk)' % (db_name, end_time - start_time, start_vms / 1024, end_vms / 1024, (end_vms - start_vms)/1024)
425 _logger.debug("WorkerCron (%s) %s", self.pid, logline)
427 self.request_count += 1
428 if self.request_count >= self.request_max and self.request_max < len(db_names):
429 _logger.error("There are more dabatases to process than allowed "
430 "by the `limit_request` configuration variable: %s more.",
431 len(db_names) - self.request_max)
437 self.multi.socket.close()
438 openerp.service.start_internal()
440 # vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: