diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index b0edfa2dd..63b568b1e 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -399,7 +399,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt old <- expireBeforeEpoch expCfg now <- systemSeconds <$> getSystemTime msgStats@MessageStats {storedMsgsCount = stored, expiredMsgsCount = expired} <- - withActiveMsgQueues ms $ expireQueueMsgs now ms old + withAllMsgQueues ms $ expireQueueMsgs now ms old atomicWriteIORef (msgCount stats) stored atomicModifyIORef'_ (msgExpired stats) (+ expired) printMessageStats "STORE: messages" msgStats @@ -1794,15 +1794,15 @@ randomId = fmap EntityId . randomId' saveServerMessages :: Bool -> AMsgStore -> IO () saveServerMessages drainMsgs = \case AMS SQSMemory SMSMemory ms@STMMsgStore {storeConfig = STMStoreConfig {storePath}} -> case storePath of - Just f -> exportMessages False ms f drainMsgs + Just f -> exportMessages ms f drainMsgs Nothing -> logInfo "undelivered messages are not saved" AMS _ SMSJournal _ -> logInfo "closed journal message storage" -exportMessages :: MsgStoreClass s => Bool -> s -> FilePath -> Bool -> IO () -exportMessages tty ms f drainMsgs = do +exportMessages :: MsgStoreClass s => s -> FilePath -> Bool -> IO () +exportMessages ms f drainMsgs = do logInfo $ "saving messages to file " <> T.pack f liftIO $ withFile f WriteMode $ \h -> - tryAny (withAllMsgQueues tty ms $ saveQueueMsgs h) >>= \case + tryAny (withActiveMsgQueues ms $ saveQueueMsgs h) >>= \case Right (Sum total) -> logInfo $ "messages saved: " <> tshow total Left e -> do logError $ "error exporting messages: " <> tshow e @@ -1834,10 +1834,10 @@ processServerMessages StartOptions {skipWarnings} = do | expire = Just <$> case old_ of Just old -> do logInfo "expiring journal store messages..." - withAllMsgQueues False ms $ processExpireQueue old + withAllMsgQueues ms $ processExpireQueue old Nothing -> do logInfo "validating journal store messages..." - withAllMsgQueues False ms $ processValidateQueue + withAllMsgQueues ms $ processValidateQueue | otherwise = logWarn "skipping message expiration" $> Nothing where processExpireQueue :: Int64 -> JournalQueue s -> IO MessageStats diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 388a6a069..0033aeccd 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -128,7 +128,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = "Journal not exported" ms <- newJournalMsgStore MQStoreCfg readQueueStore True (mkQueue ms) storeLogFile $ stmQueueStore ms - exportMessages True ms storeMsgsFilePath False + exportMessages ms storeMsgsFilePath False putStrLn "Export completed" putStrLn $ case readStoreType ini of Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`, start the server." @@ -166,9 +166,9 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = readQueueStore True (mkQueue ms) storeLogFile (queueStore ms) queues <- readTVarIO $ loadedQueues $ stmQueueStore ms ps <- newJournalMsgStore $ PQStoreCfg dbOpts {createSchema = True} MCConsole - (qCnt, nCnt) <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps + qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps renameFile storeLogFile $ storeLogFile <> ".bak" - putStrLn $ "Import completed: " <> show qCnt <> " queues, " <> show nCnt <> " notifiers" + putStrLn $ "Import completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSMemory SMSMemory) -> "store_messages set to `memory`.\nImport messages to journal to use PostgreSQL database for queues (`smp-server journal import`)" Right (ASType SQSMemory SMSJournal) -> "store_queues set to `memory`, update it to `database` in INI file" @@ -188,7 +188,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath = "Queue records not exported" ps <- newJournalMsgStore $ PQStoreCfg dbOpts MCConsole sl <- openWriteStoreLog storeLogFilePath - Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \rId qr -> logCreateQueue sl rId qr $> Sum (1 :: Int) + Sum qCnt <- foldQueueRecs True (postgresQueueStore ps) $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int) putStrLn $ "Export completed: " <> show qCnt <> " queues" putStrLn $ case readStoreType ini of Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file." diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index 32fefb386..f99dc61d5 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -58,7 +58,7 @@ import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) import Data.List (intercalate, sort) -import Data.Maybe (catMaybes, fromMaybe, isNothing, mapMaybe) +import Data.Maybe (fromMaybe, isNothing, mapMaybe) import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime) @@ -81,9 +81,8 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.Util (ifM, tshow, whenM, ($>>=), (<$$>)) import System.Directory -import System.Exit import System.FilePath (takeFileName, ()) -import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..), stdout) +import System.IO (BufferMode (..), Handle, IOMode (..), SeekMode (..)) import qualified System.IO as IO import System.Random (StdGen, genByteString, newStdGen) @@ -341,54 +340,12 @@ instance MsgStoreClass (JournalMsgStore s) where closeQueues qs = readTVarIO qs >>= mapM_ closeMsgQueue withActiveMsgQueues :: Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a - withActiveMsgQueues ms f = case queueStore_ ms of - MQStore st -> withLoadedQueues st f - PQStore st -> withLoadedQueues st f - - -- This function is a "foldr" that opens and closes all queues, processes them as defined by action and accumulates the result. - -- It is used to export storage to a single file and also to expire messages and validate all queues when server is started. - -- TODO this function requires case-sensitive file system, because it uses queue directory as recipient ID. - -- It can be made to support case-insensite FS by supporting more than one queue per directory, by getting recipient ID from state file name. - -- TODO [postgres] this should simply load all known queues and process them - withAllMsgQueues :: forall a. Monoid a => Bool -> JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a - withAllMsgQueues tty ms@JournalMsgStore {config} action = ifM (doesDirectoryExist storePath) processStore (pure mempty) - where - processStore = do - (!count, !res) <- foldQueues 0 processQueue (0, mempty) ("", storePath) - putStrLn $ progress count - pure res - JournalStoreConfig {storePath, pathParts} = config - processQueue :: (Int, a) -> (String, FilePath) -> IO (Int, a) - processQueue (!i, !r) (queueId, dir) = do - when (tty && i `mod` 100 == 0) $ putStr (progress i <> "\r") >> IO.hFlush stdout - r' <- case strDecode $ B.pack queueId of - Right rId -> - getQueue ms SRecipient rId >>= \case - Right q -> unStoreIO (getMsgQueue ms q False) *> action q <* closeMsgQueue q - Left AUTH -> do - logWarn $ "STORE: processQueue, queue " <> T.pack queueId <> " was removed, removing " <> T.pack dir - removeQueueDirectory_ dir - pure mempty - Left e -> do - logError $ "STORE: processQueue, error getting queue " <> T.pack queueId <> ", " <> tshow e - exitFailure - Left e -> do - logError $ "STORE: processQueue, message queue directory " <> T.pack dir <> " is invalid, " <> tshow e - exitFailure - pure (i + 1, r <> r') - progress i = "Processed: " <> show i <> " queues" - foldQueues depth f acc (queueId, path) = do - let f' = if depth == pathParts - 1 then f else foldQueues (depth + 1) f - listDirs >>= foldM f' acc - where - listDirs = fmap catMaybes . mapM queuePath =<< listDirectory path - queuePath dir = do - let !path' = path dir - !queueId' = queueId <> dir - ifM - (doesDirectoryExist path') - (pure $ Just (queueId', path')) - (Nothing <$ putStrLn ("Error: path " <> path' <> " is not a directory, skipping")) + withActiveMsgQueues = withQS withLoadedQueues . queueStore_ + + withAllMsgQueues :: forall a. Monoid a => JournalMsgStore s -> (JournalQueue s -> IO a) -> IO a + withAllMsgQueues ms action = case queueStore_ ms of + MQStore st -> withLoadedQueues st action + PQStore st -> foldQueues False st (mkQueue ms) action logQueueStates :: JournalMsgStore s -> IO () logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index d2a31d3d2..c13557d2f 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -81,7 +81,7 @@ instance MsgStoreClass STMMsgStore where withActiveMsgQueues = withLoadedQueues . queueStore_ {-# INLINE withActiveMsgQueues #-} - withAllMsgQueues _ = withLoadedQueues . queueStore_ + withAllMsgQueues = withLoadedQueues . queueStore_ {-# INLINE withAllMsgQueues #-} logQueueStates _ = pure () {-# INLINE logQueueStates #-} diff --git a/src/Simplex/Messaging/Server/MsgStore/Types.hs b/src/Simplex/Messaging/Server/MsgStore/Types.hs index 2aa741a8f..abdb37cd4 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Types.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Types.hs @@ -35,7 +35,7 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M newMsgStore :: MsgStoreConfig s -> IO s closeMsgStore :: s -> IO () withActiveMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a - withAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a + withAllMsgQueues :: Monoid a => s -> (StoreQueue s -> IO a) -> IO a logQueueStates :: s -> IO () logQueueState :: StoreQueue s -> StoreMonad s () queueStore :: s -> QueueStore s diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index 8988cb3f3..eaf361f93 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -22,6 +22,7 @@ module Simplex.Messaging.Server.QueueStore.Postgres ( PostgresQueueStore (..), batchInsertQueues, foldQueueRecs, + foldQueues, ) where @@ -35,9 +36,9 @@ import Data.Bitraversable (bimapM) import Data.Functor (($>)) import Data.Int (Int64) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, mapMaybe) +import Data.Maybe (catMaybes) import qualified Data.Text as T -import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError, (:.) (..)) +import Database.PostgreSQL.Simple (Binary (..), Only (..), Query, SqlError) import qualified Database.PostgreSQL.Simple as PSQL import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation) import Database.PostgreSQL.Simple.SqlQQ (sql) @@ -103,8 +104,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where db [sql| SELECT - (SELECT COUNT(1) FROM msg_queues) AS queue_count, - (SELECT COUNT(1) FROM msg_notifiers) AS notifier_count + (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL) AS queue_count, + (SELECT COUNT(1) FROM msg_queues WHERE deleted_at IS NULL AND notifier_id IS NOT NULL) AS notifier_count |] pure QueueCounts {queueCount, notifierCount} @@ -115,7 +116,8 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where sq <- mkQ rId qr withQueueLock sq "addQueue_" $ runExceptT $ do withDB "addQueue_" st $ \db -> - E.try (insertQueueDB db rId qr) >>= bimapM handleDuplicate pure + E.try (DB.execute db insertQueueQuery $ queueRecToRow (rId, qr)) + >>= bimapM handleDuplicate pure atomically $ TM.insert rId sq queues atomically $ TM.insert (senderId qr) rId senders pure sq @@ -134,25 +136,14 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where where PostgresQueueStore {queues, senders, notifiers} = st getRcvQueue rId = TM.lookupIO rId queues >>= maybe loadRcvQueue (pure . Right) - loadRcvQueue = loadQueue " WHERE q.recipient_id = ?" $ \_ -> pure () - loadSndQueue = loadQueue " WHERE q.sender_id = ?" $ \rId -> TM.insert qId rId senders - loadNtfQueue = loadQueue " WHERE n.notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare - loadQueue condition insertRef = runExceptT $ do - (rId, qRec) <- loadQueueRec - sq <- liftIO $ mkQ rId qRec - atomically $ - -- checking the cache again for concurrent reads, - -- use previously loaded queue if exists. - TM.lookup rId queues >>= \case - Just sq' -> pure sq' - Nothing -> do - insertRef rId - TM.insert rId sq queues - pure sq + loadRcvQueue = loadQueue " WHERE recipient_id = ?" $ \_ -> pure () + loadSndQueue = loadQueue " WHERE sender_id = ?" $ \rId -> TM.insert qId rId senders + loadNtfQueue = loadQueue " WHERE notifier_id = ?" $ \_ -> pure () -- do NOT cache ref - ntf subscriptions are rare + loadQueue condition insertRef = runExceptT $ loadQueueRec >>= liftIO . cachedOrLoadedQueue st mkQ insertRef where loadQueueRec = withDB "getQueue_" st $ \db -> firstRow rowToQueueRec AUTH $ - DB.query db (queueRecQuery <> condition) (Only qId) + DB.query db (queueRecQuery <> condition <> " AND deleted_at IS NULL") (Only qId) secureQueue :: PostgresQueueStore q -> q -> SndPublicAuthKey -> IO (Either ErrorType ()) secureQueue st sq sKey = @@ -172,7 +163,7 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where ExceptT $ withLockMap (notifierLocks st) nId "addQueueNotifier" $ ifM (TM.memberIO nId notifiers) (pure $ Left DUPLICATE_) $ runExceptT $ do withDB "addQueueNotifier" st $ \db -> - E.try (insert db) >>= bimapM handleDuplicate pure + E.try (update db) >>= bimapM handleDuplicate pure nId_ <- forM (notifier q) $ \NtfCreds {notifierId} -> atomically (TM.delete notifierId notifiers) $> notifierId let !q' = q {notifier = Just ntfCreds} atomically $ writeTVar (queueRec sq) $ Just q' @@ -183,29 +174,35 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where PostgresQueueStore {notifiers} = st rId = recipientId sq -- TODO [postgres] test how this query works with duplicate recipient_id (updates) and notifier_id (fails) - insert db = + update db = DB.execute db [sql| - INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret) - VALUES (?, ?, ?, ?) - ON CONFLICT (recipient_id) DO UPDATE - SET notifier_id = EXCLUDED.notifier_id, - notifier_key = EXCLUDED.notifier_key, - rcv_ntf_dh_secret = EXCLUDED.rcv_ntf_dh_secret + UPDATE msg_queues + SET notifier_id = ?, notifier_key = ?, rcv_ntf_dh_secret = ? + WHERE recipient_id = ? |] - (rId, nId, notifierKey, rcvNtfDhSecret) + (nId, notifierKey, rcvNtfDhSecret, rId) deleteQueueNotifier :: PostgresQueueStore q -> q -> IO (Either ErrorType (Maybe NotifierId)) deleteQueueNotifier st sq = withQueueDB sq "deleteQueueNotifier" $ \q -> ExceptT $ fmap sequence $ forM (notifier q) $ \NtfCreds {notifierId = nId} -> withLockMap (notifierLocks st) nId "deleteQueueNotifier" $ runExceptT $ do - withDB' "deleteQueueNotifier" st $ \db -> - DB.execute db "DELETE FROM msg_notifiers WHERE notifier_id = ?" (Only nId) + withDB' "deleteQueueNotifier" st update atomically $ TM.delete nId $ notifiers st atomically $ writeTVar (queueRec sq) $ Just q {notifier = Nothing} pure nId + where + update db = + DB.execute + db + [sql| + UPDATE msg_queues + SET notifier_id = NULL, notifier_key = NULL, rcv_ntf_dh_secret = NULL + WHERE recipient_id = ? + |] + (Only $ recipientId sq) suspendQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType ()) suspendQueue st sq = setStatusDB "suspendQueue" st sq EntityOff @@ -232,8 +229,9 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where deleteStoreQueue :: PostgresQueueStore q -> q -> IO (Either ErrorType (QueueRec, Maybe (MsgQueue q))) deleteStoreQueue st sq = runExceptT $ do q <- ExceptT $ readQueueRecIO qr + RoundedSystemTime ts <- liftIO getSystemDate withDB' "deleteStoreQueue" st $ \db -> - DB.execute db "DELETE FROM msg_queues WHERE recipient_id = ?" (Only $ Binary $ unEntityId $ recipientId sq) + DB.execute db "UPDATE msg_queues SET deleted_at = ? WHERE recipient_id = ?" (ts, recipientId sq) atomically $ writeTVar qr Nothing atomically $ TM.delete (senderId q) $ senders st forM_ (notifier q) $ \NtfCreds {notifierId} -> atomically $ TM.delete notifierId $ notifiers st @@ -241,32 +239,21 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where where qr = queueRec sq -insertQueueDB :: DB.Connection -> RecipientId -> QueueRec -> IO () -insertQueueDB db rId QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} = do - DB.execute db insertQueueQuery (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) - forM_ notifier $ \NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -> - DB.execute db insertNotifierQuery (rId, notifierId, notifierKey, rcvNtfDhSecret) - -batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO (Int64, Int64) +batchInsertQueues :: StoreQueueClass q => Bool -> M.Map RecipientId q -> PostgresQueueStore q' -> IO Int64 batchInsertQueues tty queues toStore = do qs <- catMaybes <$> mapM (\(rId, q) -> (rId,) <$$> readTVarIO (queueRec q)) (M.assocs queues) putStrLn $ "Importing " <> show (length qs) <> " queues..." let st = dbStore toStore - (ns, count) <- foldM (processChunk st) ((0, 0), 0) $ toChunks 1000000 qs + (qCnt, count) <- foldM (processChunk st) (0, 0) $ toChunks 1000000 qs putStrLn $ progress count - pure ns + pure qCnt where - processChunk st ((qCnt, nCnt), i) qs = do - qCnt' <- withConnection st $ \db -> PSQL.executeMany db insertQueueQuery $ map toQueueRow qs - nCnt' <- withConnection st $ \db -> PSQL.executeMany db insertNotifierQuery $ mapMaybe toNotifierRow qs + processChunk st (qCnt, i) qs = do + qCnt' <- withConnection st $ \db -> PSQL.executeMany db insertQueueQuery $ map queueRecToRow qs let i' = i + length qs when tty $ putStr (progress i' <> "\r") >> hFlush stdout - pure ((qCnt + qCnt', nCnt + nCnt'), i') + pure (qCnt + qCnt', i') progress i = "Imported: " <> show i <> " queues" - toQueueRow (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt}) = - (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) - toNotifierRow (rId, QueueRec {notifier}) = - (\NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} -> (rId, notifierId, notifierKey, rcvNtfDhSecret)) <$> notifier toChunks :: Int -> [a] -> [[a]] toChunks _ [] = [] toChunks n xs = @@ -277,22 +264,19 @@ insertQueueQuery :: Query insertQueueQuery = [sql| INSERT INTO msg_queues - (recipient_id, recipient_key, rcv_dh_secret, sender_id, sender_key, snd_secure, status, updated_at) - VALUES (?,?,?,?,?,?,?,?) + (recipient_id, recipient_key, rcv_dh_secret, sender_id, sender_key, snd_secure, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at) + VALUES (?,?,?,?,?,?,?,?,?,?,?) |] -insertNotifierQuery :: Query -insertNotifierQuery = - [sql| - INSERT INTO msg_notifiers (recipient_id, notifier_id, notifier_key, rcv_ntf_dh_secret) - VALUES (?, ?, ?, ?) - |] +foldQueues :: Monoid a => Bool -> PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> (q -> IO a) -> IO a +foldQueues tty st mkQ f = + foldQueueRecs tty st $ cachedOrLoadedQueue st mkQ (\_ -> pure ()) >=> f -foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> (RecipientId -> QueueRec -> IO a) -> IO a +foldQueueRecs :: Monoid a => Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a foldQueueRecs tty st f = do fmap snd $ withConnection (dbStore st) $ \db -> PSQL.fold_ db queueRecQuery (0 :: Int, mempty) $ \(!i, !acc) row -> do - r <- uncurry f (rowToQueueRec row) + r <- f $ rowToQueueRec row let i' = i + 1 when (tty && i' `mod` 100000 == 0) $ putStr ("Processed: " <> show i <> " records\r") >> hFlush stdout pure (i', acc <> r) @@ -300,18 +284,36 @@ foldQueueRecs tty st f = do queueRecQuery :: Query queueRecQuery = [sql| - SELECT q.recipient_id, q.recipient_key, q.rcv_dh_secret, q.sender_id, q.sender_key, q.snd_secure, q.status, q.updated_at, - n.notifier_id, n.notifier_key, n.rcv_ntf_dh_secret - FROM msg_queues q - LEFT JOIN msg_notifiers n ON q.recipient_id = n.recipient_id + SELECT recipient_id, recipient_key, rcv_dh_secret, + sender_id, sender_key, snd_secure, + notifier_id, notifier_key, rcv_ntf_dh_secret, + status, updated_at + FROM msg_queues |] -rowToQueueRec :: ( (RecipientId, RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, SenderCanSecure, ServerEntityStatus, Maybe RoundedSystemTime) - :. (Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret) - ) -> (RecipientId, QueueRec) -rowToQueueRec ((rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, status, updatedAt) :. (notifierId_, notifierKey_, rcvNtfDhSecret_)) = +cachedOrLoadedQueue :: PostgresQueueStore q -> (RecipientId -> QueueRec -> IO q) -> (RecipientId -> STM ()) -> (RecipientId, QueueRec) -> IO q +cachedOrLoadedQueue PostgresQueueStore {queues} mkQ insertRef (rId, qRec) = do + sq <- liftIO $ mkQ rId qRec -- loaded queue + atomically $ + -- checking the cache again for concurrent reads, + -- use previously loaded queue if exists. + TM.lookup rId queues >>= \case + Just sq' -> pure sq' + Nothing -> do + insertRef rId + TM.insert rId sq queues + pure sq + +type QueueRecRow = (RecipientId, RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, SenderCanSecure, Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, ServerEntityStatus, Maybe RoundedSystemTime) + +queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow +queueRecToRow (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier = n, status, updatedAt}) = + (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifierId <$> n, notifierKey <$> n, rcvNtfDhSecret <$> n, status, updatedAt) + +rowToQueueRec :: QueueRecRow -> (RecipientId, QueueRec) +rowToQueueRec (rId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt) = let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_ - in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}) + in (rId, QueueRec {recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}) setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> IO (Either ErrorType ()) setStatusDB op st sq status = @@ -329,10 +331,9 @@ withQueueDB sq op action = withDB' :: String -> PostgresQueueStore q -> (DB.Connection -> IO a) -> ExceptT ErrorType IO a withDB' op st' action = withDB op st' $ fmap Right . action --- TODO [postgres] possibly, use with connection if queries in addQueue_ are combined withDB :: forall a q. String -> PostgresQueueStore q -> (DB.Connection -> IO (Either ErrorType a)) -> ExceptT ErrorType IO a withDB op st' action = - ExceptT $ E.try (withTransaction (dbStore st') action) >>= either logErr pure + ExceptT $ E.try (withConnection (dbStore st') action) >>= either logErr pure where logErr :: E.SomeException -> IO (Either ErrorType a) logErr e = logError ("STORE: " <> T.pack err) $> Left (STORE err) diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index 98027b5b6..3f48a831b 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -31,19 +31,15 @@ CREATE TABLE msg_queues( sender_id BYTEA NOT NULL, sender_key BYTEA, snd_secure BOOLEAN NOT NULL, + notifier_id BYTEA, + notifier_key BYTEA, + rcv_ntf_dh_secret BYTEA, status TEXT NOT NULL, updated_at BIGINT, + deleted_at BIGINT, PRIMARY KEY (recipient_id) ); -CREATE TABLE msg_notifiers( - notifier_id BYTEA NOT NULL, - recipient_id BYTEA NOT NULL REFERENCES msg_queues(recipient_id) ON DELETE CASCADE ON UPDATE RESTRICT, - notifier_key BYTEA NOT NULL, - rcv_ntf_dh_secret BYTEA NOT NULL, - PRIMARY KEY (notifier_id) -); - CREATE UNIQUE INDEX idx_msg_queues_sender_id ON msg_queues(sender_id); -CREATE UNIQUE INDEX idx_msg_notifiers_recipient_id ON msg_notifiers(recipient_id); +CREATE UNIQUE INDEX idx_msg_queues_notifier_id ON msg_queues(notifier_id); |] diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index bebadf7c5..9d45fb0c7 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -64,8 +64,6 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where loadedQueues = queues {-# INLINE loadedQueues #-} - -- foldAllQueues = withLoadedQueues - -- {-# INLINE foldAllQueues #-} queueCounts :: STMQueueStore q -> IO QueueCounts queueCounts st = do diff --git a/tests/CoreTests/MsgStoreTests.hs b/tests/CoreTests/MsgStoreTests.hs index ccd8ea66d..bdfcc0a2e 100644 --- a/tests/CoreTests/MsgStoreTests.hs +++ b/tests/CoreTests/MsgStoreTests.hs @@ -216,11 +216,11 @@ testExportImportStore ms = do pure () length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 3 - exportMessages False ms testStoreMsgsFile False + exportMessages ms testStoreMsgsFile False renameFile testStoreMsgsFile (testStoreMsgsFile <> ".copy") closeMsgStore ms closeStoreLog sl - exportMessages False ms testStoreMsgsFile False + exportMessages ms testStoreMsgsFile False (B.readFile testStoreMsgsFile `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".copy") let cfg = (testJournalStoreCfg MQStoreCfg :: JournalStoreConfig 'QSMemory) {storePath = testStoreMsgsDir2} ms' <- newMsgStore cfg @@ -230,13 +230,13 @@ testExportImportStore ms = do printMessageStats "Messages" stats length <$> listDirectory (msgQueueDirectory ms rId1) `shouldReturn` 2 length <$> listDirectory (msgQueueDirectory ms rId2) `shouldReturn` 4 -- state file is backed up, 2 message files - exportMessages False ms' testStoreMsgsFile2 False + exportMessages ms' testStoreMsgsFile2 False (B.readFile testStoreMsgsFile2 `shouldReturn`) =<< B.readFile (testStoreMsgsFile <> ".bak") stmStore <- newMsgStore testSMTStoreConfig readWriteQueueStore True (mkQueue stmStore) testStoreLogFile (queueStore stmStore) >>= closeStoreLog MessageStats {storedMsgsCount = 5, expiredMsgsCount = 0, storedQueues = 2} <- importMessages False stmStore testStoreMsgsFile2 Nothing False - exportMessages False stmStore testStoreMsgsFile False + exportMessages stmStore testStoreMsgsFile False (B.sort <$> B.readFile testStoreMsgsFile `shouldReturn`) =<< (B.sort <$> B.readFile (testStoreMsgsFile2 <> ".bak")) testQueueState :: JournalMsgStore s -> IO () diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 5160070f0..56b5ceb89 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -821,7 +821,7 @@ testRestoreExpireMessages = ms <- newMsgStore (testJournalStoreCfg MQStoreCfg) {quota = 4} readWriteQueueStore True (mkQueue ms) testStoreLogFile (queueStore ms) >>= closeStoreLog removeFileIfExists testStoreMsgsFile - exportMessages False ms testStoreMsgsFile False + exportMessages ms testStoreMsgsFile False runTest :: Transport c => TProxy c -> (THandleSMP c 'TClient -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do testSMPClient test' `shouldReturn` ()