Skip to content

Commit

Permalink
borg rewrite: Rewrites the contents of existing archives.
Browse files Browse the repository at this point in the history
Use with caution: permanent data loss by specifying incorrect patterns
is easily possible. Make a dry run to make sure you got everything right.

borg rewrite has many uses:
- Can selectively remove files/dirs from old archives, e.g. to free
  space or purging picturarum biggus dickus from history
- Recompress data
- Rechunkify data, to have upgraded Attic / Borg 0.xx archives deduplicate
  with Borg 1.x archives. (Or to experiment with chunker-params for
  specific use cases

It is interrupt- and resumable.

Chunks are not freed on-the-fly.
Rationale:
  Makes only sense when rechunkifying, but logic on which new chunks to
  free what input chunks is complicated and *very* delicate.


Current TODOs:
- Detect and skip (unless --force) already recompressed chunks
  -- delayed until current PRs on borg.key APIs are decided
     borgbackup#810 borgbackup#789
- Usage example

Future TODOs:
- Refactor tests using py.test fixtures
  -- would require porting ArchiverTestCase to py.test: many changes,
     this changeset is already borderline too large.
- Possibly add a --target option to not replace the source archive
  -- with the target possibly in another Repo
     (better than "cp" due to full integrity checking, and deduplication
      at the target)

Fixes borgbackup#787 borgbackup#686 borgbackup#630 borgbackup#70 (and probably some I overlooked)
Also see borgbackup#757 and borgbackup#770
  • Loading branch information
enkore committed Mar 29, 2016
1 parent 61e2f12 commit 622e002
Show file tree
Hide file tree
Showing 10 changed files with 920 additions and 44 deletions.
305 changes: 298 additions & 7 deletions borg/archive.py

Large diffs are not rendered by default.

150 changes: 145 additions & 5 deletions borg/archiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,22 @@
get_cache_dir, prune_within, prune_split, \
Manifest, remove_surrogates, update_excludes, format_archive, check_extension_modules, Statistics, \
dir_is_tagged, ChunkerParams, CompressionSpec, is_slow_msgpack, yes, sysinfo, \
EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ItemFormatter
EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ItemFormatter, DASHES
from .logger import create_logger, setup_logging
logger = create_logger()
from .compress import Compressor, COMPR_BUFFER
from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader
from .repository import Repository
from .cache import Cache
from .key import key_creator, RepoKey, PassphraseKey
from .archive import Archive, ArchiveChecker, CHUNKER_PARAMS
from .archive import Archive, ArchiveChecker, ArchiveRewriter, CHUNKER_PARAMS
from .remote import RepositoryServer, RemoteRepository, cache_if_remote

has_lchflags = hasattr(os, 'lchflags')

# default umask, overriden by --umask, defaults to read/write only for owner
UMASK_DEFAULT = 0o077

DASHES = '-' * 78


def argument(args, str_or_bool):
"""If bool is passed, return it. If str is passed, retrieve named attribute from args."""
Expand Down Expand Up @@ -396,7 +394,7 @@ def item_is_hardlink_master(item):
filter=lambda item: item_is_hardlink_master(item) or matcher.match(item[b'path'])):
orig_path = item[b'path']
if item_is_hardlink_master(item):
hardlink_masters[orig_path] = (item.get(b'chunks'), item.get(b'source'))
hardlink_masters[orig_path] = (item.get(b'chunks'), None)
if not matcher.match(item[b'path']):
continue
if strip_components:
Expand Down Expand Up @@ -738,6 +736,47 @@ def do_upgrade(self, args):
print("warning: %s" % e)
return self.exit_code

@with_repository(cache=True, exclusive=True)
def do_rewrite(self, args, repository, manifest, key, cache):
"""Rewrite archive contents"""
def interrupt(signal_num, stack_frame):
if rewriter.interrupt:
print("Received signal, again. I'm not deaf.\n", file=sys.stderr)
else:
print("Received signal, will exit cleanly.\n", file=sys.stderr)
rewriter.interrupt = True

