Skip to content

Commit

Permalink
[archive] implement support for PostgreSQL databases (#6152)
Browse files Browse the repository at this point in the history
  • Loading branch information
mikf committed Feb 16, 2025
1 parent b4eae65 commit 841bc9f
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 27 deletions.
11 changes: 9 additions & 2 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -922,11 +922,13 @@ Description
extractor.*.archive
-------------------
Type
|Path|_
* ``string``
* |Path|_
Default
``null``
Example
``"$HOME/.archives/{category}.sqlite3"``
* ``"$HOME/.archives/{category}.sqlite3"``
* ``"postgresql://user:pass@host/database"``
Description
File to store IDs of downloaded files in. Downloads of files
already recorded in this archive file will be
Expand All @@ -937,6 +939,11 @@ Description
memory requirements are significantly lower when the
amount of stored IDs gets reasonably large.

If this value is a
`PostgreSQL Connection URI <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS>`__,
the archive will use this PostgreSQL database as backend (requires
`Psycopg <https://www.psycopg.org/>`__).

Note: Archive files that do not already exist get generated automatically.

Note: Archive paths support regular `format string`_ replacements,
Expand Down
161 changes: 147 additions & 14 deletions gallery_dl/archive.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

# Copyright 2024 Mike Fährmann
# Copyright 2024-2025 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
Expand All @@ -9,26 +9,55 @@
"""Download Archives"""

import os
import sqlite3
from . import formatter
import logging
from . import util, formatter

log = logging.getLogger("archive")


def connect(path, prefix, format, mode=None, pragma=None, kwdict=None):
keygen = formatter.parse(prefix + format).format_map

if path.startswith(("postgres://", "postgresql://")):
if mode == "memory":
cls = DownloadArchivePostgresqlMemory
else:
cls = DownloadArchivePostgresql
else:
path = util.expand_path(path)
if kwdict is not None and "{" in path:
path = formatter.parse(path).format_map(kwdict)
if mode == "memory":
cls = DownloadArchiveMemory
else:
cls = DownloadArchive

return cls(path, keygen, pragma)


class DownloadArchive():
_sqlite3 = None

def __init__(self, path, keygen, pragma=None, cache_key=None):

if self._sqlite3 is None:
import sqlite3
DownloadArchive._sqlite3 = sqlite3

def __init__(self, path, format_string, pragma=None,
cache_key="_archive_key"):
try:
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
except sqlite3.OperationalError:
os.makedirs(os.path.dirname(path))
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
con = self._sqlite3.connect(
path, timeout=60, check_same_thread=False)
con.isolation_level = None

self.keygen = formatter.parse(format_string).format_map
self.keygen = keygen
self.connection = con
self.close = con.close
self.cursor = cursor = con.cursor()
self._cache_key = cache_key
self._cache_key = cache_key or "_archive_key"

if pragma:
for stmt in pragma:
Expand All @@ -37,7 +66,7 @@ def __init__(self, path, format_string, pragma=None,
try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
except sqlite3.OperationalError:
except self._sqlite3.OperationalError:
# fallback for missing WITHOUT ROWID support (#553)
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)")
Expand All @@ -61,9 +90,9 @@ def finalize(self):

class DownloadArchiveMemory(DownloadArchive):

def __init__(self, path, format_string, pragma=None,
cache_key="_archive_key"):
DownloadArchive.__init__(self, path, format_string, pragma, cache_key)
def __init__(self, path, keygen, pragma=None, cache_key=None):
DownloadArchive.__init__(
self, path, keygen, pragma, cache_key)
self.keys = set()

def add(self, kwdict):
Expand All @@ -87,7 +116,7 @@ def finalize(self):
with self.connection:
try:
cursor.execute("BEGIN")
except sqlite3.OperationalError:
except self._sqlite3.OperationalError:
pass

stmt = "INSERT OR IGNORE INTO archive (entry) VALUES (?)"
Expand All @@ -96,3 +125,107 @@ def finalize(self):
cursor.execute(stmt, (key,))
else:
cursor.executemany(stmt, ((key,) for key in self.keys))


class DownloadArchivePostgresql():
_psycopg = None

def __init__(self, uri, keygen, pragma=None, cache_key=None):
if self._psycopg is None:
import psycopg
DownloadArchivePostgresql._psycopg = psycopg

self.connection = con = self._psycopg.connect(uri)
self.cursor = cursor = con.cursor()
self.close = con.close
self.keygen = keygen
self._cache_key = cache_key or "_archive_key"

try:
cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)")
con.commit()
except Exception as exc:
log.error("%s: %s when creating 'archive' table: %s",
con, exc.__class__.__name__, exc)
con.rollback()
raise

def add(self, kwdict):
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
try:
self.cursor.execute(
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
(key,))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()

def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
try:
self.cursor.execute(
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False

def finalize(self):
pass


class DownloadArchivePostgresqlMemory(DownloadArchivePostgresql):

def __init__(self, path, keygen, pragma=None, cache_key=None):
DownloadArchivePostgresql.__init__(
self, path, keygen, pragma, cache_key)
self.keys = set()

def add(self, kwdict):
self.keys.add(
kwdict.get(self._cache_key) or
self.keygen(kwdict))

def check(self, kwdict):
key = kwdict[self._cache_key] = self.keygen(kwdict)
if key in self.keys:
return True
try:
self.cursor.execute(
"SELECT true "
"FROM archive "
"WHERE entry=%s "
"LIMIT 1",
(key,))
return self.cursor.fetchone()
except Exception as exc:
log.error("%s: %s when checking entry: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
return False

def finalize(self):
if not self.keys:
return
try:
self.cursor.executemany(
"INSERT INTO archive (entry) "
"VALUES (%s) "
"ON CONFLICT DO NOTHING",
((key,) for key in self.keys))
self.connection.commit()
except Exception as exc:
log.error("%s: %s when writing entries: %s",
self.connection, exc.__class__.__name__, exc)
self.connection.rollback()
15 changes: 4 additions & 11 deletions gallery_dl/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,6 @@ def initialize(self, kwdict=None):

archive_path = cfg("archive")
if archive_path:
archive_path = util.expand_path(archive_path)

archive_prefix = cfg("archive-prefix")
if archive_prefix is None:
archive_prefix = extr.category
Expand All @@ -562,16 +560,11 @@ def initialize(self, kwdict=None):
archive_format = extr.archive_fmt

try:
if "{" in archive_path:
archive_path = formatter.parse(
archive_path).format_map(kwdict)
if cfg("archive-mode") == "memory":
archive_cls = archive.DownloadArchiveMemory
else:
archive_cls = archive.DownloadArchive
self.archive = archive_cls(
self.archive = archive.connect(
archive_path,
archive_prefix + archive_format,
archive_prefix,
archive_format,
cfg("archive-mode"),
cfg("archive-pragma"),
)
except Exception as exc:
Expand Down

0 comments on commit 841bc9f

Please sign in to comment.