Skip to content

Commit

Permalink
feat: Hunt for and clean up intermediate tables
Browse files Browse the repository at this point in the history
  • Loading branch information
RasmusSkytte committed Feb 6, 2025
1 parent 6866483 commit 8a1fc88
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 3 deletions.
4 changes: 2 additions & 2 deletions R/digest_to_checksum.R
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ digest_to_checksum.default <- function(
dplyr::across(tidyselect::all_of(col), openssl::md5)
) %>%
dplyr::copy_to(dbplyr::remote_con(.data), df = ., name = unique_table_name("SCDB_digest_to_checksum_helper"))
defer_db_cleanup(checksums)

.data <- .data %>%
dplyr::mutate(id__ = dplyr::row_number()) %>%
dplyr::left_join(checksums, by = "id__", copy = TRUE) %>%
dplyr::select(!"id__")
dplyr::left_join(checksums, by = "id__") %>%
dplyr::select(!"id__") %>%
dplyr::compute(unique_table_name("SCDB_digest_to_checksum"))

Expand Down
1 change: 1 addition & 0 deletions R/interlace.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ interlace.tbl_sql <- function(tables, by = NULL, colnames = NULL) {
dplyr::select(!".row") %>%
dplyr::ungroup() %>%
dplyr::compute(name = unique_table_name("SCDB_interlace_t"))
defer_db_cleanup(t)


# Merge data onto the new validities using non-equi joins
Expand Down
2 changes: 2 additions & 0 deletions R/locks.R
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ lock_table <- function(conn, db_table, schema = NULL) {
),
name = unique_table_name("SCDB_lock")
)
defer_db_cleanup(lock)

dplyr::rows_insert(db_lock_table, lock, by = c("schema", "table"), conflict = "ignore", in_place = TRUE)
},
Expand Down Expand Up @@ -174,6 +175,7 @@ unlock_table <- function(conn, db_table, schema = NULL, pid = Sys.getpid()) {
),
name = unique_table_name("SCDB_lock")
)
defer_db_cleanup(lock)

dplyr::rows_delete(db_lock_table, lock, by = c("schema", "table", "pid"), unmatched = "ignore", in_place = TRUE)
},
Expand Down
3 changes: 2 additions & 1 deletion R/update_snapshot.R
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ update_snapshot <- function(.data, conn, db_table, timestamp, filters = NULL, me
}

# Once we ensure .data is on the same connection as the target, we compute the checksums
.data <- dplyr::compute(digest_to_checksum(.data, col = "checksum"))
.data <- digest_to_checksum(.data, col = "checksum")
if (!inherits(conn, "SQLiteConnection")) .data <- dplyr::compute(.data) # SQLite was computed in digest_to_checksum
defer_db_cleanup(.data)

### Determine the next timestamp in the data (can be NA if none is found)
Expand Down

0 comments on commit 8a1fc88

Please sign in to comment.