From 36fe363047f3c096d0142af9ec2739ea7e664d3e Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 11 Apr 2023 12:52:44 +0200 Subject: [PATCH 01/22] implement deduplication configuration --- .../PostgreSQL/Consumers/Components.hs | 38 +++++++++++++------ src/Database/PostgreSQL/Consumers/Config.hs | 11 +++++- test/Test.hs | 1 + 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 144422d..6624594 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -242,7 +242,11 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore loop :: Int -> m Bool loop limit = do - (batch, batchSize) <- reserveJobs limit + -- If we're running in 'Deduplicating' mode we only + -- reserve one job at a time. + (batch, batchSize) <- reserveJobs $ case ccMode of + Standard -> limit + Deduplicating _ -> 1 when (batchSize > 0) $ do logInfo "Processing batch" $ object [ "batch_size" .= batchSize @@ -286,16 +290,28 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore (, n) . F.toList . fmap ccJobFetcher <$> queryResult where reservedJobs :: UTCTime -> SQL - reservedJobs now = smconcat [ - "SELECT id FROM" <+> raw ccJobsTable - , "WHERE" - , " reserved_by IS NULL" - , " AND run_at IS NOT NULL" - , " AND run_at <= " now - , " ORDER BY run_at" - , "LIMIT" limit - , "FOR UPDATE SKIP LOCKED" - ] + reservedJobs now = case ccMode of + Standard -> smconcat [ + "SELECT id FROM" <+> raw ccJobsTable + , "WHERE" + , " reserved_by IS NULL" + , " AND run_at IS NOT NULL" + , " AND run_at <= " now + , " ORDER BY run_at" + , "LIMIT" limit + , "FOR UPDATE SKIP LOCKED" + ] + Deduplicating field -> smconcat [ + "WITH latest_for_id AS" + , " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable + , " ORDER BY run_at," <+> field <> ", id" <+> "DESC LIMIT" limit <+> "FOR UPDATE SKIP LOCKED)," + , " lock_all AS" + , " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable + , " WHERE" <+> field <+> "= (SELECT" <+> field <+> "FROM latest_for_id)" + , " AND id <= (SELECT id FROM latest_for_id)" + , " FOR UPDATE SKIP LOCKED)" + , "SELECT id," <+> field <+> "FROM lock_all" + ] -- | Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index a45370b..547e9f3 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -1,6 +1,7 @@ {-# LANGUAGE ExistentialQuantification #-} module Database.PostgreSQL.Consumers.Config ( Action(..) + , Mode(..) , Result(..) , ConsumerConfig(..) ) where @@ -21,12 +22,16 @@ data Action | RerunAfter Interval | RerunAt UTCTime | Remove - deriving (Eq, Ord, Show) + deriving (Eq, Ord, Show) -- | Result of processing a job. data Result = Ok Action | Failed Action deriving (Eq, Ord, Show) +-- | The mode the consumer will run in. +data Mode = Standard | Deduplicating SQL + deriving (Show) + -- | Config of a consumer. data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig { -- | Name of the database table where jobs are stored. The table needs to have @@ -118,4 +123,8 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig { -- Note that if this action throws an exception, the consumer goes -- down, so it's best to ensure that it doesn't throw. , ccOnException :: !(SomeException -> job -> m Action) +-- | The mode the consumer will use to reserve jobs. +-- In 'Deduplicating' mode the SQL expression indicates which field +-- to select for deduplication. +, ccMode :: Mode } diff --git a/test/Test.hs b/test/Test.hs index 289364f..23da9a8 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -154,6 +154,7 @@ test = do , ccMaxRunningJobs = 20 , ccProcessJob = processJob , ccOnException = handleException + , ccMode = Standard } putJob :: Int32 -> TestEnv () From 1999544734cc34e3ab28ea549b4283e2e2b306dd Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 11 Apr 2023 12:54:03 +0200 Subject: [PATCH 02/22] refactor code --- src/Database/PostgreSQL/Consumers/Components.hs | 8 ++++---- test/Test.hs | 15 +++++++-------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 6624594..9bbfbc7 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -95,7 +95,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal initialJobs <- liftBase $ readTVarIO runningJobsInfo (`fix` initialJobs) $ \loop jobsInfo -> do -- If jobs are still running, display info about them. - when (not $ M.null jobsInfo) $ do + unless (M.null jobsInfo) $ do logInfo "Waiting for running jobs" $ object [ "job_id" .= showJobsInfo jobsInfo ] @@ -108,7 +108,7 @@ runConsumerWithMaybeIdleSignal cc cs mIdleSignal -- If jobs info didn't change, wait for it to change. -- Otherwise loop so it either displays the new info -- or exits if there are no jobs running anymore. - if (newJobsInfo == jobsInfo) + if newJobsInfo == jobsInfo then retry else return $ loop newJobsInfo where @@ -167,7 +167,7 @@ spawnMonitor ConsumerConfig{..} cs cid = forkP "monitor" . forever $ do if ok then logInfo_ "Activity of the consumer updated" else do - logInfo_ $ "Consumer is not registered" + logInfo_ "Consumer is not registered" throwM ThreadKilled -- Freeing jobs locked by inactive consumers needs to happen -- exactly once, otherwise it's possible to free it twice, after @@ -369,7 +369,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore retryToSQL _ (Right time) ids = ("WHEN id = ANY(" Array1 ids <+> ") THEN" time :) - retries = foldr step M.empty $ map f updates + retries = foldr (step . f) M.empty updates where f (idx, result) = case result of Ok action -> (idx, action) diff --git a/test/Test.hs b/test/Test.hs index 23da9a8..4d598b9 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -65,9 +65,9 @@ modifyTestTime modtime = modify (\te -> te { teCurrentTime = modtime . teCurrent runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestEnv a -> IO a runTestEnv connSource logger m = - (runLogT "consumers-test" logger defaultLogLevel) - . (runDBT connSource defaultTransactionSettings) - . (\m' -> fst <$> (runStateT m' $ TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) + runLogT "consumers-test" logger defaultLogLevel + . runDBT connSource defaultTransactionSettings + . (\m' -> fst <$> runStateT m' (TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) . unTestEnv $ m @@ -89,7 +89,7 @@ test = do withSimpleStdOutLogger $ \logger -> runTestEnv connSource logger $ do createTables - idleSignal <- liftIO $ atomically $ newEmptyTMVar + idleSignal <- liftIO newEmptyTMVarIO putJob 10 >> commit forM_ [1..10::Int] $ \_ -> do @@ -101,7 +101,7 @@ test = do currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) -- Each job creates 2 new jobs, so there should be 1024 jobs in table. - runSQL_ $ "SELECT COUNT(*) from consumers_test_jobs" + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" rowcount0 :: Int64 <- fetchOne runIdentity -- Move time 2 hours forward modifyTestTime $ addUTCTime (2*60*60) @@ -109,13 +109,13 @@ test = do runConsumerWithIdleSignal consumerConfig connSource idleSignal) $ do waitUntilTrue idleSignal -- Jobs are designed to double only 10 times, so there should be no jobs left now. - runSQL_ $ "SELECT COUNT(*) from consumers_test_jobs" + runSQL_ "SELECT COUNT(*) from consumers_test_jobs" rowcount1 :: Int64 <- fetchOne runIdentity liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 dropTables where - waitUntilTrue tmvar = whileM_ (not <$> (liftIO $ atomically $ takeTMVar tmvar)) $ return () + waitUntilTrue tmvar = whileM_ (not <$> liftIO (atomically $ takeTMVar tmvar)) $ return () printUsage = do prog <- getProgName @@ -179,7 +179,6 @@ test = do "Job #" <> showt idx <> " failed with: " <> showt exc return . RerunAfter $ imicroseconds 500000 - jobsTable :: Table jobsTable = tblTable From 7813acdf77128df639d187c71621add670d730cd Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 11 Apr 2023 12:54:14 +0200 Subject: [PATCH 03/22] update .gitignore file --- .gitignore | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.gitignore b/.gitignore index 2fd2c86..75b66bb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +# GHC and Cabal /dist/ /dist-newstyle/ /cabal.project.local @@ -6,3 +7,13 @@ TAGS .ghc.environment.* .cabal-sandbox cabal.sandbox.config + +# direnv +.direnv/ +.envrc + +# emacs +**/.dir-locals.el + +# postgres +_local/ From 49e11d5fb74c001427418361c194146b6d5aa386 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 11 Apr 2023 13:09:49 +0200 Subject: [PATCH 04/22] update consumers example --- example/Example.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/example/Example.hs b/example/Example.hs index 5887ff9..c2f50fd 100644 --- a/example/Example.hs +++ b/example/Example.hs @@ -102,6 +102,7 @@ main = do , ccMaxRunningJobs = 1 , ccProcessJob = processJob , ccOnException = handleException + , ccMode = Standard } -- Add a job to the consumer's queue. From 9dea97c87cbcf81b0bb409f6277a5b968a98bb23 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Thu, 20 Apr 2023 15:42:04 +0200 Subject: [PATCH 05/22] implement duplication test --- .../PostgreSQL/Consumers/Components.hs | 2 +- test/Test.hs | 233 ++++++++++++++---- 2 files changed, 189 insertions(+), 46 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 9bbfbc7..3c3bea5 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -310,7 +310,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " WHERE" <+> field <+> "= (SELECT" <+> field <+> "FROM latest_for_id)" , " AND id <= (SELECT id FROM latest_for_id)" , " FOR UPDATE SKIP LOCKED)" - , "SELECT id," <+> field <+> "FROM lock_all" + , "SELECT id FROM lock_all" ] -- | Spawn each job in a separate thread. diff --git a/test/Test.hs b/test/Test.hs index 4d598b9..fe35fb0 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -11,6 +11,7 @@ module Main where +import Data.Monoid.Utils import Database.PostgreSQL.Consumers import Database.PostgreSQL.PQTypes import Database.PostgreSQL.PQTypes.Checks @@ -37,7 +38,7 @@ import System.Exit import TextShow import qualified Data.Text as T -import qualified Test.HUnit as T +import qualified Test.HUnit as Test data TestEnvSt = TestEnvSt { teCurrentTime :: UTCTime @@ -64,33 +65,114 @@ modifyTestTime :: (MonadState TestEnvSt m) => (UTCTime -> UTCTime) -> m () modifyTestTime modtime = modify (\te -> te { teCurrentTime = modtime . teCurrentTime $ te }) runTestEnv :: ConnectionSourceM (LogT IO) -> Logger -> TestEnv a -> IO a -runTestEnv connSource logger m = +runTestEnv connSource logger = runLogT "consumers-test" logger defaultLogLevel . runDBT connSource defaultTransactionSettings - . (\m' -> fst <$> runStateT m' (TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) + . (\m' -> evalStateT m' (TestEnvSt (UTCTime (ModifiedJulianDay 0) 0) 0)) . unTestEnv - $ m main :: IO () -main = void $ T.runTestTT $ T.TestCase test +main = do + connSource <- connectToDB + void . Test.runTestTT $ + Test.TestList + [ Test.TestLabel "Test standard consumer config" $ Test.TestCase (test connSource) + , Test.TestLabel "Test deduplicating consumer config" $ Test.TestCase (testDeduplicating connSource) + ] -test :: IO () -test = do +-- | Connect to the postgres database +connectToDB :: IO (ConnectionSource [MonadBase IO, MonadMask]) +connectToDB = do connString <- getArgs >>= \case connString : _args -> return $ T.pack connString [] -> lookupEnv "GITHUB_ACTIONS" >>= \case Just "true" -> return "host=postgres user=postgres password=postgres" _ -> printUsage >> exitFailure - let connSettings = defaultConnectionSettings - { csConnInfo = connString } - ConnectionSource connSource = simpleSource connSettings + pure $ simpleSource defaultConnectionSettings { csConnInfo = connString } + where + printUsage = do + prog <- getProgName + putStrLn $ "Usage: " <> prog <> " " +testDeduplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO () +testDeduplicating (ConnectionSource connSource) = + withSimpleStdOutLogger $ \logger -> runTestEnv connSource logger $ do + createTables + idleSignal <- liftIO newEmptyTMVarIO + let rows = 15 + putJob rows "consumers_test_deduplicating_jobs" "consumers_test_deduplicating_chan" *> commit + + -- Move time forward 2hours, because job is scheduled 1 hour into future + modifyTestTime . addUTCTime $ 2*60*60 + finalize (localDomain "process" $ + runConsumerWithIdleSignal deduplicatingConsumerConfig connSource idleSignal) $ + waitUntilTrue idleSignal + currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) + + runSQL_ "SELECT COUNT(*) from person_test" + rowcount0 :: Int64 <- fetchOne runIdentity + + runSQL_ "SELECT COUNT(*) from consumers_test_deduplicating_jobs" + rowcount1 :: Int64 <- fetchOne runIdentity + + liftIO $ Test.assertEqual "Number of rows in person_test is 2×rows" (2 * rows) (fromIntegral rowcount0) + liftIO $ Test.assertEqual "Job in consumers_test_deduplicating_jobs should be completed" 0 rowcount1 + + dropTables + where + + tables = [deduplicatingConsumersTable, deduplicatingJobsTable, personTable] + + migrations = createTableMigration <$> tables + + createTables :: TestEnv () + createTables = do + migrateDatabase defaultExtrasOptions + {- extensions -} [] {- composites -} [] {- domains -} [] + tables migrations + checkDatabase defaultExtrasOptions + {- composites -} [] {- domains -} [] + tables + + dropTables :: TestEnv () + dropTables = do + migrateDatabase defaultExtrasOptions + {- extensions -} [] {- composites -} [] {- domains -} [] {- tables -} [] + [ dropTableMigration deduplicatingJobsTable + , dropTableMigration deduplicatingConsumersTable + , dropTableMigration personTable + ] + + deduplicatingConsumerConfig = ConsumerConfig + { ccJobsTable = "consumers_test_deduplicating_jobs" + , ccConsumersTable = "consumers_test_deduplicating_consumers" + , ccJobSelectors = ["id", "countdown"] + , ccJobFetcher = id + , ccJobIndex = fst + , ccNotificationChannel = Just "consumers_test_deduplicating_chan" + -- select some small timeout + , ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccMaxRunningJobs = 20 + , ccProcessJob = insertNRows . snd + , ccOnException = \err (idx, _) -> handleException err idx + , ccMode = Deduplicating "countdown" + } + +insertNRows :: Int32 -> TestEnv Result +insertNRows count = do + replicateM_ (fromIntegral count) $ do + runSQL_ "INSERT INTO person_test (name, age) VALUES ('Anna', 20)" + notify "consumers_test_deduplicating_chan" "" + pure $ Ok Remove + +test :: ConnectionSource [MonadBase IO, MonadMask] -> IO () +test (ConnectionSource connSource) = withSimpleStdOutLogger $ \logger -> runTestEnv connSource logger $ do createTables idleSignal <- liftIO newEmptyTMVarIO - putJob 10 >> commit + putJob 10 "consumers_test_jobs" "consumers_test_chan" *> commit forM_ [1..10::Int] $ \_ -> do -- Move time forward 2hours, because jobs are scheduled 1 hour into future @@ -111,20 +193,14 @@ test = do -- Jobs are designed to double only 10 times, so there should be no jobs left now. runSQL_ "SELECT COUNT(*) from consumers_test_jobs" rowcount1 :: Int64 <- fetchOne runIdentity - liftIO $ T.assertEqual "Number of jobs in table after 10 steps is 1024" 1024 rowcount0 - liftIO $ T.assertEqual "Number of jobs in table after 11 steps is 0" 0 rowcount1 + liftIO $ Test.assertEqual "Number of jobs in table consumers_test_jobs 10 steps is 1024" 1024 rowcount0 + liftIO $ Test.assertEqual "Number of jobs in table consumers_test_jobs 11 steps is 0" 0 rowcount1 dropTables where - waitUntilTrue tmvar = whileM_ (not <$> liftIO (atomically $ takeTMVar tmvar)) $ return () - - printUsage = do - prog <- getProgName - putStrLn $ "Usage: " <> prog <> " " tables = [consumersTable, jobsTable] -- NB: order of migrations is important. - migrations = [ createTableMigration consumersTable - , createTableMigration jobsTable ] + migrations = createTableMigration <$> tables createTables :: TestEnv () createTables = do @@ -140,44 +216,48 @@ test = do migrateDatabase defaultExtrasOptions {- extensions -} [] {- composites -} [] {- domains -} [] {- tables -} [] [ dropTableMigration jobsTable - , dropTableMigration consumersTable ] + , dropTableMigration consumersTable + ] consumerConfig = ConsumerConfig { ccJobsTable = "consumers_test_jobs" , ccConsumersTable = "consumers_test_consumers" , ccJobSelectors = ["id", "countdown"] , ccJobFetcher = id - , ccJobIndex = \(i::Int64, _::Int32) -> i + , ccJobIndex = fst , ccNotificationChannel = Just "consumers_test_chan" -- select some small timeout , ccNotificationTimeout = 100 * 1000 -- 100 msec , ccMaxRunningJobs = 20 - , ccProcessJob = processJob - , ccOnException = handleException + , ccProcessJob = processJob "consumers_test_jobs" "consumers_test_chan" + , ccOnException = \err (idx, _) -> handleException err idx , ccMode = Standard } - putJob :: Int32 -> TestEnv () - putJob countdown = localDomain "put" $ do - now <- currentTime - runSQL_ $ "INSERT INTO consumers_test_jobs " - <> "(run_at, finished_at, reserved_by, attempts, countdown) " - <> "VALUES (" now <> " + interval '1 hour', NULL, NULL, 0, " countdown <> ")" - notify "consumers_test_chan" "" - - processJob :: (Int64, Int32) -> TestEnv Result - processJob (_idx, countdown) = do - when (countdown > 0) $ do - putJob (countdown - 1) - putJob (countdown - 1) - commit - return (Ok Remove) - - handleException :: SomeException -> (Int64, Int32) -> TestEnv Action - handleException exc (idx, _countdown) = do - logAttention_ $ - "Job #" <> showt idx <> " failed with: " <> showt exc - return . RerunAfter $ imicroseconds 500000 +waitUntilTrue :: MonadIO m => TMVar Bool -> m () +waitUntilTrue tmvar = whileM_ (not <$> liftIO (atomically $ takeTMVar tmvar)) $ pure () + +putJob :: Int32 -> SQL -> Channel -> TestEnv () +putJob countdown tableName notifyChan = localDomain "put" $ do + now <- currentTime + runSQL_ $ "INSERT INTO" <+> tableName + <+> "(run_at, finished_at, reserved_by, attempts, countdown)" + <+> "VALUES (" now <+> "+ interval '1 hour', NULL, NULL, 0, " countdown <> ")" + notify notifyChan "" + +processJob :: SQL -> Channel -> (Int64, Int32) -> TestEnv Result +processJob tableName notifyChan (_idx, countdown) = do + when (countdown > 0) $ do + putJob (countdown - 1) tableName notifyChan + putJob (countdown - 1) tableName notifyChan + commit + return (Ok Remove) + +handleException :: SomeException -> Int64 -> TestEnv Action +handleException exc idx = do + logAttention_ $ + "Job #" <> showt idx <> " failed with: " <> showt exc + return . RerunAfter $ imicroseconds 500000 jobsTable :: Table jobsTable = @@ -208,6 +288,53 @@ jobsTable = ] } +personTable :: Table +personTable = + tblTable + { tblName = "person_test" + , tblVersion = 1 + , tblColumns = + [ tblColumn { colName = "id", colType = BigSerialT + , colNullable = False } + , tblColumn { colName = "name", colType = TextT + , colNullable = False } + , tblColumn { colName = "age", colType = IntegerT + , colNullable = False } + ] + , tblPrimaryKey = pkOnColumn "id" + } + +deduplicatingJobsTable :: Table +deduplicatingJobsTable = + tblTable + { tblName = "consumers_test_deduplicating_jobs" + , tblVersion = 1 + , tblColumns = + [ tblColumn { colName = "id", colType = BigSerialT + , colNullable = False } + , tblColumn { colName = "run_at", colType = TimestampWithZoneT + , colNullable = True } + , tblColumn { colName = "finished_at", colType = TimestampWithZoneT + , colNullable = True } + , tblColumn { colName = "reserved_by", colType = BigIntT + , colNullable = True } + , tblColumn { colName = "attempts", colType = IntegerT + , colNullable = False } + + -- Non-obligatory field "countdown". Really more of a count + -- and not a countdown, but name is kept to that we can reuse + -- `putJob` function. + , tblColumn { colName = "countdown", colType = IntegerT + , colNullable = False } + ] + , tblPrimaryKey = pkOnColumn "id" + , tblForeignKeys = [ + (fkOnColumn "reserved_by" "consumers_test_deduplicating_consumers" "id") { + fkOnDelete = ForeignKeySetNull + } + ] + } + consumersTable :: Table consumersTable = tblTable @@ -224,6 +351,22 @@ consumersTable = , tblPrimaryKey = pkOnColumn "id" } +deduplicatingConsumersTable :: Table +deduplicatingConsumersTable = + tblTable + { tblName = "consumers_test_deduplicating_consumers" + , tblVersion = 1 + , tblColumns = + [ tblColumn { colName = "id", colType = BigSerialT + , colNullable = False } + , tblColumn { colName = "name", colType = TextT + , colNullable = False } + , tblColumn { colName = "last_activity", colType = TimestampWithZoneT + , colNullable = False } + ] + , tblPrimaryKey = pkOnColumn "id" + } + createTableMigration :: (MonadDB m) => Table -> Migration m createTableMigration tbl = Migration { mgrTableName = tblName tbl From 3bc7f1631b4aab8976b0d73bbaa54341984e7092 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Thu, 20 Apr 2023 15:42:36 +0200 Subject: [PATCH 06/22] rename: deduplication -> duplication --- .../PostgreSQL/Consumers/Components.hs | 4 +- src/Database/PostgreSQL/Consumers/Config.hs | 6 +-- test/Test.hs | 46 +++++++++---------- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 3c3bea5..1dc1376 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -246,7 +246,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore -- reserve one job at a time. (batch, batchSize) <- reserveJobs $ case ccMode of Standard -> limit - Deduplicating _ -> 1 + Duplicating _ -> 1 when (batchSize > 0) $ do logInfo "Processing batch" $ object [ "batch_size" .= batchSize @@ -301,7 +301,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , "LIMIT" limit , "FOR UPDATE SKIP LOCKED" ] - Deduplicating field -> smconcat [ + Duplicating field -> smconcat [ "WITH latest_for_id AS" , " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable , " ORDER BY run_at," <+> field <> ", id" <+> "DESC LIMIT" limit <+> "FOR UPDATE SKIP LOCKED)," diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index 547e9f3..fde9ef5 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -29,7 +29,7 @@ data Result = Ok Action | Failed Action deriving (Eq, Ord, Show) -- | The mode the consumer will run in. -data Mode = Standard | Deduplicating SQL +data Mode = Standard | Duplicating SQL deriving (Show) -- | Config of a consumer. @@ -124,7 +124,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig { -- down, so it's best to ensure that it doesn't throw. , ccOnException :: !(SomeException -> job -> m Action) -- | The mode the consumer will use to reserve jobs. --- In 'Deduplicating' mode the SQL expression indicates which field --- to select for deduplication. +-- In 'Duplicating' mode the SQL expression indicates which field +-- to select for duplication. , ccMode :: Mode } diff --git a/test/Test.hs b/test/Test.hs index fe35fb0..fb380f6 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -77,7 +77,7 @@ main = do void . Test.runTestTT $ Test.TestList [ Test.TestLabel "Test standard consumer config" $ Test.TestCase (test connSource) - , Test.TestLabel "Test deduplicating consumer config" $ Test.TestCase (testDeduplicating connSource) + , Test.TestLabel "Test duplicating consumer config" $ Test.TestCase (testDuplicating connSource) ] -- | Connect to the postgres database @@ -95,34 +95,34 @@ connectToDB = do prog <- getProgName putStrLn $ "Usage: " <> prog <> " " -testDeduplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO () -testDeduplicating (ConnectionSource connSource) = +testDuplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO () +testDuplicating (ConnectionSource connSource) = withSimpleStdOutLogger $ \logger -> runTestEnv connSource logger $ do createTables idleSignal <- liftIO newEmptyTMVarIO let rows = 15 - putJob rows "consumers_test_deduplicating_jobs" "consumers_test_deduplicating_chan" *> commit + putJob rows "consumers_test_duplicating_jobs" "consumers_test_duplicating_chan" *> commit -- Move time forward 2hours, because job is scheduled 1 hour into future modifyTestTime . addUTCTime $ 2*60*60 finalize (localDomain "process" $ - runConsumerWithIdleSignal deduplicatingConsumerConfig connSource idleSignal) $ + runConsumerWithIdleSignal duplicatingConsumerConfig connSource idleSignal) $ waitUntilTrue idleSignal currentTime >>= (logInfo_ . T.pack . ("current time: " ++) . show) runSQL_ "SELECT COUNT(*) from person_test" rowcount0 :: Int64 <- fetchOne runIdentity - runSQL_ "SELECT COUNT(*) from consumers_test_deduplicating_jobs" + runSQL_ "SELECT COUNT(*) from consumers_test_duplicating_jobs" rowcount1 :: Int64 <- fetchOne runIdentity liftIO $ Test.assertEqual "Number of rows in person_test is 2×rows" (2 * rows) (fromIntegral rowcount0) - liftIO $ Test.assertEqual "Job in consumers_test_deduplicating_jobs should be completed" 0 rowcount1 + liftIO $ Test.assertEqual "Job in consumers_test_duplicating_jobs should be completed" 0 rowcount1 dropTables where - tables = [deduplicatingConsumersTable, deduplicatingJobsTable, personTable] + tables = [duplicatingConsumersTable, duplicatingJobsTable, personTable] migrations = createTableMigration <$> tables @@ -139,31 +139,31 @@ testDeduplicating (ConnectionSource connSource) = dropTables = do migrateDatabase defaultExtrasOptions {- extensions -} [] {- composites -} [] {- domains -} [] {- tables -} [] - [ dropTableMigration deduplicatingJobsTable - , dropTableMigration deduplicatingConsumersTable + [ dropTableMigration duplicatingJobsTable + , dropTableMigration duplicatingConsumersTable , dropTableMigration personTable ] - deduplicatingConsumerConfig = ConsumerConfig - { ccJobsTable = "consumers_test_deduplicating_jobs" - , ccConsumersTable = "consumers_test_deduplicating_consumers" + duplicatingConsumerConfig = ConsumerConfig + { ccJobsTable = "consumers_test_duplicating_jobs" + , ccConsumersTable = "consumers_test_duplicating_consumers" , ccJobSelectors = ["id", "countdown"] , ccJobFetcher = id , ccJobIndex = fst - , ccNotificationChannel = Just "consumers_test_deduplicating_chan" + , ccNotificationChannel = Just "consumers_test_duplicating_chan" -- select some small timeout , ccNotificationTimeout = 100 * 1000 -- 100 msec , ccMaxRunningJobs = 20 , ccProcessJob = insertNRows . snd , ccOnException = \err (idx, _) -> handleException err idx - , ccMode = Deduplicating "countdown" + , ccMode = Duplicating "countdown" } insertNRows :: Int32 -> TestEnv Result insertNRows count = do replicateM_ (fromIntegral count) $ do runSQL_ "INSERT INTO person_test (name, age) VALUES ('Anna', 20)" - notify "consumers_test_deduplicating_chan" "" + notify "consumers_test_duplicating_chan" "" pure $ Ok Remove test :: ConnectionSource [MonadBase IO, MonadMask] -> IO () @@ -304,10 +304,10 @@ personTable = , tblPrimaryKey = pkOnColumn "id" } -deduplicatingJobsTable :: Table -deduplicatingJobsTable = +duplicatingJobsTable :: Table +duplicatingJobsTable = tblTable - { tblName = "consumers_test_deduplicating_jobs" + { tblName = "consumers_test_duplicating_jobs" , tblVersion = 1 , tblColumns = [ tblColumn { colName = "id", colType = BigSerialT @@ -329,7 +329,7 @@ deduplicatingJobsTable = ] , tblPrimaryKey = pkOnColumn "id" , tblForeignKeys = [ - (fkOnColumn "reserved_by" "consumers_test_deduplicating_consumers" "id") { + (fkOnColumn "reserved_by" "consumers_test_duplicating_consumers" "id") { fkOnDelete = ForeignKeySetNull } ] @@ -351,10 +351,10 @@ consumersTable = , tblPrimaryKey = pkOnColumn "id" } -deduplicatingConsumersTable :: Table -deduplicatingConsumersTable = +duplicatingConsumersTable :: Table +duplicatingConsumersTable = tblTable - { tblName = "consumers_test_deduplicating_consumers" + { tblName = "consumers_test_duplicating_consumers" , tblVersion = 1 , tblColumns = [ tblColumn { colName = "id", colType = BigSerialT From 1cf9f5a648f90d5ea3dce76304a98f46f55c2fe3 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Fri, 21 Apr 2023 12:47:50 +0200 Subject: [PATCH 07/22] update CI GHC version: 9.2.3 -> 9.2.7 --- .github/workflows/haskell-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml index 415a3f1..5715105 100644 --- a/.github/workflows/haskell-ci.yml +++ b/.github/workflows/haskell-ci.yml @@ -43,9 +43,9 @@ jobs: compilerVersion: 9.4.4 setup-method: ghcup allow-failure: false - - compiler: ghc-9.2.3 + - compiler: ghc-9.2.7 compilerKind: ghc - compilerVersion: 9.2.3 + compilerVersion: 9.2.7 setup-method: ghcup allow-failure: false - compiler: ghc-9.0.2 From 641c5009d59664b3cd71e4667875b1bba61e04eb Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Wed, 26 Apr 2023 10:47:42 +0200 Subject: [PATCH 08/22] fixup --- src/Database/PostgreSQL/Consumers/Config.hs | 4 ++-- test/Test.hs | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index fde9ef5..e01440c 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -124,7 +124,7 @@ data ConsumerConfig m idx job = forall row. FromRow row => ConsumerConfig { -- down, so it's best to ensure that it doesn't throw. , ccOnException :: !(SomeException -> job -> m Action) -- | The mode the consumer will use to reserve jobs. --- In 'Duplicating' mode the SQL expression indicates which field --- to select for duplication. +-- In @'Duplicating'@ mode the SQL expression indicates which field +-- in the jobs table (specified by @'ccJobsTable'@) to select for duplication. , ccMode :: Mode } diff --git a/test/Test.hs b/test/Test.hs index fb380f6..97ebb9f 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -76,11 +76,10 @@ main = do connSource <- connectToDB void . Test.runTestTT $ Test.TestList - [ Test.TestLabel "Test standard consumer config" $ Test.TestCase (test connSource) + [ Test.TestLabel "Test standard (non-duplicating) consumer config" $ Test.TestCase (test connSource) , Test.TestLabel "Test duplicating consumer config" $ Test.TestCase (testDuplicating connSource) ] --- | Connect to the postgres database connectToDB :: IO (ConnectionSource [MonadBase IO, MonadMask]) connectToDB = do connString <- getArgs >>= \case @@ -152,7 +151,7 @@ testDuplicating (ConnectionSource connSource) = , ccJobIndex = fst , ccNotificationChannel = Just "consumers_test_duplicating_chan" -- select some small timeout - , ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccNotificationTimeout = 100 * 1000 -- msec , ccMaxRunningJobs = 20 , ccProcessJob = insertNRows . snd , ccOnException = \err (idx, _) -> handleException err idx From 95e7312123e3744db7fc629cb315674c58f70aa1 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Fri, 28 Apr 2023 13:34:10 +0200 Subject: [PATCH 09/22] update log-base constraint --- consumers.cabal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumers.cabal b/consumers.cabal index 5e2ed57..85ee37e 100644 --- a/consumers.cabal +++ b/consumers.cabal @@ -38,7 +38,7 @@ library , hpqtypes >= 1.11 && < 2.0 , lifted-base >= 0.2 && < 0.3 , lifted-threads >= 1.0 && < 1.1 - , log-base >= 0.11 && < 0.12 + , log-base >= 0.11 && < 0.13 , monad-control >= 1.0 && < 1.1 , monad-time >= 0.4 && < 0.5 , mtl >= 2.2 && < 2.3 From 866ed7d4c22a8e3ab20b6658e6c02551f2f03de1 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Fri, 5 May 2023 14:30:16 +0200 Subject: [PATCH 10/22] Fix `updateJobs` deduplication implementation --- src/Database/PostgreSQL/Consumers/Components.hs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 1dc1376..e844782 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -345,10 +345,10 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore updateJobs :: [(idx, Result)] -> m () updateJobs results = runDBT cs ts $ do now <- currentTime - runSQL_ $ smconcat [ - "WITH removed AS (" + runSQL_ $ smconcat + [ "WITH removed AS (" , " DELETE FROM" <+> raw ccJobsTable - , " WHERE id = ANY(" Array1 deletes <+> ")" + , " WHERE id" <+> operator <+> "ANY (" Array1 deletes <+> ")" , ")" , "UPDATE" <+> raw ccJobsTable <+> "SET" , " reserved_by = NULL" @@ -361,9 +361,12 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " WHEN id = ANY(" Array1 successes <+> ") THEN " now , " ELSE NULL" , " END" - , "WHERE id = ANY(" Array1 (map fst updates) <+> ")" + , "WHERE id" <+> operator <+> "ANY (" Array1 (map fst updates) <+> ")" ] where + operator = case ccMode of + Standard -> "=" + Duplicating _field -> "<=" retryToSQL now (Left int) ids = ("WHEN id = ANY(" Array1 ids <+> ") THEN " now <> " +" int :) retryToSQL _ (Right time) ids = From bd30a0baf4145034916057da20d92a004fa69b4b Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Fri, 26 May 2023 15:50:08 +0200 Subject: [PATCH 11/22] Fix job processing in deduplicating mode Ensure that only one job is processed at a time in deduplicating mode so that we don't accidentally update multiple jobs using the <= operator when checking for matching ids to update rows in the database table. --- consumers.cabal | 1 + .../PostgreSQL/Consumers/Components.hs | 70 ++++++++++++++++--- src/Database/PostgreSQL/Consumers/Config.hs | 2 +- 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/consumers.cabal b/consumers.cabal index 85ee37e..e1ee80e 100644 --- a/consumers.cabal +++ b/consumers.cabal @@ -34,6 +34,7 @@ library build-depends: base >= 4.13 && < 5 , containers >= 0.5 && < 0.7 , exceptions >= 0.10 && < 0.11 + , text , extra >= 1.6 && < 1.8 , hpqtypes >= 1.11 && < 2.0 , lifted-base >= 0.2 && < 0.3 diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index e844782..95e1695 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -28,6 +28,7 @@ import qualified Control.Concurrent.STM as STM import qualified Control.Concurrent.Thread.Lifted as T import qualified Data.Foldable as F import qualified Data.Map.Strict as M +import qualified Data.Text as Text import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer @@ -244,9 +245,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore loop limit = do -- If we're running in 'Deduplicating' mode we only -- reserve one job at a time. - (batch, batchSize) <- reserveJobs $ case ccMode of - Standard -> limit - Duplicating _ -> 1 + (batch, batchSize) <- reserveJobs limit when (batchSize > 0) $ do logInfo "Processing batch" $ object [ "batch_size" .= batchSize @@ -262,7 +261,9 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore modifyTVar' runningJobs (subtract batchSize) void . forkP "batch processor" . (`finally` subtractJobs) . restore $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + case batch of + Left job -> startJob job >>= joinJob >>= updateJob + Right jobs -> mapM startJob jobs >>= mapM joinJob >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -273,7 +274,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore return (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: Int -> m (Either job [job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime n <- runSQL $ smconcat [ @@ -287,8 +288,11 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , "RETURNING" <+> mintercalate ", " ccJobSelectors ] -- Decode lazily as we want the transaction to be as short as possible. - (, n) . F.toList . fmap ccJobFetcher <$> queryResult + (, n) . limitJobs . F.toList . fmap ccJobFetcher <$> queryResult where + limitJobs = case ccMode of + Standard -> Right + Duplicating _field -> Left . head reservedJobs :: UTCTime -> SQL reservedJobs now = case ccMode of Standard -> smconcat [ @@ -303,11 +307,11 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore ] Duplicating field -> smconcat [ "WITH latest_for_id AS" - , " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable - , " ORDER BY run_at," <+> field <> ", id" <+> "DESC LIMIT" limit <+> "FOR UPDATE SKIP LOCKED)," + , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable + , " ORDER BY run_at," <+> raw field <> ", id" <+> "DESC LIMIT 1 FOR UPDATE SKIP LOCKED)," , " lock_all AS" - , " (SELECT id," <+> field <+> "FROM" <+> raw ccJobsTable - , " WHERE" <+> field <+> "= (SELECT" <+> field <+> "FROM latest_for_id)" + , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable + , " WHERE" <+> raw field <+> "= (SELECT" <+> raw field <+> "FROM latest_for_id)" , " AND id <= (SELECT id FROM latest_for_id)" , " FOR UPDATE SKIP LOCKED)" , "SELECT id FROM lock_all" @@ -341,11 +345,54 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore ] return (ccJobIndex job, Failed action) + updateJob :: (idx, Result) -> m () + updateJob (idx, result) = runDBT cs ts $ do + now <- currentTime + affected <- runSQL $ query now + logInfo_ $ "[DEBUG] 'updateJob' affected rows: " <> Text.pack (show affected) + where + query now = case result of + Ok Remove -> deleteQuery + Failed Remove -> deleteQuery + _ -> retryQuery now (isSuccess result) (getAction result) + + deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE id <=" idx + + retryQuery now success action = smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by = NULL" + , ", run_at = CASE" + , " WHEN FALSE THEN run_at" + , retryToSQL + , " ELSE NULL" -- processed + , " END" + , if success then smconcat + [ ", finished_at = CASE" + , " WHEN id =" idx <+> "THEN" now + , " ELSE NULL" + , " END" + ] + else "" + , "WHERE id <=" idx + ] + where + retryToSQL = case action of + RerunAfter int -> "WHEN id =" idx <+> "THEN" now <+> "+" int + RerunAt time -> "WHEN id =" idx <+> "THEN" time + MarkProcessed -> "" + Remove -> error "updateJob: Remove should've been filtered out" + + isSuccess (Ok _) = True + isSuccess (Failed _) = False + + getAction (Ok action) = action + getAction (Failed action) = action + -- | Update status of the jobs. updateJobs :: [(idx, Result)] -> m () updateJobs results = runDBT cs ts $ do now <- currentTime - runSQL_ $ smconcat + affected <- runSQL $ smconcat [ "WITH removed AS (" , " DELETE FROM" <+> raw ccJobsTable , " WHERE id" <+> operator <+> "ANY (" Array1 deletes <+> ")" @@ -363,6 +410,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " END" , "WHERE id" <+> operator <+> "ANY (" Array1 (map fst updates) <+> ")" ] + logInfo_ $ "[DEBUG] 'updateJobs' affected rows: " <> Text.pack (show affected) where operator = case ccMode of Standard -> "=" diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index e01440c..c70633f 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -29,7 +29,7 @@ data Result = Ok Action | Failed Action deriving (Eq, Ord, Show) -- | The mode the consumer will run in. -data Mode = Standard | Duplicating SQL +data Mode = Standard | Duplicating (RawSQL ()) deriving (Show) -- | Config of a consumer. From c6684cae7ef6002cfbc97afb3ec9b7d86b790004 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 30 May 2023 10:30:34 +0200 Subject: [PATCH 12/22] Remove debugging log statements --- consumers.cabal | 1 - src/Database/PostgreSQL/Consumers/Components.hs | 7 ++----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/consumers.cabal b/consumers.cabal index e1ee80e..85ee37e 100644 --- a/consumers.cabal +++ b/consumers.cabal @@ -34,7 +34,6 @@ library build-depends: base >= 4.13 && < 5 , containers >= 0.5 && < 0.7 , exceptions >= 0.10 && < 0.11 - , text , extra >= 1.6 && < 1.8 , hpqtypes >= 1.11 && < 2.0 , lifted-base >= 0.2 && < 0.3 diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 95e1695..efcaac7 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -28,7 +28,6 @@ import qualified Control.Concurrent.STM as STM import qualified Control.Concurrent.Thread.Lifted as T import qualified Data.Foldable as F import qualified Data.Map.Strict as M -import qualified Data.Text as Text import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer @@ -348,8 +347,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore updateJob :: (idx, Result) -> m () updateJob (idx, result) = runDBT cs ts $ do now <- currentTime - affected <- runSQL $ query now - logInfo_ $ "[DEBUG] 'updateJob' affected rows: " <> Text.pack (show affected) + runSQL_ $ query now where query now = case result of Ok Remove -> deleteQuery @@ -392,7 +390,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore updateJobs :: [(idx, Result)] -> m () updateJobs results = runDBT cs ts $ do now <- currentTime - affected <- runSQL $ smconcat + runSQL_ $ smconcat [ "WITH removed AS (" , " DELETE FROM" <+> raw ccJobsTable , " WHERE id" <+> operator <+> "ANY (" Array1 deletes <+> ")" @@ -410,7 +408,6 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " END" , "WHERE id" <+> operator <+> "ANY (" Array1 (map fst updates) <+> ")" ] - logInfo_ $ "[DEBUG] 'updateJobs' affected rows: " <> Text.pack (show affected) where operator = case ccMode of Standard -> "=" From f47bd3aa48ee7e7d2109a539cdbcd4189ca197f4 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Wed, 31 May 2023 15:17:58 +0200 Subject: [PATCH 13/22] Fix job deletion logic --- src/Database/PostgreSQL/Consumers/Components.hs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index efcaac7..fc5750f 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -307,7 +307,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore Duplicating field -> smconcat [ "WITH latest_for_id AS" , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable - , " ORDER BY run_at," <+> raw field <> ", id" <+> "DESC LIMIT 1 FOR UPDATE SKIP LOCKED)," + , " ORDER BY run_at," <+> raw field <> ", id DESC LIMIT 1 FOR UPDATE SKIP LOCKED)," , " lock_all AS" , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable , " WHERE" <+> raw field <+> "= (SELECT" <+> raw field <+> "FROM latest_for_id)" @@ -354,7 +354,11 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore Failed Remove -> deleteQuery _ -> retryQuery now (isSuccess result) (getAction result) - deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE id <=" idx + deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE" <+> raw row <+> "<=" idx + where + row = case ccMode of + Standard -> "id" + Duplicating field -> field retryQuery now success action = smconcat [ "UPDATE" <+> raw ccJobsTable <+> "SET" From e5f1185f29a24a6ff43178cc025d58db805dab25 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Thu, 1 Jun 2023 11:06:11 +0200 Subject: [PATCH 14/22] code cleanup --- .../PostgreSQL/Consumers/Components.hs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index fc5750f..8fd6a89 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -347,18 +347,12 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore updateJob :: (idx, Result) -> m () updateJob (idx, result) = runDBT cs ts $ do now <- currentTime - runSQL_ $ query now - where - query now = case result of + runSQL_ $ case result of Ok Remove -> deleteQuery Failed Remove -> deleteQuery _ -> retryQuery now (isSuccess result) (getAction result) - - deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE" <+> raw row <+> "<=" idx - where - row = case ccMode of - Standard -> "id" - Duplicating field -> field + where + deleteQuery = "DELETE FROM" <+> raw ccJobsTable <+> "WHERE" <+> raw idxRow <+> "<=" idx retryQuery now success action = smconcat [ "UPDATE" <+> raw ccJobsTable <+> "SET" @@ -375,7 +369,8 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " END" ] else "" - , "WHERE id <=" idx + -- TODO: Is this right for deduplicating consumers? + , "WHERE" <+> raw idxRow <+> "<=" idx ] where retryToSQL = case action of @@ -384,6 +379,10 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore MarkProcessed -> "" Remove -> error "updateJob: Remove should've been filtered out" + idxRow = case ccMode of + Standard -> error "'updateJob' should never be called when ccMode = " <> show Standard + Duplicating field -> field + isSuccess (Ok _) = True isSuccess (Failed _) = False From 02f74b4b4b47c20e14128ce875fbbb22c6e93713 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Thu, 1 Jun 2023 14:19:44 +0200 Subject: [PATCH 15/22] fix tests --- src/Database/PostgreSQL/Consumers/Components.hs | 2 +- test/Test.hs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 8fd6a89..d2ffda9 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -380,7 +380,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore Remove -> error "updateJob: Remove should've been filtered out" idxRow = case ccMode of - Standard -> error "'updateJob' should never be called when ccMode = " <> show Standard + Standard -> error $ "'updateJob' should never be called when ccMode = " <> show Standard Duplicating field -> field isSuccess (Ok _) = True diff --git a/test/Test.hs b/test/Test.hs index 97ebb9f..1b9576c 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -96,7 +96,7 @@ connectToDB = do testDuplicating :: ConnectionSource [MonadBase IO, MonadMask] -> IO () testDuplicating (ConnectionSource connSource) = - withSimpleStdOutLogger $ \logger -> runTestEnv connSource logger $ do + withStdOutLogger $ \logger -> runTestEnv connSource logger $ do createTables idleSignal <- liftIO newEmptyTMVarIO let rows = 15 @@ -148,10 +148,10 @@ testDuplicating (ConnectionSource connSource) = , ccConsumersTable = "consumers_test_duplicating_consumers" , ccJobSelectors = ["id", "countdown"] , ccJobFetcher = id - , ccJobIndex = fst + , ccJobIndex = snd , ccNotificationChannel = Just "consumers_test_duplicating_chan" -- select some small timeout - , ccNotificationTimeout = 100 * 1000 -- msec + , ccNotificationTimeout = 100 * 1000 -- 100 msec , ccMaxRunningJobs = 20 , ccProcessJob = insertNRows . snd , ccOnException = \err (idx, _) -> handleException err idx @@ -167,7 +167,7 @@ insertNRows count = do test :: ConnectionSource [MonadBase IO, MonadMask] -> IO () test (ConnectionSource connSource) = - withSimpleStdOutLogger $ \logger -> + withStdOutLogger $ \logger -> runTestEnv connSource logger $ do createTables idleSignal <- liftIO newEmptyTMVarIO From 9a61a9bd288de4e9c509fe1ce5bec3546377e075 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Thu, 1 Jun 2023 14:51:48 +0200 Subject: [PATCH 16/22] fix merge mistake in dep constraints --- consumers.cabal | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/consumers.cabal b/consumers.cabal index e933b0b..dfe5447 100644 --- a/consumers.cabal +++ b/consumers.cabal @@ -32,19 +32,6 @@ library Database.PostgreSQL.Consumers.Utils build-depends: base >= 4.13 && < 5 - , containers >= 0.5 && < 0.7 - , exceptions >= 0.10 && < 0.11 - , extra >= 1.6 && < 1.8 - , hpqtypes >= 1.11 && < 2.0 - , lifted-base >= 0.2 && < 0.3 - , lifted-threads >= 1.0 && < 1.1 - , log-base >= 0.11 && < 0.12 - , monad-control >= 1.0 && < 1.1 - , monad-time >= 0.4 && < 0.5 - , mtl >= 2.2 && < 2.3 - , stm >= 2.4 && < 2.6 - , time >= 1.6 && < 2.0 - , transformers-base >= 0.4 && < 0.5 , containers >= 0.5 , exceptions >= 0.10 , hpqtypes >= 1.11 From 83828a87291ce1df0979d1f03438378b3bce58ed Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Thu, 1 Jun 2023 15:28:14 +0200 Subject: [PATCH 17/22] fix comments --- src/Database/PostgreSQL/Consumers/Components.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index d2ffda9..651bd07 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -242,8 +242,6 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore loop :: Int -> m Bool loop limit = do - -- If we're running in 'Deduplicating' mode we only - -- reserve one job at a time. (batch, batchSize) <- reserveJobs limit when (batchSize > 0) $ do logInfo "Processing batch" $ object [ @@ -259,7 +257,9 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore let subtractJobs = atomically $ do modifyTVar' runningJobs (subtract batchSize) void . forkP "batch processor" - . (`finally` subtractJobs) . restore $ do + . (`finally` subtractJobs) . restore $ + -- Ensures that we only process one job at a time + -- when running in 'Duplicating' mode. case batch of Left job -> startJob job >>= joinJob >>= updateJob Right jobs -> mapM startJob jobs >>= mapM joinJob >>= updateJobs From d1d8e77bf040650a51700d916a9db5e2c69c1540 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 13 Jun 2023 13:42:07 +0200 Subject: [PATCH 18/22] revert gitignore changes --- .gitignore | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/.gitignore b/.gitignore index 75b66bb..814d49e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,13 +7,3 @@ TAGS .ghc.environment.* .cabal-sandbox cabal.sandbox.config - -# direnv -.direnv/ -.envrc - -# emacs -**/.dir-locals.el - -# postgres -_local/ From f454220788d526bc958c99bd42f807af97cc0ff9 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Tue, 13 Jun 2023 13:42:48 +0200 Subject: [PATCH 19/22] remove comment --- src/Database/PostgreSQL/Consumers/Components.hs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 651bd07..739b62d 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -369,7 +369,6 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " END" ] else "" - -- TODO: Is this right for deduplicating consumers? , "WHERE" <+> raw idxRow <+> "<=" idx ] where From cd5ccc2dc1a6c8a3614c7da5b09d5d52263acdf9 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Fri, 16 Jun 2023 16:05:47 +0200 Subject: [PATCH 20/22] fixup --- .../PostgreSQL/Consumers/Components.hs | 24 ++++----- src/Database/PostgreSQL/Consumers/Config.hs | 16 +++++- test/Test.hs | 49 ++++++++++++------- 3 files changed, 60 insertions(+), 29 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 739b62d..43c639a 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -259,7 +259,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore void . forkP "batch processor" . (`finally` subtractJobs) . restore $ -- Ensures that we only process one job at a time - -- when running in 'Duplicating' mode. + -- when running in @'Duplicating'@ mode. case batch of Left job -> startJob job >>= joinJob >>= updateJob Right jobs -> mapM startJob jobs >>= mapM joinJob >>= updateJobs @@ -289,6 +289,8 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore -- Decode lazily as we want the transaction to be as short as possible. (, n) . limitJobs . F.toList . fmap ccJobFetcher <$> queryResult where + -- Reserve a single job or a list of jobs depending + -- on which @'ccMode'@ the consumer is running in. limitJobs = case ccMode of Standard -> Right Duplicating _field -> Left . head @@ -316,7 +318,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , "SELECT id FROM lock_all" ] - -- | Spawn each job in a separate thread. + -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result)) startJob job = do (_, joinFork) <- mask $ \restore -> T.fork $ do @@ -330,7 +332,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore unregisterJob tid = atomically $ do modifyTVar' runningJobsInfo $ M.delete tid - -- | Wait for all the jobs and collect their results. + -- Wait for all the jobs and collect their results. joinJob :: (job, m (T.Result Result)) -> m (idx, Result) joinJob (job, joinFork) = joinFork >>= \eres -> case eres of Right result -> return (ccJobIndex job, result) @@ -344,11 +346,14 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore ] return (ccJobIndex job, Failed action) + -- Update the status of a job running in @'Duplicating'@ mode. updateJob :: (idx, Result) -> m () updateJob (idx, result) = runDBT cs ts $ do now <- currentTime runSQL_ $ case result of Ok Remove -> deleteQuery + -- TODO: Should we be deduplicating when a job fails with 'Remove' or only + -- remove the failing job? Failed Remove -> deleteQuery _ -> retryQuery now (isSuccess result) (getAction result) where @@ -376,7 +381,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore RerunAfter int -> "WHEN id =" idx <+> "THEN" now <+> "+" int RerunAt time -> "WHEN id =" idx <+> "THEN" time MarkProcessed -> "" - Remove -> error "updateJob: Remove should've been filtered out" + Remove -> error "updateJob: 'Remove' should've been filtered out" idxRow = case ccMode of Standard -> error $ "'updateJob' should never be called when ccMode = " <> show Standard @@ -388,14 +393,14 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore getAction (Ok action) = action getAction (Failed action) = action - -- | Update status of the jobs. + -- Update the status of jobs running in @'Standard'@ mode. updateJobs :: [(idx, Result)] -> m () updateJobs results = runDBT cs ts $ do now <- currentTime runSQL_ $ smconcat [ "WITH removed AS (" , " DELETE FROM" <+> raw ccJobsTable - , " WHERE id" <+> operator <+> "ANY (" Array1 deletes <+> ")" + , " WHERE id = ANY (" Array1 deletes <+> ")" , ")" , "UPDATE" <+> raw ccJobsTable <+> "SET" , " reserved_by = NULL" @@ -408,12 +413,9 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , " WHEN id = ANY(" Array1 successes <+> ") THEN " now , " ELSE NULL" , " END" - , "WHERE id" <+> operator <+> "ANY (" Array1 (map fst updates) <+> ")" + , "WHERE id = ANY (" Array1 (map fst updates) <+> ")" ] where - operator = case ccMode of - Standard -> "=" - Duplicating _field -> "<=" retryToSQL now (Left int) ids = ("WHEN id = ANY(" Array1 ids <+> ") THEN " now <> " +" int :) retryToSQL _ (Right time) ids = @@ -430,7 +432,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore RerunAfter int -> M.insertWith (++) (Left int) [idx] iretries RerunAt time -> M.insertWith (++) (Right time) [idx] iretries Remove -> error - "updateJobs: Remove should've been filtered out" + "updateJobs: 'Remove' should've been filtered out" successes = foldr step [] updates where diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index c70633f..3b7771d 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -28,7 +28,21 @@ data Action data Result = Ok Action | Failed Action deriving (Eq, Ord, Show) --- | The mode the consumer will run in. +-- | The mode the consumer will run in: +-- +-- * @'Standard'@ - Consumer jobs will be run in ascending order +-- based on the __run_at__ field. When jobs are updated, +-- ones that are marked for removal will be deleted. +-- +-- * @'Duplicating' field@ - The job with the highest __id__ for +-- the lowest __run_at__ and __field__ value is selected. Then +-- all jobs that have the same __field__ value and a smaller or equal +-- __id__ value are reserved and run. When /one/ of these jobs are removed +-- all jobs with a smaller or equal __field_ value are also deleted. This +-- essentially allows one to race multiple jobs, only applying the result +-- of whichever job finishes first. +-- +-- Note: One cannot duplicate on the primary key field named @'id'@ in the @'ccJobsTable'@. data Mode = Standard | Duplicating (RawSQL ()) deriving (Show) diff --git a/test/Test.hs b/test/Test.hs index 1b9576c..1f351c3 100644 --- a/test/Test.hs +++ b/test/Test.hs @@ -102,7 +102,7 @@ testDuplicating (ConnectionSource connSource) = let rows = 15 putJob rows "consumers_test_duplicating_jobs" "consumers_test_duplicating_chan" *> commit - -- Move time forward 2hours, because job is scheduled 1 hour into future + -- Move time forward 2 hours, because job is scheduled 1 hour into future modifyTestTime . addUTCTime $ 2*60*60 finalize (localDomain "process" $ runConsumerWithIdleSignal duplicatingConsumerConfig connSource idleSignal) $ @@ -151,7 +151,7 @@ testDuplicating (ConnectionSource connSource) = , ccJobIndex = snd , ccNotificationChannel = Just "consumers_test_duplicating_chan" -- select some small timeout - , ccNotificationTimeout = 100 * 1000 -- 100 msec + , ccNotificationTimeout = 100 * 1000 -- msec , ccMaxRunningJobs = 20 , ccProcessJob = insertNRows . snd , ccOnException = \err (idx, _) -> handleException err idx @@ -293,11 +293,14 @@ personTable = { tblName = "person_test" , tblVersion = 1 , tblColumns = - [ tblColumn { colName = "id", colType = BigSerialT + [ tblColumn { colName = "id" + , colType = BigSerialT , colNullable = False } - , tblColumn { colName = "name", colType = TextT + , tblColumn { colName = "name" + , colType = TextT , colNullable = False } - , tblColumn { colName = "age", colType = IntegerT + , tblColumn { colName = "age" + , colType = IntegerT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id" @@ -309,21 +312,27 @@ duplicatingJobsTable = { tblName = "consumers_test_duplicating_jobs" , tblVersion = 1 , tblColumns = - [ tblColumn { colName = "id", colType = BigSerialT + [ tblColumn { colName = "id" + , colType = BigSerialT , colNullable = False } - , tblColumn { colName = "run_at", colType = TimestampWithZoneT + , tblColumn { colName = "run_at" + , colType = TimestampWithZoneT , colNullable = True } - , tblColumn { colName = "finished_at", colType = TimestampWithZoneT + , tblColumn { colName = "finished_at" + , colType = TimestampWithZoneT , colNullable = True } - , tblColumn { colName = "reserved_by", colType = BigIntT + , tblColumn { colName = "reserved_by" + , colType = BigIntT , colNullable = True } - , tblColumn { colName = "attempts", colType = IntegerT + , tblColumn { colName = "attempts" + , colType = IntegerT , colNullable = False } -- Non-obligatory field "countdown". Really more of a count -- and not a countdown, but name is kept to that we can reuse -- `putJob` function. - , tblColumn { colName = "countdown", colType = IntegerT + , tblColumn { colName = "countdown" + , colType = IntegerT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id" @@ -340,11 +349,14 @@ consumersTable = { tblName = "consumers_test_consumers" , tblVersion = 1 , tblColumns = - [ tblColumn { colName = "id", colType = BigSerialT + [ tblColumn { colName = "id" + , colType = BigSerialT , colNullable = False } - , tblColumn { colName = "name", colType = TextT + , tblColumn { colName = "name" + , colType = TextT , colNullable = False } - , tblColumn { colName = "last_activity", colType = TimestampWithZoneT + , tblColumn { colName = "last_activity" + , colType = TimestampWithZoneT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id" @@ -356,11 +368,14 @@ duplicatingConsumersTable = { tblName = "consumers_test_duplicating_consumers" , tblVersion = 1 , tblColumns = - [ tblColumn { colName = "id", colType = BigSerialT + [ tblColumn { colName = "id" + , colType = BigSerialT , colNullable = False } - , tblColumn { colName = "name", colType = TextT + , tblColumn { colName = "name" + , colType = TextT , colNullable = False } - , tblColumn { colName = "last_activity", colType = TimestampWithZoneT + , tblColumn { colName = "last_activity" + , colType = TimestampWithZoneT , colNullable = False } ] , tblPrimaryKey = pkOnColumn "id" From f67a52c07c8f3dcb0683930ab83ad510777c4026 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Mon, 19 Jun 2023 15:53:30 +0200 Subject: [PATCH 21/22] fix haddock comments --- src/Database/PostgreSQL/Consumers/Config.hs | 2 +- src/Database/PostgreSQL/Consumers/Utils.hs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Database/PostgreSQL/Consumers/Config.hs b/src/Database/PostgreSQL/Consumers/Config.hs index 3b7771d..bd13b8a 100644 --- a/src/Database/PostgreSQL/Consumers/Config.hs +++ b/src/Database/PostgreSQL/Consumers/Config.hs @@ -42,7 +42,7 @@ data Result = Ok Action | Failed Action -- essentially allows one to race multiple jobs, only applying the result -- of whichever job finishes first. -- --- Note: One cannot duplicate on the primary key field named @'id'@ in the @'ccJobsTable'@. +-- Note: One cannot duplicate on the primary key field named @id@ in the @'ccJobsTable'@. data Mode = Standard | Duplicating (RawSQL ()) deriving (Show) diff --git a/src/Database/PostgreSQL/Consumers/Utils.hs b/src/Database/PostgreSQL/Consumers/Utils.hs index a6bd8e2..e079602 100644 --- a/src/Database/PostgreSQL/Consumers/Utils.hs +++ b/src/Database/PostgreSQL/Consumers/Utils.hs @@ -27,7 +27,7 @@ finalize m action = do ---------------------------------------- --- | Exception thrown to a thread to stop its execution. +-- Exception thrown to a thread to stop its execution. -- All exceptions other than 'StopExecution' thrown to -- threads spawned by 'forkP' and 'gforkP' are propagated -- back to the parent thread. From b3a6761b30fb07587c98bbc29f6c9595f558eed5 Mon Sep 17 00:00:00 2001 From: skykanin <3789764+skykanin@users.noreply.github.com> Date: Mon, 19 Jun 2023 15:53:58 +0200 Subject: [PATCH 22/22] throw error when duplicating on 'id' field --- src/Database/PostgreSQL/Consumers/Components.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Database/PostgreSQL/Consumers/Components.hs b/src/Database/PostgreSQL/Consumers/Components.hs index 43c639a..726aeab 100644 --- a/src/Database/PostgreSQL/Consumers/Components.hs +++ b/src/Database/PostgreSQL/Consumers/Components.hs @@ -306,6 +306,7 @@ spawnDispatcher ConsumerConfig{..} cs cid semaphore , "LIMIT" limit , "FOR UPDATE SKIP LOCKED" ] + Duplicating "id" -> error "Cannot duplicate on the primary key field 'id'" Duplicating field -> smconcat [ "WITH latest_for_id AS" , " (SELECT id," <+> raw field <+> "FROM" <+> raw ccJobsTable