diff --git a/simplexmq.cabal b/simplexmq.cabal index e1d41c1c9..e176fa018 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -68,6 +68,7 @@ library Simplex.Messaging.Agent.Protocol Simplex.Messaging.Agent.QueryString Simplex.Messaging.Agent.RetryInterval + Simplex.Messaging.Agent.RetryInterval.Delivery Simplex.Messaging.Agent.Server Simplex.Messaging.Agent.Store Simplex.Messaging.Agent.Store.SQLite @@ -101,6 +102,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index 0cc6a3bde..701321674 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -316,7 +316,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do forM_ (filter (not . chunkCreated) chunks) $ createChunk numRecipients' withStore' c $ \db -> updateSndFileStatus db sndFileId SFSUploading where - AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, messageRetryInterval = ri} = cfg + AgentConfig {xftpMaxRecipientsPerRequest = maxRecipients, xftpRetryInterval = ri} = cfg encryptFileForUpload :: SndFile -> FilePath -> m (FileDigest, [(XFTPChunkSpec, FileDigest)]) encryptFileForUpload SndFile {key, nonce, srcFile} fsEncPath = do let CryptoFile {filePath} = srcFile @@ -345,7 +345,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do where tryCreate = do usedSrvs <- newTVarIO ([] :: [XFTPServer]) - withRetryInterval (riFast ri) $ \_ loop -> + withRetryInterval ri $ \_ loop -> createWithNextSrv usedSrvs `catchAgentError` \e -> retryOnError "XFTP prepare worker" (retryLoop loop) (throwError e) e where diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bcec478fd..2d7fbe448 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -147,11 +147,12 @@ import Simplex.Messaging.Agent.Lock (withLock) import Simplex.Messaging.Agent.NtfSubSupervisor import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval +import Simplex.Messaging.Agent.RetryInterval.Delivery import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations -import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission) +import Simplex.Messaging.Client (ProtocolClient (..), SMPTransportSession, ServerTransmission) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs) import qualified Simplex.Messaging.Crypto.Ratchet as CR @@ -782,7 +783,7 @@ subscribeConnections' c connIds = do let (errs, cs) = M.mapEither id conns errs' = M.map (Left . storeError) errs (subRs, rcvQs) = M.mapEither rcvQueueOrResult cs - mapM_ (mapM_ (\(cData, sqs) -> mapM_ (resumeMsgDelivery c cData) sqs) . sndQueue) cs + mapM_ (mapM_ (\(_, sqs) -> mapM_ (resumeMsgDelivery c) sqs) . sndQueue) cs mapM_ (resumeConnCmds c) $ M.keys cs rcvRs <- connResults <$> subscribeQueues c (concat $ M.elems rcvQs) ns <- asks ntfSupervisor @@ -1089,7 +1090,7 @@ enqueueMessageB c reqs = do aVRange <- asks $ maxVersion . smpAgentVRange . config reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db aVRange) reqs forME reqMids $ \((cData, sq :| sqs, _, _), InternalId msgId) -> do - submitPendingMsg c cData sq + submitPendingMsg c sq let sqs' = filter isActiveSndQ sqs pure $ Right (msgId, if null sqs' then Nothing else Just (cData, sqs', msgId)) where @@ -1116,45 +1117,47 @@ enqueueSavedMessageB :: (AgentMonad' m, Foldable t) => AgentClient -> t (ConnDat enqueueSavedMessageB c reqs = do -- saving to the database is in the start to avoid race conditions when delivery is read from queue before it is saved void $ withStoreBatch' c $ \db -> concatMap (storeDeliveries db) reqs - forM_ reqs $ \(cData, sqs, _) -> - forM sqs $ submitPendingMsg c cData + forM_ reqs $ \(_, sqs, _) -> + forM sqs $ submitPendingMsg c where storeDeliveries :: DB.Connection -> (ConnData, [SndQueue], AgentMsgId) -> [IO ()] storeDeliveries db (ConnData {connId}, sqs, msgId) = do let mId = InternalId msgId in map (\sq -> createSndMsgDelivery db connId sq mId) sqs -resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m () -resumeMsgDelivery = void .:. getDeliveryWorker False +resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> SndQueue -> m () +resumeMsgDelivery = void .: getDeliveryWorker False -getDeliveryWorker :: AgentMonad' m => Bool -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ()) -getDeliveryWorker hasWork c cData sq = - getAgentWorker' fst mkLock "msg_delivery" hasWork c (qAddress sq) (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c cData sq) - where - mkLock w = do - retryLock <- newEmptyTMVar - pure (w, retryLock) +getDeliveryWorker :: AgentMonad' m => Bool -> AgentClient -> SndQueue -> m Worker +getDeliveryWorker hasWork c sq = do + tSess <- mkSMPTransportSession c sq + getAgentWorker "msg_delivery" hasWork c tSess (smpDeliveryWorkers c) (runSmpQueueMsgDelivery c tSess) -submitPendingMsg :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m () -submitPendingMsg c cData sq = do +submitPendingMsg :: AgentMonad' m => AgentClient -> SndQueue -> m () +submitPendingMsg c sq = do atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1} - void $ getDeliveryWorker True c cData sq + void $ getDeliveryWorker True c sq -runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> (Worker, TMVar ()) -> m () -runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq (Worker {doWork}, qLock) = do - ri <- asks $ messageRetryInterval . config +runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> SMPTransportSession -> Worker -> m () +runSmpQueueMsgDelivery c@AgentClient {subQ} tSess@(_, srv, _) Worker {doWork} = do + dCfg <- asks $ messageDeliveryCfg . config forever $ do atomically $ endAgentOperation c AOSndNetwork waitForWork doWork atomically $ throwWhenInactive c - atomically $ throwWhenNoDelivery c sq + atomically $ throwWhenNoDelivery c tSess atomically $ beginAgentOperation c AOSndNetwork - withWork c doWork (\db -> getPendingQueueMsg db connId sq) $ - \(rq_, PendingMsgData {msgId, msgType, msgBody, msgFlags, msgRetryState, internalTs}) -> do + withWork c doWork (`getPendingSessionMsg` tSess) (deliverMessage dCfg) + where + deliverMessage + dCfg@MsgDeliveryConfig {messageRetryInterval = ri} + (cData@ConnData {userId, connId, duplexHandshake}, sq@SndQueue {quotaExceeded = qe, retryState}, rq_, PendingMsgData {msgId, msgType, msgBody, msgFlags, msgRetryState, internalTs}) = do atomically $ endAgentOperation c AOMsgDelivery -- this operation begins in submitPendingMsg let mId = unId msgId - ri' = maybe id updateRetryInterval2 msgRetryState ri - withRetryLock2 ri' qLock $ \riState loop -> do + ri = (if qe then quotaExceededRetryInterval else messageRetryInterval) dCfg + ri' = maybe id updateRetryInterval retryState ri + withRetryIntervalCount ri' $ \n delay loop -> do + -- withRetryLock2 ri' qLock $ \riState loop -> do resp <- tryError $ case msgType of AM_CONN_INFO -> sendConfirmation c sq msgBody AM_CONN_INFO_REPLY -> sendConfirmation c sq msgBody @@ -1163,50 +1166,62 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl Left e -> do let err = if msgType == AM_A_MSG_ then MERR mId e else ERR e case e of + -- never loop on quota exceeded error, ignoring messageConsecutiveRetries, + -- to avoid blocking delivery in other queues. SMP SMP.QUOTA -> case msgType of - AM_CONN_INFO -> connError msgId NOT_AVAILABLE - AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE - _ -> retrySndMsg RISlow + AM_CONN_INFO -> connError NOT_AVAILABLE + AM_CONN_INFO_REPLY -> connError NOT_AVAILABLE + _ -> ifM (msgExpired quotaExceededTimeout) (notifyDelMessages err) (updateDeliverAfter True delay) SMP SMP.AUTH -> case msgType of - AM_CONN_INFO -> connError msgId NOT_AVAILABLE - AM_CONN_INFO_REPLY -> connError msgId NOT_AVAILABLE - AM_RATCHET_INFO -> connError msgId NOT_AVAILABLE + AM_CONN_INFO -> connError NOT_AVAILABLE + AM_CONN_INFO_REPLY -> connError NOT_AVAILABLE + AM_RATCHET_INFO -> connError NOT_AVAILABLE AM_HELLO_ -- in duplexHandshake mode (v2) HELLO is only sent once, without retrying, -- because the queue must be secured by the time the confirmation or the first HELLO is received | duplexHandshake == Just True -> connErr - | otherwise -> - ifM (msgExpired helloTimeout) connErr (retrySndMsg RIFast) + -- otherwise branch is not used in clients with v2+ of agent protocol (June 2022) + -- TODO remove in v6 + | otherwise -> retrySndMsg n delay err where connErr = case rq_ of -- party initiating connection - Just _ -> connError msgId NOT_AVAILABLE + Just _ -> connError NOT_AVAILABLE -- party joining connection - _ -> connError msgId NOT_ACCEPTED - AM_REPLY_ -> notifyDel msgId err - AM_A_MSG_ -> notifyDel msgId err - AM_A_RCVD_ -> notifyDel msgId err - AM_QCONT_ -> notifyDel msgId err - AM_QADD_ -> qError msgId "QADD: AUTH" - AM_QKEY_ -> qError msgId "QKEY: AUTH" - AM_QUSE_ -> qError msgId "QUSE: AUTH" - AM_QTEST_ -> qError msgId "QTEST: AUTH" - AM_EREADY_ -> notifyDel msgId err + _ -> connError NOT_ACCEPTED + AM_REPLY_ -> notifyDel err + AM_A_MSG_ -> notifyDel err + AM_A_RCVD_ -> notifyDel err + AM_QCONT_ -> notifyDel err + AM_QADD_ -> qError "QADD: AUTH" + AM_QKEY_ -> qError "QKEY: AUTH" + AM_QUSE_ -> qError "QUSE: AUTH" + AM_QTEST_ -> qError "QTEST: AUTH" + AM_EREADY_ -> notifyDel err _ -- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server), -- the message sending would be retried - | temporaryOrHostError e -> do - let timeoutSel = if msgType == AM_HELLO_ then helloTimeout else messageTimeout - ifM (msgExpired timeoutSel) (notifyDel msgId err) (retrySndMsg RIFast) - | otherwise -> notifyDel msgId err + | temporaryOrHostError e -> retrySndMsg n delay err + | otherwise -> notifyDel err where msgExpired timeoutSel = do - msgTimeout <- asks $ timeoutSel . config + let msgTimeout = timeoutSel dCfg currentTime <- liftIO getCurrentTime pure $ diffUTCTime currentTime internalTs > msgTimeout - retrySndMsg riMode = do - withStore' c $ \db -> updatePendingMsgRIState db connId msgId riState - retrySndOp c $ loop riMode + notifyDelMessages err = do + notifyDel err + mIds <- withStore' c $ \db -> getExpiredSndMessages db connId sq + forM_ mIds $ \mId' -> notify $ MERR (unId mId') $ BROKER (B.unpack $ strEncode srv) TIMEOUT + withStore' c $ \db -> deleteExpiredSndMessages db connId sq mIds + retrySndMsg n delay err + | n + 1 < messageConsecutiveRetries dCfg = retrySndOp c loop + | otherwise = ifM (msgExpired messageTimeout) (notifyDelMessages err) (updateDeliverAfter False delay) + updateDeliverAfter qe' delay = do + let ri = (if qe then quotaExceededRetryInterval else messageRetryInterval) dCfg + -- TODO elapsed instead of 0? + delay' = if qe == qe' then nextDelay 0 delay ri else initialInterval ri + withStore' c $ \db -> updateSndQueueDelivery db connId sq qe' delay' + -- retrySndOp c $ loop riMode Right () -> do case msgType of AM_CONN_INFO -> setConfirmed @@ -1255,11 +1270,11 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl Just SndQueue {dbReplaceQueueId = Just replacedId, primary} -> -- second part of this condition is a sanity check because dbReplaceQueueId cannot point to the same queue, see switchConnection' case removeQP (\sq' -> dbQId sq' == replacedId && not (sameQueue addr sq')) sqs of - Nothing -> internalErr msgId "sent QTEST: queue not found in connection" + Nothing -> internalErr "sent QTEST: queue not found in connection" Just (sq', sq'' : sqs') -> do checkSQSwchStatus sq' SSSendingQTEST -- remove the delivery from the map to stop the thread when the delivery loop is complete - atomically $ TM.delete (qAddress sq') $ smpDeliveryWorkers c + -- atomically $ TM.delete (qAddress sq') $ smpDeliveryWorkers c withStore' c $ \db -> do when primary $ setSndQueuePrimary db connId sq deletePendingMsgs db connId sq' @@ -1267,29 +1282,29 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl let sqs'' = sq'' :| sqs' conn' = DuplexConnection cData' rqs sqs'' notify . SWITCH QDSnd SPCompleted $ connectionStats conn' - _ -> internalErr msgId "sent QTEST: there is only one queue in connection" - _ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue" - _ -> internalErr msgId "QTEST sent not in duplex connection" + _ -> internalErr "sent QTEST: there is only one queue in connection" + _ -> internalErr "sent QTEST: queue not in connection or not replacing another queue" + _ -> internalErr "QTEST sent not in duplex connection" AM_EREADY_ -> pure () - delMsgKeep (msgType == AM_A_MSG_) msgId + delMsgKeep (msgType == AM_A_MSG_) where setConfirmed = do withStore' c $ \db -> do setSndQueueStatus db sq Confirmed when (isJust rq_) $ removeConfirmations db connId unless (duplexHandshake == Just True) . void $ enqueueMessage c cData sq SMP.noMsgFlags HELLO - where - delMsg :: InternalId -> m () - delMsg = delMsgKeep False - delMsgKeep :: Bool -> InternalId -> m () - delMsgKeep keepForReceipt msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt - notify :: forall e. AEntityI e => ACommand 'Agent e -> m () - notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd) - notifyDel :: AEntityI e => InternalId -> ACommand 'Agent e -> m () - notifyDel msgId cmd = notify cmd >> delMsg msgId - connError msgId = notifyDel msgId . ERR . CONN - qError msgId = notifyDel msgId . ERR . AGENT . A_QUEUE - internalErr msgId = notifyDel msgId . ERR . INTERNAL + where + delMsg :: m () + delMsg = delMsgKeep False + delMsgKeep :: Bool -> m () + delMsgKeep keepForReceipt = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt + notify :: forall e. AEntityI e => ACommand 'Agent e -> m () + notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd) + notifyDel :: AEntityI e => ACommand 'Agent e -> m () + notifyDel cmd = notify cmd >> delMsg + connError = notifyDel . ERR . CONN + qError = notifyDel . ERR . AGENT . A_QUEUE + internalErr = notifyDel . ERR . INTERNAL retrySndOp :: AgentMonad m => AgentClient -> m () -> m () retrySndOp c loop = do @@ -2176,9 +2191,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s case findQ addr sqs of Just sq -> do logServer "<--" c srv rId $ "MSG :" <> logSecret srvMsgId - atomically $ - TM.lookup (qAddress sq) (smpDeliveryWorkers c) - >>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ()) + withStore' c $ \db -> setQuotaAvailable db connId sq + -- atomically $ + -- TM.lookup (qAddress sq) (smpDeliveryWorkers c) + -- >>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ()) Nothing -> qError "QCONT: queue address not found" messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m () @@ -2403,7 +2419,7 @@ connectReplyQueues c cData@ConnData {userId, connId} ownConnInfo (qInfo :| _) = confirmQueueAsync :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> SubscriptionMode -> m () confirmQueueAsync v c cData sq srv connInfo e2eEncryption_ subMode = do storeConfirmation c cData sq e2eEncryption_ =<< mkAgentConfirmation v c cData sq srv connInfo subMode - submitPendingMsg c cData sq + submitPendingMsg c sq confirmQueue :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnData -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> SubscriptionMode -> m () confirmQueue v@(Compatible agentVersion) c cData@ConnData {connId} sq srv connInfo e2eEncryption_ subMode = do @@ -2427,7 +2443,7 @@ mkAgentConfirmation (Compatible agentVersion) c cData sq srv connInfo subMode enqueueConfirmation :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m () enqueueConfirmation c cData sq connInfo e2eEncryption_ = do storeConfirmation c cData sq e2eEncryption_ $ AgentConnInfo connInfo - submitPendingMsg c cData sq + submitPendingMsg c sq storeConfirmation :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> Maybe (CR.E2ERatchetParams 'C.X448) -> AgentMessage -> m () storeConfirmation c ConnData {connId, connAgentVersion} sq e2eEncryption_ agentMsg = withStore c $ \db -> runExceptT $ do @@ -2448,10 +2464,10 @@ enqueueRatchetKeyMsgs c cData (sq :| sqs) e2eEncryption = do mapM_ (enqueueSavedMessage c cData msgId) $ filter isActiveSndQ sqs enqueueRatchetKey :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> CR.E2ERatchetParams 'C.X448 -> m AgentMsgId -enqueueRatchetKey c cData@ConnData {connId} sq e2eEncryption = do +enqueueRatchetKey c ConnData {connId} sq e2eEncryption = do aVRange <- asks $ smpAgentVRange . config msgId <- storeRatchetKey $ maxVersion aVRange - submitPendingMsg c cData sq + submitPendingMsg c sq pure $ unId msgId where storeRatchetKey :: Version -> m InternalId diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 305f39c26..3899a6301 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -86,6 +86,7 @@ module Simplex.Messaging.Agent.Client AgentState (..), AgentLocks (..), AgentStatsKey (..), + mkSMPTransportSession, getAgentWorker, getAgentWorker', cancelWorker, @@ -155,7 +156,6 @@ import Data.Text.Encoding import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) --- import GHC.Conc (unsafeIOToSTM) import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X @@ -228,7 +228,7 @@ data SessionVar a = SessionVar sessionVarId :: Int } -type ClientVar msg = SessionVar (Either AgentErrorType (Client msg)) +type ClientVar msg = SessionVar (Either AgentErrorType (Client msg)) type SMPClientVar = ClientVar SMP.BrokerMsg @@ -236,8 +236,6 @@ type NtfClientVar = ClientVar NtfResponse type XFTPClientVar = ClientVar FileResponse -type SMPTransportSession = TransportSession SMP.BrokerMsg - type NtfTransportSession = TransportSession NtfResponse type XFTPTransportSession = TransportSession FileResponse @@ -259,7 +257,7 @@ data AgentClient = AgentClient pendingSubs :: TRcvQueues, removedSubs :: TMap (UserId, SMPServer, SMP.RecipientId) SMPClientError, workerSeq :: TVar Int, - smpDeliveryWorkers :: TMap SndQAddr (Worker, TMVar ()), + smpDeliveryWorkers :: TMap SMPTransportSession Worker, asyncCmdWorkers :: TMap (Maybe SMPServer) Worker, connCmdsQueued :: TMap ConnId Bool, ntfNetworkOp :: TVar AgentOpState, @@ -712,7 +710,7 @@ closeAgentClient c = liftIO $ do closeProtocolServerClients c xftpClients atomically (swapTVar (smpSubWorkers c) M.empty) >>= mapM_ cancelReconnect cancelActions . actions $ asyncClients c - clearWorkers smpDeliveryWorkers >>= mapM_ (cancelWorker . fst) + clearWorkers smpDeliveryWorkers >>= mapM_ cancelWorker clearWorkers asyncCmdWorkers >>= mapM_ cancelWorker clear connCmdsQueued atomically . RQ.clear $ activeSubs c @@ -724,7 +722,7 @@ closeAgentClient c = liftIO $ do clearWorkers workers = atomically $ swapTVar (workers c) mempty clear :: Monoid m => (AgentClient -> TVar m) -> IO () clear sel = atomically $ writeTVar (sel c) mempty - cancelReconnect :: SessionVar (Async ()) -> IO () + cancelReconnect :: SessionVar (Async ()) -> IO () cancelReconnect v = void . forkIO $ atomically (readTMVar $ sessionVar v) >>= uninterruptibleCancel cancelWorker :: Worker -> IO () @@ -739,9 +737,9 @@ throwWhenInactive :: AgentClient -> STM () throwWhenInactive c = unlessM (readTVar $ active c) $ throwSTM ThreadKilled -- this function is used to remove workers once delivery is complete, not when it is removed from the map -throwWhenNoDelivery :: AgentClient -> SndQueue -> STM () -throwWhenNoDelivery c sq = - unlessM (TM.member (qAddress sq) $ smpDeliveryWorkers c) $ +throwWhenNoDelivery :: AgentClient -> SMPTransportSession -> STM () +throwWhenNoDelivery c tSess = + unlessM (TM.member tSess $ smpDeliveryWorkers c) $ throwSTM ThreadKilled closeProtocolServerClients :: ProtocolServerClient err msg => AgentClient -> (AgentClient -> TMap (TransportSession msg) (ClientVar msg)) -> IO () @@ -1573,7 +1571,7 @@ getAgentWorkersDetails AgentClient {smpClients, ntfClients, xftpClients, smpDeli smpClients_ <- textKeys <$> readTVarIO smpClients ntfClients_ <- textKeys <$> readTVarIO ntfClients xftpClients_ <- textKeys <$> readTVarIO xftpClients - smpDeliveryWorkers_ <- workerStats . fmap fst =<< readTVarIO smpDeliveryWorkers + smpDeliveryWorkers_ <- workerStats =<< readTVarIO smpDeliveryWorkers asyncCmdWorkers_ <- workerStats =<< readTVarIO asyncCmdWorkers smpSubWorkers_ <- textKeys <$> readTVarIO smpSubWorkers asyncCients_ <- M.keys <$> readTVarIO actions diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 73588a39d..4a3d2b583 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -51,6 +51,7 @@ import Numeric.Natural import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval +import Simplex.Messaging.Agent.RetryInterval.Delivery import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Client @@ -89,9 +90,8 @@ data AgentConfig = AgentConfig ntfCfg :: ProtocolClientConfig, xftpCfg :: XFTPClientConfig, reconnectInterval :: RetryInterval, - messageRetryInterval :: RetryInterval2, - messageTimeout :: NominalDiffTime, - helloTimeout :: NominalDiffTime, + messageDeliveryCfg :: MsgDeliveryConfig, + xftpRetryInterval :: RetryInterval, initialCleanupDelay :: Int64, cleanupInterval :: Int64, cleanupStepInterval :: Int, @@ -125,24 +125,12 @@ defaultReconnectInterval = maxInterval = 180_000000 } -defaultMessageRetryInterval :: RetryInterval2 -defaultMessageRetryInterval = - RetryInterval2 - { riFast = - RetryInterval - { initialInterval = 1_000000, - increaseAfter = 10_000000, - maxInterval = 60_000000 - }, - riSlow = - -- TODO: these timeouts can be increased in v5.0 once most clients are updated - -- to resume sending on QCONT messages. - -- After that local message expiration period should be also increased. - RetryInterval - { initialInterval = 60_000000, - increaseAfter = 60_000000, - maxInterval = 3600_000000 -- 1 hour - } +defaultXFTPRetryInterval :: RetryInterval +defaultXFTPRetryInterval = + RetryInterval + { initialInterval = 1_000000, + increaseAfter = 10_000000, + maxInterval = 60_000000 } defaultAgentConfig :: AgentConfig @@ -156,9 +144,8 @@ defaultAgentConfig = ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)}, xftpCfg = defaultXFTPClientConfig, reconnectInterval = defaultReconnectInterval, - messageRetryInterval = defaultMessageRetryInterval, - messageTimeout = 2 * nominalDay, - helloTimeout = 2 * nominalDay, + messageDeliveryCfg = defaultMsgDeliveryConfig, + xftpRetryInterval = defaultXFTPRetryInterval, initialCleanupDelay = 30 * 1000000, -- 30 seconds cleanupInterval = 30 * 60 * 1000000, -- 30 minutes cleanupStepInterval = 200000, -- 200ms diff --git a/src/Simplex/Messaging/Agent/RetryInterval.hs b/src/Simplex/Messaging/Agent/RetryInterval.hs index b75fd9a60..23eea031c 100644 --- a/src/Simplex/Messaging/Agent/RetryInterval.hs +++ b/src/Simplex/Messaging/Agent/RetryInterval.hs @@ -4,13 +4,16 @@ module Simplex.Messaging.Agent.RetryInterval ( RetryInterval (..), - RetryInterval2 (..), - RetryIntervalMode (..), + -- RetryInterval2 (..), + -- RetryIntervalMode (..), + RIState (..), RI2State (..), withRetryInterval, withRetryIntervalCount, - withRetryLock2, - updateRetryInterval2, + -- withRetryLock2, + -- updateRetryInterval2, + updateRetryInterval, + nextDelay, ) where @@ -27,6 +30,12 @@ data RetryInterval = RetryInterval maxInterval :: Int64 } +data RIState = RIState + { retryDelay :: Int64, + retryElapsed :: Int64 + } + deriving (Eq, Show) + data RetryInterval2 = RetryInterval2 { riSlow :: RetryInterval, riFast :: RetryInterval @@ -38,6 +47,14 @@ data RI2State = RI2State } deriving (Show) +updateRetryInterval :: RIState -> RetryInterval -> RetryInterval +updateRetryInterval RIState {retryDelay, retryElapsed} RetryInterval {initialInterval, increaseAfter, maxInterval} = + RetryInterval + { initialInterval = retryDelay, + increaseAfter = max 0 (increaseAfter - retryElapsed), + maxInterval + } + updateRetryInterval2 :: RI2State -> RetryInterval2 -> RetryInterval2 updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlow, riFast} = RetryInterval2 diff --git a/src/Simplex/Messaging/Agent/RetryInterval/Delivery.hs b/src/Simplex/Messaging/Agent/RetryInterval/Delivery.hs new file mode 100644 index 000000000..ec1626b4c --- /dev/null +++ b/src/Simplex/Messaging/Agent/RetryInterval/Delivery.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE NumericUnderscores #-} + +module Simplex.Messaging.Agent.RetryInterval.Delivery where + +import Data.Time.Clock (NominalDiffTime, nominalDay) +import Simplex.Messaging.Agent.RetryInterval + +data MsgDeliveryConfig = MsgDeliveryConfig + { messageRetryInterval :: RetryInterval, + messageTimeout :: NominalDiffTime, + messageConsecutiveRetries :: Int, + quotaExceededRetryInterval :: RetryInterval, + quotaExceededTimeout :: NominalDiffTime + } + +defaultMsgDeliveryConfig :: MsgDeliveryConfig +defaultMsgDeliveryConfig = + MsgDeliveryConfig + { messageRetryInterval = + RetryInterval + { initialInterval = 1_000000, + increaseAfter = 10_000000, + maxInterval = 60_000000 + }, + messageTimeout = 2 * nominalDay, + messageConsecutiveRetries = 3, + quotaExceededRetryInterval = + RetryInterval + { initialInterval = 180_000000, -- 3 minutes + increaseAfter = 0, + maxInterval = 3 * 3600_000000 -- 3 hours + }, + quotaExceededTimeout = 7 * nominalDay + } + +-- if +-- | quota exceeded -> +-- | message expired -> send error, stop retries, check and fail other expired messages +-- | otherwise -> stop retries, update delay, store deliver_after +-- | timeout -> +-- | n < messageConsecutiveRetries -> loop +-- | message expired -> send error, stop retries, check and fail other expired messages +-- | otherwise -> stop retries, update delay, store deliver_after diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 028b7d8ea..44e7839a5 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -28,7 +28,7 @@ import Data.Maybe (isJust) import Data.Time (UTCTime) import Data.Type.Equality import Simplex.Messaging.Agent.Protocol -import Simplex.Messaging.Agent.RetryInterval (RI2State) +import Simplex.Messaging.Agent.RetryInterval (RI2State, RIState) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.Ratchet (RatchetX448) import Simplex.Messaging.Encoding.String @@ -149,6 +149,8 @@ data StoredSndQueue (q :: QueueStored) = SndQueue e2eDhSecret :: C.DhSecretX25519, -- | queue status status :: QueueStatus, + quotaExceeded :: Bool, + retryState :: Maybe RIState, -- | database queue ID (within connection) dbQueueId :: DBQueueId q, -- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 18e8bf555..2563a4e51 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -107,9 +107,15 @@ module Simplex.Messaging.Agent.Store.SQLite createSndMsgDelivery, getSndMsgViaRcpt, updateSndMsgRcpt, - getPendingQueueMsg, - updatePendingMsgRIState, + getPendingSessionMsg, + -- getPendingQueueMsg, + -- updatePendingMsgRIState, + updateSndQueueDelivery, deletePendingMsgs, + getExpiredSndMessages, + deleteExpiredSndMessages, + setQuotaAvailable, + updateDeliveryDelay, setMsgUserAck, getRcvMsg, getLastMsg, @@ -263,6 +269,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Common import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRError, Migration (..), MigrationsToRun (..), mtrErrorDescription) import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations +import Simplex.Messaging.Client (SMPTransportSession) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..)) import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys) @@ -978,6 +985,9 @@ updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} = "UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?" (agentMsgId, msgRcptStatus, connId, sndMsgId) +getPendingSessionMsg :: DB.Connection -> SMPTransportSession -> IO (Either StoreError (Maybe (ConnData, SndQueue, Maybe RcvQueue, PendingMsgData))) +getPendingSessionMsg _db _tSess = undefined + getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData))) getPendingQueueMsg db connId SndQueue {dbQueueId} = getWorkItem "message" getMsgId getMsgData markMsgFailed @@ -1033,14 +1043,29 @@ getWorkItem itemName getId getItem markFailed = mkError :: E.SomeException -> StoreError mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e -updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () -updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} = - DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId) +-- updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () +-- updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} = +-- DB.execute db "UPDATE snd_messages SET retry_int_slow = ?, retry_int_fast = ? WHERE conn_id = ? AND internal_id = ?" (slowInterval, fastInterval, connId, msgId) + +updateSndQueueDelivery :: DB.Connection -> ConnId -> SndQueue -> Bool -> Int64 -> IO () +updateSndQueueDelivery _db _connId _sq _quotaExceeded _delay = undefined deletePendingMsgs :: DB.Connection -> ConnId -> SndQueue -> IO () deletePendingMsgs db connId SndQueue {dbQueueId} = DB.execute db "DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ?" (connId, dbQueueId) +getExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> IO [InternalId] +getExpiredSndMessages = undefined + +deleteExpiredSndMessages :: DB.Connection -> ConnId -> SndQueue -> [InternalId] -> IO () +deleteExpiredSndMessages = undefined + +setQuotaAvailable :: DB.Connection -> ConnId -> SndQueue -> IO () +setQuotaAvailable = undefined + +updateDeliveryDelay :: DB.Connection -> ConnId -> SndQueue -> IO () +updateDeliveryDelay = undefined + setMsgUserAck :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (RcvQueue, SMP.MsgId)) setMsgUserAck db connId agentMsgId = runExceptT $ do (dbRcvId, srvMsgId) <- diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index f41a34291..d9eb1bc81 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -67,6 +67,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -102,7 +103,8 @@ schemaMigrations = ("m20230814_indexes", m20230814_indexes, Just down_m20230814_indexes), ("m20230829_crypto_files", m20230829_crypto_files, Just down_m20230829_crypto_files), ("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at), - ("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items) + ("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items), + ("m20240118_snd_queue_delivery", m20240118_snd_queue_delivery, Just down_m20240118_snd_queue_delivery) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240118_snd_queue_delivery.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240118_snd_queue_delivery.hs new file mode 100644 index 000000000..9721ecc32 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20240118_snd_queue_delivery.hs @@ -0,0 +1,26 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20240118_snd_queue_delivery where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20240118_snd_queue_delivery :: Query +m20240118_snd_queue_delivery = + [sql| +ALTER TABLE snd_queues ADD COLUMN deliver_after TEXT NOT NULL DEFAULT('1970-01-01 00:00:00'); +ALTER TABLE snd_queues ADD COLUMN quota_exceeded INTEGER NOT NULL DEFAULT 0; +ALTER TABLE snd_queues ADD COLUMN retry_delay INTEGER; + +CREATE INDEX idx_snd_queues_deliver_after ON snd_queues(deliver_after); +|] + +down_m20240118_snd_queue_delivery :: Query +down_m20240118_snd_queue_delivery = + [sql| +DROP INDEX idx_snd_queues_deliver_after; + +ALTER TABLE snd_queues DROP COLUMN deliver_after; +ALTER TABLE snd_queues DROP COLUMN quota_exceeded; +ALTER TABLE snd_queues DROP COLUMN retry_delay; +|] diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 3af07a657..461fcef76 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -28,6 +28,7 @@ module Simplex.Messaging.Client ( -- * Connect (disconnect) client to (from) SMP server TransportSession, + SMPTransportSession, ProtocolClient (thVersion, sessionId, sessionTs), SMPClient, getProtocolClient, @@ -301,6 +302,8 @@ type UserId = Int64 -- | Transport session key - includes entity ID if `sessionMode = TSMEntity`. type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId) +type SMPTransportSession = TransportSession BrokerMsg + -- | Connects to 'ProtocolServer' using passed client configuration -- and queue for messages and notifications. --