matcher, include_patterns = self.build_matcher(args.excludes, args.paths)

rewriter = ArchiveRewriter(repository, manifest, key, cache, matcher,
exclude_caches=args.exclude_caches, exclude_if_present=args.exclude_if_present,
keep_tag_files=args.keep_tag_files,
compression=args.compression, chunker_params=args.chunker_params,
progress=args.progress, stats=args.stats,
list=args.output_list, dry_run=args.dry_run)

signal.signal(signal.SIGTERM, interrupt)
signal.signal(signal.SIGINT, interrupt)

if args.location.archive:
name = args.location.archive
if rewriter.is_temporary_archive(name):
self.print_error('Refusing to rewrite temporary archive of prior rewrite: %s', name)
return self.exit_code
rewriter.rewrite(name)
else:
for archive in manifest.list_archive_infos(sort_by='ts'):
name = archive.name
if rewriter.is_temporary_archive(name):
continue
print('Rewriting', name)
if not rewriter.rewrite(name):
break
manifest.write()
repository.commit()
cache.commit()
return self.exit_code

@with_repository()
def do_debug_dump_archive_items(self, args, repository, manifest, key):
"""dump (decrypted, decompressed) archive items metadata (not: data)"""
Expand Down Expand Up @@ -1514,6 +1553,107 @@ def build_parser(self, args=None, prog=None):
type=location_validator(archive=False),
help='path to the repository to be upgraded')

rewrite_epilog = textwrap.dedent("""
Rewrites the contents of existing archives.
--exclude, --exclude-from and PATH have the exact same semantics
as in borg create, this means if a PATH is specified the
rewrite includes that path and nothing else: PATH does *not* restrict
the rewrite to a path.
--compression recompresses all chunks. Due to how Borg stores compressed size
information this might display incorrect information for archives that were not
rewritten at the same time.
There is no risk of data loss by this. Use --force to recompress chunks already
using the specified compression algorithm.
--chunker-params will re-chunk all files in the archive, this can be
used to have upgraded Borg 0.xx or Attic archives deduplicate with
Borg 1.x archives.
Currently the only file status used for --list is 'I' (file/dir included in
rewritten archive).
borg rewrite is signal safe. Send either SIGINT (Ctrl-C on most terminals) or
SIGTERM to request termination.
Use the *exact same* command line to resume the operation later - changing excludes
or paths will lead to inconsistencies (changed excludes will only apply to newly
processed files/dirs). Changing compression leads to incorrect size information
(which does not cause any data loss, but can be misleading).
USE WITH CAUTION. Permanent data loss by specifying incorrect patterns is possible.
Note: The archive under rewrite is only removed after the operation completes. The
archive that is built during the rewrite exists at the same time at
<ARCHIVE>.rewrite.
Note: When recompressing or (especially) rechunking space usage can be substantial.
Note: This changes the archive ID.
""")
subparser = subparsers.add_parser('rewrite', parents=[common_parser],
description=self.do_rewrite.__doc__,
epilog=rewrite_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
help='create backup')
subparser.set_defaults(func=self.do_rewrite)
subparser.add_argument('--list', dest='output_list',
action='store_true', default=False,
help='output verbose list of items (files, dirs, ...)')
subparser.add_argument('-p', '--progress', dest='progress',
action='store_true', default=False,
help='show progress display while rewriting archives')
subparser.add_argument('-f', '--force', dest='force_recompress',
action='store_true', default=False,
help='even recompress chunks already compressed with the algorithm set with '
'--compression')
subparser.add_argument('-n', '--dry-run', dest='dry_run',
action='store_true', default=False,
help='do not change anything')
subparser.add_argument('-s', '--stats', dest='stats',
action='store_true', default=False,
help='print statistics at end')
subparser.add_argument('-e', '--exclude', dest='excludes',
type=parse_pattern, action='append',
metavar="PATTERN", help='exclude paths matching PATTERN')
subparser.add_argument('--exclude-from', dest='exclude_files',
type=argparse.FileType('r'), action='append',
metavar='EXCLUDEFILE', help='read exclude patterns from EXCLUDEFILE, one per line')
subparser.add_argument('--exclude-caches', dest='exclude_caches',
action='store_true', default=False,
help='exclude directories that contain a CACHEDIR.TAG file ('
'http://www.brynosaurus.com/cachedir/spec.html)')
subparser.add_argument('--exclude-if-present', dest='exclude_if_present',
metavar='FILENAME', action='append', type=str,
help='exclude directories that contain the specified file')
subparser.add_argument('--keep-tag-files', dest='keep_tag_files',
action='store_true', default=False,
help='keep tag files of excluded caches/directories')
subparser.add_argument('-C', '--compression', dest='compression',
type=CompressionSpec, default=None, metavar='COMPRESSION',
help='select compression algorithm (and level): '
'none == no compression (default), '
'lz4 == lz4, '
'zlib == zlib (default level 6), '
'zlib,0 .. zlib,9 == zlib (with level 0..9), '
'lzma == lzma (default level 6), '
'lzma,0 .. lzma,9 == lzma (with level 0..9).')
subparser.add_argument('--timestamp', dest='timestamp',
type=timestamp, default=None,
metavar='yyyy-mm-ddThh:mm:ss',
help='manually specify the archive creation date/time (UTC). '
'alternatively, give a reference file/directory.')
subparser.add_argument('--chunker-params', dest='chunker_params',
type=ChunkerParams, default=None,
metavar='CHUNK_MIN_EXP,CHUNK_MAX_EXP,HASH_MASK_BITS,HASH_WINDOW_SIZE',
help='specify the chunker parameters (or "default").')
subparser.add_argument('location', metavar='REPOSITORY_OR_ARCHIVE', nargs='?', default='',
type=location_validator(),
help='repository/archive to rewrite')
subparser.add_argument('paths', metavar='PATH', nargs='*', type=str,
help='paths to rewrite; patterns are supported')

