Search Results for

    Show / Hide Table of Contents

    Writing A Custom Akka.Persistence Provider

    For an introduction to event sourcing, you can read this article at MSDN.

    Persistence or event source provider in Akka.NET are divided into two plugins, the journal and snapshot store plugins, each with their own API and responsibility. The goal of Akka.Persistence event sourcing plugin is to provide a consistent event sourcing API that provides an abstraction layer over the underlying storage mechanism.

    Implementing Akka.Persistence Journal

    Akka.Persistence journal is responsible for storing all events for playback in the event of a recovery.

    All of the code examples in this documentation will assume a SQLite database. You can view the complete sample project in /src/examples/Akka.Persistence.Custom.

    The Journal table schemas we will be using are:

    private const string CreateEventsJournalSql = @"
        CREATE TABLE IF NOT EXISTS event_journal (
            ordering INTEGER PRIMARY KEY NOT NULL,
            persistence_id VARCHAR(255) NOT NULL,
            sequence_nr INTEGER(8) NOT NULL,
            is_deleted INTEGER(1) NOT NULL,
            manifest VARCHAR(255) NULL,
            timestamp INTEGER NOT NULL,
            payload BLOB NOT NULL,
            serializer_id INTEGER(4),
            UNIQUE (persistence_id, sequence_nr));";
    
    private const string CreateMetaTableSql = @"
        CREATE TABLE IF NOT EXISTS journal_metadata (
            persistence_id VARCHAR(255) NOT NULL,
            sequence_nr INTEGER(8) NOT NULL,
            PRIMARY KEY (persistence_id, sequence_nr));";
    

    Required Method Implementations

    ReplayMessagesAsync

    Task ReplayMessagesAsync(
        IActorContext context, 
        string persistenceId, 
        long fromSequenceNr, 
        long toSequenceNr, 
        long max, 
        Action<IPersistentRepresentation> recoveryCallback)
    
    • context: The contextual information about the actor processing the replayed messages.
    • persistenceId: Persistent actor identifier
    • fromSequenceNr: Inclusive sequence number where replay should start
    • toSequenceNr: Inclusive sequence number where replay should end
    • max: Maximum number of messages to be replayed
    • recoveryCallback: Called to replay a message, may be called from any thread.

    This call is NOT protected with a circuit-breaker.

    This method should asynchronously replay persistent messages:

    • Implementations replay a message by calling recoveryCallback.
    • The returned Task must be completed when all messages (matching the sequence number bounds) have been replayed.
    • The Task must be completed with a failure if any of the persistent messages could not be replayed.

    The toSequenceNr is the lowest of what was returned by ReadHighestSequenceNrAsync and what the user specified as the recovery Recovery parameter. This does imply that this call is always preceded by reading the highest sequence number for the given persistenceId.

    This call is NOT protected with a circuit-breaker because it may take a long time to replay all events. The plugin implementation itself must protect against an unresponsive backend store and make sure that the returned Task is completed with success or failure within reasonable time. It is not allowed to ignore completing the Task.

    Code Sample

    ByPersistenceIdSql in this example refers to this SQL query statement:

    private const string ByPersistenceIdSql = @"
        SELECT e.persistence_id as PersistenceId,
                e.sequence_nr as SequenceNr,
                e.timestamp as Timestamp,
                e.is_deleted as IsDeleted,
                e.manifest as Manifest,
                e.payload as Payload,
                e.serializer_id as SerializerId
            FROM event_journal e
            WHERE e.persistence_id = @PersistenceId
            AND e.sequence_nr BETWEEN @FromSequenceNr AND @ToSequenceNr
            ORDER BY sequence_nr ASC;";
    

    SQLite code example:

    public sealed override async Task ReplayMessagesAsync(
        IActorContext context,
        string persistenceId,
        long fromSequenceNr,
        long toSequenceNr,
        long max,
        Action<IPersistentRepresentation> recoveryCallback)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        using (var cts = CancellationTokenSource
                   .CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
        {
            await connection.OpenAsync(cts.Token);
            
            // Create new DbCommand instance
            using (var command = GetCommand(connection, ByPersistenceIdSql, _timeout))
            {
                // Populate the SQL parameters
                AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
                AddParameter(command, "@FromSequenceNr", DbType.Int64, fromSequenceNr);
                AddParameter(command, "@ToSequenceNr", DbType.Int64, toSequenceNr);
                
                // Create a DbDataReader to sequentially read the returned query result
                using (var reader = await command.ExecuteReaderAsync(
                           behavior: CommandBehavior.SequentialAccess, 
                           cancellationToken: cts.Token))
                {
                    var i = 0L;
                    while (i++ < max && await reader.ReadAsync(cts.Token))
                    {
                        var id = reader.GetString(0);
                        var sequenceNr = reader.GetInt64(1);
                        var timestamp = reader.GetInt64(2);
                        var isDeleted = reader.GetBoolean(3);
                        var manifest = reader.GetString(4);
                        var payload = reader[5];
                        var serializerId = reader.GetInt32(6);
                        
                        // Deserialize the persistent payload using the data we read from the reader
                        var deserialized = _serialization.Deserialize(
                            bytes: (byte[])payload, 
                            serializerId: serializerId, 
                            manifest: manifest);
                        
                        // Call the recovery callback with the deserialized data from the database
                        recoveryCallback(new Persistent(
                            payload: deserialized, 
                            sequenceNr: sequenceNr,
                            persistenceId: id,
                            manifest: manifest, 
                            isDeleted: isDeleted, 
                            sender: ActorRefs.NoSender, 
                            writerGuid: null, 
                            timestamp: timestamp));
                    }
                    command.Cancel();
                }
            }
        }
    }
    

    ReadHighestSequenceNrAsync

    Task<long> ReadHighestSequenceNrAsync(
        string persistenceId, 
        long fromSequenceNr)
    
    • persistenceId: Persistent actor identifier
    • fromSequenceNr: Hint where to start searching for the highest sequence number. When a persistent actor is recovering this fromSequenceNr will the sequence number of the used snapshot, or 0L if no snapshot is used.

    This call is protected with a circuit-breaker.

    This method should asynchronously reads the highest stored sequence number for provided persistenceId. The persistent actor will use the highest sequence number after recovery as the starting point when persisting new events. This sequence number is also used as toSequenceNr in subsequent calls to ReplayMessagesAsync unless the user has specified a lower toSequenceNr. Journal must maintain the highest sequence number and never decrease it.

    Please also note that requests for the highest sequence number may be made concurrently to writes executing for the same persistenceId, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed.

    HighestSequenceNrSql in this example refers to this SQL query statement:

    private const string HighestSequenceNrSql = @"
        SELECT MAX(u.SeqNr) as SequenceNr 
            FROM (
                SELECT MAX(e.sequence_nr) as SeqNr FROM event_journal e 
                    WHERE e.persistence_id = @PersistenceId
                UNION
                SELECT MAX(m.sequence_nr) as SeqNr FROM journal_metadata m 
                    WHERE m.persistence_id = @PersistenceId) as u";
    

    SQLite code example:

    public sealed override async Task<long> ReadHighestSequenceNrAsync(
        string persistenceId,
        long fromSequenceNr,
        CancellationToken cancellationToken)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        using (var cts = CancellationTokenSource
                   .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))
        {
            await connection.OpenAsync(cts.Token);
            // Create new DbCommand instance
            using (var command = GetCommand(connection, HighestSequenceNrSql, _timeout))
            {
                // Populate the SQL parameters
                AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
    
                // Execute the SQL query statement
                var result = await command.ExecuteScalarAsync(cts.Token);
    
                // Return the result if one is returned by the query result, else return zero
                return result is long ? Convert.ToInt64(result) : 0L;
            }
        }
    }
    

    WriteMessagesAsync

    protected abstract Task<IImmutableList<Exception>> WriteMessagesAsync(
        IEnumerable<AtomicWrite> messages)
    

    Asynchronously writes a batch of persistent messages to the journal.

    This call is protected with a circuit-breaker.

    Calls to this method are serialized by the enclosing journal actor. If you spawn work in asynchronous tasks it is alright that they complete the futures in any order, but the actual writes for a specific persistenceId should be serialized to avoid issues such as events of a later write are visible to consumers (query side, or replay) before the events of an earlier write are visible. A PersistentActor will not send a new WriteMessages request before the previous one has been completed.

    Please note that the IPersistentRepresentation.Sender of the contained IPersistentRepresentation objects has been set to null (i.e. set to ActorRefs.NoSender) in order to save space in the journal and to prevent saving a sender reference that will likely be obsolete during replay.

    Please also note that requests for the highest sequence number may be made concurrently to this call executing for the same persistenceId, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed. In the latter case it is highly desirable to defer reading the highest sequence number until all outstanding writes have completed, otherwise the PersistentActor may reuse sequence numbers.

    Batching

    The batch is only for performance reasons, i.e. all messages don't have to be written atomically. Higher throughput can typically be achieved by using batch inserts of many records compared to inserting records one-by-one, but this aspect depends on the underlying data store and a journal implementation can implement it as efficiently as possible.

    Journals should aim to persist events in-order for a given persistenceId as otherwise in case of a failure, the persistent state may end up being inconsistent.

    AtomicWrite

    Each AtomicWrite message contains a Payload property that contains:

    • a single IPersistentRepresentation that corresponds to the event that was passed to the Eventsourced.Persist<TEvent>(TEvent, Action<TEvent>) method of the PersistentActor, or
    • it contains several IPersistentRepresentation that correspond to the events that were passed to the Eventsourced.PersistAll<TEvent>(IEnumerable<TEvent>, Action<TEvent>) method of the PersistentActor.

    Note that while AtomicWrite.Payload is declared as an object property, the type of this object instance will always be an ImmutableList<IPersistentRepresentation>.

    All Persistent of the AtomicWrite must be written to the data store atomically, i.e. all or none must be stored.

    If the journal (data store) cannot support atomic writes of multiple events it should reject such writes with a NotSupportedException describing the issue. This limitation should also be documented by the journal plugin.

    Handling Failures

    If there are failures when storing any of the messages in the batch the returned Task must be completed with failure. The Task must only be completed with success when all messages in the batch have been confirmed to be stored successfully, i.e. they will be readable, and visible, in a subsequent replay. If there is uncertainty about if the messages were stored or not the Task must be completed with failure.

    Data store connection problems must be signaled by completing the Task with failure.

    Handling AtomicWrite Result

    The journal can also signal that it rejects individual messages (AtomicWrite) by the returned Task.

    • It is possible but not mandatory to reduce the number of allocations by returning null for the happy path, i.e. when no messages are rejected.
    • Otherwise the returned list must have as many elements as the input messages.
    • Each result element signals if the corresponding AtomicWrite is rejected or not, with an exception describing the problem.
    • Rejecting a message means it was not stored, i.e. it must not be included in a later replay.
    • Rejecting a message is typically done before attempting to store it, e.g. because of serialization error.
    • Data store connection problems must not be signaled as rejections.
    Code Sample

    InsertEventSql in this example refers to this SQL query statement:

    private const string InsertEventSql = @"
        INSERT INTO event_journal (
            persistence_id,
            sequence_nr,
            timestamp,
            is_deleted,
            manifest,
            payload,
            serializer_id)
        VALUES (
            @PersistenceId,
            @SequenceNr,
            @Timestamp,
            @IsDeleted,
            @Manifest,
            @Payload,
            @SerializerId);";
    

    SQLite code example:

    protected sealed override async Task<IImmutableList<Exception>> WriteMessagesAsync(
        IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
    {
        // For each of the atomic write request, create an async Task 
        var writeTasks = messages.Select(async message =>
        {
            // Create a new DbConnection instance
            using (var connection = new SqliteConnection(_connectionString))
            using (var cts = CancellationTokenSource
                       .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))
            {
                await connection.OpenAsync(cts.Token);
                
                // Create new DbCommand instance
                using (var command = GetCommand(connection, InsertEventSql, _timeout))
                // Create a new DbTransaction instance for this AtomicWrite
                using (var tx = connection.BeginTransaction())
                {
                    command.Transaction = tx;
                    
                    // Cast the payload to IImmutableList<IPersistentRepresentation>.
                    // Note that AtomicWrite.Payload property is declared as an object property,
                    // but it is always populated with an IImmutableList<IPersistentRepresentation>
                    // instance
                    var payload = (IImmutableList<IPersistentRepresentation>)message.Payload;
                    var persistentMessages = payload.ToArray();
                    foreach (var @event in persistentMessages)
                    {
                        // Get the serializer associated with the payload type
                        var serializer = _serialization.FindSerializerForType(@event.Payload.GetType());
                        
                        // This WithTransport method call is important, it allows for proper
                        // local IActorRef serialization by switching the serialization information
                        // context during the serialization process
                        var (binary, manifest) = Akka.Serialization.Serialization.WithTransport(
                            system: _serialization.System, 
                            state: (@event.Payload, serializer), 
                            action: state =>
                            {
                                var (thePayload, theSerializer) = state;
                                var thisManifest = "";
                                
                                // There are two kinds of serializer when it comes to manifest
                                // support, we have to support both of them for proper payload
                                // serialization
                                if (theSerializer is SerializerWithStringManifest stringManifest)
                                {
                                    thisManifest = stringManifest.Manifest(thePayload);
                                }
                                else if (theSerializer.IncludeManifest)
                                {
                                    thisManifest = thePayload.GetType().TypeQualifiedName();
                                }
                                
                                // Return the serialized byte array and the manifest for the
                                // serialized data
                                return (theSerializer.ToBinary(thePayload), thisManifest);
                            });
                        
                        // Populate the SQL parameters
                        AddParameter(command, "@PersistenceId", DbType.String, @event.PersistenceId);
                        AddParameter(command, "@SequenceNr", DbType.Int64, @event.SequenceNr);
                        AddParameter(command, "@Timestamp", DbType.Int64, @event.Timestamp);
                        AddParameter(command, "@IsDeleted", DbType.Boolean, false);
                        AddParameter(command, "@Manifest", DbType.String, manifest);
                        AddParameter(command, "@Payload", DbType.Binary, binary);
                        AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
                        
                        // Execute the SQL query
                        await command.ExecuteScalarAsync(cts.Token);
                        
                        // clear the parameters and reuse the DbCommand instance
                        command.Parameters.Clear();
                    }
                    // Commit the DbTransaction
                    tx.Commit();
                }
            
            }
        }).ToArray();
    
        // Collect all exceptions raised for each failed writes and return them.
        var result = await Task<IImmutableList<Exception>>.Factory
            .ContinueWhenAll(writeTasks,
                tasks => tasks.Select(t => t.IsFaulted 
                    ? TryUnwrapException(t.Exception) 
                    : null).ToImmutableList(), cancellationToken);
    
        return result;
    }
    

    DeleteMessagesToAsync

    protected abstract Task DeleteMessagesToAsync(
        string persistenceId, 
        long toSequenceNr)
    
    • persistenceId: Persistent actor identifier
    • toSequenceNr: Inclusive sequence number of the last event to be deleted

    This call is protected with a circuit-breaker.

    Asynchronously deletes all persistent messages up to inclusive toSequenceNr bound. This operation has to be done atomically.

    Code Sample

    DeleteBatchSql in this example refers to this SQL query statement:

    private const string DeleteBatchSql = @"
        DELETE FROM event_journal
            WHERE persistence_id = @PersistenceId AND sequence_nr <= @ToSequenceNr;
        DELETE FROM journal_metadata
            WHERE persistence_id = @PersistenceId AND sequence_nr <= @ToSequenceNr;";
    

    HighestSequenceNrSql in this example refers to this SQL query statement:

    private const string HighestSequenceNrSql = @"
        SELECT MAX(u.SeqNr) as SequenceNr 
            FROM (
                SELECT MAX(e.sequence_nr) as SeqNr FROM event_journal e 
                    WHERE e.persistence_id = @PersistenceId
                UNION
                SELECT MAX(m.sequence_nr) as SeqNr FROM journal_metadata m 
                    WHERE m.persistence_id = @PersistenceId) as u";
    

    UpdateSequenceNrSql in this example refers to this SQL query statement:

    private const string UpdateSequenceNrSql = @"
        INSERT INTO journal_metadata (persistence_id, sequence_nr)
        VALUES (@PersistenceId, @SequenceNr);";
    

    SQLite code example:

    protected sealed override async Task DeleteMessagesToAsync(
        string persistenceId,
        long toSequenceNr,
        CancellationToken cancellationToken)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        {
            await connection.OpenAsync(cancellationToken);
            
            using (var cts = CancellationTokenSource
                       .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))
            {
                // We will be using two DbCommands to complete this process
                using (var deleteCommand = GetCommand(connection, DeleteBatchSql, _timeout))
                using (var highestSeqNrCommand = 
                       GetCommand(connection, HighestSequenceNrSql, _timeout))
                {
                    // Populate the SQL parameters
                    AddParameter(highestSeqNrCommand, "@PersistenceId", DbType.String, 
                        persistenceId);
    
                    AddParameter(deleteCommand, "@PersistenceId", DbType.String, persistenceId);
                    AddParameter(deleteCommand, "@ToSequenceNr", DbType.Int64, toSequenceNr);
    
                    // Create a DbTransaction instance
                    using (var tx = connection.BeginTransaction())
                    {
                        deleteCommand.Transaction = tx;
                        highestSeqNrCommand.Transaction = tx;
    
                        // Execute the HighestSequenceNrSql SQL statement and fetch the current
                        // highest sequence number for the given persistence identifier
                        var res = await highestSeqNrCommand.ExecuteScalarAsync(cts.Token);
                        var highestSeqNr = res is long ? Convert.ToInt64(res) : 0L;
    
                        // Delete all events up to toSequenceNr both in the journal and
                        // metadata table
                        await deleteCommand.ExecuteNonQueryAsync(cts.Token);
    
                        // Update the metadata table to reflect the new highest sequence number,
                        // if the toSequenceNr is higher than our current highest sequence number
                        if (highestSeqNr <= toSequenceNr)
                        {
                            // Create a new DbCommand instance
                            using (var updateCommand = GetCommand(connection, UpdateSequenceNrSql, 
                                       _timeout))
                            {
                                // This update is still part of the same transaction as everything
                                // else in this process
                                updateCommand.Transaction = tx;
    
                                // Populate the SQL parameters
                                AddParameter(
                                    updateCommand, "@PersistenceId", DbType.String, persistenceId);
                                AddParameter(
                                    updateCommand, "@SequenceNr", DbType.Int64, highestSeqNr);
    
                                // Execute the update SQL statement
                                await updateCommand.ExecuteNonQueryAsync(cts.Token);
                                tx.Commit();
                            }
                        }
                        else tx.Commit();
                    }
                }
            }
        }
    }
    

    Detail Implementation

    Reading HOCON Settings

    There are some HOCON settings that are by default loaded by the journal base class and these can be overriden in your HOCON settings. The minimum HOCON settings that need to be defined for your custom plugin are:

    akka.persistence.journal {
      plugin = "akka.persistence.journal.custom-sqlite"
      custom-sqlite {
        # qualified type name of the SQLite persistence journal actor
        class = "Akka.Persistence.Custom.Journal.SqliteJournal, Akka.Persistence.Custom"
      }
    }
    

    The default HOCON settings are:

    # Fallback settings for journal plugin configurations.
    # These settings are used if they are not defined in plugin config section.
    akka.persistence.journal-plugin-fallback {
      # Fully qualified class name providing journal plugin api implementation.
      # It is mandatory to specify this property.
      # The class must have a constructor without parameters or constructor with
      # one `Akka.Configuration.Config` parameter.
      class = ""
    
      # Dispatcher for the plugin actor.
      plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
    
      # Dispatcher for message replay.
      replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
    
      # journal supervisor strategy used. 
      # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
      # by default it restarts the journal on crash
      supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"
    
      # Default serializer used as manifest serializer when applicable 
      # and payload serializer when no specific binding overrides are specified
      serializer = "json"
    
      # Removed: used to be the Maximum size of a persistent message batch 
      # written to the journal.
      # Now this setting is without function, PersistentActor will write 
      # as many messages as it has accumulated since the last write.
      max-message-batch-size = 200
    
      # If there is more time in between individual events gotten from the Journal
      # recovery than this the recovery will fail.
      # Note that it also affect reading the snapshot before replaying events on
      # top of it, even though iti is configured for the journal.
      recovery-event-timeout = 30s
    
      circuit-breaker {
        max-failures = 10
        call-timeout = 10s
        reset-timeout = 30s
      }
    
      # The replay filter can detect a corrupt event stream by inspecting
      # sequence numbers and writerUuid when replaying events.
      replay-filter {
        # What the filter should do when detecting invalid events.
        # Supported values:
        # `repair-by-discard-old` : discard events from old writers,
        #                           warning is logged
        # `fail` : fail the replay, error is logged
        # `warn` : log warning but emit events untouche
        # `off` : disable this feature completely
        mode = repair-by-discard-old
    
        # It uses a look ahead buffer for analyzing the events.
        # This defines the size (in number of events) of the buffer.
        window-size = 100
    
        # How many old writerUuid to remember
        max-old-writers = 10
    
        # Set this to `on` to enable detailed debug logging of each
        # replayed event.
        debug = off
      }
    }
    

    Pre-Start Requirement

    It is a good practice to allow user of your custom plugin to be able to auto initialize their back end event source from a blank slate. In order to do that, it is good practice to support auto-initialize setting in your journal HOCON settings.

    In order to support this, we will have to be able to stash all incoming messages while we're creating the tables in the event source. Here's an implementation that we use in our example code:

    protected override void PreStart()
    {
        base.PreStart();
        
        // Call the Initialize method and pipe the result back to signal that
        // database schemas are ready to use, if it needs to be initialized
        Initialize().PipeTo(Self);
        
        // WaitingForInitialization receive handler will wait for a success/fail
        // result back from the Initialize method
        BecomeStacked(WaitingForInitialization);
    }
    
    protected override void PostStop()
    {
        base.PostStop();
    
        // stop all operations executed in the background
        _pendingRequestsCancellation.Cancel();
    }
    
    private bool WaitingForInitialization(object message)
    {
        switch (message)
        {
            // Tables are already created or successfully created all needed tables
            case Status.Success _:
                UnbecomeStacked();
                // Unstash all messages received when we were initializing our tables
                Stash.UnstashAll();
                break;
            
            case Status.Failure fail:
                // Failed creating tables. Log an error and stop the actor.
                _log.Error(fail.Cause, "Failure during {0} initialization.", Self);
                Context.Stop(Self);
                break;
            
            default:
                // By default, stash all received messages while we're waiting for the
                // Initialize method.
                Stash.Stash();
                break;
        }
        return true;
    }
    
    private async Task<object> Initialize()
    {
        // No database initialization needed, the user explicitly asked us
        // not to initialize anything.
        if (!_settings.AutoInitialize) 
            return new Status.Success(NotUsed.Instance);
    
        // Create SQLite journal tables 
        try
        {
            using (var connection = new SqliteConnection(_connectionString))
            using (var cts = CancellationTokenSource
                       .CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
            {
                await connection.OpenAsync(cts.Token);
                using (var command = GetCommand(connection, CreateEventsJournalSql, _timeout))
                {
                    await command.ExecuteNonQueryAsync(cts.Token);
                    command.CommandText = CreateMetaTableSql;
                    await command.ExecuteNonQueryAsync(cts.Token);
                }
            }
        }
        catch (Exception e)
        {
            return new Status.Failure(e);
        }
        
        return new Status.Success(NotUsed.Instance);
    }
    

    Implementing Akka.Persistence SnapshotStore

    Required Method Implementations

    LoadAsync

    Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
    
    • persistenceId: Identification of the persistent actor
    • criteria: Selection criteria for event loading

    This call is protected with a circuit-breaker.

    Asynchronously loads a snapshot.

    Code Sample

    SelectSnapshotSql in this example refers to this SQL query statement:

    private const string SelectSnapshotSql = @"
        SELECT persistence_id,
            sequence_nr, 
            timestamp,
            manifest, 
            payload,
            serializer_id
        FROM snapshot 
        WHERE persistence_id = @PersistenceId 
            AND sequence_nr <= @SequenceNr
            AND timestamp <= @Timestamp
        ORDER BY sequence_nr DESC
        LIMIT 1";
    

    SQLite code example:

    protected sealed override async Task<SelectedSnapshot> LoadAsync(
        string persistenceId,
        SnapshotSelectionCriteria criteria, 
        CancellationToken cancellationToken)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        using (var cts = CancellationTokenSource
                   .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))
        {
            await connection.OpenAsync(cts.Token);
            
            // Create new DbCommand instance
            using (var command = GetCommand(connection, SelectSnapshotSql, _timeout))
            {
                // Populate the SQL parameters
                AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
                AddParameter(command, "@SequenceNr", DbType.Int64, criteria.MaxSequenceNr);
                AddParameter(command, "@Timestamp", DbType.DateTime2, criteria.MaxTimeStamp);
    
                // Create a DbDataReader to sequentially read the returned query result
                using (var reader = await command.ExecuteReaderAsync(
                           CommandBehavior.SequentialAccess,
                           cts.Token))
                {
                    // Return null if no snapshot is found
                    if (!await reader.ReadAsync(cts.Token)) 
                        return null;
                    
                    var id = reader.GetString(0);
                    var sequenceNr = reader.GetInt64(1);
                    var timestamp = reader.GetDateTime(2);
    
                    var metadata = new SnapshotMetadata(id, sequenceNr, timestamp);
                        
                    var manifest = reader.GetString(3);
                    var binary = (byte[])reader[4];
                    
                    var serializerId = reader.GetInt32(5);
                    
                    // Deserialize the snapshot payload using the data we read from the reader
                    var snapshot = _serialization.Deserialize(binary, serializerId, manifest);
    
                    return new SelectedSnapshot(metadata, snapshot);
                }
            }
        }
    }
    

    SaveAsync

    Task SaveAsync(SnapshotMetadata metadata, object snapshot)
    
    • metadata: Snapshot metadata
    • snapshot: The snapshot object instance to be stored

    This call is protected with a circuit-breaker

    Asynchronously saves a snapshot.

    Code Sample

    InsertSnapshotSql in this example refers to this SQL query statement:

    private const string InsertSnapshotSql = @"
            UPDATE snapshot
            SET timestamp = @Timestamp, 
                manifest = @Manifest,
                payload = @Payload, 
                serializer_id = @SerializerId
            WHERE persistence_id = @PersistenceId AND sequence_nr = @SequenceNr;
    
            INSERT OR IGNORE INTO snapshot 
                (persistence_id, sequence_nr, timestamp, manifest, payload, serializer_id)
            VALUES (@PersistenceId, @SequenceNr, @Timestamp, @Manifest, @Payload, @SerializerId)";
    

    SQLite code example:

    protected sealed override async Task SaveAsync(
        SnapshotMetadata metadata,
        object snapshot, 
        CancellationToken cancellationToken)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        using (var cts = CancellationTokenSource
                   .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))
        {
            await connection.OpenAsync(cts.Token);
            
            // Create new DbCommand instance
            using (var command = GetCommand(connection, InsertSnapshotSql, _timeout)) 
            // Create a new DbTransaction instance
            using (var tx = connection.BeginTransaction())
            {
                command.Transaction = tx;
                
                var snapshotType = snapshot.GetType();
                
                // Get the serializer associated with the payload type
                var serializer = _serialization.FindSerializerForType(objectType: snapshotType);
    
                // This WithTransport method call is important, it allows for proper
                // local IActorRef serialization by switching the serialization information
                // context during the serialization process
                var (binary, manifest) = Akka.Serialization.Serialization.WithTransport(
                    system: _serialization.System,
                    state: (snapshot, serializer),
                    action: state =>
                    {
                        var (thePayload, theSerializer) = state;
                        var thisManifest = "";
                                
                        // There are two kinds of serializer when it comes to manifest
                        // support, we have to support both of them for proper payload
                        // serialization
                        if (theSerializer is SerializerWithStringManifest stringManifest)
                        {
                            thisManifest = stringManifest.Manifest(thePayload);
                        }
                        else if (theSerializer.IncludeManifest)
                        {
                            thisManifest = thePayload.GetType().TypeQualifiedName();
                        }
                                
                        // Return the serialized byte array and the manifest for the
                        // serialized data
                        return (theSerializer.ToBinary(thePayload), thisManifest);
                    });
                
                // Populate the SQL parameters
                AddParameter(command, "@PersistenceId", DbType.String, metadata.PersistenceId);
                AddParameter(command, "@SequenceNr", DbType.Int64, metadata.SequenceNr);
                AddParameter(command, "@Timestamp", DbType.DateTime2, metadata.Timestamp);
                AddParameter(command, "@Manifest", DbType.String, manifest);
                AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
                AddParameter(command, "@Payload", DbType.Binary, binary);
    
                // Execute the SQL query
                await command.ExecuteNonQueryAsync(cts.Token);
    
                // Commit the DbTransaction
                tx.Commit();
            }
        }
    }
    

    DeleteAsync

    Task DeleteAsync(SnapshotMetadata metadata);
    
    • metadata: Snapshot metadata

    This call is protected with a circuit-breaker

    Deletes the snapshot identified by by the provided Metadata

    Code Sample

    DeleteSnapshotSql in this example refers to this SQL query statement:

    private const string DeleteSnapshotSql = @"
        DELETE FROM snapshot
        WHERE persistence_id = @PersistenceId
            AND sequence_nr = @SequenceNr";
    

    DeleteSnapshotRangeSql in this example refers to this SQL query statement:

    private const string DeleteSnapshotRangeSql = @"
        DELETE FROM snapshot
        WHERE persistence_id = @PersistenceId
            AND sequence_nr <= @SequenceNr
            AND timestamp <= @Timestamp";
    

    SQLite code example:

    protected sealed override async Task DeleteAsync(SnapshotMetadata metadata, CancellationToken cancellationToken)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        using (var cts = CancellationTokenSource
                   .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))    
        {
            await connection.OpenAsync(cts.Token);
            var timestamp = metadata.Timestamp != DateTime.MinValue 
                ? metadata.Timestamp 
                : (DateTime?)null;
            
            var sql = timestamp.HasValue
                ? DeleteSnapshotRangeSql + " AND timestamp = @Timestamp"
                : DeleteSnapshotSql;
    
            // Create new DbCommand instance
            using (var command = GetCommand(connection, sql, _timeout))
            // Create a new DbTransaction instance
            using (var tx = connection.BeginTransaction())
            {
                command.Transaction = tx;
                
                // Populate the SQL parameters
                AddParameter(command, "@PersistenceId", DbType.String, metadata.PersistenceId);
                AddParameter(command, "@SequenceNr", DbType.Int64, metadata.SequenceNr);
                if (timestamp.HasValue)
                {
                    AddParameter(command, "@Timestamp", DbType.DateTime2, metadata.Timestamp);
                }
    
                // Execute the SQL query
                await command.ExecuteNonQueryAsync(cts.Token);
    
                // Commit the DbTransaction
                tx.Commit();
            }
        }
    }
    

    DeleteAsync

    Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria);
    
    • persistenceId: Identification of the persistent actor
    • criteria: Selection criteria for event deletion

    This call is protected with a circuit-breaker

    Deletes all snapshots matching the provided criteria

    Code Sample

    DeleteSnapshotRangeSql in this example refers to this SQL query statement:

    private const string DeleteSnapshotRangeSql = @"
        DELETE FROM snapshot
        WHERE persistence_id = @PersistenceId
            AND sequence_nr <= @SequenceNr
            AND timestamp <= @Timestamp";
    

    SQLite code example:

    protected sealed override async Task DeleteAsync(
        string persistenceId, 
        SnapshotSelectionCriteria criteria, 
        CancellationToken cancellationToken)
    {
        // Create a new DbConnection instance
        using (var connection = new SqliteConnection(_connectionString))
        using (var cts = CancellationTokenSource
                   .CreateLinkedTokenSource(_pendingRequestsCancellation.Token, cancellationToken))
        {
            await connection.OpenAsync(cts.Token);
            
            // Create new DbCommand instance
            using (var command = GetCommand(connection, DeleteSnapshotRangeSql, _timeout))
            // Create a new DbTransaction instance
            using (var tx = connection.BeginTransaction())
            {
                command.Transaction = tx;
                
                // Populate the SQL parameters
                AddParameter(command, "@PersistenceId", DbType.String, persistenceId);
                AddParameter(command, "@SequenceNr", DbType.Int64, criteria.MaxSequenceNr);
                AddParameter(command, "@Timestamp", DbType.DateTime2, criteria.MaxTimeStamp);
                
                // Execute the SQL query
                await command.ExecuteNonQueryAsync(cts.Token);
    
                // Commit the DbTransaction
                tx.Commit();
            }
        }
    }
    

    Detail Implementation

    Reading HOCON Settings

    There are some HOCON settings that are by default loaded by the snapshot store base class and these can be overriden in your HOCON settings. The minimum HOCON settings that need to be defined for your custom plugin are:

    akka.persistence{
      snapshot-store {
        plugin = "akka.persistence.snapshot-store.custom-sqlite"
        custom-sqlite {
          # qualified type name of the SQLite persistence journal actor
          class = "Akka.Persistence.Custom.Snapshot.SqliteSnapshotStore, Akka.Persistence.Custom"
        }
      }
    }
    

    The default HOCON settings are:

    # Fallback settings for snapshot store plugin configurations
    # These settings are used if they are not defined in plugin config section.
    akka.persistence.snapshot-store-plugin-fallback {
      # Fully qualified class name providing snapshot store plugin api
      # implementation. It is mandatory to specify this property if
      # snapshot store is enabled.
      # The class must have a constructor without parameters or constructor with
      # one `Akka.Configuration.Config` parameter.
      class = ""
    
      # Dispatcher for the plugin actor.
      plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"
    
      # snapshot-store supervisor strategy used. 
      # It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
      # by default it restarts the snapshot-store on crash
      supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"
      
      # Default serializer used as manifest serializer when applicable 
      # and payload serializer when no specific binding overrides are specified
      serializer = "json"
    
      circuit-breaker {
        max-failures = 5
        call-timeout = 20s
        reset-timeout = 60s
      }
    }
    

    Pre-Start Requirement

    It is a good practice to allow user of your custom plugin to be able to auto initialize their back end event source from a blank slate. In order to do that, it is good practice to support auto-initialize setting in your journal HOCON settings.

    In order to support this, we will have to be able to stash all incoming messages while we're creating the tables in the event source. Here's an implementation that we use in our example code:

    protected override void PreStart()
    {
        base.PreStart();
        if (_settings.AutoInitialize)
        {
            // Call the Initialize method and pipe the result back to signal that
            // database schemas are ready to use, if it needs to be initialized
            Initialize().PipeTo(Self);
        
            // WaitingForInitialization receive handler will wait for a success/fail
            // result back from the Initialize method
            BecomeStacked(WaitingForInitialization);
        }
    }
    
    protected override void PostStop()
    {
        base.PostStop();
    
        // stop all operations executed in the background
        _pendingRequestsCancellation.Cancel();
    }
    
    private async Task<object> Initialize()
    {
        // No database initialization needed, the user explicitly asked us not to initialize anything.
        if (!_settings.AutoInitialize) 
            return new Status.Success(NotUsed.Instance);
    
        // Create SQLite journal tables 
        try
        {
            using (var connection = new SqliteConnection(_connectionString))
            using (var cts = CancellationTokenSource
                       .CreateLinkedTokenSource(_pendingRequestsCancellation.Token))
            {
                await connection.OpenAsync(cts.Token);
                
                using (var command = GetCommand(connection, CreateSnapshotTableSql, _timeout))
                using (var tx = connection.BeginTransaction())
                {
                    command.Transaction = tx;
                    await command.ExecuteNonQueryAsync(cts.Token);
                    tx.Commit();
                }
                
                return Status.Success.Instance;
            }
        }
        catch (Exception e)
        {
            return new Status.Failure(e);
        }
    }
    
    private bool WaitingForInitialization(object message)
    {
        switch(message)
        {
            // Tables are already created or successfully created all needed tables
            case Status.Success _:
                UnbecomeStacked();
                // Unstash all messages received when we were initializing our tables
                Stash.UnstashAll();
                return true;
            
            case Status.Failure msg:
                // Failed creating tables. Log an error and stop the actor.
                _log.Error(msg.Cause, "Error during snapshot store initialization");
                Context.Stop(Self);
                return true;
            
            default:
                // By default, stash all received messages while we're waiting for the
                // Initialize method.
                Stash.Stash();
                return true;
        }
    }
    

    Tying Everything Together Into An Akka.NET Extension

    The last thing you need to do to make the new custom persistence provider to work is to create an Akka.NET extension that wraps the journal and snapshot storage so that it can be seamlessly used from inside Akka.NET.

    In order to do this, you will need to extend two things, IExtension and ExtensionIdProvider<T>.

    Extending IExtension

    IExtension is a marker interface that marks a class as an Akka.NET extension. It is the class instance that all user will use to interact with your custom extension.

    There are two conventions that needs to be implemented when you extend IExtension:

    • Any class extending this interface is required to provide a single constructor that takes a single ExtendedActorSystem as its argument.
    public SqlitePersistence(ExtendedActorSystem system)
    {
        var defaultConfig = DefaultConfiguration();
        system.Settings.InjectTopLevelFallback(defaultConfig);
    
        JournalConfig = system.Settings.Config.GetConfig(JournalConfigPath);
        DefaultJournalConfig = defaultConfig.GetConfig(JournalConfigPath);
        SnapshotConfig = system.Settings.Config.GetConfig(SnapshotConfigPath);
        DefaultSnapshotConfig = defaultConfig.GetConfig(SnapshotConfigPath);
    }
    
    • It is strongly recommended to create a static Get method that returns an instance of this class. This method is responsible for registering the extension with the Akka.NET extension manager and instantiates a new instance for users to use.
    public static SqlitePersistence Get(ActorSystem system)
    {
        return system.WithExtension<SqlitePersistence, SqlitePersistenceProvider>();
    }
    

    Extending ExtensionIdProvider<T>

    ExtensionIdProvider<T> is a factory class that is responsible for creating new instances for your extension. This class is quite straight forward, all you need to do is to override a single method to make it work:

    public class SqlitePersistenceProvider : ExtensionIdProvider<SqlitePersistence>
    {
        public override SqlitePersistence CreateExtension(ExtendedActorSystem system)
        {
            return new SqlitePersistence(system);
        }
    }
    

    Unit Testing Journal and SnapshotStore

    Akka.Persistence came with a standardized Technology Compatibility Kit (TCK) test kit that can be readily incorporated into your unit testing suite to test that a custom provider adheres to a basic compatibility requirement.

    To do this, you will need to create an XUnit unit test project and reference your custom persistence project and the Akka.Persistence.TCK package. There are four abstract classes that can be implemented to test your persistence implementation:

    • JournalSerializationSpec
    • JournalSpec
    • SnapshotStoreSerializationSpec
    • SnapshotStoreSpec

    For an implementation sample, please see the /src/examples/Akka.Persistence.Custom.Tests example project.

    In this article
    • githubEdit this page
    Back to top
    Contribute
    • Project Chat
    • Discussion Forum
    • Source Code
    Support
    • Akka.NET Support Plans
    • Akka.NET Observability Tools
    • Akka.NET Training & Consulting
    Maintained By
    • Petabridge - The Akka.NET Company
    • Learn Akka.NET