[IMP]document : upgrade pyftplib with new version 0.5.1
authorHarry (Open ERP) <hmo@tinyerp.com>
Mon, 12 Oct 2009 09:56:24 +0000 (15:26 +0530)
committerHarry (Open ERP) <hmo@tinyerp.com>
Mon, 12 Oct 2009 09:56:24 +0000 (15:26 +0530)
bzr revid: hmo@tinyerp.com-20091012095624-pt9mjfd3uahkhyil

addons/document/__terp__.py
addons/document/document.py
addons/document/ftpserver/authorizer.py
addons/document/ftpserver/ftpserver.py

index 27c917f..c5fa5d9 100644 (file)
@@ -23,7 +23,7 @@
 
 {
     'name': 'Integrated Document Management System',
-    'version': '1.0',
+    'version': '1.1',
     'category': 'Generic Modules/Others',
     'description': """This is a complete document management system:
     * FTP Interface
index 460c52a..1d7ca84 100644 (file)
@@ -655,7 +655,6 @@ class document_file(osv.osv):
             vals['res_id']=context.get('default_res_id',False)
         if not vals.get('res_model', False) and context.get('default_res_model',False):
             vals['res_model']=context.get('default_res_model',False)
-
         if vals.get('res_id', False) and vals.get('res_model',False):
             obj_model=self.pool.get(vals['res_model'])
             result = obj_model.read(cr, uid, [vals['res_id']], context=context)
@@ -694,7 +693,7 @@ class document_file(osv.osv):
         try:
             res = content_index(base64.decodestring(datas), vals['datas_fname'], vals.get('content_type', None))
             super(document_file,self).write(cr, uid, [result], {
-                'index_content' : res,
+                'index_content': res,
             })
             cr.commit()
         except:
index 5a64ca8..46e70c1 100644 (file)
@@ -1,5 +1,6 @@
 # -*- encoding: utf-8 -*-
-
+import pooler
+from service import security
 class authorizer:
     read_perms = "elr"
     write_perms = "adfmw"
@@ -47,7 +48,7 @@ class authorizer:
         paths = path.split('/')
         if not len(paths)>2:
             return True
-        db_name = paths[1]
+        db_name = paths[1]        
         db,pool = pooler.get_db_and_pool(db_name)
         res = security.login(db_name, username, self.password)
         return bool(res)
index ae0fc38..8f704a6 100644 (file)
@@ -1,12 +1,10 @@
 #!/usr/bin/env python
-# -*- encoding: utf-8 -*-
 # ftpserver.py
 #
 #  pyftpdlib is released under the MIT license, reproduced below:
 #  ======================================================================
 #  Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>
 #  Hacked by Fabien Pinckaers (C) 2008 <fp@tinyerp.com>
-#
 #                         All Rights Reserved
 #
 #  Permission to use, copy, modify, and distribute this software and
@@ -58,6 +56,9 @@ the backend functionality for the FTPd:
     providing a high level, cross-platform interface compatible
     with both Windows and UNIX style filesystems.
 
+    [CallLater] - calls a function at a later time whithin the polling
+    loop asynchronously.
+
     [AuthorizerError] - base class for authorizers exceptions.
 
 
@@ -116,11 +117,11 @@ import traceback
 import errno
 import time
 import glob
-import fnmatch
 import tempfile
 import warnings
 import random
 import stat
+import heapq
 from tarfile import filemode
 
 try:
@@ -130,67 +131,80 @@ except ImportError:
     pwd = grp = None
 
 
-LOG_ACTIVE = True
-
 __all__ = ['proto_cmds', 'Error', 'log', 'logline', 'logerror', 'DummyAuthorizer',
-           'FTPHandler', 'FTPServer', 'PassiveDTP', 'ActiveDTP', 'DTPHandler',
-           'FileProducer', 'IteratorProducer', 'BufferedIteratorProducer',
-           'AbstractedFS',]
+           'AuthorizerError', 'FTPHandler', 'FTPServer', 'PassiveDTP',
+           'ActiveDTP', 'DTPHandler', 'FileProducer', 'BufferedIteratorProducer',
+           'AbstractedFS', 'CallLater']
 
 
 __pname__   = 'Python FTP server library (pyftpdlib)'
-__ver__     = '0.4.0'
-__date__    = '2008-05-16'
+__ver__     = '0.5.1'
+__date__    = '2009-01-21'
 __author__  = "Giampaolo Rodola' <g.rodola@gmail.com>"
 __web__     = 'http://code.google.com/p/pyftpdlib/'
 
 
 proto_cmds = {
-    'ABOR': 'Syntax: ABOR (abort transfer).',
-    'ALLO': 'Syntax: ALLO <SP> bytes (obsolete; allocate storage).',
-    'APPE': 'Syntax: APPE <SP> file-name (append data to an existent file).',
-    'CDUP': 'Syntax: CDUP (go to parent directory).',
-    'CWD' : 'Syntax: CWD <SP> dir-name (change current working directory).',
-    'DELE': 'Syntax: DELE <SP> file-name (delete file).',
-    'EPRT': 'Syntax: EPRT <SP> |proto|ip|port| (set server in extended active mode).',
-    'EPSV': 'Syntax: EPSV [<SP> proto/"ALL"] (set server in extended passive mode).',
-    'FEAT': 'Syntax: FEAT (list all new features supported).',
-    'HELP': 'Syntax: HELP [<SP> cmd] (show help).',
-    'LIST': 'Syntax: LIST [<SP> path-name] (list files).',
-    'MDTM': 'Syntax: MDTM <SP> file-name (get last modification time).',
-    'MLSD': 'Syntax: MLSD [<SP> dir-name] (list files in a machine-processable form)',
-    'MLST': 'Syntax: MLST [<SP> path-name] (show a path in a machine-processable form)',
-    'MODE': 'Syntax: MODE <SP> mode (obsolete; set data transfer mode).',
-    'MKD' : 'Syntax: MDK <SP> dir-name (create directory).',
-    'NLST': 'Syntax: NLST [<SP> path-name] (list files in a compact form).',
-    'NOOP': 'Syntax: NOOP (just do nothing).',
-    'OPTS': 'Syntax: OPTS <SP> ftp-command [<SP> option] (specify options for FTP commands)',
-    'PASS': 'Syntax: PASS <SP> user-name (set user password).',
-    'PASV': 'Syntax: PASV (set server in passive mode).',
-    'PORT': 'Syntax: PORT <sp> h1,h2,h3,h4,p1,p2 (set server in active mode).',
-    'PWD' : 'Syntax: PWD (get current working directory).',
-    'QUIT': 'Syntax: QUIT (quit current session).',
-    'REIN': 'Syntax: REIN (reinitialize / flush account).',
-    'REST': 'Syntax: REST <SP> marker (restart file position).',
-    'RETR': 'Syntax: RETR <SP> file-name (retrieve a file).',
-    'RMD' : 'Syntax: RMD <SP> dir-name (remove directory).',
-    'RNFR': 'Syntax: RNFR <SP> file-name (file renaming (source name)).',
-    'RNTO': 'Syntax: RNTO <SP> file-name (file renaming (destination name)).',
-    'SIZE': 'Syntax: HELP <SP> file-name (get file size).',
-    'STAT': 'Syntax: STAT [<SP> path name] (status information [list files]).',
-    'STOR': 'Syntax: STOR <SP> file-name (store a file).',
-    'STOU': 'Syntax: STOU [<SP> file-name] (store a file with a unique name).',
-    'STRU': 'Syntax: STRU <SP> type (obsolete; set file structure).',
-    'SYST': 'Syntax: SYST (get operating system type).',
-    'TYPE': 'Syntax: TYPE <SP> [A | I] (set transfer type).',
-    'USER': 'Syntax: USER <SP> user-name (set username).',
-    'XCUP': 'Syntax: XCUP (obsolete; go to parent directory).',
-    'XCWD': 'Syntax: XCWD <SP> dir-name (obsolete; change current directory).',
-    'XMKD': 'Syntax: XMDK <SP> dir-name (obsolete; create directory).',
-    'XPWD': 'Syntax: XPWD (obsolete; get current dir).',
-    'XRMD': 'Syntax: XRMD <SP> dir-name (obsolete; remove directory).',
+    # cmd : (perm, auth,  arg,   path,  help)
+    'ABOR': (None, True,  False, False, 'Syntax: ABOR (abort transfer).'),
+    'ALLO': (None, True,  True,  False, 'Syntax: ALLO <SP> bytes (noop; allocate storage).'),
+    'APPE': ('a',  True,  True,  True,  'Syntax: APPE <SP> file-name (append data to an existent file).'),
+    'CDUP': ('e',  True,  False, True,  'Syntax: CDUP (go to parent directory).'),
+    'CWD' : ('e',  True,  None,  True,  'Syntax: CWD [<SP> dir-name] (change current working directory).'),
+    'DELE': ('d',  True,  True,  True,  'Syntax: DELE <SP> file-name (delete file).'),
+    'EPRT': (None, True,  True,  False, 'Syntax: EPRT <SP> |proto|ip|port| (set server in extended active mode).'),
+    'EPSV': (None, True,  None,  False, 'Syntax: EPSV [<SP> proto/"ALL"] (set server in extended passive mode).'),
+    'FEAT': (None, False, False, False, 'Syntax: FEAT (list all new features supported).'),
+    'HELP': (None, False, None,  False, 'Syntax: HELP [<SP> cmd] (show help).'),
+    'LIST': ('l',  True,  None,  True,  'Syntax: LIST [<SP> path-name] (list files).'),
+    'MDTM': (None, True,  True,  True,  'Syntax: MDTM [<SP> file-name] (get last modification time).'),
+    'MLSD': ('l',  True,  None,  True,  'Syntax: MLSD [<SP> dir-name] (list files in a machine-processable form)'),
+    'MLST': (None, True,  None,  True,  'Syntax: MLST [<SP> path-name] (show a path in a machine-processable form)'),
+    'MODE': (None, True,  True,  False, 'Syntax: MODE <SP> mode (noop; set data transfer mode).'),
+    'MKD' : ('m',  True,  True,  True,  'Syntax: MDK <SP> dir-name (create directory).'),
+    'NLST': ('l',  True,  None,  True,  'Syntax: NLST [<SP> path-name] (list files in a compact form).'),
+    'NOOP': (None, False, False, False, 'Syntax: NOOP (just do nothing).'),
+    'OPTS': (None, True,  True,  False, 'Syntax: OPTS <SP> ftp-command [<SP> option] (specify options for FTP commands)'),
+    'PASS': (None, False, True,  False, 'Syntax: PASS <SP> user-name (set user password).'),
+    'PASV': (None, True,  False, False, 'Syntax: PASV (set server in passive mode).'),
+    'PORT': (None, True,  True,  False, 'Syntax: PORT <sp> h1,h2,h3,h4,p1,p2 (set server in active mode).'),
+    'PWD' : (None, True,  False, False, 'Syntax: PWD (get current working directory).'),
+    'QUIT': (None, False, False, False, 'Syntax: QUIT (quit current session).'),
+    'REIN': (None, True,  False, False, 'Syntax: REIN (reinitialize / flush account).'),
+    'REST': (None, True,  True,  False, 'Syntax: REST <SP> marker (restart file position).'),
+    'RETR': ('r',  True,  True,  True,  'Syntax: RETR <SP> file-name (retrieve a file).'),
+    'RMD' : ('d',  True,  True,  True,  'Syntax: RMD <SP> dir-name (remove directory).'),
+    'RNFR': ('f',  True,  True,  True,  'Syntax: RNFR <SP> file-name (file renaming (source name)).'),
+    'RNTO': (None, True,  True,  True,  'Syntax: RNTO <SP> file-name (file renaming (destination name)).'),
+    'SITE': (None, False, True, False,  'Syntax: SITE <SP> site-command (execute the specified SITE command).'),
+    'SITE HELP' : (None, False, None, False, 'Syntax: SITE HELP [<SP> site-command] (show SITE command help).'),
+    'SIZE': (None, True,  True,  True,  'Syntax: HELP <SP> file-name (get file size).'),
+    'STAT': ('l',  False, None,  True,  'Syntax: STAT [<SP> path name] (status information [list files]).'),
+    'STOR': ('w',  True,  True,  True,  'Syntax: STOR <SP> file-name (store a file).'),
+    'STOU': ('w',  True,  None,  True,  'Syntax: STOU [<SP> file-name] (store a file with a unique name).'),
+    'STRU': (None, True,  True,  False, 'Syntax: STRU <SP> type (noop; set file structure).'),
+    'SYST': (None, False, False, False, 'Syntax: SYST (get operating system type).'),
+    'TYPE': (None, True,  True,  False, 'Syntax: TYPE <SP> [A | I] (set transfer type).'),
+    'USER': (None, False, True,  False, 'Syntax: USER <SP> user-name (set username).'),
+    'XCUP': ('e',  True,  False, True,  'Syntax: XCUP (obsolete; go to parent directory).'),
+    'XCWD': ('e',  True,  None,  True,  'Syntax: XCWD [<SP> dir-name] (obsolete; change current directory).'),
+    'XMKD': ('m',  True,  True,  True,  'Syntax: XMDK <SP> dir-name (obsolete; create directory).'),
+    'XPWD': (None, True,  False, False, 'Syntax: XPWD (obsolete; get current dir).'),
+    'XRMD': ('d',  True,  True,  True,  'Syntax: XRMD <SP> dir-name (obsolete; remove directory).'),
     }
 
+class _CommandProperty:
+    def __init__(self, perm, auth_needed, arg_needed, check_path, help):
+        self.perm = perm
+        self.auth_needed = auth_needed
+        self.arg_needed = arg_needed
+        self.check_path = check_path
+        self.help = help
+
+for cmd, properties in proto_cmds.iteritems():
+    proto_cmds[cmd] = _CommandProperty(*properties)
+del cmd, properties
+
 
 # hack around format_exc function of traceback module to grant
 # backward compatibility with python < 2.4
@@ -221,6 +235,97 @@ def _strerror(err):
     else:
         return err.strerror
 
+# the heap used for the scheduled tasks
+_tasks = []
+
+def _scheduler():
+    """Run the scheduled functions due to expire soonest (if any)."""
+    now = time.time()
+    while _tasks and now >= _tasks[0].timeout:
+        call = heapq.heappop(_tasks)
+        if call.repush:
+            heapq.heappush(_tasks, call)
+            call.repush = False
+            continue
+        try:
+            call.call()
+        finally:
+            if not call.cancelled:
+                call.cancel()
+
+
+class CallLater:
+    """Calls a function at a later time.
+
+    It can be used to asynchronously schedule a call within the polling
+    loop without blocking it. The instance returned is an object that
+    can be used to cancel or reschedule the call.
+    """
+
+    def __init__(self, seconds, target, *args, **kwargs):
+        """
+         - (int) seconds: the number of seconds to wait
+         - (obj) target: the callable object to call later
+         - args: the arguments to call it with
+         - kwargs: the keyword arguments to call it with
+        """
+        assert callable(target), "%s is not callable" %target
+        assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
+                                           "to 0 seconds" % (seconds)
+        self.__delay = seconds
+        self.__target = target
+        self.__args = args
+        self.__kwargs = kwargs
+        # seconds from the epoch at which to call the function
+        self.timeout = time.time() + self.__delay
+        self.repush = False
+        self.cancelled = False
+        heapq.heappush(_tasks, self)
+
+    def __le__(self, other):
+        return self.timeout <= other.timeout
+
+    def call(self):
+        """Call this scheduled function."""
+        assert not self.cancelled, "Already cancelled"
+        self.__target(*self.__args, **self.__kwargs)
+
+    def reset(self):
+        """Reschedule this call resetting the current countdown."""
+        assert not self.cancelled, "Already cancelled"
+        self.timeout = time.time() + self.__delay
+        self.repush = True
+
+    def delay(self, seconds):
+        """Reschedule this call for a later time."""
+        assert not self.cancelled, "Already cancelled."
+        assert sys.maxint >= seconds >= 0, "%s is not greater than or equal " \
+                                           "to 0 seconds" %(seconds)
+        self.__delay = seconds
+        newtime = time.time() + self.__delay
+        if newtime > self.timeout:
+            self.timeout = newtime
+            self.repush = True
+        else:
+            # XXX - slow, can be improved
+            self.timeout = newtime
+            heapq.heapify(_tasks)
+
+    def cancel(self):
+        """Unschedule this call."""
+        assert not self.cancelled, "Already cancelled"
+        self.cancelled = True
+        del self.__target, self.__args, self.__kwargs
+        if self in _tasks:
+            pos = _tasks.index(self)
+            if pos == 0:
+                heapq.heappop(_tasks)
+            elif pos == len(_tasks) - 1:
+                _tasks.pop(pos)
+            else:
+                _tasks[pos] = _tasks.pop()
+                heapq._siftup(_tasks, pos)
+
 
 # --- library defined exceptions
 
@@ -235,13 +340,11 @@ class AuthorizerError(Error):
 
 def log(msg):
     """Log messages intended for the end user."""
-    if LOG_ACTIVE:
-        print msg
+    print msg
 
 def logline(msg):
     """Log commands and responses passing through the command channel."""
-    if LOG_ACTIVE:
-        print msg
+    print msg
 
 def logerror(msg):
     """Log traceback outputs occurring in case of errors."""
@@ -300,20 +403,14 @@ class DummyAuthorizer:
         """
         if self.has_user(username):
             raise AuthorizerError('User "%s" already exists' %username)
-        homedir = os.path.realpath(homedir)
         if not os.path.isdir(homedir):
             raise AuthorizerError('No such directory: "%s"' %homedir)
-        for p in perm:
-            if p not in 'elradfmw':
-                raise AuthorizerError('No such permission "%s"' %p)
-        for p in perm:
-            if (p in self.write_perms) and (username == 'anonymous'):
-                warnings.warn("write permissions assigned to anonymous user.",
-                              RuntimeWarning)
-                break
+        homedir = os.path.realpath(homedir)
+        self._check_permissions(username, perm)
         dic = {'pwd': str(password),
                'home': homedir,
                'perm': perm,
+               'operms': {},
                'msg_login': str(msg_login),
                'msg_quit': str(msg_quit)
                }
@@ -341,9 +438,26 @@ class DummyAuthorizer:
         """Remove a user from the virtual users table."""
         del self.user_table[username]
 
+    def override_perm(self, username, directory, perm, recursive=False):
+        """Override permissions for a given directory."""
+        self._check_permissions(username, perm)
+        if not os.path.isdir(directory):
+            raise AuthorizerError('No such directory: "%s"' %directory)
+        directory = os.path.normcase(os.path.realpath(directory))
+        home = os.path.normcase(self.get_home_dir(username))
+        if directory == home:
+            raise AuthorizerError("Can't override home directory permissions")
+        if not self._issubpath(directory, home):
+            raise AuthorizerError("Path escapes user home directory")
+        self.user_table[username]['operms'][directory] = perm, recursive
+
     def validate_authentication(self, username, password):
         """Return True if the supplied username and password match the
         stored credentials."""
+        if not self.has_user(username):
+            return False
+        if username == 'anonymous':
+            return True
         return self.user_table[username]['pwd'] == password
 
     def impersonate_user(self, username, password):
@@ -375,6 +489,19 @@ class DummyAuthorizer:
         Expected perm argument is one of the following letters:
         "elradfmw".
         """
+        if path is None:
+            return perm in self.user_table[username]['perm']
+
+        path = os.path.normcase(path)
+        for dir in self.user_table[username]['operms'].keys():
+            operm, recursive = self.user_table[username]['operms'][dir]
+            if self._issubpath(path, dir):
+                if recursive:
+                    return perm in operm
+                if (path == dir) or (os.path.dirname(path) == dir \
+                and not os.path.isdir(path)):
+                    return perm in operm
+
         return perm in self.user_table[username]['perm']
 
     def get_perms(self, username):
@@ -393,6 +520,23 @@ class DummyAuthorizer:
         """Return the user's quitting message."""
         return self.user_table[username]['msg_quit']
 
+    def _check_permissions(self, username, perm):
+        warned = 0
+        for p in perm:
+            if p not in self.read_perms + self.write_perms:
+                raise AuthorizerError('No such permission "%s"' %p)
+            if (username == 'anonymous') and (p in self.write_perms) and not warned:
+                warnings.warn("Write permissions assigned to anonymous user.",
+                              RuntimeWarning)
+                warned = 1
+
+    def _issubpath(self, a, b):
+        """Return True if a is a sub-path of b or if the paths are equal."""
+        p1 = a.rstrip(os.sep).split(os.sep)
+        p2 = b.rstrip(os.sep).split(os.sep)
+        return p1[:len(p2)] == p2
+
+
 
 # --- DTP classes
 
@@ -400,7 +544,11 @@ class PassiveDTP(asyncore.dispatcher):
     """This class is an asyncore.disptacher subclass.  It creates a
     socket listening on a local port, dispatching the resultant
     connection to DTPHandler.
+
+     - (int) timeout: the timeout for a remote client to establish
+       connection with the listening socket. Defaults to 30 seconds.
     """
+    timeout = 30
 
     def __init__(self, cmd_channel, extmode=False):
         """Initialize the passive data server.
@@ -410,13 +558,17 @@ class PassiveDTP(asyncore.dispatcher):
         """
         asyncore.dispatcher.__init__(self)
         self.cmd_channel = cmd_channel
+        if self.timeout:
+            self.idler = CallLater(self.timeout, self.handle_timeout)
+        else:
+            self.idler = None
 
         ip = self.cmd_channel.getsockname()[0]
         self.create_socket(self.cmd_channel.af, socket.SOCK_STREAM)
 
-        if not self.cmd_channel.passive_ports:
-        # By using 0 as port number value we let kernel choose a free
-        # unprivileged random port.
+        if self.cmd_channel.passive_ports is None:
+            # By using 0 as port number value we let kernel choose a
+            # free unprivileged random port.
             self.bind((ip, 0))
         else:
             ports = list(self.cmd_channel.passive_ports)
@@ -461,8 +613,12 @@ class PassiveDTP(asyncore.dispatcher):
 
     def handle_accept(self):
         """Called when remote client initiates a connection."""
-        sock, addr = self.accept()
-
+        try:
+            sock, addr = self.accept()
+        except TypeError:
+            # for some reason sometimes accept() returns None instead
+            # of a socket
+            return
         # Check the origin of data connection.  If not expressively
         # configured we drop the incoming data connection if remote
         # IP address does not match the client's IP address.
@@ -492,6 +648,10 @@ class PassiveDTP(asyncore.dispatcher):
         self.cmd_channel.data_channel = handler
         self.cmd_channel.on_dtp_connection()
 
+    def handle_timeout(self):
+        self.cmd_channel.respond("421 Passive data channel timed out.")
+        self.close()
+
     def writable(self):
         return 0
 
@@ -504,16 +664,21 @@ class PassiveDTP(asyncore.dispatcher):
         logerror(traceback.format_exc())
         self.close()
 
-    def handle_close(self):
-        """Called on closing the data connection."""
-        self.close()
+    def close(self):
+        if self.idler is not None and not self.idler.cancelled:
+            self.idler.cancel()
+        asyncore.dispatcher.close(self)
 
 
 class ActiveDTP(asyncore.dispatcher):
     """This class is an asyncore.disptacher subclass. It creates a
     socket resulting from the connection to a remote user-port,
     dispatching it to DTPHandler.
+
+     - (int) timeout: the timeout for us to establish connection with
+       the client's listening data socket.
     """
+    timeout = 30
 
     def __init__(self, ip, port, cmd_channel):
         """Initialize the active data channel attemping to connect
@@ -525,6 +690,10 @@ class ActiveDTP(asyncore.dispatcher):
         """
         asyncore.dispatcher.__init__(self)
         self.cmd_channel = cmd_channel
+        if self.timeout:
+            self.idler = CallLater(self.timeout, self.handle_timeout)
+        else:
+            self.idler = None
         self.create_socket(self.cmd_channel.af, socket.SOCK_STREAM)
         try:
             self.connect((ip, port))
@@ -534,11 +703,19 @@ class ActiveDTP(asyncore.dispatcher):
 
     # --- connection / overridden
 
+    # NOOP, overridden to prevent unhandled read/write event
+    # messages to be printed on Python < 2.6
+
     def handle_write(self):
-        """NOOP, must be overridden to prevent unhandled write event."""
+        pass
+
+    def handle_read(self):
+        pass
 
     def handle_connect(self):
         """Called when connection is established."""
+        if self.idler is not None and not self.idler.cancelled:
+            self.idler.cancel()
         self.cmd_channel.respond('200 Active data connection established.')
         # delegate such connection to DTP handler
         handler = self.cmd_channel.dtp_handler(self.socket, self.cmd_channel)
@@ -546,6 +723,10 @@ class ActiveDTP(asyncore.dispatcher):
         self.cmd_channel.on_dtp_connection()
         #self.close()  # <-- (done automatically)
 
+    def handle_timeout(self):
+        self.cmd_channel.respond("421 Active data channel timed out.")
+        self.close()
+
     def handle_expt(self):
         self.cmd_channel.respond("425 Can't connect to specified address.")
         self.close()
@@ -563,6 +744,11 @@ class ActiveDTP(asyncore.dispatcher):
         self.cmd_channel.respond("425 Can't connect to specified address.")
         self.close()
 
+    def close(self):
+        if self.idler is not None and not self.idler.cancelled:
+            self.idler.cancel()
+        asyncore.dispatcher.close(self)
+
 
 try:
     from collections import deque
@@ -578,15 +764,16 @@ class DTPHandler(asyncore.dispatcher):
     RFC-959) managing data-transfer operations involving sending
     and receiving data.
 