subparser = subparsers.add_parser('help', parents=[common_parser],
description='Extra help')
subparser.add_argument('--epilog-only', dest='epilog_only',
Expand Down
20 changes: 11 additions & 9 deletions borg/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,27 +358,29 @@ def legacy_cleanup():
self.do_cache = os.path.isdir(archive_path)
self.chunks = create_master_idx(self.chunks)

def add_chunk(self, id, data, stats):
def add_chunk(self, id, data, stats, check_csize=False):
if not self.txn_active:
self.begin_txn()
size = len(data)
if self.seen_chunk(id, size):
count, _, stored_csize = self.get_chunk(id, size)
if count and not check_csize:
return self.chunk_incref(id, stats)
data = self.key.encrypt(data)
csize = len(data)
self.repository.put(id, data, wait=False)
self.chunks[id] = (1, size, csize)
if csize != stored_csize:
self.repository.put(id, data, wait=False)
self.chunks[id] = (count + 1, size, csize)
stats.update(size, csize, True)
return id, size, csize

def seen_chunk(self, id, size=None):
refcount, stored_size, _ = self.chunks.get(id, (0, None, None))
def get_chunk(self, id, size=None):
refcount, stored_size, stored_csize = self.chunks.get(id, (0, None, None))
if size is not None and stored_size is not None and size != stored_size:
# we already have a chunk with that id, but different size.
# this is either a hash collision (unlikely) or corruption or a bug.
raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" % (
id, stored_size, size))
return refcount
raise Exception("chunk has same id [%r], but different size (stored: %d new: %d)!" %
(id, stored_size, size))
return refcount, stored_size, stored_csize

def chunk_incref(self, id, stats):
if not self.txn_active:
Expand Down
14 changes: 9 additions & 5 deletions borg/compress.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,21 @@ class Compressor:
self.params = kwargs
self.compressor = get_compressor(name, **self.params)

def compress(self, data):
return self.compressor.compress(data)

