Skip to content

Commit

Permalink
code cleanup and remove obsolete members
Browse files Browse the repository at this point in the history
  • Loading branch information
rogeralsing committed Mar 8, 2020
1 parent c4f10db commit 288f163
Show file tree
Hide file tree
Showing 31 changed files with 396 additions and 335 deletions.
126 changes: 66 additions & 60 deletions src/Proto.Actor/ActorContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,24 @@ internal enum ContextState : byte
Alive,
Restarting,
Stopping,
Stopped,
Stopped
}

//Angels cry over this code, but it serves a purpose, lazily init of less frequently used features
public class ActorContextExtras
{
public ActorContextExtras(IContext context)
{
Context = context;
}

public ImmutableHashSet<PID> Children { get; private set; } = ImmutableHashSet<PID>.Empty;
public Timer? ReceiveTimeoutTimer { get; private set; }
public RestartStatistics RestartStatistics { get; } = new RestartStatistics(0, null);
public Stack<object> Stash { get; } = new Stack<object>();
public ImmutableHashSet<PID> Watchers { get; private set; } = ImmutableHashSet<PID>.Empty;
public IContext Context { get; }

public ActorContextExtras(IContext context)
{
Context = context;
}

public void InitReceiveTimeoutTimer(Timer timer) => ReceiveTimeoutTimer = timer;

public void ResetReceiveTimeoutTimer(TimeSpan timeout) => ReceiveTimeoutTimer?.Change(timeout, timeout);
Expand Down Expand Up @@ -76,17 +76,6 @@ public class ActorContext : IMessageInvoker, IContext, ISupervisor
private object? _messageOrEnvelope;
private ContextState _state;

private ActorContextExtras EnsureExtras()
{
if (_extras == null)
{
var context = _props.ContextDecoratorChain?.Invoke(this) ?? this;
_extras = new ActorContextExtras(context);
}

return _extras;
}

public ActorContext(Props props, PID parent, PID self)
{
_props = props;
Expand All @@ -99,8 +88,6 @@ public ActorContext(Props props, PID parent, PID self)
}

private static ILogger Logger { get; } = Log.CreateLogger<ActorContext>();

public IImmutableSet<PID> Children => _extras?.Children ?? EmptyChildren;
IReadOnlyCollection<PID> IContext.Children => Children;

public IActor? Actor { get; private set; }
Expand Down Expand Up @@ -219,12 +206,14 @@ public void Request(PID target, object message, PID sender)
SendUserMessage(target, messageEnvelope);
}

public Task<T> RequestAsync<T>(PID target, object message, TimeSpan timeout) => RequestAsync(target, message, new FutureProcess<T>(timeout));
public Task<T> RequestAsync<T>(PID target, object message, TimeSpan timeout) =>
RequestAsync(target, message, new FutureProcess<T>(timeout));

This comment has been minimized.

Copy link
@alexeyzimarev

alexeyzimarev Mar 8, 2020

Member

@rogeralsing if we need to have shorter line length, I suggest putting it to the DotSettings file, otherwise we'll always be reformatting each others code 😄


public Task<T> RequestAsync<T>(PID target, object message, CancellationToken cancellationToken)
=> RequestAsync(target, message, new FutureProcess<T>(cancellationToken));

public Task<T> RequestAsync<T>(PID target, object message) => RequestAsync(target, message, new FutureProcess<T>());
public Task<T> RequestAsync<T>(PID target, object message) =>
RequestAsync(target, message, new FutureProcess<T>());

public void ReenterAfter<T>(Task<T> target, Func<Task<T>, Task> action)
{
Expand All @@ -249,6 +238,40 @@ public void ReenterAfter(Task target, Action action)
target.ContinueWith(t => { Self.SendSystemMessage(cont); });
}

public Task Receive(MessageEnvelope envelope)
{
_messageOrEnvelope = envelope;
return DefaultReceive();
}

public void Stop(PID pid)
{
var reff = ProcessRegistry.Instance.Get(pid);
reff.Stop(pid);
}

public Task StopAsync(PID pid)
{
var future = new FutureProcess<object>();

pid.SendSystemMessage(new Watch(future.Pid));
Stop(pid);

return future.Task;
}

