diff --git a/Akkling.sln b/Akkling.sln index fe7d959..49f00b1 100644 --- a/Akkling.sln +++ b/Akkling.sln @@ -42,6 +42,7 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{8CC185AC-1D97-41F9-BBF1-09D892CA18B9}" ProjectSection(SolutionItems) = preProject examples\basic.fsx = examples\basic.fsx + examples\io.fsx = examples\io.fsx examples\lifecycle.fsx = examples\lifecycle.fsx examples\persistence.fsx = examples\persistence.fsx examples\remote.fsx = examples\remote.fsx diff --git a/examples/io.fsx b/examples/io.fsx new file mode 100644 index 0000000..502d7d1 --- /dev/null +++ b/examples/io.fsx @@ -0,0 +1,42 @@ +#r "../src/Akkling/bin/Debug/Akka.dll" +#r "../src/Akkling/bin/Debug/Wire.dll" +#r "../src/Akkling/bin/Debug/Newtonsoft.Json.dll" +#r "../src/Akkling/bin/Debug/FSharp.PowerPack.dll" +#r "../src/Akkling/bin/Debug/FSharp.PowerPack.Linq.dll" +#r "../src/Akkling/bin/Debug/Akkling.dll" + +open System +open Akkling +open Akkling.IO +open Akkling.IO.Tcp +open System.Net + +let system = System.create "telnet-sys" <| Configuration.defaultConfig() + +let handler connection = fun (ctx: Actor) -> + monitor ctx connection |> ignore + let rec loop () = actor { + let! msg = ctx.Receive () + match msg with + | Received(data) -> + printfn "%s" (data.DecodeString()) + return! loop () + | Terminated(_, _,_) | ConnectionClosed(_) -> return Stop + | _ -> return Unhandled + } + loop () + +let endpoint = IPEndPoint(IPAddress.Loopback, 5000) +let listener = spawn system "listener" <| fun m -> + IO.Tcp(m) + let conn = m.Sender () + conn return Unhandled + } + loop () + \ No newline at end of file diff --git a/examples/persistence.fsx b/examples/persistence.fsx index 3431ac5..a408823 100644 --- a/examples/persistence.fsx +++ b/examples/persistence.fsx @@ -38,8 +38,8 @@ let counter = | GetState -> mailbox.Sender() return Persist [ Event { Delta = 1 } ] - | Dec -> return Persist [ Event { Delta = -1 } ] + | Inc -> return Persist (Event { Delta = 1 }) + | Dec -> return Persist (Event { Delta = -1 }) } loop 0 diff --git a/src/Akkling.Persistence/PersistentActor.fs b/src/Akkling.Persistence/PersistentActor.fs index 6e5c686..1adbcf3 100644 --- a/src/Akkling.Persistence/PersistentActor.fs +++ b/src/Akkling.Persistence/PersistentActor.fs @@ -83,16 +83,20 @@ and []ExtEventsourced<'Message> = inherit ExtActor<'Message> and PersistentEffect<'Message> = - | Persist of 'Message seq - | PersistAsync of 'Message seq + | Persist of 'Message + | PersistAll of 'Message seq + | PersistAsync of 'Message + | PersistAllAsync of 'Message seq | Defer of 'Message seq interface Effect with member this.OnApplied(context, message) = match context with | :? ExtEventsourced<'Message> as persistentContext -> match this with - | Persist(events) -> persistentContext.PersistEvent events - | PersistAsync(events) -> persistentContext.AsyncPersistEvent events + | Persist(event) -> persistentContext.PersistEvent [event] + | PersistAll(events) -> persistentContext.PersistEvent events + | PersistAsync(event) -> persistentContext.AsyncPersistEvent [event] + | PersistAllAsync(events) -> persistentContext.AsyncPersistEvent events | Defer(events) -> persistentContext.DeferEvent events | _ -> raise (Exception("Cannot use persistent effects in context of non-persistent actor")) @@ -116,6 +120,7 @@ and TypedPersistentContext<'Message, 'Actor when 'Actor :> FunPersistentActor<'M member __.Receive() = Input member __.Self = typed self member __.Sender<'Response>() = typed (context.Sender) :> IActorRef<'Response> + member __.Parent<'Other>() = typed (context.Parent) :> IActorRef<'Other> member __.System = context.System member __.ActorOf(props, name) = context.ActorOf(props, name) member __.ActorSelection(path : string) = context.ActorSelection(path) diff --git a/src/Akkling.Persistence/PersistentView.fs b/src/Akkling.Persistence/PersistentView.fs index 0def321..cd84e83 100644 --- a/src/Akkling.Persistence/PersistentView.fs +++ b/src/Akkling.Persistence/PersistentView.fs @@ -57,6 +57,7 @@ and TypedViewContext<'Message, 'Actor when 'Actor :> FunPersistentView<'Message> member __.Receive() = Input member __.Self = typed self member __.Sender<'Response>() = typed (context.Sender) :> IActorRef<'Response> + member __.Parent<'Other>() = typed (context.Parent) :> IActorRef<'Other> member __.System = context.System member __.ActorOf(props, name) = context.ActorOf(props, name) member __.ActorSelection(path : string) = context.ActorSelection(path) diff --git a/src/Akkling/Actors.fs b/src/Akkling/Actors.fs index a001249..bc2f2d4 100644 --- a/src/Akkling/Actors.fs +++ b/src/Akkling/Actors.fs @@ -44,6 +44,11 @@ type Actor<'Message> = /// abstract Sender<'Response> : unit -> IActorRef<'Response> + /// + /// Returns a parrent of current actor. + /// + abstract Parent<'Other> : unit -> IActorRef<'Other> + /// /// Lazy logging adapter. It won't be initialized until logging function will be called. /// @@ -113,6 +118,7 @@ type TypedContext<'Message, 'Actor when 'Actor :> ActorBase and 'Actor :> IWithU member __.Receive() = Input member __.Self = typed self member __.Sender<'Response>() = typed (context.Sender) :> IActorRef<'Response> + member __.Parent<'Other>() = typed (context.Parent) :> IActorRef<'Other> member __.System = context.System member __.ActorOf(props, name) = context.ActorOf(props, name) member __.ActorSelection(path : string) = context.ActorSelection(path) diff --git a/src/Akkling/Akkling.fsproj b/src/Akkling/Akkling.fsproj index 6fd1262..c959517 100644 --- a/src/Akkling/Akkling.fsproj +++ b/src/Akkling/Akkling.fsproj @@ -74,6 +74,7 @@ + diff --git a/src/Akkling/IO.fs b/src/Akkling/IO.fs new file mode 100644 index 0000000..ea95fb8 --- /dev/null +++ b/src/Akkling/IO.fs @@ -0,0 +1,79 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// Copyright (C) 2015 Bartosz Sypytkowski +// +//----------------------------------------------------------------------- + +namespace Akkling + +module IO = + + open Akka.IO + open System.Net + + /// + /// Gets TCP manager for current actor. + /// + let Tcp(context: Actor<'Message>) : IActorRef = + typed (Akka.IO.Tcp.Manager(context.System)) + + /// + /// Gets UDP manager for current actor. + /// + let Udp(context: Actor<'Message>) : IActorRef = + typed (Akka.IO.Udp.Manager(context.System)) + + type TcpMessage = Akka.IO.TcpMessage + + module Tcp = + + let (|Received|_|) (msg:obj) : ByteString option = + match msg with + | :? Tcp.Received as r -> Some (r.Data) + | _ -> None + + let (|Connected|_|) (msg:obj) : (EndPoint * EndPoint) option = + match msg with + | :? Tcp.Connected as c -> Some (c.RemoteAddress, c.LocalAddress) + | _ -> None + + let (|CommandFailed|_|) (msg:obj) : #Tcp.Command option = + match msg with + | :? Tcp.CommandFailed as c -> + if c.Cmd :? #Tcp.Command + then Some (c.Cmd :?> #Tcp.Command) + else None + | _ -> None + + let (|ConnectionClosed|_|) (msg:obj) = + match msg with + | :? Tcp.ConnectionClosed as closed -> Some closed + | _ -> None + + let (|Closed|Aborted|ConfirmedClosed|PeerClosed|ErrorClosed|) (msg:Tcp.ConnectionClosed) = + match msg with + | :? Tcp.Closed -> Closed + | :? Tcp.Aborted -> Aborted + | :? Tcp.ConfirmedClosed -> ConfirmedClosed + | :? Tcp.PeerClosed -> PeerClosed + | :? Tcp.ErrorClosed -> ErrorClosed + + module Udp = + + let inline Bind(ref: IActorRef<'t>, localAddress: EndPoint) = + Akka.IO.Udp.Bind(ref, localAddress) :> Udp.Command + + let (|Received|_|) (msg:obj) : ByteString option = + match msg with + | :? Udp.Received as r -> Some (r.Data) + | _ -> None + + let (|CommandFailed|_|) (msg:obj) : 'C option = + match msg with + | :? Udp.CommandFailed as c -> + if c.Cmd :? 'C + then Some (c.Cmd :?> 'C) + else None + | _ -> None diff --git a/src/Akkling/MessagePatterns.fs b/src/Akkling/MessagePatterns.fs index ae980aa..5e132b3 100644 --- a/src/Akkling/MessagePatterns.fs +++ b/src/Akkling/MessagePatterns.fs @@ -19,7 +19,11 @@ let (|Terminated|_|) (msg: obj) : (IActorRef<'T> * bool * bool) option = | :? Terminated as t -> Some((typed t.ActorRef, t.ExistenceConfirmed, t.AddressTerminated)) | _ -> None - +/// +/// Active pattern that matches message agains message. +/// This is the result of request send with matching correlation id. +/// Response contains actor ref of the requested identity or None if no actor was found. +/// let (|ActorIdentity|_|) (msg: obj) : ('CorrelationId * IActorRef<'T> option) option = match msg with | :? ActorIdentity as identity ->