From fc2468f61ab2a3e44b5a9ac773f15e899e2efbd4 Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Mon, 10 Aug 2015 23:25:40 +0100 Subject: [PATCH 1/4] Implement asynchronous writeback fsync() after each segment write is suboptimal :). It means you stop (cpu) processing to wait for the physical disk write. And the default segment size is 5MB. (I noticed bup avoids this issue by writing pack files of 1GB by default :). Improvements will vary depending disk/cpu speed (I guess the worst case was when they were evenly matched). Writing 65M on SheevaPlug "NAS" went from 47s to 45s. 920M on desktop HDD (read from SSD) went from 68s to 45s --- attic/repository.py | 63 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index eed85dc4..ccf0ad58 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -7,6 +7,8 @@ import struct import sys from zlib import crc32 +import threading +import queue from .hashindex import NSIndex from .helpers import Error, IntegrityError, read_msgpack, write_msgpack, unhexlify, UpgradableLock @@ -377,7 +379,6 @@ def preload(self, ids): """Preload objects (only applies to remote repositories """ - class LoggedIO(object): header_fmt = struct.Struct(' Date: Tue, 11 Aug 2015 14:16:50 +0100 Subject: [PATCH 2/4] Make sure we propagate IO errors from async writeback --- attic/repository.py | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index ccf0ad58..d359f89c 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -589,36 +589,58 @@ class FsyncWorker(object): One fd is processed at a time. If the thread is already working, the caller will block. This provides double-buffering. + + Any exceptions (from os.fsync() or fd.close()) will be re-raised + on the next call into FsyncWorker. (Naturally this applies to + the .flush() and .close() methods as well as .fsync_and_close_fd()). """ def __init__(self): self.channel = Channel() - self.thread = threading.Thread(target=self._run, daemon=True) - self.thread.start() + self.exception = None + thread = threading.Thread(target=self._run, daemon=True) + thread.start() def _run(self): - while True: + while True: # worker thread loop task = self.channel.get() if task == None: - break - task() + break # thread shutdown requested + try: + task() + except Exception as e: + self.exception = e def fsync_and_close_fd(self, fd): """fsync() and close() fd in the background""" def task(): - os.fsync(fd) - fd.close() + try: + os.fsync(fd) + finally: + fd.close() + self.flush() # raise any pending exception self.channel.put(task) def flush(self): - """Wait for pending writeback""" + """Wait for any pending fsync. + + This will also make sure an IOError is re-raised + in the calling thread, if necessary. + """ def task(): pass self.channel.put(task) + if self.exception != None: + e = self.exception + self.exception = None + raise e + def close(self): - self.channel.put(None) - self.thread.join() # wait for thread to finish + try: + self.flush() + finally: + self.channel.put(None) # tell thread to shutdown class Channel(object): """A blocking channel, like in CSP or Go. From f5cdca56f5673392aa17a890adb59b8dc232072f Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Wed, 12 Aug 2015 12:52:51 +0100 Subject: [PATCH 3/4] Python 3.2 compat fix in added code TypeError: __init__() got an unexpected keyword argument 'daemon' --- attic/repository.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/attic/repository.py b/attic/repository.py index d359f89c..536f2b46 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -598,7 +598,8 @@ class FsyncWorker(object): def __init__(self): self.channel = Channel() self.exception = None - thread = threading.Thread(target=self._run, daemon=True) + thread = threading.Thread(target=self._run) + thread.daemon = True thread.start() def _run(self): From 5abf53b8d9363bf8b5628bb34eee4fb312126548 Mon Sep 17 00:00:00 2001 From: Alan Jenkins Date: Wed, 12 Aug 2015 14:06:02 +0100 Subject: [PATCH 4/4] Style fixes in added code --- attic/repository.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/attic/repository.py b/attic/repository.py index 536f2b46..a41b35bd 100644 --- a/attic/repository.py +++ b/attic/repository.py @@ -603,10 +603,10 @@ def __init__(self): thread.start() def _run(self): - while True: # worker thread loop + while True: # worker thread loop task = self.channel.get() - if task == None: - break # thread shutdown requested + if task is None: + break # thread shutdown requested try: task() except Exception as e: @@ -619,7 +619,7 @@ def task(): os.fsync(fd) finally: fd.close() - self.flush() # raise any pending exception + self.flush() # raise any pending exception self.channel.put(task) def flush(self): @@ -632,7 +632,7 @@ def task(): pass self.channel.put(task) - if self.exception != None: + if self.exception is not None: e = self.exception self.exception = None raise e @@ -641,7 +641,7 @@ def close(self): try: self.flush() finally: - self.channel.put(None) # tell thread to shutdown + self.channel.put(None) # tell thread to shutdown class Channel(object): """A blocking channel, like in CSP or Go. @@ -659,4 +659,4 @@ def get(self): def put(self, item): self.q.put(item) - self.q.join() # wait for task_done(), in reader thread + self.q.join() # wait for task_done(), in reader thread