-    Instance attributes defined in this class, initialized when
-    channel is opened:
+    Class attributes:
 
-     - (instance) cmd_channel: the command channel class instance.
-     - (file) file_obj: the file transferred (if any).
-     - (bool) receive: True if channel is used for receiving data.
-     - (bool) transfer_finished: True if transfer completed successfully.
-     - (int) tot_bytes_sent: the total bytes sent.
-     - (int) tot_bytes_received: the total bytes received.
+     - (int) timeout: the timeout which roughly is the maximum time we
+       permit data transfers to stall for with no progress. If the
+       timeout triggers, the remote client will be kicked off
+       (defaults 300).
+
+     - (int) ac_in_buffer_size: incoming data buffer size (defaults 65536)
+
+     - (int) ac_out_buffer_size: outgoing data buffer size (defaults 65536)
 
     DTPHandler implementation note:
 
@@ -608,8 +795,9 @@ class DTPHandler(asyncore.dispatcher):
     available we use a list instead.
     """
 
-    ac_in_buffer_size = 8192
-    ac_out_buffer_size  = 8192
+    timeout = 300
+    ac_in_buffer_size = 65536
+    ac_out_buffer_size = 65536
 
     def __init__(self, sock_obj, cmd_channel):
         """Initialize the command channel.
@@ -631,6 +819,12 @@ class DTPHandler(asyncore.dispatcher):
         self.tot_bytes_sent = 0
         self.tot_bytes_received = 0
         self.data_wrapper = lambda x: x
