Skip to content

Commit

Permalink
Merge pull request #117 from CSML-by-Clevy/fix/infinite-loop-close-al…
Browse files Browse the repository at this point in the history
…l-conversations

fix inifinite loop in get all open conversations for a given client (dynamodb)
  • Loading branch information
frsechet authored Sep 23, 2020
2 parents 049249d + 9845e77 commit 693da83
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 66 deletions.
4 changes: 1 addition & 3 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ENGINE_DB_TYPE=dynamodb
ENGINE_DB_TYPE=mongodb

MONGODB_HOST=localhost
MONGODB_PORT=27017
Expand All @@ -12,8 +12,6 @@ AWS_SECRET_ACCESS_KEY=bar
AWS_DYNAMODB_ENDPOINT=http://localhost:8000
AWS_DYNAMODB_TABLE=csml-engine-db-local

HTTP_DB_MS_URL=http://localhost:3130

ENGINE_SERVER_PORT=5000

ENCRYPTION_SECRET=some-secret-string
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bindings/node/native/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "csml_engine_node"
version = "1.2.0"
version = "1.2.1"
authors = ["Alexis Merelo <[email protected]>"]
license = "MIT"
build = "build.rs"
Expand Down
2 changes: 1 addition & 1 deletion csml_engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "csml_engine"
version = "1.2.0"
version = "1.2.1"
authors = ["Alexis Merelo <[email protected]>"]
license = "Apache-2.0"
edition = "2018"
Expand Down
99 changes: 74 additions & 25 deletions csml_engine/src/db_connectors/dynamodb/conversations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,6 @@ pub fn close_conversation(
Ok(())
}

/**
* There should not be many open conversations for any given client.
* In a normal scenario, there should be either 1, or none. If for some reason,
* there is more than one, there should definitely not be many, and it would lead to all
* sorts of other issues anyway.
* For this reason it should be safe to just get them all one by one like this.
*/
fn get_all_open_conversations(client: &Client, db: &mut DynamoDbClient) -> Vec<DbConversation> {
let mut res = vec![];

while let Some(conv) = match get_latest_open(client, db) {
Ok(val) => val,
_ => None,
} {
res.push(conv);
}

res
}

/**
* To close a conversation, we must replace the given conversation,
* ideally in a transaction to make sure that we don't lose a conversation in the process.
Expand Down Expand Up @@ -144,22 +124,91 @@ fn replace_conversation(
Ok(())
}


fn get_all_open_conversations(
client: &Client,
db: &mut DynamoDbClient,
) -> Result<Vec<Conversation>, EngineError> {
let hash = Conversation::get_hash(client);

let key_cond_expr = "#hashKey = :hashVal AND begins_with(#rangeKey, :rangePrefix)".to_string();
let expr_attr_names = [
(String::from("#hashKey"), String::from("hash")),
(String::from("#rangeKey"), String::from("range_time")), // time index
]
.iter()
.cloned()
.collect();

let expr_attr_values = [
(
String::from(":hashVal"),
AttributeValue {
s: Some(hash.to_string()),
..Default::default()
},
),
(
String::from(":rangePrefix"),
AttributeValue {
s: Some(String::from("conversation#OPEN")),
..Default::default()
},
),
]
.iter()
.cloned()
.collect();

// There should not be many open conversations for any given client.
// In a normal scenario, there should be either 1, or none. If for some reason,
// there is more than one, there should definitely not be many, and it would lead to all
// sorts of other issues anyway.
// For this reason it *should* be safe to limit to 50 max, and assume there are not 51+.
let limit = Some(50);

let input = QueryInput {
table_name: get_table_name()?,
index_name: Some(String::from("TimeIndex")),
key_condition_expression: Some(key_cond_expr),
expression_attribute_names: Some(expr_attr_names),
expression_attribute_values: Some(expr_attr_values),
limit,
select: Some(String::from("ALL_ATTRIBUTES")),
..Default::default()
};

let query = db.client.query(input);
let data = db.runtime.block_on(query)?;

let keys = match data.items {
Some(items) => {
items.iter()
.map(|hm| serde_dynamodb::from_hashmap(hm.clone()).unwrap())
.collect()
},
None => vec![],
};

Ok(keys)
}

pub fn close_all_conversations(
client: &Client,
db: &mut DynamoDbClient,
) -> Result<(), EngineError> {
let status = "CLOSED";
let now = get_date_time();
let conversations = get_all_open_conversations(client, db);
for conv in conversations.iter() {
let mut new_conv = Conversation::from(conv);

let mut conversations = get_all_open_conversations(client, db)?;
for new_conv in conversations.iter_mut() {
new_conv.status = status.to_owned();
new_conv.last_interaction_at = now.to_owned();
new_conv.updated_at = now.to_owned();
new_conv.range_time = make_range(&["interaction", status, &now, &conv.id]);
new_conv.range_time = make_range(&["interaction", status, &now, &new_conv.id]);
new_conv.range = Conversation::get_range(status, &new_conv.id);

let old_key = Conversation::get_key(&conv.client, "OPEN", &conv.id);
let old_key = Conversation::get_key(&client, "OPEN", &new_conv.id);
let new_item = serde_dynamodb::to_hashmap(&new_conv)?;

replace_conversation(&old_key, new_item, db)?;
Expand Down
31 changes: 1 addition & 30 deletions csml_engine/src/db_connectors/dynamodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::data::DynamoDbClient;
use crate::db_connectors::DbConversation;
use crate::{encrypt::encrypt_data, Client, Database, EngineError};
use crate::{Client, Database, EngineError};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -130,34 +129,6 @@ impl Conversation {
created_at: now.to_owned(),
}
}

pub fn from(conversation: &DbConversation) -> Self {
let class_name = "conversation";
let metadata = encrypt_data(&conversation.metadata).unwrap();
Self {
hash: Self::get_hash(&conversation.client),
range: Self::get_range(&conversation.status, &conversation.id),
range_time: make_range(&[
class_name,
&conversation.status,
&conversation.last_interaction_at,
&conversation.id,
]),
id: conversation.id.to_owned(),
client: Some(conversation.client.to_owned()),
bot_id: Some(conversation.client.bot_id.to_owned()),
channel_id: Some(conversation.client.channel_id.to_owned()),
user_id: Some(conversation.client.user_id.to_owned()),
class: class_name.to_string(),
metadata,
flow_id: conversation.flow_id.to_owned(),
step_id: conversation.step_id.to_owned(),
status: conversation.status.to_owned(),
last_interaction_at: conversation.last_interaction_at.to_owned(),
updated_at: conversation.updated_at.to_owned(),
created_at: conversation.created_at.to_owned(),
}
}
}

#[derive(Serialize, Deserialize, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion csml_interpreter/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "csml_interpreter"
version = "1.2.0"
version = "1.2.1"
authors = [
"Alexis Merelo <[email protected]>",
"Jefferson Le Quellec <[email protected]>",
Expand Down
2 changes: 1 addition & 1 deletion csml_server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "csml_server"
version = "1.2.0"
version = "1.2.1"
authors = ["François Falala-Sechet <[email protected]>"]
edition = "2018"

Expand Down

0 comments on commit 693da83

Please sign in to comment.