Skip to content

Commit

Permalink
Do not hold eager workflow slot provider on worker (temporalio#725)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Apr 23, 2024
1 parent 2926ba7 commit a04b31f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 21 deletions.
25 changes: 13 additions & 12 deletions client/src/worker_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use parking_lot::RwLock;
use slotmap::SlotMap;
use std::collections::{hash_map::Entry::Vacant, HashMap};
use std::sync::Arc;

use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;

Expand Down Expand Up @@ -54,7 +53,7 @@ impl SlotKey {
#[derive(Default, Debug)]
struct SlotManagerImpl {
/// Maps keys, i.e., namespace#task_queue, to provider.
providers: HashMap<SlotKey, Arc<dyn SlotProvider + Send + Sync>>,
providers: HashMap<SlotKey, Box<dyn SlotProvider + Send + Sync>>,
/// Maps ids to keys in `providers`.
index: SlotMap<WorkerKey, SlotKey>,
}
Expand Down Expand Up @@ -82,7 +81,7 @@ impl SlotManagerImpl {
None
}

fn register(&mut self, provider: Arc<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
fn register(&mut self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
let key = SlotKey::new(
provider.namespace().to_string(),
provider.task_queue().to_string(),
Expand All @@ -96,9 +95,11 @@ impl SlotManagerImpl {
}
}

fn unregister(&mut self, id: WorkerKey) {
fn unregister(&mut self, id: WorkerKey) -> Option<Box<dyn SlotProvider + Send + Sync>> {
if let Some(key) = self.index.remove(id) {
self.providers.remove(&key);
self.providers.remove(&key)
} else {
None
}
}

Expand Down Expand Up @@ -136,12 +137,12 @@ impl SlotManager {
}

/// Register a local worker that can provide WFT processing slots.
pub fn register(&self, provider: Arc<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
pub fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey> {
self.manager.write().register(provider)
}

/// Unregister a provider, typically when its worker starts shutdown.
pub fn unregister(&self, id: WorkerKey) {
pub fn unregister(&self, id: WorkerKey) -> Option<Box<dyn SlotProvider + Send + Sync>> {
self.manager.write().unregister(id)
}

Expand Down Expand Up @@ -197,8 +198,8 @@ mod tests {
let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true);

let manager = SlotManager::new();
let some_slots = manager.register(Arc::new(mock_provider1));
let no_slots = manager.register(Arc::new(mock_provider2));
let some_slots = manager.register(Box::new(mock_provider1));
let no_slots = manager.register(Box::new(mock_provider2));
assert!(no_slots.is_none());

let mut found = 0;
Expand All @@ -220,8 +221,8 @@ mod tests {
new_mock_provider("foo".to_string(), "bar_q".to_string(), false, false);
let mock_provider2 = new_mock_provider("foo".to_string(), "bar_q".to_string(), false, true);

let no_slots = manager.register(Arc::new(mock_provider2));
let some_slots = manager.register(Arc::new(mock_provider1));
let no_slots = manager.register(Box::new(mock_provider2));
let some_slots = manager.register(Box::new(mock_provider1));
assert!(some_slots.is_none());

let mut not_found = 0;
Expand All @@ -246,7 +247,7 @@ mod tests {
for i in 0..10 {
let namespace = format!("myId{}", i % 3);
let mock_provider = new_mock_provider(namespace, "bar_q".to_string(), false, false);
worker_keys.push(manager.register(Arc::new(mock_provider)));
worker_keys.push(manager.register(Box::new(mock_provider)));
}
assert_eq!((3, 3), manager.num_providers());

Expand Down
15 changes: 6 additions & 9 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ use {
pub struct Worker {
config: WorkerConfig,
wf_client: Arc<dyn WorkerClient>,
slot_provider: Arc<SlotProvider>,
/// Registration key to enable eager workflow start for this worker
worker_key: Mutex<Option<WorkerKey>>,
/// Manages all workflows and WFT processing
Expand Down Expand Up @@ -222,15 +221,14 @@ impl Worker {
pub fn replace_client(&self, new_client: ConfiguredClient<TemporalServiceClientWithMetrics>) {
// Unregister worker from current client, register in new client at the end
let mut worker_key = self.worker_key.lock();
let slot_provider = (*worker_key).and_then(|k| self.wf_client.workers().unregister(k));
if let Some(key) = *worker_key {
self.wf_client.workers().unregister(key);
}
self.wf_client
.replace_client(super::init_worker_client(&self.config, new_client));
*worker_key = self
.wf_client
.workers()
.register(self.slot_provider.clone());
*worker_key = slot_provider
.and_then(|slot_provider| self.wf_client.workers().register(slot_provider));
}

#[cfg(test)]
Expand Down Expand Up @@ -385,15 +383,14 @@ impl Worker {
info!("Activity polling is disabled for this worker");
};
let la_sink = LAReqSink::new(local_act_mgr.clone());
let slot_provider = Arc::new(SlotProvider::new(
let provider = SlotProvider::new(
config.namespace.clone(),
config.task_queue.clone(),
wft_semaphore.clone(),
external_wft_tx,
));
let worker_key = Mutex::new(client.workers().register(slot_provider.clone()));
);
let worker_key = Mutex::new(client.workers().register(Box::new(provider)));
Self {
slot_provider,
worker_key,
wf_client: client.clone(),
workflows: Workflows::new(
Expand Down

0 comments on commit a04b31f

Please sign in to comment.