-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSlowConsumerGrain.cs
39 lines (31 loc) · 1.01 KB
/
SlowConsumerGrain.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
namespace OrleansPersistentStream_DroppedEvents
{
using Orleans.Runtime;
using Orleans.Streams;
[ImplicitStreamSubscription("ns")]
internal class SlowConsumerGrain : Grain, IConsumerGrain
{
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
await this.GetStreamProvider("TestStream")
.GetStream<int>(StreamId.Create("ns", this.GetPrimaryKey()))
.SubscribeAsync(this);
var rnd = new Random();
await Task.Delay(rnd.Next(50, 300));
await base.OnActivateAsync(cancellationToken);
}
public Task OnNextAsync(int item, StreamSequenceToken? token = null)
{
return Task.CompletedTask;
}
public Task OnCompletedAsync()
{
return Task.CompletedTask;
}
public Task OnErrorAsync(Exception ex)
{
Console.WriteLine("Got error");
return Task.CompletedTask;
}
}
}