+        self._lastdata = 0
+        self._closed = False
+        if self.timeout:
+            self.idler = CallLater(self.timeout, self.handle_timeout)
+        else:
+            self.idler = None
 
     # --- utility methods
 
@@ -642,7 +836,10 @@ class DTPHandler(asyncore.dispatcher):
          - (str) type: current transfer type, 'a' (ASCII) or 'i' (binary).
         """
         if type == 'a':
-            self.data_wrapper = lambda x: x.replace('\r\n', os.linesep)
+            if os.linesep == '\r\n':
+                self.data_wrapper = lambda x: x
+            else:
+                self.data_wrapper = lambda x: x.replace('\r\n', os.linesep)
         elif type == 'i':
             self.data_wrapper = lambda x: x
         else:
@@ -748,6 +945,20 @@ class DTPHandler(asyncore.dispatcher):
             # we tried to send some actual data
             return
 
+    def handle_timeout(self):
+        """Called cyclically to check if data trasfer is stalling with
+        no progress in which case the client is kicked off.
+        """
+        if self.get_transmitted_bytes() > self._lastdata:
+            self._lastdata = self.get_transmitted_bytes()
+            self.idler = CallLater(self.timeout, self.handle_timeout)
+        else:
+            msg = "Data connection timed out."
+            self.cmd_channel.log(msg)
+            self.cmd_channel.respond("421 " + msg)
+            self.cmd_channel.close_when_done()
+            self.close()
+
     def handle_expt(self):
         """Called on "exceptional" data events."""
         self.cmd_channel.respond("426 Connection error; transfer aborted.")
@@ -805,10 +1016,19 @@ class DTPHandler(asyncore.dispatcher):
     def close(self):
         """Close the data channel, first attempting to close any remaining
         file handles."""
-        if self.file_obj and not self.file_obj.closed:
-            self.file_obj.close()
-        asyncore.dispatcher.close(self)
-        self.cmd_channel.on_dtp_close()
+        if not self._closed:
+            self._closed = True
+            if self.file_obj is not None and not self.file_obj.closed:
+                self.file_obj.close()
+            if self.idler is not None and not self.idler.cancelled:
+                self.idler.cancel()
+            asyncore.dispatcher.close(self)
+            if self.file_obj is not None and self.transfer_finished:
+                if self.receive:
+                    self.cmd_channel.on_file_received(self.file_obj.name)
+                else:
+                    self.cmd_channel.on_file_sent(self.file_obj.name)
+            self.cmd_channel.on_dtp_close()
 
 
 # --- producers
@@ -827,7 +1047,10 @@ class FileProducer:
         self.done = False
         self.file = file
         if type == 'a':
-            self.data_wrapper = lambda x: x.replace(os.linesep, '\r\n')
+            if os.linesep == '\r\n':
+                self.data_wrapper = lambda x: x
+            else:
+                self.data_wrapper = lambda x: x.replace(os.linesep, '\r\n')
         elif type == 'i':
             self.data_wrapper = lambda x: x
         else:
@@ -845,22 +1068,6 @@ class FileProducer:
         return data
 
 
-class IteratorProducer:
-    """Producer for iterator objects."""
-
-    def __init__(self, iterator):
-        self.iterator = iterator
-
-    def more(self):
-        """Attempt a chunk of data from iterator by calling its next()
-        method.
-        """
-        try:
-            return self.iterator.next()
-        except StopIteration:
-            return ''
-
-
 class BufferedIteratorProducer:
     """Producer for iterator objects with buffer capabilities."""
     # how many times iterator.next() will be called before
@@ -1043,9 +1250,9 @@ class AbstractedFS:
             os.chdir(basedir)
             self.cwd = self.fs2ftp(path)
 
-    def mkdir(self, path, basename):
+    def mkdir(self, path):
         """Create the specified directory."""
-        os.mkdir(os.path.join(path, basename))
+        os.mkdir(path)
 
     def listdir(self, path):
         """List the content of a directory."""
@@ -1123,17 +1330,6 @@ class AbstractedFS:
 
     exists = lexists  # alias for backward compatibility with 0.2.0
 
-    def glob1(self, dirname, pattern):
-        """Return a list of files matching a dirname pattern
-        non-recursively.
-
-        Unlike glob.glob1 raises exception if os.listdir() fails.
-        """
-        names = self.listdir(dirname)
-        if pattern[0] != '.':
-            names = filter(lambda x: x[0] != '.', names)
-        return fnmatch.filter(names, pattern)
-
     # --- Listing utilities
 
     # note: the following operations are no more blocking
@@ -1152,28 +1348,6 @@ class AbstractedFS:
             self.lstat(path)  # raise exc in case of problems
             return self.format_list(basedir, [filename])
 
-    def get_stat_dir(self, rawline):
-        """Return an iterator object that yields a list of files
-        matching a dirname pattern non-recursively in a form
-        suitable for STAT command.
-
-         - (str) rawline: the raw string passed by client as command
-         argument.
-        """
-        ftppath = self.ftpnorm(rawline)
-        if not glob.has_magic(ftppath):
-            return self.get_list_dir(self.ftp2fs(rawline))
-        else:
-            basedir, basename = os.path.split(ftppath)
-            if glob.has_magic(basedir):
-                return iter(['Directory recursion not supported.\r\n'])
-            else:
-                basedir = self.ftp2fs(basedir)
-                listing = self.glob1(basedir, basename)
-                if listing:
-                    listing.sort()
-                return self.format_list(basedir, listing)
-
     def format_list(self, basedir, listing, ignore_err=True):
         """Return an iterator object that yields the entries of given
         directory emulating the "/bin/ls -lA" UNIX command output.
@@ -1223,14 +1397,15 @@ class AbstractedFS:
                 # bogus values for owner and group
                 uname = "owner"
                 gname = "group"
-            # stat.st_mtime could fail (-1) if last mtime is too old
-            # in which case we return the local time as last mtime
             try:
                 mtime = time.strftime("%b %d %H:%M", time.localtime(st.st_mtime))
             except ValueError:
+                # It could be raised if last mtime happens to be too
+                # old (prior to year 1900) in which case we return
+                # the current time as last mtime.
                 mtime = time.strftime("%b %d %H:%M")
             # if the file is a symlink, resolve it, e.g. "symlink -> realfile"
-            if stat.S_ISLNK(st.st_mode):
+            if stat.S_ISLNK(st.st_mode) and hasattr(os, 'readlink'):
                 basename = basename + " -> " + os.readlink(file)
 
             # formatting is matched with proftpd ls output
@@ -1301,8 +1476,9 @@ class AbstractedFS:
                 try:
                     modify = 'modify=%s;' %time.strftime("%Y%m%d%H%M%S",
                                            time.localtime(st.st_mtime))
+                # it could be raised if last mtime happens to be too old
+                # (prior to year 1900)
                 except ValueError:
-                    # stat.st_mtime could fail (-1) if last mtime is too old
                     modify = ""
             if 'create' in facts:
                 # on Windows we can provide also the creation time
@@ -1343,6 +1519,11 @@ class FTPHandler(asynchat.async_chat):
     reproduced below and can be modified before instantiating this
     class.
 
+     - (int) timeout:
+       The timeout which is the maximum time a remote client may spend
+       between FTP commands. If the timeout triggers, the remote client
+       will be kicked off.  Defaults to 300 seconds.
+
      - (str) banner: the string sent when client connects.
 
      - (int) max_login_attempts:
@@ -1398,6 +1579,7 @@ class FTPHandler(asynchat.async_chat):
     abstracted_fs = AbstractedFS
 
     # session attributes (explained in the docstring)
+    timeout = 300
     banner = "pyftpdlib %s ready." %__ver__
     max_login_attempts = 3
     permit_foreign_addresses = False
