Akka.Cluster.Metrics Module
The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus with the help of Cluster Metrics Extension.
Cluster metrics information is primarily used for load-balancing routers, and can also be used to implement advanced metrics-based node life cycles, such as “Node Let-it-crash” when CPU steal time becomes excessive.
Cluster members with status WeaklyUp
, if that feature is enabled, will participate in Cluster Metrics collection and dissemination.
Metrics Collector
Metrics collection is delegated to an implementation of Akka.Cluster.Metrics.IMetricsCollector
.
Different collector implementations may provide different subsets of metrics published to the cluster.
Metrics currently supported are defined in Akka.Cluster.Metrics.StandardMetrics
class:
MemoryUsed
- total memory allocated to the currently running processMemoryAvailable
- memory, available for the processMaxMemoryRecommended
- if set, memory limit recommended for current processProcessors
- number of available processorsCpuProcessUsage
- CPU usage by current processCpuTotalUsage
- total CPU usage
Note: currently, due to some .NET Core limitations
CpuTotalUsage
is the same asCpuProcessUsage
metrics, but this is something to be fixed in near future (see this issue for details).
Cluster metrics extension comes with built-in Akka.Cluster.Metrics.Collectors.DefaultCollector
collector implementation,
which collects all metrics defined above.
You can also plug-in your own metrics collector implementation.
By default, metrics extension will use collector provider fall back and will try to load them in this order:
- configured user-provided collector (see
Configuration
section for details) - built-in
Akka.Cluster.Metrics.Collectors.DefaultCollector
collector
Metrics Events
Metrics extension periodically publishes current snapshot of the cluster metrics to the node system event bus.
The publication interval is controlled by the akka.cluster.metrics.collector.sample-interval
setting.
The payload of the Akka.Cluster.Metrics.Events.ClusterMetricsChanged
event will contain latest metrics of the node as well as
other cluster member nodes metrics gossip which was received during the collector sample interval.
You can subscribe your metrics listener actors to these events in order to implement custom node lifecycle:
ClusterMetrics.Get(Sys).Subscribe(metricsListenerActor);
Adaptive Load Balancing
The AdaptiveLoadBalancingPool
/ AdaptiveLoadBalancingGroup
performs load balancing of messages to cluster nodes based on the cluster metrics data.
It uses random selection of routees with probabilities derived from the remaining capacity of the corresponding node.
It can be configured to use a specific IMetricsSelector
implementation to produce the probabilities, a.k.a. weights:
memory
/MemoryMetricsSelector
- Used and max available memory. Weights based on remaining memory capacity: (max - used) / maxcpu
/CpuMetricsSelector
- CPU utilization in percentage. Weights based on remaining cpu capacity: 1 - utilizationmix
/MixMetricsSelector
- Combines memory and cpu. Weights based on mean of remaining capacity of the combined selectors.
The collected metrics values are smoothed with exponential weighted moving average. In the cluster configuration you can adjust how quickly past data is decayed compared to new data.
Let’s take a look at this router in action. What can be more demanding than calculating factorials?
The backend worker that performs the factorial calculation:
public class FactorialBackend : ReceiveActor
{
public FactorialBackend()
{
ReceiveAsync<int>(n =>
{
var sender = Sender;
return Task.Run(() => Factorial(n)).PipeTo(sender, success: factorial => new FactorialResult(n, factorial));
});
}
private BigInteger Factorial(int n)
{
var acc = BigInteger.One;
for (var i = 0; i <= n; ++i)
acc *= i;
return acc;
}
}
The frontend that receives user jobs and delegates to the backends via the router:
public class FactorialFrontend : ReceiveActor
{
private readonly int _upToN;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IActorRef _backend = Context.ActorOf(FromConfig.Instance.Props(), "factorialBackendRouter");
public FactorialFrontend(int upToN, bool repeat)
{
_upToN = upToN;
Receive<FactorialResult>(result =>
{
if (result.N != _upToN)
return;
_log.Debug("{0}! = {1}", result.N, result.Factorial);
if (repeat)
SendJobs();
else
Context.Stop(Self);
});
Receive<ReceiveTimeout>(_ =>
{
_log.Info("Timeout");
SendJobs();
});
}
protected override void PreStart()
{
base.PreStart();
SendJobs();
Context.SetReceiveTimeout(10.Seconds());
}
private void SendJobs()
{
_log.Info("Starting batch of factorials up to [{0}]", _upToN);
for (var n = 1; n <= _upToN; ++n)
{
_backend.Tell(n);
}
}
}
As you can see, the router is defined in the same way as other routers, and in this case it is configured as follows:
akka.actor.deployment {
/factorialFrontend/factorialBackendRouter = {
# Router type provided by metrics extension.
router = cluster-metrics-adaptive-group
# Router parameter specific for metrics extension.
# metrics-selector = memory
# metrics-selector = cpu
metrics-selector = mix
#
routees.paths = ["/user/factorialBackend"]
cluster {
enabled = on
use-roles = ["backend"]
allow-local-routees = off
}
}
}
It is only router
type and the metrics-selector
parameter that is specific to this router, other things work in the same way as other routers.
The same type of router could also have been defined in code:
Group Router
var totalInstances = 100;
var routeesPaths = new []{ "/user/factorialBackend", "" };
var allowLocalRoutees = true;
var useRoles = "backend";
IActorRef backend = Context.ActorOf(
new ClusterRouterGroup(
new AdaptiveLoadBalancingGroup(MemoryMetricsSelector.Instance),
new ClusterRouterGroupSettings(totalInstances, routeesPaths, allowLocalRoutees, useRoles)
).Props(),
"factorialBackendRouter2");
Pool Router
var totalInstances = 100;
var maxInstancesPerNode = 3;
var allowLocalRoutees = false;
var useRoles = "backend";
IActorRef backend = Context.ActorOf(
new ClusterRouterPool(
new AdaptiveLoadBalancingPool(CpuMetricsSelector.Instance, 0),
new ClusterRouterPoolSettings(totalInstances, maxInstancesPerNode, allowLocalRoutees, useRoles))
.Props(Props.Create<FactorialBackend>()),
"factorialBackendRouter3");
Subscribe to Metrics Events
It is possible to subscribe to the metrics events directly to implement other functionality.
//-----------------------------------------------------------------------
// <copyright file="MetricsListenerSample.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using Akka.Actor;
using Akka.Cluster.Metrics.Events;
using Akka.Cluster.Metrics.Serialization;
using Akka.Event;
using Akka.Util;
namespace Akka.Cluster.Metrics.Tests
{
public class MetricsListener : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Cluster _cluster = Cluster.Get(Context.System);
private readonly ClusterMetrics _metricsExtension = ClusterMetrics.Get(Context.System);
public MetricsListener()
{
Receive<ClusterMetricsChanged>(clusterMetrics =>
{
foreach (var nodeMetrics in clusterMetrics.NodeMetrics)
{
if (nodeMetrics.Address.Equals(_cluster.SelfAddress))
{
LogMemory(nodeMetrics);
LogCpu(nodeMetrics);
}
}
});
}
// Subscribe unto ClusterMetricsEvent events.
protected override void PreStart()
{
base.PreStart();
_metricsExtension.Subscribe(Self);
}
// Unsubscribe from ClusterMetricsEvent events.
protected override void PostStop()
{
base.PostStop();
_metricsExtension.Unsubscribe(Self);
}
private void LogMemory(NodeMetrics nodeMetrics)
{
Option<StandardMetrics.Memory> memory = StandardMetrics.ExtractMemory(nodeMetrics);
if (memory.HasValue)
_log.Info("Used memory: {0} Mb", memory.Value.Used / 1024 / 1024);
}
private void LogCpu(NodeMetrics nodeMetrics)
{
Option<StandardMetrics.Cpu> cpu = StandardMetrics.ExtractCpu(nodeMetrics);
if (cpu.HasValue)
_log.Info("Cpu load: {0}% ({1} processors)", cpu.Value.TotalUsage / 100, cpu.Value.ProcessorsNumber);
}
}
}
Custom Metrics Collector
Metrics collection is delegated to the implementation of Akka.Cluster.Metrics.IMetricsCollector
.
You can plug-in your own metrics collector instead of built-in Akka.Cluster.Metrics.Collectors.DefaultCollector
.
Custom metrics collector implementation class must be specified in the akka.cluster.metrics.collector.provider
configuration property.
Configuration
Starting Akka.Cluster.Metrics Automatically During ActorSystem Startup
You can automatically startup the Akka.Cluster.Metrics module by providing, at minimum, these settings:
akka {
extensions = ["Akka.Cluster.Metrics.ClusterMetricsExtensionProvider, Akka.Cluster.Metrics"]
actor.provider = "cluster"
cluster.metrics.collector {
provider = ["Akka.Cluster.Metrics.Collectors.DefaultCollector, Akka.Cluster.Metrics"]
}
}
These settings will auto-start Akka.Cluster.Metrics using the default memory and CPU metrics collectors.
Default Configurations
The Cluster metrics extension can be configured with the following properties:
##############################################
# Akka Cluster Metrics Reference Config File #
##############################################
# This is the reference config file that contains all the default settings.
# Make your edits in your application.conf in order to override these settings.
# Cluster metrics extension.
# Provides periodic statistics collection and publication throughout the cluster.
akka.cluster.metrics {
# Full path of dispatcher configuration key.
dispatcher = "akka.actor.default-dispatcher"
# How long should any actor wait before starting the periodic tasks.
periodic-tasks-initial-delay = 1s
# Metrics supervisor actor.
supervisor {
# Actor name. Example name space: /system/cluster-metrics
name = "cluster-metrics"
# Supervision strategy.
strategy {
# Fully-qualified class name of the class providing `akka.actor.SupervisorStrategy`.
# Must have a constructor with signature `<init>(com.typesafe.config.Config)`.
# Default metrics strategy provider is a configurable extension of `OneForOneStrategy`.
provider = "Akka.Cluster.Metrics.ClusterMetricsStrategy, Akka.Cluster.Metrics"
# Configuration of the default strategy provider.
# Replace with custom settings when overriding the provider.
configuration = {
# Log restart attempts.
loggingEnabled = true
# Child actor restart-on-failure window.
withinTimeRange = 3s
# Maximum number of restart attempts before child actor is stopped.
maxNrOfRetries = 3
}
}
}
# Metrics collector actor.
collector {
# Enable or disable metrics collector for load-balancing nodes.
# Metrics collection can also be controlled at runtime by sending control messages
# to /system/cluster-metrics actor: `akka.cluster.metrics.{CollectionStartMessage,CollectionStopMessage}`
enabled = on
# Fully-qualified class name of the metrics collector implementation.
# It must implement `akka.cluster.metrics.MetricsCollector` and
# have public constructor with akka.actor.ActorSystem parameter.
# Will try to load in the following order of priority:
# 1) configured custom collector 2) internal `SigarMetricsCollector` 3) internal `JmxMetricsCollector`
provider = ""
# Try all 3 available collector providers, or else fail on the configured custom collector provider.
fallback = true
# How often metrics are sampled on a node.
# Shorter interval will collect the metrics more often.
# Also controls frequency of the metrics publication to the node system event bus.
sample-interval = 3s
# How often a node publishes metrics information to the other nodes in the cluster.
# Shorter interval will publish the metrics gossip more often.
gossip-interval = 3s
# How quickly the exponential weighting of past data is decayed compared to
# new data. Set lower to increase the bias toward newer values.
# The relevance of each data sample is halved for every passing half-life
# duration, i.e. after 4 times the half-life, a data sample’s relevance is
# reduced to 6% of its original relevance. The initial relevance of a data
# sample is given by 1 – 0.5 ^ (collect-interval / half-life).
# See http://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
moving-average-half-life = 12s
}
}
# Cluster metrics extension serializers and routers.
akka.actor {
# Protobuf serializer for remote cluster metrics messages.
serializers {
akka-cluster-metrics = "Akka.Cluster.Metrics.Serialization.ClusterMetricsMessageSerializer, Akka.Cluster.Metrics"
}
# Interface binding for remote cluster metrics messages.
serialization-bindings {
"Akka.Cluster.Metrics.Serialization.MetricsGossipEnvelope, Akka.Cluster.Metrics" = akka-cluster-metrics
"Akka.Cluster.Metrics.AdaptiveLoadBalancingPool, Akka.Cluster.Metrics" = akka-cluster-metrics
"Akka.Cluster.Metrics.MixMetricsSelector, Akka.Cluster.Metrics" = akka-cluster-metrics
"Akka.Cluster.Metrics.CpuMetricsSelector, Akka.Cluster.Metrics" = akka-cluster-metrics
"Akka.Cluster.Metrics.MemoryMetricsSelector, Akka.Cluster.Metrics" = akka-cluster-metrics
}
# Globally unique metrics extension serializer identifier.
serialization-identifiers {
"Akka.Cluster.Metrics.Serialization.ClusterMetricsMessageSerializer, Akka.Cluster.Metrics" = 10
}
# Provide routing of messages based on cluster metrics.
router.type-mapping {
cluster-metrics-adaptive-pool = "Akka.Cluster.Metrics.AdaptiveLoadBalancingPool, Akka.Cluster.Metrics"
cluster-metrics-adaptive-group = "Akka.Cluster.Metrics.AdaptiveLoadBalancingGroup, Akka.Cluster.Metrics"
}
}