Search Results for

    Show / Hide Table of Contents

    Akka I/O

    The I/O extension provides an non-blocking, event driven API that matches the underlying transports mechanism.

    Getting Started

    Every I/O Driver has a special actor, called the manager, that serves as an entry point for the API. The manager for a particular driver is accessible through an extension method on ActorSystem. The following example shows how to get a reference to the TCP manager.

    using Akka.Actor;
    using Akka.IO;
    
    ...
    
    var system = ActorSystem.Create("example");
    var manager = system.Tcp();
    

    TCP Driver

    Client Connection

    To create a connection an actor sends a Tcp.Connect message to the TCP Manager. Once the connection is established the connection actor sends a Tcp.Connected message to the commander, which registers the connection handler by replying with a Tcp.Register message.

    Once this handshake is completed, the handler and connection communicate with Tcp.WriteCommand and Tcp.Received messages.

    The following diagram illustrate the actors involved in establishing and handling a connection.

    TCP Connection

    The following example shows a simple Telnet client. The client send lines entered in the console to the TCP connection, and write data received from the network to the console.

    public class TelnetClient : UntypedActor
    {
        public TelnetClient(string host, int port)
        {
            var endpoint = new DnsEndPoint(host, port);
            Context.System.Tcp().Tell(new Tcp.Connect(endpoint));
        }
    
        protected override void OnReceive(object message)
        {
            if (message is Tcp.Connected connected)
            {
                Console.WriteLine("Connected to {0}", connected.RemoteAddress);
    
                // Register self as connection handler
                Sender.Tell(new Tcp.Register(Self));
                ReadConsoleAsync();
                Become(Connected(Sender));
            }
            else if (message is Tcp.CommandFailed)
            {
                Console.WriteLine("Connection failed");
            }
            else Unhandled(message);
        }
    
        private UntypedReceive Connected(IActorRef connection)
        {
            return message =>
            {
                if (message is Tcp.Received received)  // data received from network
                {
                    Console.WriteLine(Encoding.ASCII.GetString(received.Data.ToArray()));
                }
                else if (message is string s)   // data received from console
                {
                    connection.Tell(Tcp.Write.Create(ByteString.FromString(s + "\n")));
                    ReadConsoleAsync();
                }
                else if (message is Tcp.PeerClosed)
                {
                    Console.WriteLine("Connection closed");
                }
                else Unhandled(message);
            };
        }
    
        private void ReadConsoleAsync()
        {
            Task.Factory.StartNew(self => Console.In.ReadLineAsync().PipeTo((ICanTell)self), Self);
        }
    }
    
    

    Server Connection

    To accept connections, an actor sends an Tcp.Bind message to the TCP manager, passing the bind handler in the message. The bind commander will receive a Tcp.Bound message when the connection is listening.

    The bind handler will receive a Tcp.Connected message for each accepted connection, and needs to register the connection handler by replying with a Tcp.Register message. Thereafter it proceeds the same as a client connection.

    The following diagram illustrate the actor and messages.

    TCP Connection

    The following code example shows a simple server that echo's data received from the network.

    public class EchoServer : UntypedActor
    {
        public EchoServer(int port)
        {
            Context.System.Tcp().Tell(new Tcp.Bind(Self, new IPEndPoint(IPAddress.Any, port)));
        }
    
        protected override void OnReceive(object message)
        {
            switch (message)
            {
                case Tcp.Bound bound:
                    Console.WriteLine("Listening on {0}", bound.LocalAddress);
                    break;
                case Tcp.Connected:
                {
                    var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
                    Sender.Tell(new Tcp.Register(connection));
                    break;
                }
                default:
                    Unhandled(message);
                    break;
            }
        }
    }
    
    
    public class EchoConnection : UntypedActor
    {
        private readonly IActorRef _connection;
    
        public EchoConnection(IActorRef connection)
        {
            _connection = connection;
        }
    
        protected override void OnReceive(object message)
        {
            if (message is Tcp.Received received)
            {
                if (received.Data[0] == 'x')
                    Context.Stop(Self);
                else
                    _connection.Tell(Tcp.Write.Create(received.Data));
            }
            else Unhandled(message);
        }
    }
    
    

    TCP Listener Statistics

    If you're building a long-running TCP server you can subscribe to the TcpListener actor's statistics via the Tcp.SubscribeToTcpListenerStats message:

    public class EchoServerWithStats : UntypedActor
    {
        public EchoServerWithStats(int port)
        {
            Context.System.Tcp().Tell(new Tcp.Bind(Self, new IPEndPoint(IPAddress.Any, port)));
        }
    
        protected override void OnReceive(object message)
        {
            switch (message)
            {
                case Tcp.Bound bound:
                    Sender.Tell(new Tcp.SubscribeToTcpListenerStats(Self));
                    Console.WriteLine("Listening on {0}", bound.LocalAddress);
                    break;
                case Tcp.TcpListenerStatistics stats:
                    Console.WriteLine("Received TCP stats: {0}", stats);
                    break;
                case Tcp.Connected:
                {
                    var connection = Context.ActorOf(Props.Create(() => new EchoConnection(Sender)));
                    Sender.Tell(new Tcp.Register(connection));
                    break;
                }
                default:
                    Unhandled(message);
                    break;
            }
        }
    }
    
    

    This will result in a Tcp.TcpListenerStatistics message being delivered with updated, rolling statistics once every 10 seconds or so roughly. Each independent TcpListener maintains its own statistics and they can support an arbitrary number of subscribers.

    In this article
    • githubEdit this page
    Back to top
    Contribute
    • Project Chat
    • Discussion Forum
    • Source Code
    Support
    • Akka.NET Support Plans
    • Akka.NET Observability Tools
    • Akka.NET Training & Consulting
    Maintained By
    • Petabridge - The Akka.NET Company
    • Learn Akka.NET