@@ -1412,14 +1594,17 @@ class FTPHandler(asynchat.async_chat):
             established connection.
          - (instance) server: the ftp server class instance.
         """
-        asynchat.async_chat.__init__(self, conn=conn)
-        self.server = server
-        self.remote_ip, self.remote_port = self.socket.getpeername()[:2]
-        self.in_buffer = []
-        self.in_buffer_len = 0
+        asynchat.async_chat.__init__(self, conn)
         self.set_terminator("\r\n")
+        # try to handle urgent data inline
+        try:
+            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_OOBINLINE, 1)
+        except socket.error:
+            pass
 
-        # session attributes
+        # public session attributes
+        self.server = server
+        self.remote_ip, self.remote_port = self.socket.getpeername()[:2]
         self.fs = self.abstracted_fs()
         self.authenticated = False
         self.username = ""
@@ -1428,24 +1613,13 @@ class FTPHandler(asynchat.async_chat):
         self.current_type = 'a'
         self.restart_position = 0
         self.quit_pending = False
-        self._epsvall = False
-        self.__in_dtp_queue = None
-        self.__out_dtp_queue = None
-
-        # mlsx facts attributes
-        self.current_facts = ['type', 'perm', 'size', 'modify']
-        if os.name == 'posix':
-            self.current_facts.append('unique')
-        self.available_facts = self.current_facts[:]
-        if pwd and grp:
-            self.available_facts += ['unix.mode', 'unix.uid', 'unix.gid']
-        if os.name == 'nt':
-            self.available_facts.append('create')
-
-        # dtp attributes
+        self.sleeping = False
         self.data_server = None
         self.data_channel = None
-
+        if self.timeout:
+            self.idler = CallLater(self.timeout, self.handle_timeout)
+        else:
+            self.idler = None
         if hasattr(self.socket, 'family'):
             self.af = self.socket.family
         else:  # python < 2.5
@@ -1453,6 +1627,23 @@ class FTPHandler(asynchat.async_chat):
             self.af = socket.getaddrinfo(ip, port, socket.AF_UNSPEC,
                                          socket.SOCK_STREAM)[0][0]
 
+        # private session attributes
+        self._in_buffer = []
+        self._in_buffer_len = 0
+        self._epsvall = False
+        self._in_dtp_queue = None
+        self._out_dtp_queue = None
+        self._closed = False
+        self._extra_feats = []
+        self._current_facts = ['type', 'perm', 'size', 'modify']
+        if os.name == 'posix':
+            self._current_facts.append('unique')
+        self._available_facts = self._current_facts[:]
+        if pwd and grp:
+            self._available_facts += ['unix.mode', 'unix.uid', 'unix.gid']
+        if os.name == 'nt':
+            self._available_facts.append('create')
+
     def handle(self):
         """Return a 220 'Ready' response to the client over the command
         channel.
@@ -1484,37 +1675,33 @@ class FTPHandler(asynchat.async_chat):
         self.log(msg)
         self.close_when_done()
 
+    def handle_timeout(self):
+        """Called when client does not send any command within the time
+        specified in <timeout> attribute."""
+        msg = "Control connection timed out."
+        self.log(msg)
+        self.respond("421 " + msg)
+        self.close_when_done()
+
     # --- asyncore / asynchat overridden methods
 
     def readable(self):
         # if there's a quit pending we stop reading data from socket
-        return not self.quit_pending
+        return not self.sleeping
 
     def collect_incoming_data(self, data):
         """Read incoming data and append to the input buffer."""
-        self.in_buffer.append(data)
-        self.in_buffer_len += len(data)
+        self._in_buffer.append(data)
+        self._in_buffer_len += len(data)
         # Flush buffer if it gets too long (possible DoS attacks).
         # RFC-959 specifies that a 500 response could be given in
         # such cases
         buflimit = 2048
-        if self.in_buffer_len > buflimit:
+        if self._in_buffer_len > buflimit:
             self.respond('500 Command too long.')
             self.log('Command received exceeded buffer limit of %s.' %(buflimit))
-            self.in_buffer = []
-            self.in_buffer_len = 0
-
-    # commands accepted before authentication
-    unauth_cmds = ('FEAT','HELP','NOOP','PASS','QUIT','STAT','SYST','USER')
-
-    # commands needing an argument
-    arg_cmds = ('ALLO','APPE','DELE','EPRT','MDTM','MODE','MKD','OPTS','PORT',
-                'REST','RETR','RMD','RNFR','RNTO','SIZE', 'STOR','STRU',
-                'TYPE','USER','XMKD','XRMD')
-
-    # commands needing no argument
-    unarg_cmds = ('ABOR','CDUP','FEAT','NOOP','PASV','PWD','QUIT','REIN',
-                  'SYST','XCUP','XPWD')
+            self._in_buffer = []
+            self._in_buffer_len = 0
 
     def found_terminator(self):
         r"""Called when the incoming data stream matches the \r\n
@@ -1524,130 +1711,109 @@ class FTPHandler(asynchat.async_chat):
         corresponding method (e.g. for received command "MKD pathname",
         ftp_MKD() method is called with "pathname" as the argument).
         """
-        line = ''.join(self.in_buffer)
-        self.in_buffer = []
-        self.in_buffer_len = 0
+        if self.idler is not None and not self.idler.cancelled:
+            self.idler.reset()
+
+        line = ''.join(self._in_buffer)
+        self._in_buffer = []
+        self._in_buffer_len = 0
 
         cmd = line.split(' ')[0].upper()
-        space = line.find(' ')
-        if space != -1:
-            arg = line[space + 1:]
-        else:
-            arg = ""
+        arg = line[len(cmd)+1:]
+        if cmd == "SITE" and arg:
+            cmd = "SITE %s" %arg.split(' ')[0].upper()
+            arg = line[len(cmd)+1:]
 
         if cmd != 'PASS':
             self.logline("<== %s" %line)
         else:
             self.logline("<== %s %s" %(line.split(' ')[0], '*' * 6))
 
-        # let's check if user provided an argument for those commands
-        # needing one
-        if not arg and cmd in self.arg_cmds:
+        # Recognize those commands having a "special semantic". They
+        # should be sent by following the RFC-959 procedure of sending
+        # Telnet IP/Synch sequence (chr 242 and 255) as OOB data but
+        # since many ftp clients don't do it correctly we check the
+        # last 4 characters only.
+        if not cmd in proto_cmds:
+            if cmd[-4:] in ('ABOR', 'STAT', 'QUIT'):
+                cmd = cmd[-4:]
+            else:
+                self.respond('500 Command "%s" not understood.' %cmd)
+                return
+
+        if not arg and proto_cmds[cmd].arg_needed is True:
             self.respond("501 Syntax error: command needs an argument.")
             return
-
-        # let's do the same for those commands requiring no argument.
-        elif arg and cmd in self.unarg_cmds:
+        if arg and proto_cmds[cmd].arg_needed is False:
             self.respond("501 Syntax error: command does not accept arguments.")
             return
 
-        # provide a limited set of commands if user isn't
-        # authenticated yet
-        if (not self.authenticated):
-            if cmd in self.unauth_cmds:
-                # we permit STAT during this phase but we don't want
-                # STAT to return a directory LISTing if the user is
-                # not authenticated yet (this could happen if STAT
-                # is used with an argument)
-                if (cmd == 'STAT') and arg:
-                    self.respond("530 Log in with USER and PASS first.")
-                else:
-                    method = getattr(self, 'ftp_' + cmd)
-                    method(arg)  # call the proper ftp_* method
-            elif cmd in proto_cmds:
+        if not self.authenticated:
+            if proto_cmds[cmd].auth_needed or (cmd == 'STAT' and arg):
                 self.respond("530 Log in with USER and PASS first.")
             else:
-                self.respond('500 Command "%s" not understood.' %line)
-
-        # provide full command set
-        elif (self.authenticated) and (cmd in proto_cmds):
-            if not (self.__check_path(arg, arg)): # and self.__check_perm(cmd, arg)):
+                method = getattr(self, 'ftp_' + cmd.replace(' ', '_'))
+                method(arg)  # call the proper ftp_* method
+        else:
+            if cmd == 'STAT' and not arg:
+                self.ftp_STAT('')
                 return
-            method = getattr(self, 'ftp_' + cmd)
-            method(arg)  # call the proper ftp_* method
 
-        else:
-            # recognize those commands having "special semantics"
-            if 'ABOR' in cmd:
-                self.ftp_ABOR("")
-            elif 'STAT' in cmd:
-                self.ftp_STAT("")
-            # unknown command
-            else:
-                self.respond('500 Command "%s" not understood.' %line)
-
-    def __check_path(self, cmd, line):
-        """Check whether a path is valid."""
-        # For the following commands we have to make sure that the real
-        # path destination belongs to the user's root directory.
-        # If provided path is a symlink we follow its final destination
-        # to do so.
-        if cmd in ('APPE','CWD','DELE','MDTM','NLST','MLSD','MLST','RETR',
-                   'RMD','SIZE','STOR','XCWD','XRMD'):
-            datacr = None
-            datacr = self.fs.get_cr(line)
-            try:
-                if not self.fs.validpath(self.fs.ftp2fs(line, datacr)):
-                    line = self.fs.ftpnorm(line)
+            # for file-system related commands check whether real path
+            # destination is valid                
+            if proto_cmds[cmd].check_path and cmd != 'STOU':
+                if cmd in ('CWD', 'XCWD'):
+                    arg = arg or '/'
+                elif cmd in ('CDUP', 'XCUP'):
+                    arg = '..'
+                elif cmd == 'LIST':
+                    if arg.lower() in ('-a', '-l', '-al', '-la'):
+                        arg = self.fs.cwd
+                    else:
+                        arg = arg or self.fs.cwd
+                elif cmd == 'STAT':
+                    if glob.has_magic(arg):
+                        self.respond('550 Globbing not supported.')
+                        return
+                    arg = arg or self.fs.cwd
+                else:  # LIST, NLST, MLSD, MLST
+                    arg = arg or self.fs.cwd                                                
+                if not self.fs.validpath(arg or '/'):
+                    line = self.fs.ftpnorm(arg)
                     err = '"%s" points to a path which is outside ' \
                           "the user's root directory" %line
                     self.respond("550 %s." %err)
-                    self.log('FAIL %s "%s". %s.' %(cmd, line, err))
-                    self.fs.close_cr(datacr)
-                    return False
-            except:
-                pass
-            self.fs.close_cr(datacr)
-        return True
-
-    def __check_perm(self, cmd, line, datacr):
-        """Check permissions depending on issued command."""
-        map = {'CWD':'e', 'XCWD':'e', 'CDUP':'e', 'XCUP':'e',
-               'LIST':'l', 'NLST':'l', 'MLSD':'l', 'STAT':'l',
-               'RETR':'r',
-               'APPE':'a',
-               'DELE':'d', 'RMD':'d', 'XRMD':'d',
-               'RNFR':'f',
-               'MKD':'m', 'XMKD':'m',
-               'STOR':'w'}
-        if cmd in map:
-            if cmd == 'STAT' and not line:
-                return True
-            perm = map[cmd]
-            if not line and (cmd in ('LIST','NLST','MLSD')):
-                path = self.fs.ftp2fs(self.fs.cwd, datacr)
-            else:
-                path = self.fs.ftp2fs(line, datacr)
-            if not self.authorizer.has_perm(self.username, perm, path):
-                self.log('FAIL %s "%s". Not enough privileges.' \
-                         %(cmd, self.fs.ftpnorm(line)))
-                self.respond("550 Can't %s. Not enough privileges." %cmd)
-                return False
-        return True
+                    self.log('FAIL %s "%s". %s.' %(cmd, line, err))                    
+                    return                
+            # check permission
+            perm = proto_cmds[cmd].perm
+            if perm is not None and cmd != 'STOU':                
+                if not self.authorizer.has_perm(self.username, perm, arg or '/'):
+                    self.log('FAIL %s "%s". Not enough privileges.' \
+                             %(cmd, self.fs.fs2ftp(line)))
+                    self.respond("550 Can't %s. Not enough privileges." %cmd)
+                    return            
+            # call the proper ftp_* method
+            method = getattr(self, 'ftp_' + cmd.replace(' ', '_'))
+            method(arg)
 
     def handle_expt(self):
