Writing A Custom Akka.Persistence Provider
For an introduction to event sourcing, you can read this article at MSDN.
Persistence or event source provider in Akka.NET are divided into two plugins, the journal and snapshot store plugins, each with their own API and responsibility. The goal of Akka.Persistence
event sourcing plugin is to provide a consistent event sourcing API that provides an abstraction layer over the underlying storage mechanism.
Implementing Akka.Persistence Journal
Akka.Persistence journal is responsible for storing all events for playback in the event of a recovery.
All of the code examples in this documentation will assume a SQLite database. You can view the complete sample project in /src/examples/Akka.Persistence.Custom
.
The Journal
table schemas we will be using are:
private const string CreateEventsJournalSql = @"
CREATE TABLE IF NOT EXISTS event_journal (
ordering INTEGER PRIMARY KEY NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
sequence_nr INTEGER(8) NOT NULL,
is_deleted INTEGER(1) NOT NULL,
manifest VARCHAR(255) NULL,
timestamp INTEGER NOT NULL,
payload BLOB NOT NULL,
serializer_id INTEGER(4),
UNIQUE (persistence_id, sequence_nr));";
private const string CreateMetaTableSql = @"
CREATE TABLE IF NOT EXISTS journal_metadata (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr INTEGER(8) NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr));";
Required Method Implementations
ReplayMessagesAsync
Task ReplayMessagesAsync(
IActorContext context,
string persistenceId,
long fromSequenceNr,
long toSequenceNr,
long max,
Action<IPersistentRepresentation> recoveryCallback)
context
: The contextual information about the actor processing the replayed messages.persistenceId
: Persistent actor identifierfromSequenceNr
: Inclusive sequence number where replay should starttoSequenceNr
: Inclusive sequence number where replay should endmax
: Maximum number of messages to be replayedrecoveryCallback
: Called to replay a message, may be called from any thread.
This call is NOT protected with a circuit-breaker.
This method should asynchronously replay persistent messages:
- Implementations replay a message by calling
recoveryCallback
. - The returned
Task
must be completed when all messages (matching the sequence number bounds) have been replayed. - The
Task
must be completed with a failure if any of the persistent messages could not be replayed.
The toSequenceNr
is the lowest of what was returned by ReadHighestSequenceNrAsync
and what the user specified as the recovery Recovery
parameter. This does imply that this call is always preceded by reading the highest sequence number for the given persistenceId
.
This call is NOT protected with a circuit-breaker because it may take a long time to replay all events. The plugin implementation itself must protect against an unresponsive backend store and make sure that the returned Task
is completed with success or failure within reasonable time. It is not allowed to ignore completing the Task
.
Code Sample
ByPersistenceIdSql
in this example refers to this SQL query statement:
private const string ByPersistenceIdSql = @"
SELECT e.persistence_id as PersistenceId,
e.sequence_nr as SequenceNr,
e.timestamp as Timestamp,
e.is_deleted as IsDeleted,
e.manifest as Manifest,
e.payload as Payload,
e.serializer_id as SerializerId
FROM event_journal e
WHERE e.persistence_id = @PersistenceId
AND e.sequence_nr BETWEEN @FromSequenceNr AND @ToSequenceNr
ORDER BY sequence_nr ASC;";
SQLite code example:
public sealed override async Task ReplayMessagesAsync(
IActorContext context,
string persistenceId,
long fromSequenceNr,
long toSequenceNr,
long max,
Action<IPersistentRepresentation> recoveryCallback)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
// Create new DbCommand instance
using (var command = GetCommand(connection, ByPersistenceIdSql, _timeout))
{
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
AddParameter(command, "@FromSequenceNr", DbType.Int64, fromSequenceNr);
AddParameter(command, "@ToSequenceNr", DbType.Int64, toSequenceNr);
// Create a DbDataReader to sequentially read the returned query result
using (var reader = await command.ExecuteReaderAsync(
behavior: CommandBehavior.SequentialAccess,
cancellationToken: cts.Token))
{
var i = 0L;
while (i++ < max && await reader.ReadAsync(cts.Token))
{
var id = reader.GetString(0);
var sequenceNr = reader.GetInt64(1);
var timestamp = reader.GetInt64(2);
var isDeleted = reader.GetBoolean(3);
var manifest = reader.GetString(4);
var payload = reader[5];
var serializerId = reader.GetInt32(6);
// Deserialize the persistent payload using the data we read from the reader
var deserialized = _serialization.Deserialize(
bytes: (byte[])payload,
serializerId: serializerId,
manifest: manifest);
// Call the recovery callback with the deserialized data from the database
recoveryCallback(new Persistent(
payload: deserialized,
sequenceNr: sequenceNr,
persistenceId: id,
manifest: manifest,
isDeleted: isDeleted,
sender: ActorRefs.NoSender,
writerGuid: null,
timestamp: timestamp));
}
command.Cancel();
}
}
}
}
ReadHighestSequenceNrAsync
Task<long> ReadHighestSequenceNrAsync(
string persistenceId,
long fromSequenceNr)
persistenceId
: Persistent actor identifierfromSequenceNr
: Hint where to start searching for the highest sequence number. When a persistent actor is recovering thisfromSequenceNr
will the sequence number of the used snapshot, or0L
if no snapshot is used.
This call is protected with a circuit-breaker.
This method should asynchronously reads the highest stored sequence number for provided persistenceId
. The persistent actor will use the highest sequence number after recovery as the starting point when persisting new events. This sequence number is also used as toSequenceNr
in subsequent calls to ReplayMessagesAsync
unless the user has specified a lower toSequenceNr
. Journal must maintain the highest sequence number and never decrease it.
Please also note that requests for the highest sequence number may be made concurrently to writes executing for the same persistenceId
, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed.
HighestSequenceNrSql
in this example refers to this SQL query statement:
private const string HighestSequenceNrSql = @"
SELECT MAX(u.SeqNr) as SequenceNr
FROM (
SELECT MAX(e.sequence_nr) as SeqNr FROM event_journal e
WHERE e.persistence_id = @PersistenceId
UNION
SELECT MAX(m.sequence_nr) as SeqNr FROM journal_metadata m
WHERE m.persistence_id = @PersistenceId) as u";
SQLite code example:
public sealed override async Task<long> ReadHighestSequenceNrAsync(
string persistenceId,
long fromSequenceNr)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
// Create new DbCommand instance
using (var command = GetCommand(connection, HighestSequenceNrSql, _timeout))
{
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
// Execute the SQL query statement
var result = await command.ExecuteScalarAsync(cts.Token);
// Return the result if one is returned by the query result, else return zero
return result is long ? Convert.ToInt64(result) : 0L;
}
}
}
WriteMessagesAsync
protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(
IEnumerable<AtomicWrite> messages)
Asynchronously writes a batch of persistent messages to the journal.
This call is protected with a circuit-breaker.
Calls to this method are serialized by the enclosing journal actor. If you spawn work in asynchronous tasks it is alright that they complete the futures in any order, but the actual writes for a specific persistenceId should be serialized to avoid issues such as events of a later write are visible to consumers (query side, or replay) before the events of an earlier write are visible. A PersistentActor
will not send a new WriteMessages
request before the previous one has been completed.
Please note that the IPersistentRepresentation.Sender
of the contained IPersistentRepresentation
objects has been set to null (i.e. set to ActorRefs.NoSender
) in order to save space in the journal and to prevent saving a sender reference that will likely be obsolete during replay.
Please also note that requests for the highest sequence number may be made concurrently to this call executing for the same persistenceId
, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed. In the latter case it is highly desirable to defer reading the highest sequence number until all outstanding writes have completed, otherwise the PersistentActor
may reuse sequence numbers.
Batching
The batch is only for performance reasons, i.e. all messages don't have to be written atomically. Higher throughput can typically be achieved by using batch inserts of many records compared to inserting records one-by-one, but this aspect depends on the underlying data store and a journal implementation can implement it as efficiently as possible.
Journals should aim to persist events in-order for a given persistenceId
as otherwise in case of a failure, the persistent state may end up being inconsistent.
AtomicWrite
Each AtomicWrite
message contains a Payload
property that contains:
- a single
IPersistentRepresentation
that corresponds to the event that was passed to theEventsourced.Persist<TEvent>(TEvent, Action<TEvent>)
method of thePersistentActor
, or - it contains several
IPersistentRepresentation
that correspond to the events that were passed to theEventsourced.PersistAll<TEvent>(IEnumerable<TEvent>, Action<TEvent>)
method of thePersistentActor
.
Note that while AtomicWrite.Payload
is declared as an object property, the type of this object instance will always be an ImmutableList<IPersistentRepresentation>
.
All Persistent
of the AtomicWrite
must be written to the data store atomically, i.e. all or none must be stored.
If the journal (data store) cannot support atomic writes of multiple events it should reject such writes with a NotSupportedException
describing the issue. This limitation should also be documented by the journal plugin.
Handling Failures
If there are failures when storing any of the messages in the batch the returned Task
must be completed with failure. The Task
must only be completed with success when all messages in the batch have been confirmed to be stored successfully, i.e. they will be readable, and visible, in a subsequent replay. If there is uncertainty about if the messages were stored or not the Task
must be completed with failure.
Data store connection problems must be signaled by completing the Task
with failure.
Handling AtomicWrite
Result
The journal can also signal that it rejects individual messages (AtomicWrite
) by the returned Task
.
- It is possible but not mandatory to reduce the number of allocations by returning null for the happy path, i.e. when no messages are rejected.
- Otherwise the returned list must have as many elements as the input
messages
. - Each result element signals if the corresponding
AtomicWrite
is rejected or not, with an exception describing the problem. - Rejecting a message means it was not stored, i.e. it must not be included in a later replay.
- Rejecting a message is typically done before attempting to store it, e.g. because of serialization error.
- Data store connection problems must not be signaled as rejections.
Code Sample
InsertEventSql
in this example refers to this SQL query statement:
private const string InsertEventSql = @"
INSERT INTO event_journal (
persistence_id,
sequence_nr,
timestamp,
is_deleted,
manifest,
payload,
serializer_id)
VALUES (
@PersistenceId,
@SequenceNr,
@Timestamp,
@IsDeleted,
@Manifest,
@Payload,
@SerializerId);";
SQLite code example:
protected sealed override async Task<IImmutableList<Exception>> WriteMessagesAsync(
IEnumerable<AtomicWrite> messages)
{
// For each of the atomic write request, create an async Task
var writeTasks = messages.Select(async message =>
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
// Create new DbCommand instance
using (var command = GetCommand(connection, InsertEventSql, _timeout))
// Create a new DbTransaction instance for this AtomicWrite
using (var tx = connection.BeginTransaction())
{
command.Transaction = tx;
// Cast the payload to IImmutableList<IPersistentRepresentation>.
// Note that AtomicWrite.Payload property is declared as an object property,
// but it is always populated with an IImmutableList<IPersistentRepresentation>
// instance
var payload = (IImmutableList<IPersistentRepresentation>)message.Payload;
var persistentMessages = payload.ToArray();
foreach (var @event in persistentMessages)
{
// Get the serializer associated with the payload type
var serializer = _serialization.FindSerializerForType(@event.Payload.GetType());
// This WithTransport method call is important, it allows for proper
// local IActorRef serialization by switching the serialization information
// context during the serialization process
var (binary, manifest) = Akka.Serialization.Serialization.WithTransport(
system: _serialization.System,
state: (@event.Payload, serializer),
action: state =>
{
var (thePayload, theSerializer) = state;
var thisManifest = "";
// There are two kinds of serializer when it comes to manifest
// support, we have to support both of them for proper payload
// serialization
if (theSerializer is SerializerWithStringManifest stringManifest)
{
thisManifest = stringManifest.Manifest(thePayload);
}
else if (theSerializer.IncludeManifest)
{
thisManifest = thePayload.GetType().TypeQualifiedName();
}
// Return the serialized byte array and the manifest for the
// serialized data
return (theSerializer.ToBinary(thePayload), thisManifest);
});
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, @event.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, @event.SequenceNr);
AddParameter(command, "@Timestamp", DbType.Int64, @event.Timestamp);
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
AddParameter(command, "@Manifest", DbType.String, manifest);
AddParameter(command, "@Payload", DbType.Binary, binary);
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
// Execute the SQL query
await command.ExecuteScalarAsync(cts.Token);
// clear the parameters and reuse the DbCommand instance
command.Parameters.Clear();
}
// Commit the DbTransaction
tx.Commit();
}
}
}).ToArray();
// Collect all exceptions raised for each failed writes and return them.
var result = await Task<IImmutableList<Exception>>.Factory
.ContinueWhenAll(writeTasks,
tasks => tasks.Select(t => t.IsFaulted
? TryUnwrapException(t.Exception)
: null).ToImmutableList());
return result;
}
DeleteMessagesToAsync
protected abstract Task DeleteMessagesToAsync(
string persistenceId,
long toSequenceNr)
persistenceId
: Persistent actor identifiertoSequenceNr
: Inclusive sequence number of the last event to be deleted
This call is protected with a circuit-breaker.
Asynchronously deletes all persistent messages up to inclusive toSequenceNr
bound. This operation has to be done atomically.
Code Sample
DeleteBatchSql
in this example refers to this SQL query statement:
private const string DeleteBatchSql = @"
DELETE FROM event_journal
WHERE persistence_id = @PersistenceId AND sequence_nr <= @ToSequenceNr;
DELETE FROM journal_metadata
WHERE persistence_id = @PersistenceId AND sequence_nr <= @ToSequenceNr;";
HighestSequenceNrSql
in this example refers to this SQL query statement:
private const string HighestSequenceNrSql = @"
SELECT MAX(u.SeqNr) as SequenceNr
FROM (
SELECT MAX(e.sequence_nr) as SeqNr FROM event_journal e
WHERE e.persistence_id = @PersistenceId
UNION
SELECT MAX(m.sequence_nr) as SeqNr FROM journal_metadata m
WHERE m.persistence_id = @PersistenceId) as u";
UpdateSequenceNrSql
in this example refers to this SQL query statement:
private const string UpdateSequenceNrSql = @"
INSERT INTO journal_metadata (persistence_id, sequence_nr)
VALUES (@PersistenceId, @SequenceNr);";
SQLite code example:
protected sealed override async Task DeleteMessagesToAsync(
string persistenceId,
long toSequenceNr)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
{
await connection.OpenAsync();
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
// We will be using two DbCommands to complete this process
using (var deleteCommand = GetCommand(connection, DeleteBatchSql, _timeout))
using (var highestSeqNrCommand =
GetCommand(connection, HighestSequenceNrSql, _timeout))
{
// Populate the SQL parameters
AddParameter(highestSeqNrCommand, "@PersistenceId", DbType.String,
persistenceId);
AddParameter(deleteCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(deleteCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);
// Create a DbTransaction instance
using (var tx = connection.BeginTransaction())
{
deleteCommand.Transaction = tx;
highestSeqNrCommand.Transaction = tx;
// Execute the HighestSequenceNrSql SQL statement and fetch the current
// highest sequence number for the given persistence identifier
var res = await highestSeqNrCommand.ExecuteScalarAsync(cts.Token);
var highestSeqNr = res is long ? Convert.ToInt64(res) : 0L;
// Delete all events up to toSequenceNr both in the journal and
// metadata table
await deleteCommand.ExecuteNonQueryAsync(cts.Token);
// Update the metadata table to reflect the new highest sequence number,
// if the toSequenceNr is higher than our current highest sequence number
if (highestSeqNr <= toSequenceNr)
{
// Create a new DbCommand instance
using (var updateCommand = GetCommand(connection, UpdateSequenceNrSql,
_timeout))
{
// This update is still part of the same transaction as everything
// else in this process
updateCommand.Transaction = tx;
// Populate the SQL parameters
AddParameter(
updateCommand, "@PersistenceId", DbType.String, persistenceId);
AddParameter(
updateCommand, "@SequenceNr", DbType.Int64, highestSeqNr);
// Execute the update SQL statement
await updateCommand.ExecuteNonQueryAsync(cts.Token);
tx.Commit();
}
}
else tx.Commit();
}
}
}
}
}
Detail Implementation
Reading HOCON Settings
There are some HOCON settings that are by default loaded by the journal base class and these can be overriden in your HOCON settings. The minimum HOCON settings that need to be defined for your custom plugin are:
akka.persistence.journal {
plugin = "akka.persistence.journal.custom-sqlite"
custom-sqlite {
# qualified type name of the SQLite persistence journal actor
class = "Akka.Persistence.Custom.Journal.SqliteJournal, Akka.Persistence.Custom"
}
}
The default HOCON settings are:
# Fallback settings for journal plugin configurations.
# These settings are used if they are not defined in plugin config section.
akka.persistence.journal-plugin-fallback {
# Fully qualified class name providing journal plugin api implementation.
# It is mandatory to specify this property.
# The class must have a constructor without parameters or constructor with
# one `Akka.Configuration.Config` parameter.
class = ""
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
# Default serializer used as manifest serializer when applicable
# and payload serializer when no specific binding overrides are specified
serializer = "json"
# Removed: used to be the Maximum size of a persistent message batch
# written to the journal.
# Now this setting is without function, PersistentActor will write
# as many messages as it has accumulated since the last write.
max-message-batch-size = 200
# If there is more time in between individual events gotten from the Journal
# recovery than this the recovery will fail.
# Note that it also affect reading the snapshot before replaying events on
# top of it, even though iti is configured for the journal.
recovery-event-timeout = 30s
circuit-breaker {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
}
# The replay filter can detect a corrupt event stream by inspecting
# sequence numbers and writerUuid when replaying events.
replay-filter {
# What the filter should do when detecting invalid events.
# Supported values:
# `repair-by-discard-old` : discard events from old writers,
# warning is logged
# `fail` : fail the replay, error is logged
# `warn` : log warning but emit events untouche
# `off` : disable this feature completely
mode = repair-by-discard-old
# It uses a look ahead buffer for analyzing the events.
# This defines the size (in number of events) of the buffer.
window-size = 100
# How many old writerUuid to remember
max-old-writers = 10
# Set this to `on` to enable detailed debug logging of each
# replayed event.
debug = off
}
}
Pre-Start Requirement
It is a good practice to allow user of your custom plugin to be able to auto initialize their back end event source from a blank slate. In order to do that, it is good practice to support auto-initialize
setting in your journal HOCON settings.
In order to support this, we will have to be able to stash all incoming messages while we're creating the tables in the event source. Here's an implementation that we use in our example code:
protected override void PreStart()
{
base.PreStart();
// Call the Initialize method and pipe the result back to signal that
// database schemas are ready to use, if it needs to be initialized
Initialize().PipeTo(Self);
// WaitingForInitialization receive handler will wait for a success/fail
// result back from the Initialize method
BecomeStacked(WaitingForInitialization);
}
protected override void PostStop()
{
base.PostStop();
// stop all operations executed in the background
_pendingRequestsCancellation.Cancel();
}
private bool WaitingForInitialization(object message)
{
switch (message)
{
// Tables are already created or successfully created all needed tables
case Status.Success _:
UnbecomeStacked();
// Unstash all messages received when we were initializing our tables
Stash.UnstashAll();
break;
case Status.Failure fail:
// Failed creating tables. Log an error and stop the actor.
_log.Error(fail.Cause, "Failure during {0} initialization.", Self);
Context.Stop(Self);
break;
default:
// By default, stash all received messages while we're waiting for the
// Initialize method.
Stash.Stash();
break;
}
return true;
}
private async Task<object> Initialize()
{
// No database initialization needed, the user explicitly asked us
// not to initialize anything.
if (!_settings.AutoInitialize)
return new Status.Success(NotUsed.Instance);
// Create SQLite journal tables
try
{
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
using (var command = GetCommand(connection, CreateEventsJournalSql, _timeout))
{
await command.ExecuteNonQueryAsync(cts.Token);
command.CommandText = CreateMetaTableSql;
await command.ExecuteNonQueryAsync(cts.Token);
}
}
}
catch (Exception e)
{
return new Status.Failure(e);
}
return new Status.Success(NotUsed.Instance);
}
Implementing Akka.Persistence SnapshotStore
Required Method Implementations
LoadAsync
Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
persistenceId
: Identification of the persistent actorcriteria
: Selection criteria for event loading
This call is protected with a circuit-breaker.
Asynchronously loads a snapshot.
Code Sample
SelectSnapshotSql
in this example refers to this SQL query statement:
private const string SelectSnapshotSql = @"
SELECT persistence_id,
sequence_nr,
timestamp,
manifest,
payload,
serializer_id
FROM snapshot
WHERE persistence_id = @PersistenceId
AND sequence_nr <= @SequenceNr
AND timestamp <= @Timestamp
ORDER BY sequence_nr DESC
LIMIT 1";
SQLite code example:
protected sealed override async Task<SelectedSnapshot> LoadAsync(
string persistenceId,
SnapshotSelectionCriteria criteria)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
// Create new DbCommand instance
using (var command = GetCommand(connection, SelectSnapshotSql, _timeout))
{
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, criteria.MaxSequenceNr);
AddParameter(command, "@Timestamp", DbType.DateTime2, criteria.MaxTimeStamp);
// Create a DbDataReader to sequentially read the returned query result
using (var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess,
cts.Token))
{
// Return null if no snapshot is found
if (!await reader.ReadAsync(cts.Token))
return null;
var id = reader.GetString(0);
var sequenceNr = reader.GetInt64(1);
var timestamp = reader.GetDateTime(2);
var metadata = new SnapshotMetadata(id, sequenceNr, timestamp);
var manifest = reader.GetString(3);
var binary = (byte[])reader[4];
var serializerId = reader.GetInt32(5);
// Deserialize the snapshot payload using the data we read from the reader
var snapshot = _serialization.Deserialize(binary, serializerId, manifest);
return new SelectedSnapshot(metadata, snapshot);
}
}
}
}
SaveAsync
Task SaveAsync(SnapshotMetadata metadata, object snapshot)
metadata
: Snapshot metadatasnapshot
: The snapshot object instance to be stored
This call is protected with a circuit-breaker
Asynchronously saves a snapshot.
Code Sample
InsertSnapshotSql
in this example refers to this SQL query statement:
private const string InsertSnapshotSql = @"
UPDATE snapshot
SET timestamp = @Timestamp,
manifest = @Manifest,
payload = @Payload,
serializer_id = @SerializerId
WHERE persistence_id = @PersistenceId AND sequence_nr = @SequenceNr;
INSERT OR IGNORE INTO snapshot
(persistence_id, sequence_nr, timestamp, manifest, payload, serializer_id)
VALUES (@PersistenceId, @SequenceNr, @Timestamp, @Manifest, @Payload, @SerializerId)";
SQLite code example:
protected sealed override async Task SaveAsync(
SnapshotMetadata metadata,
object snapshot)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
// Create new DbCommand instance
using (var command = GetCommand(connection, InsertSnapshotSql, _timeout))
// Create a new DbTransaction instance
using (var tx = connection.BeginTransaction())
{
command.Transaction = tx;
var snapshotType = snapshot.GetType();
// Get the serializer associated with the payload type
var serializer = _serialization.FindSerializerForType(objectType: snapshotType);
// This WithTransport method call is important, it allows for proper
// local IActorRef serialization by switching the serialization information
// context during the serialization process
var (binary, manifest) = Akka.Serialization.Serialization.WithTransport(
system: _serialization.System,
state: (snapshot, serializer),
action: state =>
{
var (thePayload, theSerializer) = state;
var thisManifest = "";
// There are two kinds of serializer when it comes to manifest
// support, we have to support both of them for proper payload
// serialization
if (theSerializer is SerializerWithStringManifest stringManifest)
{
thisManifest = stringManifest.Manifest(thePayload);
}
else if (theSerializer.IncludeManifest)
{
thisManifest = thePayload.GetType().TypeQualifiedName();
}
// Return the serialized byte array and the manifest for the
// serialized data
return (theSerializer.ToBinary(thePayload), thisManifest);
});
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, metadata.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, metadata.SequenceNr);
AddParameter(command, "@Timestamp", DbType.DateTime2, metadata.Timestamp);
AddParameter(command, "@Manifest", DbType.String, manifest);
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
AddParameter(command, "@Payload", DbType.Binary, binary);
// Execute the SQL query
await command.ExecuteNonQueryAsync(cts.Token);
// Commit the DbTransaction
tx.Commit();
}
}
}
DeleteAsync
Task DeleteAsync(SnapshotMetadata metadata);
metadata
: Snapshot metadata
This call is protected with a circuit-breaker
Deletes the snapshot identified by by the provided Metadata
Code Sample
DeleteSnapshotSql
in this example refers to this SQL query statement:
private const string DeleteSnapshotSql = @"
DELETE FROM snapshot
WHERE persistence_id = @PersistenceId
AND sequence_nr = @SequenceNr";
DeleteSnapshotRangeSql
in this example refers to this SQL query statement:
private const string DeleteSnapshotRangeSql = @"
DELETE FROM snapshot
WHERE persistence_id = @PersistenceId
AND sequence_nr <= @SequenceNr
AND timestamp <= @Timestamp";
SQLite code example:
protected sealed override async Task DeleteAsync(SnapshotMetadata metadata)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
var timestamp = metadata.Timestamp != DateTime.MinValue
? metadata.Timestamp
: (DateTime?)null;
var sql = timestamp.HasValue
? DeleteSnapshotRangeSql + " AND { Configuration.TimestampColumnName} = @Timestamp"
: DeleteSnapshotSql;
// Create new DbCommand instance
using (var command = GetCommand(connection, sql, _timeout))
// Create a new DbTransaction instance
using (var tx = connection.BeginTransaction())
{
command.Transaction = tx;
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, metadata.PersistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, metadata.SequenceNr);
if (timestamp.HasValue)
{
AddParameter(command, "@Timestamp", DbType.DateTime2, metadata.Timestamp);
}
// Execute the SQL query
await command.ExecuteNonQueryAsync(cts.Token);
// Commit the DbTransaction
tx.Commit();
}
}
}
DeleteAsync
Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria);
persistenceId
: Identification of the persistent actorcriteria
: Selection criteria for event deletion
This call is protected with a circuit-breaker
Deletes all snapshots matching the provided criteria
Code Sample
DeleteSnapshotRangeSql
in this example refers to this SQL query statement:
private const string DeleteSnapshotRangeSql = @"
DELETE FROM snapshot
WHERE persistence_id = @PersistenceId
AND sequence_nr <= @SequenceNr
AND timestamp <= @Timestamp";
SQLite code example:
protected sealed override async Task DeleteAsync(
string persistenceId,
SnapshotSelectionCriteria criteria)
{
// Create a new DbConnection instance
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
// Create new DbCommand instance
using (var command = GetCommand(connection, DeleteSnapshotRangeSql, _timeout))
// Create a new DbTransaction instance
using (var tx = connection.BeginTransaction())
{
command.Transaction = tx;
// Populate the SQL parameters
AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
AddParameter(command, "@SequenceNr", DbType.Int64, criteria.MaxSequenceNr);
AddParameter(command, "@Timestamp", DbType.DateTime2, criteria.MaxTimeStamp);
// Execute the SQL query
await command.ExecuteNonQueryAsync(cts.Token);
// Commit the DbTransaction
tx.Commit();
}
}
}
Detail Implementation
Reading HOCON Settings
There are some HOCON settings that are by default loaded by the snapshot store base class and these can be overriden in your HOCON settings. The minimum HOCON settings that need to be defined for your custom plugin are:
akka.persistence{
snapshot-store {
plugin = "akka.persistence.snapshot-store.custom-sqlite"
custom-sqlite {
# qualified type name of the SQLite persistence journal actor
class = "Akka.Persistence.Custom.Snapshot.SqliteSnapshotStore, Akka.Persistence.Custom"
}
}
}
The default HOCON settings are:
# Fallback settings for snapshot store plugin configurations
# These settings are used if they are not defined in plugin config section.
akka.persistence.snapshot-store-plugin-fallback {
# Fully qualified class name providing snapshot store plugin api
# implementation. It is mandatory to specify this property if
# snapshot store is enabled.
# The class must have a constructor without parameters or constructor with
# one `Akka.Configuration.Config` parameter.
class = ""
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
# Default serializer used as manifest serializer when applicable
# and payload serializer when no specific binding overrides are specified
serializer = "json"
circuit-breaker {
max-failures = 5
call-timeout = 20s
reset-timeout = 60s
}
}
Pre-Start Requirement
It is a good practice to allow user of your custom plugin to be able to auto initialize their back end event source from a blank slate. In order to do that, it is good practice to support auto-initialize
setting in your journal HOCON settings.
In order to support this, we will have to be able to stash all incoming messages while we're creating the tables in the event source. Here's an implementation that we use in our example code:
protected override void PreStart()
{
base.PreStart();
if (_settings.AutoInitialize)
{
// Call the Initialize method and pipe the result back to signal that
// database schemas are ready to use, if it needs to be initialized
Initialize().PipeTo(Self);
// WaitingForInitialization receive handler will wait for a success/fail
// result back from the Initialize method
BecomeStacked(WaitingForInitialization);
}
}
protected override void PostStop()
{
base.PostStop();
// stop all operations executed in the background
_pendingRequestsCancellation.Cancel();
}
private async Task<object> Initialize()
{
// No database initialization needed, the user explicitly asked us not to initialize anything.
if (!_settings.AutoInitialize)
return new Status.Success(NotUsed.Instance);
// Create SQLite journal tables
try
{
using (var connection = new SqliteConnection(_connectionString))
using (var cts = CancellationTokenSource
.CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
{
await connection.OpenAsync(cts.Token);
using (var command = GetCommand(connection, CreateSnapshotTableSql, _timeout))
using (var tx = connection.BeginTransaction())
{
command.Transaction = tx;
await command.ExecuteNonQueryAsync(cts.Token);
tx.Commit();
}
return Status.Success.Instance;
}
}
catch (Exception e)
{
return new Status.Failure(e);
}
}
private bool WaitingForInitialization(object message)
{
switch(message)
{
// Tables are already created or successfully created all needed tables
case Status.Success _:
UnbecomeStacked();
// Unstash all messages received when we were initializing our tables
Stash.UnstashAll();
return true;
case Status.Failure msg:
// Failed creating tables. Log an error and stop the actor.
_log.Error(msg.Cause, "Error during snapshot store initialization");
Context.Stop(Self);
return true;
default:
// By default, stash all received messages while we're waiting for the
// Initialize method.
Stash.Stash();
return true;
}
}
Tying Everything Together Into An Akka.NET Extension
The last thing you need to do to make the new custom persistence provider to work is to create an Akka.NET extension that wraps the journal and snapshot storage so that it can be seamlessly used from inside Akka.NET.
In order to do this, you will need to extend two things, IExtension
and ExtensionIdProvider<T>
.
Extending IExtension
IExtension
is a marker interface that marks a class as an Akka.NET extension. It is the class instance that all user will use to interact with your custom extension.
There are two conventions that needs to be implemented when you extend IExtension
:
- Any class extending this interface is required to provide a single constructor that takes a single
ExtendedActorSystem
as its argument.
public SqlitePersistence(ExtendedActorSystem system)
{
var defaultConfig = DefaultConfiguration();
system.Settings.InjectTopLevelFallback(defaultConfig);
JournalConfig = system.Settings.Config.GetConfig(JournalConfigPath);
DefaultJournalConfig = defaultConfig.GetConfig(JournalConfigPath);
SnapshotConfig = system.Settings.Config.GetConfig(SnapshotConfigPath);
DefaultSnapshotConfig = defaultConfig.GetConfig(SnapshotConfigPath);
}
- It is strongly recommended to create a static
Get
method that returns an instance of this class. This method is responsible for registering the extension with the Akka.NET extension manager and instantiates a new instance for users to use.
public static SqlitePersistence Get(ActorSystem system)
{
return system.WithExtension<SqlitePersistence, SqlitePersistenceProvider>();
}
Extending ExtensionIdProvider<T>
ExtensionIdProvider<T>
is a factory class that is responsible for creating new instances for your extension. This class is quite straight forward, all you need to do is to override a single method to make it work:
public class SqlitePersistenceProvider : ExtensionIdProvider<SqlitePersistence>
{
public override SqlitePersistence CreateExtension(ExtendedActorSystem system)
{
return new SqlitePersistence(system);
}
}
Unit Testing Journal and SnapshotStore
Akka.Persistence came with a standardized Technology Compatibility Kit (TCK) test kit that can be readily incorporated into your unit testing suite to test that a custom provider adheres to a basic compatibility requirement.
To do this, you will need to create an XUnit unit test project and reference your custom persistence project and the Akka.Persistence.TCK
package. There are four abstract classes that can be implemented to test your persistence implementation:
- JournalSerializationSpec
- JournalSpec
- SnapshotStoreSerializationSpec
- SnapshotStoreSpec
For an implementation sample, please see the /src/examples/Akka.Persistence.Custom.Tests
example project.