def decompress(self, data):
@staticmethod
def detect(data):
hdr = bytes(data[:2]) # detect() does not work with memoryview
for cls in COMPRESSOR_LIST:
if cls.detect(hdr):
return cls(**self.params).decompress(data)
return cls
else:
raise ValueError('No decompressor for this data found: %r.', data[:2])

def compress(self, data):
return self.compressor.compress(data)

def decompress(self, data):
return self.detect(data)(**self.params).decompress(data)


# a buffer used for (de)compression result, which can be slightly bigger
# than the chunk buffer in the worst (incompressible data) case, add 10%:
Expand Down
50 changes: 50 additions & 0 deletions borg/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import unicodedata

import logging

from .logger import create_logger
logger = create_logger()

Expand All @@ -40,6 +41,8 @@
EXIT_WARNING = 1 # reached normal end of operation, but there were issues
EXIT_ERROR = 2 # terminated abruptly, did not reach end of operation

DASHES = '-' * 78


class Error(Exception):
"""Error base class"""
Expand Down Expand Up @@ -491,6 +494,9 @@ def timestamp(s):


def ChunkerParams(s):
if s.strip().lower() == "default":
from .archive import CHUNKER_PARAMS
return CHUNKER_PARAMS
chunk_min, chunk_max, chunk_mask, window_size = s.split(',')
if int(chunk_max) > 23:
# do not go beyond 2**23 (8MB) chunk size now,
Expand Down Expand Up @@ -1268,3 +1274,47 @@ def format_time(self, key, item):

def time(self, key, item):
return safe_timestamp(item.get(key) or item[b'mtime'])


class ChunkIteratorFileWrapper:
"""File-like wrapper for chunk iterators"""

def __init__(self, chunk_iterator):
self.chunk_iterator = chunk_iterator
self.chunk_offset = 0
self.chunk = b''
self.exhausted = False

def _refill(self):
remaining = len(self.chunk) - self.chunk_offset
if not remaining:
try:
self.chunk = memoryview(next(self.chunk_iterator))
except StopIteration:
self.exhausted = True
return 0 # EOF
self.chunk_offset = 0
remaining = len(self.chunk)
return remaining

def _read(self, nbytes):
if not nbytes:
return b''
remaining = self._refill()
will_read = min(remaining, nbytes)
self.chunk_offset += will_read
return self.chunk[self.chunk_offset - will_read:self.chunk_offset]

def read(self, nbytes):
parts = []
while nbytes and not self.exhausted:
read_data = self._read(nbytes)
nbytes -= len(read_data)
parts.append(read_data)
return b''.join(parts)


def open_item(archive, item):
"""Return file-like object for archived item (with chunks)."""
chunk_iterator = archive.pipeline.fetch_many([c[0] for c in item[b'chunks']])
return ChunkIteratorFileWrapper(chunk_iterator)
13 changes: 11 additions & 2 deletions borg/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ def id_hash(self, data):
def encrypt(self, data):
pass

def decrypt(self, id, data):
def decrypt(self, id, data, decompress=True):
pass

def assert_chunk_id(self, id, data):
"""raise IntegrityError if id doesn't match data"""


class PlaintextKey(KeyBase):
TYPE = 0x02
Expand All @@ -99,14 +102,20 @@ def id_hash(self, data):
def encrypt(self, data):
return b''.join([self.TYPE_STR, self.compressor.compress(data)])

def decrypt(self, id, data):
def decrypt(self, id, data, decompress=True):
if data[0] != self.TYPE:
raise IntegrityError('Invalid encryption envelope')
if not decompress:
return memoryview(data)[1:]
data = self.compressor.decompress(memoryview(data)[1:])
if id and sha256(data).digest() != id:
raise IntegrityError('Chunk id verification failed')
return data

def assert_chunk_id(self, id, data):
if sha256(data).digest() != id:
raise IntegrityError('Chunk id verification failed')


class AESKeyBase(KeyBase):
"""Common base class shared by KeyfileKey and PassphraseKey
Expand Down
Loading

0 comments on commit 622e002

Please sign in to comment.