-        """Called when there is out of band (OOB) data for the socket
-        connection.  This could happen in case of such commands needing
-        "special action" (typically STAT and ABOR) in which case we
-        append OOB data to incoming buffer.
+        """Called when there is out of band (OOB) data to be read.
+        This could happen in case of such clients strictly following
+        the RFC-959 directives of sending Telnet IP and Synch as OOB
+        data before issuing ABOR, STAT and QUIT commands.
+        It should never be called since the SO_OOBINLINE option is
+        enabled except on some systems like FreeBSD where it doesn't
+        seem to have effect.
         """
         if hasattr(socket, 'MSG_OOB'):
             try:
                 data = self.socket.recv(1024, socket.MSG_OOB)
-            except socket.error:
-                pass
+            except socket.error, why:
+                if why[0] == errno.EINVAL:
+                    return
             else:
-                self.in_buffer.append(data)
+                self._in_buffer.append(data)
                 return
         self.log("Can't handle OOB data.")
         self.close()
@@ -1672,21 +1838,23 @@ class FTPHandler(asynchat.async_chat):
     def handle_close(self):
         self.close()
 
-    _closed = False
     def close(self):
         """Close the current channel disconnecting the client."""
         if not self._closed:
             self._closed = True
-            if self.data_server:
+            if self.data_server is not None:
                 self.data_server.close()
                 del self.data_server
 
-            if self.data_channel:
+            if self.data_channel is not None:
                 self.data_channel.close()
                 del self.data_channel
 
-            del self.__out_dtp_queue
-            del self.__in_dtp_queue
+            del self._out_dtp_queue
+            del self._in_dtp_queue
+
+            if self.idler is not None and not self.idler.cancelled:
+                self.idler.cancel()
 
             # remove client IP address from ip map
             self.server.ip_map.remove(self.remote_ip)
@@ -1695,6 +1863,16 @@ class FTPHandler(asynchat.async_chat):
 
     # --- callbacks
 
+    def on_file_sent(self, file):
+        """Called every time a file has been succesfully sent.
+        'file' is the complete filename of the file being sent.
+        """
+
+    def on_file_received(self, file):
+        """Called every time a file has been succesfully received.
+        'file' is the complete filename of the file being received.
+        """
+
     def on_dtp_connection(self):
         """Called every time data channel connects (either active or
         passive).
@@ -1704,34 +1882,45 @@ class FTPHandler(asynchat.async_chat):
         If awaiting inbound data, the data channel is enabled for
         receiving.
         """
-        if self.data_server:
+        if self.data_server is not None:
             self.data_server.close()
         self.data_server = None
 
+        # stop the idle timer as long as the data transfer is not finished
+        if self.idler is not None and not self.idler.cancelled:
+            self.idler.cancel()
+
         # check for data to send
-        if self.__out_dtp_queue:
-            data, isproducer, file = self.__out_dtp_queue
+        if self._out_dtp_queue is not None:
+            data, isproducer, file = self._out_dtp_queue
+            self._out_dtp_queue = None
             if file:
                 self.data_channel.file_obj = file
-            if not isproducer:
-                self.data_channel.push(data)
-            else:
-                self.data_channel.push_with_producer(data)
-            if self.data_channel:
-                self.data_channel.close_when_done()
-            self.__out_dtp_queue = None
+            try:
+                if not isproducer:
+                    self.data_channel.push(data)
+                else:
+                    self.data_channel.push_with_producer(data)
+                if self.data_channel is not None:
+                    self.data_channel.close_when_done()
+            except:
+                # dealing with this exception is up to DTP (see bug #84)
+                self.data_channel.handle_error()
 
         # check for data to receive
-        elif self.__in_dtp_queue:
-            self.data_channel.file_obj = self.__in_dtp_queue
+        elif self._in_dtp_queue is not None:
+            self.data_channel.file_obj = self._in_dtp_queue
             self.data_channel.enable_receiving(self.current_type)
-            self.__in_dtp_queue = None
+            self._in_dtp_queue = None
 
     def on_dtp_close(self):
         """Called every time the data channel is closed."""
         self.data_channel = None
         if self.quit_pending:
             self.close_when_done()
+        elif self.timeout:
+            # data transfer finished, restart the idle timer
+            self.idler = CallLater(self.timeout, self.handle_timeout)
 
     # --- utility
 
@@ -1754,22 +1943,26 @@ class FTPHandler(asynchat.async_chat):
          - (bool) isproducer: whether treat data as a producer.
          - (file) file: the file[-like] object to send (if any).
         """
-        if self.data_channel:
+        if self.data_channel is not None:
             self.respond("125 Data connection already open. Transfer starting.")
             if file:
                 self.data_channel.file_obj = file
-            if not isproducer:
-                self.data_channel.push(data)
-            else:
-                self.data_channel.push_with_producer(data)
-            if self.data_channel:
-                self.data_channel.close_when_done()
+            try:
+                if not isproducer:
+                    self.data_channel.push(data)
+                else:
+                    self.data_channel.push_with_producer(data)
+                if self.data_channel is not None:
+                    self.data_channel.close_when_done()
+            except:
+                # dealing with this exception is up to DTP (see bug #84)
+                self.data_channel.handle_error()
         else:
             self.respond("150 File status okay. About to open data connection.")
-            self.__out_dtp_queue = (data, isproducer, file)
+            self._out_dtp_queue = (data, isproducer, file)
 
     def log(self, msg):
-        """Log a message, including additional identifying session data."""        
+        """Log a message, including additional identifying session data."""
         log("[%s]@%s:%s %s" %(self.username, self.remote_ip,
                               self.remote_port, msg))
 
@@ -1781,11 +1974,11 @@ class FTPHandler(asynchat.async_chat):
         """Flush account information by clearing attributes that need
         to be reset on a REIN or new USER command.
         """
-        if self.data_channel:
+        if self.data_channel is not None:
             if not self.data_channel.transfer_in_progress():
                 self.data_channel.close()
                 self.data_channel = None
-        if self.data_server:
+        if self.data_server is not None:
             self.data_server.close()
             self.data_server = None
 
@@ -1797,8 +1990,9 @@ class FTPHandler(asynchat.async_chat):
         self.current_type = 'a'
         self.restart_position = 0
         self.quit_pending = False
-        self.__in_dtp_queue = None
-        self.__out_dtp_queue = None
+        self.sleeping = False
+        self._in_dtp_queue = None
+        self._out_dtp_queue = None
 
     def run_as_current_user(self, function, *args, **kwargs):
         """Execute a function impersonating the current logged-in user."""
@@ -1817,36 +2011,33 @@ class FTPHandler(asynchat.async_chat):
         # FTP bounce attacks protection: according to RFC-2577 it's
         # recommended to reject PORT if IP address specified in it
         # does not match client IP address.
-        if not self.permit_foreign_addresses:
-            if ip != self.remote_ip:
-                self.log("Rejected data connection to foreign address %s:%s."
-                         %(ip, port))
-                self.respond("501 Can't connect to a foreign address.")
-                return
+        if not self.permit_foreign_addresses and ip != self.remote_ip:
+            self.log("Rejected data connection to foreign address %s:%s."
+                     %(ip, port))
+            self.respond("501 Can't connect to a foreign address.")
+            return
 
         # ...another RFC-2577 recommendation is rejecting connections
         # to privileged ports (< 1024) for security reasons.
-        if not self.permit_privileged_ports:
-            if port < 1024:
-                self.log('PORT against the privileged port "%s" refused.' %port)
-                self.respond("501 Can't connect over a privileged port.")
-                return
+        if not self.permit_privileged_ports and port < 1024:
+            self.log('PORT against the privileged port "%s" refused.' %port)
+            self.respond("501 Can't connect over a privileged port.")
+            return
 
         # close existent DTP-server instance, if any.
-        if self.data_server:
+        if self.data_server is not None:
             self.data_server.close()
             self.data_server = None
-        if self.data_channel:
+        if self.data_channel is not None:
             self.data_channel.close()
             self.data_channel = None
 
         # make sure we are not hitting the max connections limit
-        if self.server.max_cons:
-            if len(self._map) >= self.server.max_cons:
-                msg = "Too many connections. Can't open data channel."
-                self.respond("425 %s" %msg)
-                self.log(msg)
-                return
+        if self.server.max_cons and len(self._map) >= self.server.max_cons:
+            msg = "Too many connections. Can't open data channel."
+            self.respond("425 %s" %msg)
+            self.log(msg)
+            return
 
         # open data channel
         self.active_dtp(ip, port, self)
@@ -1858,21 +2049,20 @@ class FTPHandler(asynchat.async_chat):
         which case extended passive mode will be used (see RFC-2428).
         """
         # close existing DTP-server instance, if any
-        if self.data_server:
+        if self.data_server is not None:
             self.data_server.close()
             self.data_server = None
 
-        if self.data_channel:
+        if self.data_channel is not None:
             self.data_channel.close()
             self.data_channel = None
 
         # make sure we are not hitting the max connections limit
-        if self.server.max_cons:
-            if len(self._map) >= self.server.max_cons:
-                msg = "Too many connections. Can't open data channel."
-                self.respond("425 %s" %msg)
-                self.log(msg)
-                return
+        if self.server.max_cons and len(self._map) >= self.server.max_cons:
+            msg = "Too many connections. Can't open data channel."
+            self.respond("425 %s" %msg)
+            self.log(msg)
+            return
 
         # open data channel
         self.data_server = self.passive_dtp(self, extmode)
@@ -1933,7 +2123,7 @@ class FTPHandler(asynchat.async_chat):
                     assert len(octs) == 4
                     for x in octs:
                         assert 0 <= x <= 255
-                except (AssertionError, ValueError, OverflowError), err:
+                except (AssertionError, ValueError, OverflowError):
                     self.respond("501 Invalid EPRT format.")
                 else:
                     self._make_eport(ip, port)
@@ -1996,12 +2186,7 @@ class FTPHandler(asynchat.async_chat):
                 self.respond('501 Unknown network protocol (use 2).')
 
     def ftp_QUIT(self, line):
-        """Quit the current session."""
-        # From RFC-959:
-        # This command terminates a USER and if file transfer is not
-        # in progress, the server closes the control connection.
-        # If file transfer is in progress, the connection will remain
-        # open for result response and the server will then close it.
+        """Quit the current session disconnecting the client."""
         if self.authenticated:
             msg_quit = self.authorizer.get_msg_quit(self.username)
         else:
@@ -2012,12 +2197,15 @@ class FTPHandler(asynchat.async_chat):
             self.push("221-%s\r\n" %msg_quit)
             self.respond("221 ")
 
-        if not self.data_channel:
-            self.close_when_done()
-        else:
-            # tell the cmd channel to stop responding to commands.
+        # From RFC-959:
+        # If file transfer is in progress, the connection will remain
+        # open for result response and the server will then close it.
+        # We also stop responding to any further command.
+        if self.data_channel:
             self.quit_pending = True
-
+            self.sleeping = True
+        else:
+            self.close_when_done()
 
         # --- data transferring
 
@@ -2028,35 +2216,30 @@ class FTPHandler(asynchat.async_chat):
         # - If no argument, fall back on cwd as default.
         # - Some older FTP clients erroneously issue /bin/ls-like LIST
         #   formats in which case we fall back on cwd as default.
-        if not line or line.lower() in ('-a', '-l', '-al', '-la'):
-            line = self.fs.cwd
-        try:
-            data = None
-            data = self.fs.get_cr(line)
-            path = self.fs.ftp2fs(line, data)
+        datacr = None
+        try:            
+            datacr = self.fs.get_cr(line)
+            path = self.fs.ftp2fs(line, datacr)
             line = self.fs.ftpnorm(line)
-            iterator = self.run_as_current_user(self.fs.get_list_dir, path)
+            iterator = self.run_as_current_user(self.fs.get_list_dir, path)            
         except OSError, err:
-            self.fs.close_cr(data)
             why = _strerror(err)
             self.log('FAIL LIST "%s". %s.' %(line, why))
             self.respond('550 %s.' %why)
         else:
-            self.fs.close_cr(data)
             self.log('OK LIST "%s". Transfer starting.' %line)
             producer = BufferedIteratorProducer(iterator)
             self.push_dtp_data(producer, isproducer=True)
+        self.fs.close_cr(datacr)
 
     def ftp_NLST(self, line):
         """Return a list of files in the specified directory in a
         compact form to the client.
         """
-        if not line:
-            line = self.fs.cwd
-        try:
-            data = None
-            data = self.fs.get_cr(line)
-            path = self.fs.ftp2fs(line, data)
+        datacr = None
+        try:            
+            datacr = self.fs.get_cr(line)
+            path = self.fs.ftp2fs(line, datacr)
             line = self.fs.ftpnorm(line)
             if self.fs.isdir(path):
                 listing = self.run_as_current_user(self.fs.listdir, path)
@@ -2065,21 +2248,19 @@ class FTPHandler(asynchat.async_chat):
                 # if path is a file we just list its name
                 self.fs.lstat(path)  # raise exc in case of problems
                 basedir, filename = os.path.split(line)
-                listing = [filename]
+                listing = [filename]            
         except OSError, err:
-            self.fs.close_cr(data)
             why = _strerror(err)
             self.log('FAIL NLST "%s". %s.' %(line, why))
             self.respond('550 %s.' %why)
         else:
-            self.fs.close_cr(data)
             data = ''
             if listing:
                 listing.sort()
                 data = '\r\n'.join(listing) + '\r\n'
             self.log('OK NLST "%s". Transfer starting.' %line)
             self.push_dtp_data(data)
-
+        self.fs.close_cr(datacr)
         # --- MLST and MLSD commands
 
     # The MLST and MLSD commands are intended to standardize the file and
@@ -2090,12 +2271,9 @@ class FTPHandler(asynchat.async_chat):
     def ftp_MLST(self, line):
         """Return information about a pathname in a machine-processable
         form as defined in RFC-3659.