public void Poison(PID pid) => pid.SendUserMessage(new PoisonPill());

public Task PoisonAsync(PID pid)
{
var future = new FutureProcess<object>();

pid.SendSystemMessage(new Watch(future.Pid));
Poison(pid);

return future.Task;
}

public void EscalateFailure(Exception reason, object message)
{
var failure = new Failure(Self, reason, EnsureExtras().RestartStatistics, message);
Expand All @@ -264,12 +287,6 @@ public void EscalateFailure(Exception reason, object message)
}
}

public void RestartChildren(Exception reason, params PID[] pids) => pids.SendSystemMessage(new Restart(reason));

public void StopChildren(params PID[] pids) => pids.SendSystemMessage(Proto.Stop.Instance);

public void ResumeChildren(params PID[] pids) => pids.SendSystemMessage(ResumeMailbox.Instance);

public Task InvokeSystemMessageAsync(object msg)
{
try
Expand Down Expand Up @@ -350,10 +367,23 @@ public Task InvokeUserMessageAsync(object msg)
return res;
}

public Task Receive(MessageEnvelope envelope)
public IImmutableSet<PID> Children => _extras?.Children ?? EmptyChildren;

public void RestartChildren(Exception reason, params PID[] pids) => pids.SendSystemMessage(new Restart(reason));

public void StopChildren(params PID[] pids) => pids.SendSystemMessage(Proto.Stop.Instance);

public void ResumeChildren(params PID[] pids) => pids.SendSystemMessage(ResumeMailbox.Instance);

private ActorContextExtras EnsureExtras()
{
_messageOrEnvelope = envelope;
return DefaultReceive();
if (_extras == null)
{
var context = _props.ContextDecoratorChain?.Invoke(this) ?? this;
_extras = new ActorContextExtras(context);
}

return _extras;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
Expand Down Expand Up @@ -444,7 +474,9 @@ private void HandleFailure(Failure msg)
supervisor.HandleFailure(this, msg.Who, msg.RestartStatistics, msg.Reason, msg.Message);
break;
default:
_props.SupervisorStrategy.HandleFailure(this, msg.Who, msg.RestartStatistics, msg.Reason, msg.Message);
_props.SupervisorStrategy.HandleFailure(this, msg.Who, msg.RestartStatistics, msg.Reason,
msg.Message
);
break;
}
}
Expand All @@ -461,7 +493,9 @@ private async Task HandleTerminatedAsync(Terminated msg)
}

private void HandleRootFailure(Failure failure)
=> Supervision.DefaultStrategy.HandleFailure(this, failure.Who, failure.RestartStatistics, failure.Reason, failure.Message);
=> Supervision.DefaultStrategy.HandleFailure(this, failure.Who, failure.RestartStatistics, failure.Reason,
failure.Message
);

//Initiate stopping, not final
private async Task InitiateStopAsync()
Expand Down Expand Up @@ -560,33 +594,5 @@ private void ReceiveTimeoutCallback(object state)
CancelReceiveTimeout();
Send(Self, Proto.ReceiveTimeout.Instance);
}

public void Stop(PID pid)
{
var reff = ProcessRegistry.Instance.Get(pid);
reff.Stop(pid);
}

public Task StopAsync(PID pid)
{
var future = new FutureProcess<object>();

pid.SendSystemMessage(new Watch(future.Pid));
Stop(pid);

return future.Task;
}

public void Poison(PID pid) => pid.SendUserMessage(new PoisonPill());

