Skip to content

Commit

Permalink
smp server: PostgreSQL queue store (#1448)
Browse files Browse the repository at this point in the history
* smp server: queue store typeclass

* parameterize JournalMsgStore

* typeclass for queue store

* postgres WIP

* compiles, passes tests

* remove StoreType

* split migrations

* progress

* addQueueRec

* reduce type spaghetti

* remove addQueue from typeclass definition

* getQueue

* test postgres storage in SMP server

* fix schema

* comment

* import queues to postgresql

* import queues to postgresql

* log

* fix test

* counts

* ci: test smp server with postgres backend (#1463)

* ci: test smp server with postgres backend

* postgres service

* attempt

* attempt

* empty

* empty

* PGHOST attempt

* PGHOST + softlink attempt

* only softlink attempt

* working attempt (PGHOST)

* remove env var

* empty

* do not start server without DB schema, do not import when schema exists

* export database

* enable all tests, disable two tests

* option for migration confirmation

* comments

---------

Co-authored-by: spaced4ndy <[email protected]>
  • Loading branch information
epoberezkin and spaced4ndy authored Feb 24, 2025
1 parent f9d7b1e commit 4dc40bd
Show file tree
Hide file tree
Showing 42 changed files with 1,774 additions and 836 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@ jobs:
build:
name: build-${{ matrix.os }}-${{ matrix.ghc }}
runs-on: ${{ matrix.os }}

services:
postgres:
image: postgres:15
env:
POSTGRES_HOST_AUTH_METHOD: trust # Allows passwordless access
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
# Maps tcp port 5432 on service container to the host
- 5432:5432

strategy:
fail-fast: false
matrix:
Expand Down Expand Up @@ -52,6 +67,8 @@ jobs:
timeout-minutes: 40
shell: bash
run: cabal test --test-show-details=direct
env:
PGHOST: localhost

- name: Prepare binaries
if: startsWith(github.ref, 'refs/tags/v')
Expand Down
25 changes: 14 additions & 11 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,9 @@ library
Simplex.RemoteControl.Types
if flag(client_postgres)
exposed-modules:
Simplex.Messaging.Agent.Store.Postgres
Simplex.Messaging.Agent.Store.Postgres.Common
Simplex.Messaging.Agent.Store.Postgres.DB
Simplex.Messaging.Agent.Store.Postgres.Migrations
Simplex.Messaging.Agent.Store.Postgres.Migrations.App
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20241210_initial
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250203_msg_bodies
if !flag(client_library)
exposed-modules:
Simplex.Messaging.Agent.Store.Postgres.Util
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -213,6 +206,11 @@ library
Simplex.FileTransfer.Server.Stats
Simplex.FileTransfer.Server.Store
Simplex.FileTransfer.Server.StoreLog
Simplex.Messaging.Agent.Store.Postgres
Simplex.Messaging.Agent.Store.Postgres.Common
Simplex.Messaging.Agent.Store.Postgres.DB
Simplex.Messaging.Agent.Store.Postgres.Migrations
Simplex.Messaging.Agent.Store.Postgres.Util
Simplex.Messaging.Notifications.Server
Simplex.Messaging.Notifications.Server.Control
Simplex.Messaging.Notifications.Server.Env
Expand All @@ -236,8 +234,12 @@ library
Simplex.Messaging.Server.Prometheus
Simplex.Messaging.Server.QueueStore
Simplex.Messaging.Server.QueueStore.STM
Simplex.Messaging.Server.QueueStore.Postgres
Simplex.Messaging.Server.QueueStore.Postgres.Migrations
Simplex.Messaging.Server.QueueStore.Types
Simplex.Messaging.Server.Stats
Simplex.Messaging.Server.StoreLog
Simplex.Messaging.Server.StoreLog.ReadWrite
Simplex.Messaging.Server.StoreLog.Types
Simplex.Messaging.Transport.WebSockets
other-modules:
Expand Down Expand Up @@ -304,15 +306,16 @@ library
case-insensitive ==1.2.*
, hashable ==1.4.*
, ini ==0.4.1
, postgresql-simple ==0.7.*
, optparse-applicative >=0.15 && <0.17
, process ==1.6.*
, raw-strings-qq ==1.1.*
, temporary ==1.3.*
, websockets ==0.12.*
if flag(client_postgres)
if flag(client_postgres) || !flag(client_library)
build-depends:
postgresql-libpq >=0.10.0.0
, postgresql-simple ==0.7.*
, raw-strings-qq ==1.1.*
if flag(client_postgres)
cpp-options: -DdbPostgres
else
build-depends:
Expand Down Expand Up @@ -490,6 +493,7 @@ test-suite simplexmq-test
, memory
, mtl
, network
, postgresql-simple ==0.7.*
, process
, QuickCheck ==2.14.*
, random
Expand All @@ -513,7 +517,6 @@ test-suite simplexmq-test
if flag(client_postgres)
build-depends:
postgresql-libpq >=0.10.0.0
, postgresql-simple ==0.7.*
, raw-strings-qq ==1.1.*
cpp-options: -DdbPostgres
else
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/FileTransfer/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers
import Simplex.Messaging.Protocol (XFTPServer)
import System.FilePath ((</>))
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..))
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..), fromTextField_)