-        """
-        # if no argument, fall back on cwd as default
-        if not line:
-            line = self.fs.cwd
-        try:
-            datacr = None
+        """ 
+        datacr = None       
+        try:            
             datacr = self.fs.get_cr(line)
             path = self.fs.ftp2fs(line, datacr)
             line = self.fs.ftpnorm(line)
@@ -2103,7 +2281,7 @@ class FTPHandler(asynchat.async_chat):
             perms = self.authorizer.get_perms(self.username)
             iterator = self.run_as_current_user(self.fs.format_mlsx, basedir,
                        [basename], perms, self.current_facts, ignore_err=False)
-            data = ''.join(iterator)
+            data = ''.join(iterator)            
         except OSError, err:
             self.fs.close_cr(datacr)
             why = _strerror(err)
@@ -2124,11 +2302,8 @@ class FTPHandler(asynchat.async_chat):
         """Return contents of a directory in a machine-processable form
         as defined in RFC-3659.
         """
-        # if no argument, fall back on cwd as default
-        if not line:
-            line = self.fs.cwd
-        try:
-            datacr = None
+        datacr = None
+        try:            
             datacr = self.fs.get_cr(line)
             path = self.fs.ftp2fs(line, datacr)
             line = self.fs.ftpnorm(line)
@@ -2138,7 +2313,7 @@ class FTPHandler(asynchat.async_chat):
                 self.log('FAIL MLSD "%s". %s.' %(line, err))
                 self.respond("501 %s." %err)
                 return
-            listing = self.run_as_current_user(self.fs.listdir, path)
+            listing = self.run_as_current_user(self.fs.listdir, path)            
         except OSError, err:
             self.fs.close_cr(datacr)
             why = _strerror(err)
@@ -2148,7 +2323,7 @@ class FTPHandler(asynchat.async_chat):
             self.fs.close_cr(datacr)
             perms = self.authorizer.get_perms(self.username)
             iterator = self.fs.format_mlsx(path, listing, perms,
-                       self.current_facts)
+                       self._current_facts)
             producer = BufferedIteratorProducer(iterator)
             self.log('OK MLSD "%s". Transfer starting.' %line)
             self.push_dtp_data(producer, isproducer=True)
@@ -2157,18 +2332,14 @@ class FTPHandler(asynchat.async_chat):
         """Retrieve the specified file (transfer from the server to the
         client)
         """
-        try:
-            datacr = None
+        datacr = None        
+        try:            
             datacr = self.fs.get_cr(line)
             file = self.fs.ftp2fs(line, datacr)
             line = self.fs.ftpnorm(line)
-            fd = self.run_as_current_user(self.fs.open, file, 'rb')
-        except OSError, err:
-            self.fs.close_cr(datacr)
-            why = _strerror(err)
-            self.log('FAIL RETR "%s". %s.' %(line, why))
-            self.respond('550 %s.' %why)
-            return
+            rest_pos = self.restart_position
+            self.restart_position = 0
+            fd = self.run_as_current_user(self.fs.open, file, 'rb')            
         except IOError, err:
             self.fs.close_cr(datacr)
             why = _strerror(err)
@@ -2176,7 +2347,7 @@ class FTPHandler(asynchat.async_chat):
             self.respond('550 %s.' %why)
             return
 
-        if self.restart_position:
+        if rest_pos:
             # Make sure that the requested offset is valid (within the
             # size of the file being resumed).
             # According to RFC-1123 a 554 reply may result in case that
@@ -2184,19 +2355,18 @@ class FTPHandler(asynchat.async_chat):
             # the REST.
             ok = 0
             try:
-                assert not self.restart_position > self.fs.getsize(file)
-                fd.seek(self.restart_position)
+                assert not rest_pos > self.fs.getsize(file)
+                fd.seek(rest_pos)
                 ok = 1
             except AssertionError:
                 why = "Invalid REST parameter"
             except IOError, err:
                 why = _strerror(err)
-            self.restart_position = 0
-            if not ok:
+            if not ok:                
                 self.respond('554 %s' %why)
                 self.log('FAIL RETR "%s". %s.' %(line, why))
                 self.fs.close_cr(datacr)
-                return
+                return        
         self.log('OK RETR "%s". Download starting.' %line)
         producer = FileProducer(fd, self.current_type)
         self.push_dtp_data(producer, isproducer=True, file=fd)
@@ -2213,7 +2383,7 @@ class FTPHandler(asynchat.async_chat):
             cmd = 'APPE'
         else:
             cmd = 'STOR'
-
+            
         line = self.fs.ftpnorm(line)
         basedir,basename = os.path.split(line)
 
@@ -2224,12 +2394,14 @@ class FTPHandler(asynchat.async_chat):
 
         except OSError, err:
             self.fs.close_cr(datacr)
-            why = _strerror(err)
+            why = ftpserver._strerror(err)
             self.log('FAIL %s "%s". %s.' %(cmd, line, why))
             self.respond('550 %s.' %why)
             return
-
-        if self.restart_position:
+            
+        rest_pos = self.restart_position
+        self.restart_position = 0
+        if rest_pos:
             mode = 'r+'
         try:
             fd = self.run_as_current_user(self.fs.create, file, basename, mode + 'b')
@@ -2240,7 +2412,7 @@ class FTPHandler(asynchat.async_chat):
             self.respond('550 %s.' %why)
             return
 
-        if self.restart_position:
+        if rest_pos:
             # Make sure that the requested offset is valid (within the
             # size of the file being resumed).
             # According to RFC-1123 a 554 reply may result in case
@@ -2248,31 +2420,29 @@ class FTPHandler(asynchat.async_chat):
             # specified in the REST.
             ok = 0
             try:
-                assert not self.restart_position > self.fs.getsize(self.fs.ftp2fs(line, datacr))
-                fd.seek(self.restart_position)
+                assert not rest_pos > self.fs.getsize(file)
+                fd.seek(rest_pos)
                 ok = 1
             except AssertionError:
                 why = "Invalid REST parameter"
             except IOError, err:
                 why = _strerror(err)
-            self.restart_position = 0
             if not ok:
-                self.fs.close_cr(datacr)
                 self.respond('554 %s' %why)
                 self.log('FAIL %s "%s". %s.' %(cmd, line, why))
+                self.fs.close_cr(datacr)
                 return
 
         self.log('OK %s "%s". Upload starting.' %(cmd, line))
-        if self.data_channel:
+        if self.data_channel is not None:
             self.respond("125 Data connection already open. Transfer starting.")
             self.data_channel.file_obj = fd
             self.data_channel.enable_receiving(self.current_type)
         else:
             self.respond("150 File status okay. About to open data connection.")
-            self.__in_dtp_queue = fd
+            self._in_dtp_queue = fd
         self.fs.close_cr(datacr)
 
-
     def ftp_STOU(self, line):
         """Store a file on the server with a unique name."""
         # Note 1: RFC-959 prohibited STOU parameters, but this
@@ -2289,7 +2459,7 @@ class FTPHandler(asynchat.async_chat):
         if self.restart_position:
             self.respond("450 Can't STOU while REST request is pending.")
             return
-
+            
         datacr = None
         datacr = self.fs.get_cr(line)
 
@@ -2299,7 +2469,7 @@ class FTPHandler(asynchat.async_chat):
             basedir = self.fs.ftp2fs(basedir, datacr)
             #prefix = prefix + '.'
         else:
