Akka I/O
The I/O extension provides an non-blocking, event driven API that matches the underlying transports mechanism.
Getting Started
Every I/O Driver has a special actor, called the manager
, that serves as an entry point for the API.
The manager
for a particular driver is accessible through an extension method on ActorSystem
. The following example shows how to get a reference to the TCP manager.
using Akka.Actor;
using Akka.IO;
...
var system = ActorSystem.Create("example");
var manager = system.Tcp();
TCP Driver
Client Connection
To create a connection an actor sends a Tcp.Connect
message to the TCP Manager.
Once the connection is established the connection actor sends a Tcp.Connected
message to the commander
, which registers the connection handler
by replying with a Tcp.Register
message.
Once this handshake is completed, the handler and connection communicate with Tcp.WriteCommand
and Tcp.Received
messages.
The following diagram illustrate the actors involved in establishing and handling a connection.
The following example shows a simple Telnet client. The client send lines entered in the console to the TCP connection, and write data received from the network to the console.
public class TelnetClient : UntypedActor
{
public TelnetClient(string host, int port)
{
var endpoint = new DnsEndPoint(host, port);
Context.System.Tcp().Tell(new Tcp.Connect(endpoint));
}
protected override void OnReceive(object message)
{
if (message is Tcp.Connected connected)
{
Console.WriteLine("Connected to {0}", connected.RemoteAddress);
// Register self as connection handler
Sender.Tell(new Tcp.Register(Self));
ReadConsoleAsync();
Become(Connected(Sender));
}
else if (message is Tcp.CommandFailed)
{
Console.WriteLine("Connection failed");
}
else Unhandled(message);
}
private UntypedReceive Connected(IActorRef connection)
{
return message =>
{
if (message is Tcp.Received received) // data received from network
{
Console.WriteLine(Encoding.ASCII.GetString(received.Data.ToArray()));
}
else if (message is string s) // data received from console
{
connection.Tell(Tcp.Write.Create(ByteString.FromString(s + "\n")));
ReadConsoleAsync();
}
else if (message is Tcp.PeerClosed)
{
Console.WriteLine("Connection closed");
}
else Unhandled(message);
};
}
private void ReadConsoleAsync()
{
Task.Factory.StartNew(self => Console.In.ReadLineAsync().PipeTo((ICanTell)self), Self);
}
}
Server Connection
To accept connections, an actor sends an Tcp.Bind
message to the TCP manager, passing the bind handler
in the message.
The bind commander
will receive a Tcp.Bound
message when the connection is listening.
The bind handler
will receive a Tcp.Connected
message for each accepted connection, and needs to register the connection handler by replying with a Tcp.Register
message. Thereafter it proceeds the same as a client connection.
The following diagram illustrate the actor and messages.
The following code example shows a simple server that echo's data received from the network.
public class EchoServer : UntypedActor
{
public EchoServer(int port)
{
Context.System.Tcp().Tell(new Tcp.Bind(Self, new IPEndPoint(IPAddress.Any, port)));
}
protected override void OnReceive(object message)
{
switch (message)
{
case Tcp.Bound bound:
Console.WriteLine("Listening on {0}", bound.LocalAddress);
break;
case Tcp.Connected:
{
var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
Sender.Tell(new Tcp.Register(connection));
break;
}
default:
Unhandled(message);
break;
}
}
}
public class EchoConnection : UntypedActor
{
private readonly IActorRef _connection;
public EchoConnection(IActorRef connection)
{
_connection = connection;
}
protected override void OnReceive(object message)
{
if (message is Tcp.Received received)
{
if (received.Data[0] == 'x')
Context.Stop(Self);
else
_connection.Tell(Tcp.Write.Create(received.Data));
}
else Unhandled(message);
}
}
TCP Listener Statistics
If you're building a long-running TCP server you can subscribe to the TcpListener
actor's statistics via the Tcp.SubscribeToTcpListenerStats
message:
public class EchoServerWithStats : UntypedActor
{
public EchoServerWithStats(int port)
{
Context.System.Tcp().Tell(new Tcp.Bind(Self, new IPEndPoint(IPAddress.Any, port)));
}
protected override void OnReceive(object message)
{
switch (message)
{
case Tcp.Bound bound:
Sender.Tell(new Tcp.SubscribeToTcpListenerStats(Self));
Console.WriteLine("Listening on {0}", bound.LocalAddress);
break;
case Tcp.TcpListenerStatistics stats:
Console.WriteLine("Received TCP stats: {0}", stats);
break;
case Tcp.Connected:
{
var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
Sender.Tell(new Tcp.Register(connection));
break;
}
default:
Unhandled(message);
break;
}
}
}
This will result in a Tcp.TcpListenerStatistics
message being delivered with updated, rolling statistics once every 10 seconds or so roughly. Each independent TcpListener
maintains its own statistics and they can support an arbitrary number of subscribers.