Skip to content

Commit

Permalink
perf: update to do less iterations for vault_state
Browse files Browse the repository at this point in the history
  • Loading branch information
ResuBaka committed Mar 1, 2025
1 parent b4c1d36 commit 74e6e4f
Showing 1 changed file with 103 additions and 93 deletions.
196 changes: 103 additions & 93 deletions rust/api-server/api/src/vault_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,66 @@ use entity::vault_state::RawVaultState;
use entity::{vault_state, vault_state_collectibles};
use log::{debug, error, info};
use migration::OnConflict;
use sea_orm::IntoActiveModel;
use sea_orm::{DbErr, IntoActiveModel};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QuerySelect, sea_query};
use std::collections::{HashMap, HashSet};

async fn get_known_player_state_ids(conn: &DatabaseConnection) -> anyhow::Result<HashSet<i64>> {
let known_player_state_ids: Vec<i64> = vault_state::Entity::find()
async fn get_known_vault_state_ids(conn: &DatabaseConnection) -> anyhow::Result<HashSet<i64>> {
let known_vault_state_ids: Vec<i64> = vault_state::Entity::find()
.select_only()
.column(vault_state::Column::EntityId)
.into_tuple()
.all(conn)
.await?;

let known_player_state_ids = known_player_state_ids.into_iter().collect::<HashSet<i64>>();
Ok(known_player_state_ids)
let known_vault_state_ids = known_vault_state_ids.into_iter().collect::<HashSet<i64>>();
Ok(known_vault_state_ids)
}

async fn db_insert_player_states(
async fn db_insert_vault_states(
conn: &DatabaseConnection,
buffer_before_insert: &mut Vec<RawVaultState>,
on_conflict: &OnConflict,
vault_state_collectible_on_conflict: &OnConflict,
list_of_vault_state_collectibles_to_delete: &mut Option<&mut HashSet<(i64, i32)>>,
) -> anyhow::Result<()> {
let player_states_from_db = vault_state::Entity::find()
.filter(
vault_state::Column::EntityId.is_in(
buffer_before_insert
.iter()
.map(|player_state| player_state.entity_id)
.collect::<Vec<i64>>(),
),
)
.all(conn)
.await?;
let (vault_state_ids, vault_state_models, vault_state_collectibles_models) =
buffer_before_insert
.iter()
.map(|vault_state| {
let vault_state_collectibles = vault_state.to_model_collectibles();
let vault_state_model = vault_state.to_model();

let player_states_from_db_map = player_states_from_db
.into_iter()
.map(|player_state| (player_state.entity_id, player_state))
.collect::<HashMap<i64, vault_state::Model>>();
(
vault_state.entity_id,
vault_state_model,
vault_state_collectibles,
)
})
.collect::<(
Vec<i64>,
Vec<vault_state::Model>,
Vec<Vec<vault_state_collectibles::Model>>,
)>();

let vault_states_from_db = vault_state::Entity::find()
.filter(vault_state::Column::EntityId.is_in(vault_state_ids.clone()))
.all(conn);

let vault_states_collectibles_from_db = vault_state_collectibles::Entity::find()
.filter(
vault_state_collectibles::Column::EntityId.is_in(
buffer_before_insert
.iter()
.map(|player_state| player_state.entity_id)
.collect::<Vec<i64>>(),
),
)
.all(conn)
.await?;
.filter(vault_state_collectibles::Column::EntityId.is_in(vault_state_ids.clone()))
.all(conn);

let (vault_states_from_db, vault_states_collectibles_from_db) =
tokio::join!(vault_states_from_db, vault_states_collectibles_from_db,);

let vault_states_from_db = vault_states_from_db?;
let vault_states_collectibles_from_db = vault_states_collectibles_from_db?;

let vault_states_from_db_map = vault_states_from_db
.into_iter()
.map(|vault_state| (vault_state.entity_id, vault_state))
.collect::<HashMap<i64, vault_state::Model>>();

let vault_states_collectibles_from_db_map = vault_states_collectibles_from_db
.into_iter()
Expand All @@ -68,43 +77,35 @@ async fn db_insert_player_states(
})
.collect::<HashMap<(i64, i32), vault_state_collectibles::Model>>();

let things_to_insert = buffer_before_insert
.iter()
.map(|player_state| player_state.clone().to_model())
let things_to_insert = vault_state_models
.into_iter()
.filter(
|player_state| match player_states_from_db_map.get(&player_state.entity_id) {
Some(player_state_from_db) => player_state_from_db != player_state,
|vault_state| match vault_states_from_db_map.get(&vault_state.entity_id) {
Some(vault_state_from_db) => vault_state_from_db != vault_state,
None => true,
},
)
.map(|player_state| player_state.clone().into_active_model())
.map(|vault_state| vault_state.into_active_model())
.collect::<Vec<vault_state::ActiveModel>>();

let mut things_to_insert_collectibles = buffer_before_insert
.iter_mut()
.flat_map(|player_state| {
player_state
.clone()
.collectibles
.iter()
.map(|collectible| collectible.to_model(player_state.entity_id))
.collect::<Vec<vault_state_collectibles::Model>>()
})
.filter(|player_state| {
let mut things_to_insert_collectibles = vault_state_collectibles_models
.into_iter()
.flatten()
.filter(|vault_state| {
if let Some(list_of_vault_state_collectibles_to_delete) =
list_of_vault_state_collectibles_to_delete
{
list_of_vault_state_collectibles_to_delete
.remove(&(player_state.entity_id, player_state.id));
.remove(&(vault_state.entity_id, vault_state.id));
}
match vault_states_collectibles_from_db_map
.get(&(player_state.entity_id, player_state.id))
.get(&(vault_state.entity_id, vault_state.id))
{
Some(player_state_from_db) => player_state_from_db != player_state,
Some(vault_state_from_db) => vault_state_from_db != vault_state,
None => true,
}
})
.map(|player_state| player_state.clone().into_active_model())
.map(|vault_state| vault_state.into_active_model())
.collect::<Vec<vault_state_collectibles::ActiveModel>>();

if things_to_insert.is_empty() {
Expand Down Expand Up @@ -135,32 +136,32 @@ async fn db_insert_player_states(
Ok(())
}

async fn delete_player_state(
async fn delete_vault_state(
conn: &DatabaseConnection,
known_player_state_ids: HashSet<i64>,
known_vault_state_ids: HashSet<i64>,
cross_delete: bool,
) -> anyhow::Result<()> {
info!(
"player_state's ({}) to delete: {:?}",
known_player_state_ids.len(),
known_player_state_ids
"vault_state's ({}) to delete: {:?}",
known_vault_state_ids.len(),
known_vault_state_ids
);
vault_state::Entity::delete_many()
.filter(vault_state::Column::EntityId.is_in(known_player_state_ids.clone()))
.filter(vault_state::Column::EntityId.is_in(known_vault_state_ids.clone()))
.exec(conn)
.await?;

if cross_delete {
vault_state_collectibles::Entity::delete_many()
.filter(vault_state_collectibles::Column::EntityId.is_in(known_player_state_ids))
.filter(vault_state_collectibles::Column::EntityId.is_in(known_vault_state_ids))
.exec(conn)
.await?;
}
Ok(())
}

pub(crate) async fn handle_initial_subscription(
p0: &DatabaseConnection,
conn: &DatabaseConnection,
p1: &Table,
) -> anyhow::Result<()> {
let chunk_size = 500;
Expand All @@ -180,28 +181,28 @@ pub(crate) async fn handle_initial_subscription(
])
.to_owned();

let mut known_player_state_ids = get_known_player_state_ids(p0).await?;
let mut known_vault_state_collectibles_ids = vault_state_collectibles::Entity::find()
.select_only()
.column(vault_state_collectibles::Column::EntityId)
.column(vault_state_collectibles::Column::Id)
.into_tuple::<(i64, i32)>()
.all(p0)
.await?
let (known_vault_state_ids, known_vault_state_collectibles_ids) = tokio::join!(
get_known_vault_state_ids(conn),
get_known_vault_state_collectibles_ids(conn),
);

let mut known_vault_state_ids = known_vault_state_ids?;

let mut known_vault_state_collectibles_ids = known_vault_state_collectibles_ids?
.into_iter()
.collect::<HashSet<(i64, i32)>>();

for update in p1.updates.iter() {
for row in update.inserts.iter() {
match serde_json::from_str::<RawVaultState>(row.as_ref()) {
Ok(player_state) => {
if known_player_state_ids.contains(&player_state.entity_id) {
known_player_state_ids.remove(&player_state.entity_id);
Ok(vault_state) => {
if known_vault_state_ids.contains(&vault_state.entity_id) {
known_vault_state_ids.remove(&vault_state.entity_id);
}
buffer_before_insert.push(player_state);
buffer_before_insert.push(vault_state);
if buffer_before_insert.len() == chunk_size {
db_insert_player_states(
p0,
db_insert_vault_states(
conn,
&mut buffer_before_insert,
&on_conflict,
&vault_state_collectible_on_conflict,
Expand All @@ -221,8 +222,8 @@ pub(crate) async fn handle_initial_subscription(
}

if !buffer_before_insert.is_empty() {
db_insert_player_states(
p0,
db_insert_vault_states(
conn,
&mut buffer_before_insert,
&on_conflict,
&vault_state_collectible_on_conflict,
Expand All @@ -231,19 +232,28 @@ pub(crate) async fn handle_initial_subscription(
.await?;
}

if !known_player_state_ids.is_empty() {
delete_player_state(p0, known_player_state_ids, false).await?;
if !known_vault_state_ids.is_empty() {
delete_vault_state(conn, known_vault_state_ids, false).await?;
}

if !known_vault_state_collectibles_ids.is_empty() {
delete_vault_state_collectibles(p0, known_vault_state_collectibles_ids).await?;
delete_vault_state_collectibles(conn, known_vault_state_collectibles_ids).await?;
}

Ok(())
}

fn get_known_vault_state_collectibles_ids(conn: &DatabaseConnection) -> impl Future<Output=Result<Vec<(i64, i32)>, DbErr>> + Sized {
vault_state_collectibles::Entity::find()
.select_only()
.column(vault_state_collectibles::Column::EntityId)
.column(vault_state_collectibles::Column::Id)
.into_tuple::<(i64, i32)>()
.all(conn)
}

async fn delete_vault_state_collectibles(
p0: &DatabaseConnection,
conn: &DatabaseConnection,
p1: HashSet<(i64, i32)>,
) -> anyhow::Result<()> {
let to_chunk = p1.iter().clone().collect::<Vec<_>>();
Expand Down Expand Up @@ -272,15 +282,15 @@ async fn delete_vault_state_collectibles(

vault_state_collectibles::Entity::delete_many()
.filter(build_filter)
.exec(p0)
.exec(conn)
.await?;
}

Ok(())
}

pub(crate) async fn handle_transaction_update(
p0: &DatabaseConnection,
conn: &DatabaseConnection,
tables: &[TableWithOriginalEventTransactionUpdate],
) -> anyhow::Result<()> {
let on_conflict = sea_query::OnConflict::column(vault_state::Column::EntityId)
Expand All @@ -304,9 +314,9 @@ pub(crate) async fn handle_transaction_update(
for p1 in tables.iter() {
for row in p1.inserts.iter() {
match serde_json::from_str::<RawVaultState>(row.as_ref()) {
Ok(player_state) => {
found_in_inserts.insert(player_state.entity_id);
buffer_before_insert.insert(player_state.entity_id, player_state);
Ok(vault_state) => {
found_in_inserts.insert(vault_state.entity_id);
buffer_before_insert.insert(vault_state.entity_id, vault_state);
}
Err(error) => {
error!("TransactionUpdate Insert PlayerState Error: {error}");
Expand All @@ -321,7 +331,7 @@ pub(crate) async fn handle_transaction_update(
.column(vault_state_collectibles::Column::EntityId)
.column(vault_state_collectibles::Column::Id)
.into_tuple::<(i64, i32)>()
.all(p0)
.all(conn)
.await?
.into_iter()
.collect::<HashSet<(i64, i32)>>();
Expand All @@ -333,8 +343,8 @@ pub(crate) async fn handle_transaction_update(
.map(|x| x.1)
.collect::<Vec<RawVaultState>>();

db_insert_player_states(
p0,
db_insert_vault_states(
conn,
&mut buffer_before_insert_vec,
&on_conflict,
&vault_state_collectible_on_conflict,
Expand All @@ -348,9 +358,9 @@ pub(crate) async fn handle_transaction_update(
for p1 in tables.iter() {
for row in p1.deletes.iter() {
match serde_json::from_str::<RawVaultState>(row.as_ref()) {
Ok(player_state) => {
if !found_in_inserts.contains(&player_state.entity_id) {
players_to_delete.insert(player_state.entity_id);
Ok(vault_state) => {
if !found_in_inserts.contains(&vault_state.entity_id) {
players_to_delete.insert(vault_state.entity_id);
}
}
Err(error) => {
Expand All @@ -361,11 +371,11 @@ pub(crate) async fn handle_transaction_update(
}

if !players_to_delete.is_empty() {
delete_player_state(p0, players_to_delete, true).await?;
delete_vault_state(conn, players_to_delete, true).await?;
}

if !known_vault_state_collectibles_ids.is_empty() {
delete_vault_state_collectibles(p0, known_vault_state_collectibles_ids).await?;
delete_vault_state_collectibles(conn, known_vault_state_collectibles_ids).await?;
}

Ok(())
Expand Down

0 comments on commit 74e6e4f

Please sign in to comment.