FTP-backed filesystem with aggressive caching

I know, I know. It's 2015 and nobody uses FTP anymore. But by some unlucky coincidence, I found myself forced to use it - because the target machine couldn't have ssh for political reasons.

After some googling around I found CurlFtpFS - which mounts remote directory as a FUSE filesystem. It works, but I find it to be horribly slow - it takes several seconds to save a file, and opening small text file in Emacs takes over a minute (as it turns out, in my configuration Emacs searches the neighborhood of the opened file all possible version control system files and autosave files from other editors - and with FTP, each stat() call gives 1 second delay).

Since I didn't see any other way, I sat down to write my own FTP-backed filesystem with aggressive caching. The idea is that we don't expect that files on remote target will change often, so we can skip doing FTP calls for directory listings, file stats and the like if we have already cached them. I didn't implement any cache expiration things, but that shouldn't be too hard.

I'm not bothering with file ownership, permissions, or modification times in this filesystem - getting them all right is hard, and doesn't give me any immediate value. So as far as my filesystem is concerned all the files were last touched in 1970.

For this task I chose Python. I don't like it, to be frank, but it always has the best selection of libraries - for example, for my filesystem I immediately found ftputil and fusepy libraries.

ftputil library provided awesome high-level interface for handling file operations. Note how all functions block in FTP calls:
ftp = FTPHost(options.host, options.username, options.password)
ftp.stat_cache.max_age = 5

# helper class to store relevant results of stat() call
class Stat:
    def __init__(self):
        self.st_mode = 0
        self.st_size = 0
        self.enoent = False

class FtpFs:
    def __init__(self):
        self.ftp_lock = threading.Lock()

    def ftp_stat(self, path):
        st = Stat()
        if path == "/":
            st.st_mode = S_IFDIR | 0755
            st.st_size = 0
        else:
            self.ftp_lock.acquire()
            try:
                ftp_st = ftp.stat(path)
            except:
                # path not found
                st.enoent = True
                return st
            finally:
                self.ftp_lock.release()
            st.st_mode = ftp_st.st_mode
            st.st_size = ftp_st.st_size
        return st

    def ftp_write(self, path, data):
        with self.ftp_lock:
            with ftp.open(path, mode = "wb") as f:
                f.write(data)

    def ftp_read(self, path):
        with self.ftp_lock:
            with ftp.open(path, mode = "rb") as f:
                return f.read()

    def ftp_listdir(self, path):
        with self.ftp_lock:
            return map(lambda f: fuse.Direntry(f.encode("utf-8")), ftp.listdir(path))

    def ftp_mkdir(self, path):
        with self.ftp_lock:
            ftp.mkdir(path)

    def ftp_rename(self, from_path, to_path):
        with self.ftp_lock:
            ftp.rename(from_path, to_path)

    def ftp_remove(self, path):
        with self.ftp_lock:
            ftp.remove(path)

    def ftp_rmtree(self, path):
        with self.ftp_lock:
            ftp.rmtree(path)
Now, we need to implement an actual filesystem via the FUSE interface. The idea is to use asynchronous FTP interaction where possible - only blocking when the required data is not available in the cache. So all write requests will only send a message to the ftp worker thread and return immediately. Read requests would (when possible) get value from the cache and also return immediately. I added asynchronous read requests after getting value from the cache, as well - so we have some rudimentary cache expiration, but that's not required.
Here's the code (I omitted most of the fuse calls for brevity):
PRIORITY_WRITE = 1
PRIORITY_READ = 2