type RcvFileId = ByteString -- Agent entity ID

Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ import Data.Time.Clock.System (SystemTime)
import Data.Type.Equality
import Data.Typeable ()
import Data.Word (Word16, Word32)
import Simplex.Messaging.Agent.Store.DB (Binary (..), FromField (..), ToField (..))
import Simplex.Messaging.Agent.Store.DB (Binary (..), FromField (..), ToField (..), blobFieldDecoder, fromTextField_)
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..))
import Simplex.FileTransfer.Transport (XFTPErrorType)
Expand Down Expand Up @@ -1016,7 +1016,7 @@ instance Encoding AMessage where

instance ToField AMessage where toField = toField . Binary . smpEncode

instance FromField AMessage where fromField = blobFieldParser smpP
instance FromField AMessage where fromField = blobFieldDecoder smpDecode

instance Encoding AMessageReceipt where
smpEncode AMessageReceipt {agentMsgId, msgHash, rcptInfo} =
Expand Down
4 changes: 2 additions & 2 deletions src/Simplex/Messaging/Agent/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import Data.Int (Int64)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Simplex.Messaging.Agent.Protocol (UserId)
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..))
import Simplex.Messaging.Parsers (defaultJSON, fromTextField_)
import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..), fromTextField_)
import Simplex.Messaging.Parsers (defaultJSON)
import Simplex.Messaging.Protocol (NtfServer, SMPServer, XFTPServer)
import Simplex.Messaging.Util (decodeJSON, encodeJSON)
import UnliftIO.STM
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import Data.Type.Equality
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.Store.Common
import Simplex.Messaging.Agent.Store.Interface (DBOpts, createDBStore)
import Simplex.Messaging.Agent.Store.Interface (createDBStore)
import Simplex.Messaging.Agent.Store.Migrations.App (appMigrations)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..), MigrationError (..))
import qualified Simplex.Messaging.Crypto as C
Expand Down
39 changes: 13 additions & 26 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first, second)
import Data.Bifunctor (first)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as U
import qualified Data.ByteString.Char8 as B
Expand All @@ -247,7 +247,7 @@ import Data.List (foldl', sortBy)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, listToMaybe)
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
import Data.Ord (Down (..))
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
Expand All @@ -263,7 +263,7 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.Common
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..), FromField (..), ToField (..))
import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..), FromField (..), ToField (..), blobFieldDecoder, fromTextField_)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
import Simplex.Messaging.Crypto.Ratchet (PQEncryption (..), PQSupport (..), RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
Expand All @@ -272,11 +272,11 @@ import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfSubscriptionId, NtfTknStatus (..), NtfTokenId, SMPQueueNtf (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_)
import Simplex.Messaging.Parsers (parseAll)
import Simplex.Messaging.Protocol
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, tshow, ($>>=), (<$$>))
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, firstRow, firstRow', ifM, maybeFirstRow, tshow, ($>>=), (<$$>))
import Simplex.Messaging.Version.Internal
import qualified UnliftIO.Exception as E
import UnliftIO.STM
Expand Down Expand Up @@ -1743,23 +1743,23 @@ deriving newtype instance FromField InternalId

