Skip to content

Commit

Permalink
smp server: expire messages in postgres database
Browse files Browse the repository at this point in the history
  • Loading branch information
epoberezkin committed Feb 26, 2025
1 parent 80a070a commit bbb1d98
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 151 deletions.
14 changes: 7 additions & 7 deletions src/Simplex/Messaging/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
old <- expireBeforeEpoch expCfg
now <- systemSeconds <$> getSystemTime
msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <-
withActiveMsgQueues ms $ expireQueueMsgs now ms old
withAllMsgQueues ms $ expireQueueMsgs now ms old
atomicWriteIORef (msgCount stats) stored
atomicModifyIORef'_ (msgExpired stats) (+ expired)
printMessageStats "STORE: messages" msgStats
Expand Down Expand Up @@ -1794,15 +1794,15 @@ randomId = fmap EntityId . randomId'
saveServerMessages :: Bool -> AMsgStore -> IO ()
saveServerMessages drainMsgs = \case
AMS SQSMemory SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of
Just f -> exportMessages False ms f drainMsgs
Just f -> exportMessages ms f drainMsgs
Nothing -> logInfo "undelivered messages are not saved"
AMS _ SMSJournal _ -> logInfo "closed journal message storage"

exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO ()
exportMessages tty ms f drainMsgs = do
exportMessages :: MsgStoreClass s => s -> FilePath -> Bool -> IO ()
exportMessages ms f drainMsgs = do
logInfo $ "saving messages to file " <> T.pack f
liftIO $ withFile f WriteMode $ \h ->
tryAny (withAllMsgQueues tty ms $ saveQueueMsgs h) >>= \case
tryAny (withActiveMsgQueues ms $ saveQueueMsgs h) >>= \case
Right (Sum total) -> logInfo $ "messages saved: " <> tshow total
Left e -> do
logError $ "error exporting messages: " <> tshow e
Expand Down Expand Up @@ -1834,10 +1834,10 @@ processServerMessages StartOptions {skipWarnings} = do
| expire = Just <$> case old_ of
Just old -> do
logInfo "expiring journal store messages..."
withAllMsgQueues False ms $ processExpireQueue old
withAllMsgQueues ms $ processExpireQueue old
Nothing -> do
logInfo "validating journal store messages..."
withAllMsgQueues False ms $ processValidateQueue
withAllMsgQueues ms $ processValidateQueue
| otherwise = logWarn "skipping message expiration" $> Nothing
where
processExpireQueue :: Int64 -> JournalQueue s -> IO MessageStats
Expand Down
8 changes: 4 additions & 4 deletions src/Simplex/Messaging/Server/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
"Journal not exported"
ms <- newJournalMsgStore MQStoreCfg
readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms
exportMessages True ms storeMsgsFilePath False
exportMessages ms storeMsgsFilePath False
putStrLn "Export completed"
putStrLn $ case readStoreType ini of
Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, start the server."
Expand Down Expand Up @@ -166,9 +166,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
readQueueStore True (mkQueue ms) storeLogFile (queueStore ms)
queues <- readTVarIO $ loadedQueues $ stmQueueStore ms
ps <- newJournalMsgStore $ PQStoreCfg dbOpts {createSchema = True} MCConsole
(qCnt, nCnt) <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
renameFile storeLogFile $ storeLogFile <> ".bak"
putStrLn $ "Import completed: " <> show qCnt <> " queues, " <> show nCnt <> " notifiers"
putStrLn $ "Import completed: " <> show qCnt <> " queues"
putStrLn $ case readStoreType ini of
Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`.\nImport messages to journal to use PostgreSQL database for queues (`smp-server journal import`)"
Right (ASType SQSMemory SMSJournal) -> "store_queues set to `memory`, update it to `database` in INI file"
Expand All @@ -188,7 +188,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
"Queue records not exported"
ps <- newJournalMsgStore $ PQStoreCfg dbOpts MCConsole
sl <- openWriteStoreLog storeLogFilePath
Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \rId qr -> logCreateQueue sl rId qr $> Sum (1 :: Int)
Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
putStrLn $ "Export completed: " <> show qCnt <> " queues"
putStrLn $ case readStoreType ini of
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file."
Expand Down
59 changes: 8 additions & 51 deletions src/Simplex/Messaging/Server/MsgStore/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate, sort)
import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe)
import Data.Maybe (fromMaybe, isNothing, mapMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
Expand All @@ -81,9 +81,8 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>))
import System.Directory
import System.Exit
import System.FilePath (takeFileName, (</>))
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout)
import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..))
import qualified System.IO as IO
import System.Random (StdGen, genByteString, newStdGen)

Expand Down Expand Up @@ -341,54 +340,12 @@ instance MsgStoreClass (JournalMsgStore s) where
closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue

withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withActiveMsgQueues ms f = case queueStore_ ms of
MQStore st -> withLoadedQueues st f
PQStore st -> withLoadedQueues st f

-- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result.
-- It is used to export storage to a single file and also to expire messages and validate all queues when server is started.
-- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID.
-- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name.
-- TODO [postgres] this should simply load all known queues and process them
withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty)
where
processStore = do
(!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath)
putStrLn $ progress count
pure res
JournalStoreConfig {storePath, pathParts} = config
processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a)
processQueue (!i, !r) (queueId, dir) = do
when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout
r' <- case strDecode $ B.pack queueId of
Right rId ->
getQueue ms SRecipient rId >>= \case
Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q
Left AUTH -> do
logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir
removeQueueDirectory_ dir
pure mempty
Left e -> do
logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e
exitFailure
Left e -> do
logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e
exitFailure
pure (i + 1, r <> r')
progress i = "Processed: " <> show i <> " queues"
foldQueues depth f acc (queueId, path) = do
let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f
listDirs >>= foldM f' acc
where
listDirs = fmap catMaybes . mapM queuePath =<< listDirectory path
queuePath dir = do
let !path' = path </> dir
!queueId' = queueId <> dir
ifM
(doesDirectoryExist path')
(pure $ Just (queueId', path'))
(Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping"))
withActiveMsgQueues = withQS withLoadedQueues . queueStore_

withAllMsgQueues :: forall a. Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a
withAllMsgQueues ms action = case queueStore_ ms of
MQStore st -> withLoadedQueues st action
PQStore st -> foldQueues False st (mkQueue ms) action

logQueueStates :: JournalMsgStore s -> IO ()
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server/MsgStore/STM.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ instance MsgStoreClass STMMsgStore where

withActiveMsgQueues = withLoadedQueues . queueStore_
{-# INLINE withActiveMsgQueues #-}
withAllMsgQueues _ = withLoadedQueues . queueStore_
withAllMsgQueues = withLoadedQueues . queueStore_
{-# INLINE withAllMsgQueues #-}
logQueueStates _ = pure ()
{-# INLINE logQueueStates #-}
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Server/MsgStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
newMsgStore :: MsgStoreConfig s -> IO s
closeMsgStore :: s -> IO ()
withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
withAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
withAllMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueStore :: s -> QueueStore s
Expand Down
Loading

0 comments on commit bbb1d98

Please sign in to comment.