class CacheFtpFS(fuse.Fuse, FtpFs):
    def __init__(self, *args, **kw):
        fuse.Fuse.__init__(self, *args, **kw)
        FtpFs.__init__(self)

        self.cache_attr = {}
        self.cache_list = {}
        self.cache_data = {}

        self.io_queue = IoQueue()

        ftp_thread = threading.Thread(target = self.ftp_worker)
        ftp_thread.setDaemon(True)
        ftp_thread.start()

    def ftp_worker(self):
        while True:
            job = self.io_queue.get()
            if job != None:
                try:
                    if job[1] == "stat":
                        self.cache_attr[job[2]] = self.ftp_stat(job[2])
                    elif job[1] == "write":
                        self.ftp_write(job[2], self.cache_data[job[2]])
                    elif job[1] == "read":
                        self.cache_data[job[2]] = self.ftp_read(job[2])
                    elif job[1] == "listdir":
                        self.cache_list[job[2]] = self.ftp_listdir(job[2])
                except Exception as e:
                    print("exception in ftp job")
                    print(e)
            else:
                time.sleep(0.2)

    def getattr(self, path):
        if self.cache_attr.get(path) != None:
            ftp_st = self.cache_attr[path]
            if path != "/":
                self.io_queue.put((PRIORITY_READ, "stat", path))
        else:
            ftp_st = self.ftp_stat(path)
            self.cache_attr[path] = ftp_st

        if ftp_st.enoent:
            return -ENOENT

        st = fuse.Stat()

        # won't handle number of hard links
        st.st_nlink = 1
        # won't handle file ownership
        st.st_uid = os.geteuid()
        st.st_gid = os.getegid()
        # won't handle modification times
        st.st_mtime = 0

        st.st_mode = ftp_st.st_mode
        st.st_size = ftp_st.st_size

        return st

    def readdir(self, path, offset, dh=None):
        if self.cache_list.get(path) != None:
            self.io_queue.put((PRIORITY_READ, "listdir", path))
            return self.cache_list[path]

        f = [fuse.Direntry("."), fuse.Direntry("..")] + self.ftp_listdir(path)
        self.cache_list[path] = f
        return f

    def write(self, path, buf, offset, fh=None):
        print("write " + path + " offset=" + str(offset))

        if self.cache_data.get(path) == None:
            self.cache_data[path] = self.ftp_read(path)

        self.cache_data[path] = self.cache_data[path][:offset] + buf
        if self.cache_attr.get(path) != None:
            self.cache_attr[path] = Stat()

        self.cache_attr[path].enoent = 0
        self.cache_attr[path].st_size = len(self.cache_data[path])
        self.cache_attr[path].st_mode = S_IFREG | 0644

        self.io_queue.put((PRIORITY_WRITE, "write", path), (PRIORITY_READ, "stat", path))

        return len(buf)

    def read(self, path, length, offset, fh=None):
        print("read " + path + " length=" + str(length) + " offset=" + str(offset))

        if self.cache_data.get(path) != None:
            self.io_queue.put((PRIORITY_READ, "read", path))
        else:
            self.cache_data[path] = self.ftp_read(path)

        return self.cache_data[path][offset : offset + length]

fuse.fuse_python_api = (0, 2)

fs = CacheFtpFS()
fs.main(args=["./cacheftpfs.py", options.mountpoint, "-f"])
I could have used Queue.PriorityQueue as a message bus between filesystem and ftp worker, but I wanted to deduplicate requests - to avoid sending dozens of write calls per file save. Also, PriorityQueue would have reordered requests with the same priority, and that we would like to avoid. Here's the implementation of the simple deduplicating priority queue:
class IoQueue:
    def __init__(self):
        self.queue_lock = threading.Lock()
        self.queue_set = set()
        self.queue_list = []

    def put(self, *msg):
        with self.queue_lock:
            for m in msg:
                if m not in self.queue_set:
                    self.queue_set.add(m)
                    self.queue_list.append(m)

    def get(self):
        with self.queue_lock:
            if len(self.queue_list) > 0:
                p = min(m[0] for m in self.queue_list)
                m = next(m for m in self.queue_list if m[0] == p)
                self.queue_set.discard(m)
                self.queue_list.remove(m)
                return m
            else:
                return None
I'm very satisfied with the result - for simple read-write tasks (like compiling a bunch of sass files) it works very fast, beating CurlFtpFS or even sshfs. I'm actually grateful that there was no ssh on my target machine - if I could use ssh, I would have never written this utility, and would have been stuck with 10s sass compile times.

Full code can be found here: https://github.com/Rogach/cacheftpfs

Comments








  1. Hey what a brilliant post I have come across and believe me I have been searching out for this similar kind of post for past a week and hardly came across this. Thank you very much and will look for more postings from you
    ftp ports

    ReplyDelete

Post a Comment

Popular posts from this blog

How to create your own simple 3D render engine in pure Java

Configuration objects in Scallop

Better CLI option parsing in Scala