-            basedir = self.fs.ftp2fs(self.fs.cwd, datacr)
+            basedir = self.fs.ftp2fs(self.fs.cwd)
             prefix = 'ftpd.'
         try:
             fd = self.run_as_current_user(self.fs.mkstemp, prefix=prefix,
@@ -2317,8 +2487,12 @@ class FTPHandler(asynchat.async_chat):
             self.fs.close_cr(datacr)
             return
 
-        filename = line
-        if not self.authorizer.has_perm(self.username, 'w', filename):
+        if not self.authorizer.has_perm(self.username, 'w', fd.name):
+            try:
+                fd.close()
+                self.run_as_current_user(self.fs.remove, fd.name)
+            except os.error:
+                pass
             self.log('FAIL STOU "%s". Not enough privileges'
                      %self.fs.ftpnorm(line))
             self.respond("550 Can't STOU: not enough privileges.")
@@ -2326,27 +2500,30 @@ class FTPHandler(asynchat.async_chat):
             return
 
         # now just acts like STOR except that restarting isn't allowed
+        filename = os.path.basename(fd.name)
         self.log('OK STOU "%s". Upload starting.' %filename)
-        if self.data_channel:
+        if self.data_channel is not None:
             self.respond("125 FILE: %s" %filename)
             self.data_channel.file_obj = fd
             self.data_channel.enable_receiving(self.current_type)
         else:
             self.respond("150 FILE: %s" %filename)
-            self.__in_dtp_queue = fd
+            self._in_dtp_queue = fd
         self.fs.close_cr(datacr)
 
-
-    def ftp_APPE(self, line):
+    def ftp_APPE(self, file):
         """Append data to an existing file on the server."""
         # watch for APPE preceded by REST, which makes no sense.
         if self.restart_position:
-            self.respond("550 Can't APPE while REST request is pending.")
+            self.respond("450 Can't APPE while REST request is pending.")
         else:
-            self.ftp_STOR(line, mode='a')
+            self.ftp_STOR(file, mode='a')
 
     def ftp_REST(self, line):
         """Restart a file transfer from a previous mark."""
+        if self.current_type == 'a':
+            self.respond('501 Resuming transfers not allowed in ASCII mode.')
+            return
         try:
             marker = int(line)
             if marker < 0:
@@ -2354,9 +2531,7 @@ class FTPHandler(asynchat.async_chat):
         except (ValueError, OverflowError):
             self.respond("501 Invalid parameter.")
         else:
-            self.respond("350 Restarting at position %s. " \
-                        "Now use RETR/STOR for resuming." %marker)
-            self.log("OK REST %s." %marker)
+            self.respond("350 Restarting at position %s." %marker)
             self.restart_position = marker
 
     def ftp_ABOR(self, line):
@@ -2367,7 +2542,7 @@ class FTPHandler(asynchat.async_chat):
             resp = "225 No transfer to abort."
         else:
             # a PASV was received but connection wasn't made yet
-            if self.data_server:
+            if self.data_server is not None:
                 self.data_server.close()
                 self.data_server = None
                 resp = "225 ABOR command successful; data channel closed."
@@ -2379,7 +2554,7 @@ class FTPHandler(asynchat.async_chat):
             # was successfully processed.
             # If no data has been transmitted we just respond with 225
             # indicating that no transfer was in progress.
-            if self.data_channel:
+            if self.data_channel is not None:
                 if self.data_channel.transfer_in_progress():
                     self.data_channel.close()
                     self.data_channel = None
@@ -2398,10 +2573,6 @@ class FTPHandler(asynchat.async_chat):
 
     def ftp_USER(self, line):
         """Set the username for the current session."""
-        # we always treat anonymous user as lower-case string.
-        if line.lower() == "anonymous":
-            line = "anonymous"
-
         # RFC-959 specifies a 530 response to the USER command if the
         # username is not valid.  If the username is valid is required
         # ftpd returns a 331 response instead.  In order to prevent a
@@ -2422,9 +2593,10 @@ class FTPHandler(asynchat.async_chat):
             self.respond('331 %s, send password.' %msg)
         self.username = line
 
+    _auth_failed_timeout = 5
+
     def ftp_PASS(self, line):
         """Check username's password against the authorizer."""
-
         if self.authenticated:
             self.respond("503 User already authenticated.")
             return
@@ -2432,50 +2604,42 @@ class FTPHandler(asynchat.async_chat):
             self.respond("503 Login with USER first.")
             return
 
-        # username ok
-        if self.authorizer.has_user(self.username):
-            if self.username == 'anonymous' \
-            or self.authorizer.validate_authentication(self.username, line):
-                msg_login = self.authorizer.get_msg_login(self.username)
-                if len(msg_login) <= 75:
-                    self.respond('230 %s' %msg_login)
-                else:
-                    self.push("230-%s\r\n" %msg_login)
-                    self.respond("230 ")
-
-                self.authenticated = True
-                self.password = line
-                self.attempted_logins = 0
-                self.fs.root = self.authorizer.get_home_dir(self.username)
-                self.fs.username=self.username
-                self.fs.password=line
-                self.log("User %s logged in." %self.username)
-            else:
+        def auth_failed(msg="Authentication failed."):
+            if not self._closed:
                 self.attempted_logins += 1
                 if self.attempted_logins >= self.max_login_attempts:
-                    self.respond("530 Maximum login attempts. Disconnecting.")
-                    self.close()
+                    msg = "530 " + msg + " Disconnecting."
+                    self.respond(msg)
+                    self.log(msg)
+                    self.close_when_done()
                 else:
-                    self.respond("530 Authentication failed.")
-                self.log('Authentication failed (user: "%s").' %self.username)
-                self.username = ""
-
-        # wrong username
+                    self.respond("530 " + msg)
+                    self.log(msg)
+                    self.sleeping = False
+
+        if self.authorizer.validate_authentication(self.username, line):
+            msg_login = self.authorizer.get_msg_login(self.username)
+            if len(msg_login) <= 75:
+                self.respond('230 %s' %msg_login)
+            else:
+                self.push("230-%s\r\n" %msg_login)
+                self.respond("230 ")
+            self.authenticated = True
+            self.password = line
+            self.fs.username = self.username
+            self.fs.password = line
+            self.attempted_logins = 0
+            self.fs.root = self.authorizer.get_home_dir(self.username)
+            self.log("User %s logged in." %self.username)
         else:
-            self.attempted_logins += 1
-            if self.attempted_logins >= self.max_login_attempts:
-                self.log('Authentication failed: unknown username "%s".'
-                            %self.username)
-                self.respond("530 Maximum login attempts. Disconnecting.")
-                self.close()
-            elif self.username.lower() == 'anonymous':
-                self.respond("530 Anonymous access not allowed.")
-                self.log('Authentication failed: anonymous access not allowed.')
+            self.username = ""
+            self.fs.username = ""
+            self.sleeping = True
+            if self.username == 'anonymous':
+                CallLater(self._auth_failed_timeout, auth_failed,
+                          "Anonymous access not allowed.")
             else:
-                self.respond("530 Authentication failed.")
-                self.log('Authentication failed: unknown username "%s".'
-                            %self.username)
-                self.username = ""
+                CallLater(self._auth_failed_timeout, auth_failed)
 
     def ftp_REIN(self, line):
         """Reinitialize user's current session."""
@@ -2501,9 +2665,6 @@ class FTPHandler(asynchat.async_chat):
 
     def ftp_CWD(self, line):
         """Change the current working directory."""
-        # TODO: a lot of FTP servers go back to root directory if no
-        # arg is provided but this is not specified in RFC-959.
-        # Search for official references about this behaviour.
         if not line:
             line = '/'
         datacr = None
@@ -2533,33 +2694,38 @@ class FTPHandler(asynchat.async_chat):
 
     def ftp_SIZE(self, line):
         """Return size of file in a format suitable for using with
-        RESTart as defined in RFC-3659.
-
-        Implementation note:
-        properly handling the SIZE command when TYPE ASCII is used would
-        require to scan the entire file to perform the ASCII translation
-        logic (file.read().replace(os.linesep, '\r\n')) and then
-        calculating the len of such data which may be different than
-        the actual size of the file on the server.  Considering that
-        calculating such result could be very resource-intensive it
-        could be easy for a malicious client to try a DoS attack, thus
-        we do not perform the ASCII translation.
-
-        However, clients in general should not be resuming downloads in
-        ASCII mode.  Resuming downloads in binary mode is the recommended
-        way as specified in RFC-3659.
-        """
-        datacr = None
+        RESTart as defined in RFC-3659."""
+
+        # Implementation note: properly handling the SIZE command when
+        # TYPE ASCII is used would require to scan the entire file to
+        # perform the ASCII translation logic
+        # (file.read().replace(os.linesep, '\r\n')) and then calculating
+        # the len of such data which may be different than the actual
+        # size of the file on the server.  Considering that calculating
+        # such result could be very resource-intensive and also dangerous
+        # (DoS) we reject SIZE when the current TYPE is ASCII.
+        # However, clients in general should not be resuming downloads
+        # in ASCII mode.  Resuming downloads in binary mode is the
+        # recommended way as specified in RFC-3659.
+        
+        if self.current_type == 'a':
+            why = "SIZE not allowed in ASCII mode"
+            self.log('FAIL SIZE "%s". %s.' %(line, why))
+            self.respond("550 %s." %why)
+            return        
+        datacr = False
         try:
             datacr = self.fs.get_cr(line)
             path = self.fs.ftp2fs(line, datacr)
             line = self.fs.ftpnorm(line)
-            if self.fs.isdir(path):
+            if not self.fs.isfile(self.fs.realpath(path)):
                 why = "%s is not retrievable" %line
                 self.log('FAIL SIZE "%s". %s.' %(line, why))
                 self.respond("550 %s." %why)
+                self.fs.close_cr(datacr)
                 return
             size = self.run_as_current_user(self.fs.getsize, path)
+            size = self.run_as_current_user(self.fs.getsize, path)
         except OSError, err:
             why = _strerror(err)
             self.log('FAIL SIZE "%s". %s.' %(line, why))
@@ -2573,7 +2739,7 @@ class FTPHandler(asynchat.async_chat):
         """Return last modification time of file to the client as an ISO
         3307 style timestamp (YYYYMMDDHHMMSS) as defined in RFC-3659.
         """
-        datacr = None
+        datacr = None        
         try:
             datacr = self.fs.get_cr(line)
             path = self.fs.ftp2fs(line, datacr)
@@ -2584,17 +2750,23 @@ class FTPHandler(asynchat.async_chat):
                 self.respond("550 %s." %why)
                 self.fs.close_cr(datacr)
                 return
-            lmt = self.run_as_current_user(self.fs.getmtime, path)
-        except OSError, err:
-            why = _strerror(err)
+            secs = self.run_as_current_user(self.fs.getmtime, path)    
+            lmt = self.run_as_current_user(self.fs.getmtime, path)           
+            
+        except (OSError, ValueError), err:
+            if isinstance(err, OSError):
+                why = _strerror(err)
+            else:
+                # It could happen if file's last modification time
+                # happens to be too old (prior to year 1900)
+                why = "Can't determine file's last modification time"
             self.log('FAIL MDTM "%s". %s.' %(line, why))
             self.respond('550 %s.' %why)
         else:
-            lmt = time.strftime("%Y%m%d%H%M%S", time.localtime(lmt))
             self.respond("213 %s" %lmt)
             self.log('OK MDTM "%s".' %line)
         self.fs.close_cr(datacr)
-
+        
     def ftp_MKD(self, line):
         """Create the specified directory."""
         datacr = None
@@ -2610,12 +2782,15 @@ class FTPHandler(asynchat.async_chat):
             self.respond('550 %s.' %why)
         else:
             self.log('OK MKD "%s".' %line)
-            self.respond("257 Directory created.")
+            # The 257 response is supposed to include the directory
+            # name and in case it contains embedded double-quotes
+            # they must be doubled (see RFC-959, chapter 7, appendix 2).
+            self.respond('257 "%s" directory created.' %line.replace('"', '""'))
         self.fs.close_cr(datacr)
 
     def ftp_RMD(self, line):
         """Remove the specified directory."""
-        datacr = None
+        datacr = False
         try:
             datacr = self.fs.get_cr(line)
             path = self.fs.ftp2fs(line, datacr)
@@ -2626,7 +2801,7 @@ class FTPHandler(asynchat.async_chat):
                 self.log('FAIL MKD "/". %s' %msg)
                 self.fs.close_cr(datacr)
                 return
-            self.run_as_current_user(self.fs.rmdir, path)
+            self.run_as_current_user(self.fs.rmdir, path)            
         except OSError, err:
             why = _strerror(err)
             self.log('FAIL RMD "%s". %s.' %(line, why))
@@ -2635,7 +2810,7 @@ class FTPHandler(asynchat.async_chat):
             self.log('OK RMD "%s".' %line)
             self.respond("250 Directory removed.")
         self.fs.close_cr(datacr)
-
+        
     def ftp_DELE(self, line):
         """Delete the specified file."""
         datacr = None
@@ -2652,10 +2827,11 @@ class FTPHandler(asynchat.async_chat):
             self.log('OK DELE "%s".' %line)
             self.respond("250 File removed.")
         self.fs.close_cr(datacr)
-
+        
     def ftp_RNFR(self, line):
         """Rename the specified (only the source name is specified
         here, see RNTO command)"""
+        
         datacr = None
         try:
             datacr = self.fs.get_cr(line)
@@ -2679,7 +2855,7 @@ class FTPHandler(asynchat.async_chat):
         if not self.fs.rnfr:
             self.respond("503 Bad sequence of commands: use RNFR first.")
             return
-        datacr = None
+        datacr = None        
         try:
             try:
                 datacr = self.fs.get_cr(line)
@@ -2702,35 +2878,50 @@ class FTPHandler(asynchat.async_chat):
             self.fs.close_cr(datacr)
 
 
+
         # --- others
 
     def ftp_TYPE(self, line):
         """Set current type data type to binary/ascii"""
-        line = line.upper()
-        if line in ("A", "AN", "A N"):
+        type = line.upper().replace(' ', '')
+        if type in ("A", "L7"):
             self.respond("200 Type set to: ASCII.")
             self.current_type = 'a'
-        elif line in ("I", "L8", "L 8"):
+        elif type in ("I", "L8"):
             self.respond("200 Type set to: Binary.")
             self.current_type = 'i'
         else:
             self.respond('504 Unsupported type "%s".' %line)
 
     def ftp_STRU(self, line):
-        """Set file structure (obsolete)."""
-        # obsolete (backward compatibility with older ftp clients)
-        if line in ('f','F'):
+        """Set file structure ("F" is the only one supported (noop))."""
+        stru = line.upper()
+        if stru == 'F':
             self.respond('200 File transfer structure set to: F.')
-        else:
+        elif stru in ('P', 'R'):
+           # R is required in minimum implementations by RFC-959, 5.1.
+           # RFC-1123, 4.1.2.13, amends this to only apply to servers
+           # whose file systems support record structures, but also
+           # suggests that such a server "may still accept files with
+           # STRU R, recording the byte stream literally".
+           # Should we accept R but with no operational difference from
+           # F? proftpd and wu-ftpd don't accept STRU R. We just do
+           # the same.
+           #
+           # RFC-1123 recommends against implementing P.
             self.respond('504 Unimplemented STRU type.')
+        else:
+            self.respond('501 Unrecognized STRU type.')
 
     def ftp_MODE(self, line):
-        """Set data transfer mode (obsolete)"""
-        # obsolete (backward compatibility with older ftp clients)
-        if line in ('s', 'S'):
+        """Set data transfer mode ("S" is the only one supported (noop))."""
+        mode = line.upper()
+        if mode == 'S':
             self.respond('200 Transfer mode set to: S')
-        else:
+        elif mode in ('B', 'C'):
             self.respond('504 Unimplemented MODE type.')
+        else:
+            self.respond('501 Unrecognized MODE type.')
 
     def ftp_STAT(self, line):
         """Return statistics about current ftp session. If an argument
@@ -2738,17 +2929,15 @@ class FTPHandler(asynchat.async_chat):
 
         Implementation note:
 
-        RFC-959 do not explicitly mention globbing; this means that FTP
-        servers are not required to support globbing in order to be
-        compliant.  However, many FTP servers do support globbing as a
-        measure of convenience for FTP clients and users.
+        RFC-959 does not explicitly mention globbing but many FTP
+        servers do support it as a measure of convenience for FTP
+        clients and users.
 
         In order to search for and match the given globbing expression,
         the code has to search (possibly) many directories, examine
         each contained filename, and build a list of matching files in
         memory.  Since this operation can be quite intensive, both CPU-
-        and memory-wise, we limit the search to only one directory
-        non-recursively, as LIST does.
+        and memory-wise, we do not support globbing.
         """
         # return STATus information about ftpd
         if not line:
@@ -2766,9 +2955,9 @@ class FTPHandler(asynchat.async_chat):
             else:
                 type = 'Binary'
             s.append("TYPE: %s; STRUcture: File; MODE: Stream" %type)
-            if self.data_server:
+            if self.data_server is not None:
                 s.append('Passive data channel waiting for connection.')
-            elif self.data_channel:
+            elif self.data_channel is not None:
                 bytes_sent = self.data_channel.tot_bytes_sent
                 bytes_recv = self.data_channel.tot_bytes_received
                 s.append('Data connection open:')
@@ -2787,19 +2976,22 @@ class FTPHandler(asynchat.async_chat):
                 datacr = self.fs.get_cr(line)
                 iterator = self.run_as_current_user(self.fs.get_stat_dir, line, datacr)
             except OSError, err:
-                self.respond('550 %s.' %_strerror(err))
+                why = _strerror(err)
+                self.log('FAIL STAT "%s". %s.' %(line, why))
+                self.respond('550 %s.' %why)
             else:
-                self.push('213-Status of "%s":\r\n' %self.fs.ftpnorm(line))
+                self.push('213-Status of "%s":\r\n' %line)
                 self.push_with_producer(BufferedIteratorProducer(iterator))
                 self.respond('213 End of status.')
             self.fs.close_cr(datacr)
-
+            
     def ftp_FEAT(self, line):
         """List all new features supported as defined in RFC-2398."""
         features = ['EPRT','EPSV','MDTM','MLSD','REST STREAM','SIZE','TVFS']
+        features.extend(self._extra_feats)
         s = ''
-        for fact in self.available_facts:
-            if fact in self.current_facts:
+        for fact in self._available_facts:
+            if fact in self._current_facts:
                 s += fact + '*;'
             else:
                 s += fact + ';'
@@ -2824,8 +3016,8 @@ class FTPHandler(asynchat.async_chat):
             self.respond('501 %s.' %err)
         else:
             facts = [x.lower() for x in arg.split(';')]
-            self.current_facts = [x for x in facts if x in self.available_facts]
-            f = ''.join([x + ';' for x in self.current_facts])
+            self._current_facts = [x for x in facts if x in self._available_facts]
+            f = ''.join([x + ';' for x in self._current_facts])
             self.respond('200 MLST OPTS ' + f)
 
     def ftp_NOOP(self, line):
@@ -2842,22 +3034,23 @@ class FTPHandler(asynchat.async_chat):
         self.respond("215 UNIX Type: L8")
 
     def ftp_ALLO(self, line):
-        """Allocate bytes for storage (obsolete)."""
-        # obsolete (always respond with 202)
+        """Allocate bytes for storage (noop)."""
+        # not necessary (always respond with 202)
         self.respond("202 No storage allocation necessary.")
 
     def ftp_HELP(self, line):
         """Return help text to the client."""
         if line:
-            if line.upper() in proto_cmds:
-                self.respond("214 %s" %proto_cmds[line.upper()])
+            line = line.upper()
+            if line in proto_cmds:
+                self.respond("214 %s" %proto_cmds[line].help)
             else:
                 self.respond("501 Unrecognized command.")
         else:
             # provide a compact list of recognized commands
             def formatted_help():
                 cmds = []
-                keys = proto_cmds.keys()
+                keys = [x for x in proto_cmds.keys() if not x.startswith('SITE ')]
                 keys.sort()
                 while keys:
                     elems = tuple((keys[0:8]))
@@ -2869,6 +3062,30 @@ class FTPHandler(asynchat.async_chat):
             self.push(formatted_help())
             self.respond("214 Help command successful.")
 
+        # --- site commands
+
+    # No SITE commands aside from SITE HELP are implemented by default.
+    # The user willing to add support for a specific SITE command has
+    # to define a new ftp_SITE_%CMD% method in the subclass.
+
+    def ftp_SITE_HELP(self, line):
+        """Return help text to the client for a given SITE command."""
+        if line:
+            line = line.upper()
+            if line in proto_cmds:
+                self.respond("214 %s" %proto_cmds[line].help)
+            else:
+                self.respond("501 Unrecognized SITE command.")
+        else:
+            self.push("214-The following SITE commands are recognized:\r\n")
+            site_cmds = []
+            keys = proto_cmds.keys()
+            keys.sort()
+            for cmd in keys:
+                if cmd.startswith('SITE '):
+                    site_cmds.append(' %s\r\n' %cmd[5:])
+            self.push(''.join(site_cmds))
+            self.respond("214 Help SITE command successful.")
 
         # --- support for deprecated cmds
 
@@ -2969,43 +3186,50 @@ class FTPServer(asyncore.dispatcher):
             return
         asyncore.dispatcher.set_reuse_addr(self)
 
-    def serve_forever(self, **kwargs):
+    def serve_forever(self, timeout=1, use_poll=False, map=None, count=None):
         """A wrap around asyncore.loop(); starts the asyncore polling
-        loop.
-
-        The keyword arguments in kwargs are the same expected by
-        asyncore.loop() function: timeout, use_poll, map and count.
+        loop including running the scheduler.
+        The arguments are the same expected by original asyncore.loop()
+        function.
         """
-        if not 'count' in kwargs:
-            log("Serving FTP on %s:%s" %self.socket.getsockname()[:2])
-
-        # backward compatibility for python < 2.4
+        if map is None:
+            map = asyncore.socket_map
+        # backward compatibility for python versions < 2.4
         if not hasattr(self, '_map'):
-            if not 'map' in kwargs:
-                map = asyncore.socket_map
-            else:
-                map = kwargs['map']
             self._map = self.handler._map = map
 
-        try:
-            # FIX #16, #26
-            # use_poll specifies whether to use select module's poll()
-            # with asyncore or whether to use asyncore's own poll()
-            # method Python versions < 2.4 need use_poll set to False
-            # This breaks on OS X systems if use_poll is set to True.
-            # All systems seem to work fine with it set to False
-            # (tested on Linux, Windows, and OS X platforms)
-            if kwargs:
-                asyncore.loop(**kwargs)
-            else:
-                asyncore.loop(timeout=1.0, use_poll=False)
-        except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
-            log("Shutting down FTPd.")
-            self.close_all()
+        if use_poll and hasattr(asyncore.select, 'poll'):
+            poll_fun = asyncore.poll2
+        else:
+            poll_fun = asyncore.poll
+
+        if count is None:
+            log("Serving FTP on %s:%s" %self.socket.getsockname()[:2])
+            try:
+                while map or _tasks:
+                    if map:
+                        poll_fun(timeout, map)
+                    if _tasks:
+                        _scheduler()
+            except (KeyboardInterrupt, SystemExit, asyncore.ExitNow):
+                log("Shutting down FTP server.")
+                self.close_all()
+        else:
+            while (map or _tasks) and count > 0:
+                if map:
+                    poll_fun(timeout, map)
+                if _tasks:
+                    _scheduler()
+                count = count - 1
 
     def handle_accept(self):
         """Called when remote client initiates a connection."""
-        sock_obj, addr = self.accept()
+        try:
+            sock_obj, addr = self.accept()
+        except TypeError:
+            # for some reason sometimes accept() returns None instead
+            # of a socket
+            return
         log("[]%s:%s Connected." %addr[:2])
 
         handler = self.handler(sock_obj, self)
@@ -3044,7 +3268,7 @@ class FTPServer(asyncore.dispatcher):
         self.close()
 
     def close_all(self, map=None, ignore_all=False):
-        """Stop serving; close all existent connections disconnecting
+        """Stop serving and also disconnects all currently connected
         clients.
 
          - (dict) map:
@@ -3057,13 +3281,11 @@ class FTPServer(asyncore.dispatcher):
 
         Implementation note:
 
-        Instead of using the current asyncore.close_all() function
-        which only close sockets, we iterate over all existent channels
-        calling close() method for each one of them, avoiding memory
-        leaks.
-
-        This is how asyncore.close_all() function should work in
+        This is how asyncore.close_all() is implemented starting from
         Python 2.6.
+        The previous versions of close_all() instead of iteratating over
+        all opened channels and calling close() method for each one
+        of them only closed sockets generating memory leaks.
         """
         if map is None:
             map = self._map
@@ -3085,15 +3307,13 @@ class FTPServer(asyncore.dispatcher):
 
 def test():
     # cmd line usage (provide a read-only anonymous ftp server):
-    # python -m pyftpdlib.FTPServer
+    # python -m pyftpdlib.ftpserver
     authorizer = DummyAuthorizer()
-    authorizer.add_anonymous(os.getcwd(), perm='elradfmw')
+    authorizer.add_anonymous(os.getcwd())
     FTPHandler.authorizer = authorizer
-    address = ('', 8021)
+    address = ('', 21)
     ftpd = FTPServer(address, FTPHandler)
     ftpd.serve_forever()
 
 if __name__ == '__main__':
     test()
-
-# vim:expandtab:smartindent:tabstop=4:softtabstop=4:shiftwidth=4: