Skip to content

Commit

Permalink
Added reading/writing of message batches to the actor
Browse files Browse the repository at this point in the history
  • Loading branch information
dominicletz committed Oct 16, 2024
1 parent 7b132e4 commit 9aabef9
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 152 deletions.
36 changes: 36 additions & 0 deletions src/DiodeMessages.mo
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { abs } = "mo:base/Int";
import Result "mo:base/Result";
import Iter "mo:base/Iter";
import Nat64 "mo:base/Nat64";
import Nat32 "mo:base/Nat32";
import { now } = "mo:base/Time";
import List "mo:base/List";
import Region "mo:base/Region";
import Blob "mo:base/Blob";
import Map "mo:map/Map";
Expand Down Expand Up @@ -147,6 +149,40 @@ module DiodeMessages {
return get_message_by_offset(store, offset);
};

public func get_messages_by_range(store: MessageStore, min_message_id: Nat32, in_max_message_id: Nat32) : [Message] {
let max_message_id = Nat32.min(in_max_message_id, get_max_message_id(store));
if (min_message_id > max_message_id) {
return [];
};

Iter.range(Nat32.toNat(min_message_id), Nat32.toNat(max_message_id))
|> Iter.map(_, func (i : Nat) : Message {
let offset = get_message_offset_by_id(store, Nat32.fromNat(i));
get_message_by_offset(store, offset);
})
|> Iter.toArray(_);
};

public func get_messages_by_range_for_key(store: MessageStore, key_id: Blob, min_message_id: Nat32, in_max_message_id: Nat32) : [Message] {
assert(min_message_id <= in_max_message_id);
let max_message_id = Nat32.max(in_max_message_id, get_max_message_id(store));

var messages : List.List<Message> = List.nil<Message>();
var current_message_id = min_message_id;
while (current_message_id != 0 and current_message_id <= max_message_id) {
let message = get_message_by_id(store, current_message_id);
if (message.key_id != key_id) {
return List.toArray(messages);
};

messages := List.push(message, messages);
current_message_id := message.next_msg_id;
};

return List.toArray(messages);
};


private func get_message_by_offset(store: MessageStore, offset: Nat64) : Message {
let id = Region.loadNat32(store.inbox.region, offset);
let timestamp = Region.loadNat32(store.inbox.region, offset + 4);
Expand Down
21 changes: 20 additions & 1 deletion src/Main.mo
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ actor {
case (#err(e)) {
return #err(e);
};
case _ {};
case (#ok) {};
};
};
#ok;
Expand All @@ -32,6 +32,10 @@ actor {
DiodeMessages.get_message_by_id(dm, message_id);
};

public shared query func get_messages_by_range(min_message_id: Nat32, max_message_id: Nat32) : async [DiodeMessages.Message] {
DiodeMessages.get_messages_by_range(dm, min_message_id, max_message_id);
};

public shared query func get_min_message_id() : async Nat32 {
DiodeMessages.get_min_message_id(dm);
};
Expand All @@ -47,4 +51,19 @@ actor {
public shared query func get_max_message_id_by_key(key_id: Blob) : async ?Nat32 {
DiodeMessages.get_max_message_id_by_key(dm, key_id);
};

public shared query func get_messages_by_range_for_key(key_id: Blob, min_message_id: Nat32, max_message_id: Nat32) : async [DiodeMessages.Message] {
DiodeMessages.get_messages_by_range_for_key(dm, key_id, min_message_id, max_message_id);
};


public shared func test_record_output() : async ((Nat32, Nat32)) {
(0, 1);
};

public shared func test_record_input(record: (Nat32, Nat32)) : async Nat32 {
let (a, b) = record;
a + b;
};

};
Loading

0 comments on commit 9aabef9

Please sign in to comment.