public Task PoisonAsync(PID pid)
{
var future = new FutureProcess<object>();

pid.SendSystemMessage(new Watch(future.Pid));
Poison(pid);

return future.Task;
}
}
}
2 changes: 1 addition & 1 deletion src/Proto.Actor/ActorContextDecorator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public virtual Task<T> RequestAsync<T>(PID target, object message, CancellationT

public virtual MessageHeader Headers => _context.Headers;
public virtual object Message => _context.Message;

public virtual Task Receive(MessageEnvelope envelope) => _context.Receive(envelope);

public virtual PID? Parent => _context.Parent;
Expand Down
9 changes: 7 additions & 2 deletions src/Proto.Actor/Behavior.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ public class Behavior
{
private readonly Stack<Receive> _behaviors = new Stack<Receive>();

public Behavior() { }
public Behavior()
{
}

public Behavior(Receive receive) => Become(receive);
public Behavior(Receive receive)
{
Become(receive);
}

public void Become(Receive receive)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Actor/Delegates.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
namespace Proto
{
public delegate Task Receive(IContext context);

//TODO: IReceiveContext ?
public delegate Task Receiver(IReceiverContext context, MessageEnvelope envelope);

public delegate Task Sender(ISenderContext context, PID target, MessageEnvelope envelope);
}
9 changes: 7 additions & 2 deletions src/Proto.Actor/EmptyActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

namespace Proto
{
class EmptyActor : IActor
internal class EmptyActor : IActor
{
private readonly Receive _receive;
public EmptyActor(Receive receive) => _receive = receive;

public EmptyActor(Receive receive)
{
_receive = receive;
}

public Task ReceiveAsync(IContext context) => _receive(context);
}
}
57 changes: 33 additions & 24 deletions src/Proto.Actor/EventStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ public class EventStream : EventStream<object>
internal EventStream()
{
Subscribe(msg =>
{
if (msg is DeadLetterEvent letter)
{
_logger.LogInformation("[DeadLetter] '{0}' got '{1}:{2}' from '{3}'", letter.Pid.ToShortString(),
letter.Message.GetType().Name, letter.Message, letter.Sender?.ToShortString());
if (msg is DeadLetterEvent letter)
{
_logger.LogInformation("[DeadLetter] '{0}' got '{1}:{2}' from '{3}'",
letter.Pid.ToShortString(),
letter.Message.GetType().Name, letter.Message, letter.Sender?.ToShortString()
);
}
}
});
);
}
}

public class EventStream<T>
{
private readonly ILogger _logger = Log.CreateLogger<EventStream<T>>();
Expand All @@ -44,10 +48,11 @@ internal EventStream()
public Subscription<T> Subscribe(Action<T> action, IDispatcher? dispatcher = null)
{
var sub = new Subscription<T>(this, dispatcher ?? Dispatchers.SynchronousDispatcher, x =>
{
action(x);
return Actor.Done;
});
{
action(x);
return Actor.Done;
}
);
_subscriptions.TryAdd(sub.Id, sub);
return sub;
}
Expand All @@ -62,13 +67,15 @@ public Subscription<T> Subscribe(Func<T, Task> action, IDispatcher? dispatcher =
public Subscription<T> Subscribe<TMsg>(Action<TMsg> action, IDispatcher? dispatcher = null) where TMsg : T
{
var sub = new Subscription<T>(this, dispatcher ?? Dispatchers.SynchronousDispatcher, msg =>
{
if (msg is TMsg typed)
{
action(typed);
if (msg is TMsg typed)
{
action(typed);
}

return Actor.Done;
}
return Actor.Done;
});
);

_subscriptions.TryAdd(sub.Id, sub);
return sub;
Expand All @@ -79,17 +86,19 @@ public void Publish(T msg)
foreach (var sub in _subscriptions.Values)
{
sub.Dispatcher.Schedule(() =>
{
try
{
sub.Action(msg);
}
catch (Exception ex)
{
_logger.LogWarning(0, ex, "Exception has occurred when publishing a message.");
try
{
sub.Action(msg);
}
catch (Exception ex)
{
_logger.LogWarning(0, ex, "Exception has occurred when publishing a message.");
}

return Actor.Done;
}
return Actor.Done;
});
);
}
}

Expand All @@ -114,4 +123,4 @@ public Subscription(EventStream<T> eventStream, IDispatcher dispatcher, Func<T,

public void Unsubscribe() => _eventStream.Unsubscribe(Id);
}
}
}
6 changes: 3 additions & 3 deletions src/Proto.Actor/Exceptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ namespace Proto
{
public class ProcessNameExistException : Exception
{
public string Name { get; }
public PID Pid { get; }

public ProcessNameExistException(string name, PID pid) : base($"a Process with the name '{name}' already exists")
{
Name = name;
Pid = pid;
}

public string Name { get; }
public PID Pid { get; }
}
}
Loading

0 comments on commit 288f163

Please sign in to comment.