instance ToField AgentMessageType where toField = toField . Binary . smpEncode

instance FromField AgentMessageType where fromField = blobFieldParser smpP
instance FromField AgentMessageType where fromField = blobFieldDecoder smpDecode

instance ToField MsgIntegrity where toField = toField . Binary . strEncode

instance FromField MsgIntegrity where fromField = blobFieldParser strP
instance FromField MsgIntegrity where fromField = blobFieldDecoder strDecode

instance ToField SMPQueueUri where toField = toField . Binary . strEncode

instance FromField SMPQueueUri where fromField = blobFieldParser strP
instance FromField SMPQueueUri where fromField = blobFieldDecoder strDecode

instance ToField AConnectionRequestUri where toField = toField . Binary . strEncode

instance FromField AConnectionRequestUri where fromField = blobFieldParser strP
instance FromField AConnectionRequestUri where fromField = blobFieldDecoder strDecode

instance ConnectionModeI c => ToField (ConnectionRequestUri c) where toField = toField . Binary . strEncode

instance (E.Typeable c, ConnectionModeI c) => FromField (ConnectionRequestUri c) where fromField = blobFieldParser strP
instance (E.Typeable c, ConnectionModeI c) => FromField (ConnectionRequestUri c) where fromField = blobFieldDecoder strDecode

instance ToField ConnectionMode where toField = toField . decodeLatin1 . strEncode

Expand All @@ -1775,19 +1775,19 @@ instance FromField MsgFlags where fromField = fromTextField_ $ eitherToMaybe . s

instance ToField [SMPQueueInfo] where toField = toField . Binary . smpEncodeList

instance FromField [SMPQueueInfo] where fromField = blobFieldParser smpListP
instance FromField [SMPQueueInfo] where fromField = blobFieldDecoder $ parseAll smpListP

instance ToField (NonEmpty TransportHost) where toField = toField . decodeLatin1 . strEncode

instance FromField (NonEmpty TransportHost) where fromField = fromTextField_ $ eitherToMaybe . strDecode . encodeUtf8

instance ToField AgentCommand where toField = toField . Binary . strEncode

instance FromField AgentCommand where fromField = blobFieldParser strP
instance FromField AgentCommand where fromField = blobFieldDecoder strDecode

instance ToField AgentCommandTag where toField = toField . Binary . strEncode

instance FromField AgentCommandTag where fromField = blobFieldParser strP
instance FromField AgentCommandTag where fromField = blobFieldDecoder strDecode

instance ToField MsgReceiptStatus where toField = toField . decodeLatin1 . strEncode

Expand All @@ -1805,23 +1805,10 @@ deriving newtype instance ToField ChunkReplicaId

deriving newtype instance FromField ChunkReplicaId

listToEither :: e -> [a] -> Either e a
listToEither _ (x : _) = Right x
listToEither e _ = Left e

firstRow :: (a -> b) -> e -> IO [a] -> IO (Either e b)
firstRow f e a = second f . listToEither e <$> a

maybeFirstRow :: Functor f => (a -> b) -> f [a] -> f (Maybe b)
maybeFirstRow f q = fmap f . listToMaybe <$> q

fromOnlyBI :: Only BoolInt -> Bool
fromOnlyBI (Only (BI b)) = b
{-# INLINE fromOnlyBI #-}

firstRow' :: (a -> Either e b) -> e -> IO [a] -> IO (Either e b)
firstRow' f e a = (f <=< listToEither e) <$> a

#if !defined(dbPostgres)
{- ORMOLU_DISABLE -}
-- SQLite.Simple only has these up to 10 fields, which is insufficient for some of our queries
Expand Down
1 change: 0 additions & 1 deletion src/Simplex/Messaging/Agent/Store/DB.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ import Simplex.Messaging.Agent.Store.Postgres.DB
where
import Simplex.Messaging.Agent.Store.SQLite.DB
#endif

Loading

0 comments on commit 4dc40bd

Please sign in to comment.