Search Results for

    Show / Hide Table of Contents

    Class MemoryJournal

    In-memory journal for testing purposes.

    Inheritance
    object
    ActorBase
    WriteJournalBase
    AsyncWriteJournal
    MemoryJournal
    SharedMemoryJournal
    TestJournal
    Implements
    IInternalActor
    IAsyncRecovery
    Inherited Members
    AsyncWriteJournal.CanPublish
    AsyncWriteJournal.Receive(object)
    AsyncWriteJournal.ReceiveWriteJournal(object)
    AsyncWriteJournal.TryUnwrapException(Exception)
    WriteJournalBase.PreparePersistentBatch(IEnumerable<IPersistentEnvelope>)
    WriteJournalBase.AdaptToJournal(IPersistentRepresentation)
    ActorBase.Sender
    ActorBase.Self
    ActorBase.Context
    ActorBase.AroundReceive(Receive, object)
    ActorBase.EmptyReceive
    ActorBase.Unhandled(object)
    ActorBase.Become(Receive)
    ActorBase.BecomeStacked(Receive)
    ActorBase.UnbecomeStacked()
    ActorBase.SetReceiveTimeout(TimeSpan?)
    ActorBase.AroundPreRestart(Exception, object)
    ActorBase.AroundPreStart()
    ActorBase.PreStart()
    ActorBase.AroundPostRestart(Exception, object)
    ActorBase.PreRestart(Exception, object)
    ActorBase.PostRestart(Exception)
    ActorBase.AroundPostStop()
    ActorBase.PostStop()
    ActorBase.SupervisorStrategy()
    object.Equals(object)
    object.Equals(object, object)
    object.GetHashCode()
    object.GetType()
    object.MemberwiseClone()
    object.ReferenceEquals(object, object)
    object.ToString()
    Namespace: Akka.Persistence.Journal
    Assembly: Akka.Persistence.dll
    Syntax
    public class MemoryJournal : AsyncWriteJournal, IInternalActor, IAsyncRecovery

    Properties

    | Edit this page View Source

    Messages

    Declaration
    protected virtual ConcurrentDictionary<string, LinkedList<IPersistentRepresentation>> Messages { get; }
    Property Value
    Type Description
    ConcurrentDictionary<string, LinkedList<IPersistentRepresentation>>

    Methods

    | Edit this page View Source

    Add(IPersistentRepresentation)

    Declaration
    public IDictionary<string, LinkedList<IPersistentRepresentation>> Add(IPersistentRepresentation persistent)
    Parameters
    Type Name Description
    IPersistentRepresentation persistent
    Returns
    Type Description
    IDictionary<string, LinkedList<IPersistentRepresentation>>
    | Edit this page View Source

    Delete(string, long)

    Declaration
    public IDictionary<string, LinkedList<IPersistentRepresentation>> Delete(string pid, long seqNr)
    Parameters
    Type Name Description
    string pid
    long seqNr
    Returns
    Type Description
    IDictionary<string, LinkedList<IPersistentRepresentation>>
    | Edit this page View Source

    DeleteMessagesToAsync(string, long, CancellationToken)

    Asynchronously deletes all persistent messages up to inclusive toSequenceNr bound.

    Declaration
    protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, CancellationToken cancellationToken)
    Parameters
    Type Name Description
    string persistenceId

    TBD

    long toSequenceNr

    TBD

    CancellationToken cancellationToken

    CancellationToken used to signal cancelled snapshot operation

    Returns
    Type Description
    Task
    Overrides
    AsyncWriteJournal.DeleteMessagesToAsync(string, long, CancellationToken)
    | Edit this page View Source

    HighestSequenceNr(string)

    Declaration
    public long HighestSequenceNr(string pid)
    Parameters
    Type Name Description
    string pid
    Returns
    Type Description
    long
    | Edit this page View Source

    Read(string, long, long, long)

    Declaration
    public IEnumerable<IPersistentRepresentation> Read(string pid, long fromSeqNr, long toSeqNr, long max)
    Parameters
    Type Name Description
    string pid
    long fromSeqNr
    long toSeqNr
    long max
    Returns
    Type Description
    IEnumerable<IPersistentRepresentation>
    | Edit this page View Source

    ReadHighestSequenceNrAsync(string, long, CancellationToken)

    Asynchronously reads the highest stored sequence number for provided persistenceId. The persistent actor will use the highest sequence number after recovery as the starting point when persisting new events. This sequence number is also used as toSequenceNr in subsequent calls to ReplayMessagesAsync(IActorContext, string, long, long, long, Action<IPersistentRepresentation>) unless the user has specified a lower toSequenceNr. Journal must maintain the highest sequence number and never decrease it.

    This call is protected with a circuit-breaker.

    Please also not that requests for the highest sequence number may be made concurrently to writes executing for the same persistenceId, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed.

    Declaration
    public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr, CancellationToken cancellationToken)
    Parameters
    Type Name Description
    string persistenceId

    Persistent actor identifier

    long fromSequenceNr

    Hint where to start searching for the highest sequence number. When a persistent actor is recovering this fromSequenceNr will the sequence number of the used snapshot, or 0L if no snapshot is used.

    CancellationToken cancellationToken

    CancellationToken used to signal cancelled recovery operation

    Returns
    Type Description
    Task<long>

    TBD

    Overrides
    AsyncWriteJournal.ReadHighestSequenceNrAsync(string, long, CancellationToken)
    | Edit this page View Source

    ReceivePluginInternal(object)

    Plugin API: Allows plugin implementers to use f.PipeTo(Self) and handle additional messages for implementing advanced features

    Declaration
    protected override bool ReceivePluginInternal(object message)
    Parameters
    Type Name Description
    object message

    TBD

    Returns
    Type Description
    bool

    TBD

    Overrides
    AsyncWriteJournal.ReceivePluginInternal(object)
    | Edit this page View Source

    ReplayMessagesAsync(IActorContext, string, long, long, long, Action<IPersistentRepresentation>)

    Asynchronously replays persistent messages. Implementations replay a message by calling recoveryCallback. The returned task must be completed when all messages (matching the sequence number bounds) have been replayed. The task must be completed with a failure if any of the persistent messages could not be replayed.

    The toSequenceNr is the lowest of what was returned by ReadHighestSequenceNrAsync(string, long, CancellationToken) and what the user specified as recovery Recovery parameter. This does imply that this call is always preceded by reading the highest sequence number for the given persistenceId.

    This call is NOT protected with a circuit-breaker because it may take a long time to replay all events. The plugin implementation itself must protect against an unresponsive backend store and make sure that the returned Task is completed with success or failure within reasonable time. It is not allowed to ignore completing the Task.

    Declaration
    public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
    Parameters
    Type Name Description
    IActorContext context

    The contextual information about the actor processing replayed messages.

    string persistenceId

    Persistent actor identifier

    long fromSequenceNr

    Inclusive sequence number where replay should start

    long toSequenceNr

    Inclusive sequence number where replay should end

    long max

    Maximum number of messages to be replayed

    Action<IPersistentRepresentation> recoveryCallback

    Called to replay a message, may be called from any thread.

    Returns
    Type Description
    Task

    TBD

    Overrides
    AsyncWriteJournal.ReplayMessagesAsync(IActorContext, string, long, long, long, Action<IPersistentRepresentation>)
    | Edit this page View Source

    Update(string, long, Func<IPersistentRepresentation, IPersistentRepresentation>)

    Declaration
    public IDictionary<string, LinkedList<IPersistentRepresentation>> Update(string pid, long seqNr, Func<IPersistentRepresentation, IPersistentRepresentation> updater)
    Parameters
    Type Name Description
    string pid
    long seqNr
    Func<IPersistentRepresentation, IPersistentRepresentation> updater
    Returns
    Type Description
    IDictionary<string, LinkedList<IPersistentRepresentation>>
    | Edit this page View Source

    WriteMessagesAsync(IEnumerable<AtomicWrite>, CancellationToken)

    Plugin API: asynchronously writes a batch of persistent messages to the journal.

    The batch is only for performance reasons, i.e. all messages don't have to be written atomically. Higher throughput can typically be achieved by using batch inserts of many records compared to inserting records one-by-one, but this aspect depends on the underlying data store and a journal implementation can implement it as efficient as possible. Journals should aim to persist events in-order for a given persistenceId as otherwise in case of a failure, the persistent state may be end up being inconsistent.

    Each AtomicWrite message contains the single Akka.Persistence.Persistent that corresponds to the event that was passed to the Persist<TEvent>(TEvent, Action<TEvent>) method of the PersistentActor, or it contains several Akka.Persistence.Persistent that correspond to the events that were passed to the PersistAll<TEvent>(IEnumerable<TEvent>, Action<TEvent>) method of the PersistentActor. All Akka.Persistence.Persistent of the AtomicWrite must be written to the data store atomically, i.e. all or none must be stored. If the journal (data store) cannot support atomic writes of multiple events it should reject such writes with a NotSupportedException describing the issue. This limitation should also be documented by the journal plugin.

    If there are failures when storing any of the messages in the batch the returned Task must be completed with failure. The Task must only be completed with success when all messages in the batch have been confirmed to be stored successfully, i.e. they will be readable, and visible, in a subsequent replay. If there is uncertainty about if the messages were stored or not the Task must be completed with failure.

    Data store connection problems must be signaled by completing the Task with failure.

    The journal can also signal that it rejects individual messages (AtomicWrite) by the returned Task. It is possible but not mandatory to reduce number of allocations by returning null for the happy path, i.e. when no messages are rejected. Otherwise the returned list must have as many elements as the input messages. Each result element signals if the corresponding AtomicWrite is rejected or not, with an exception describing the problem. Rejecting a message means it was not stored, i.e. it must not be included in a later replay. Rejecting a message is typically done before attempting to store it, e.g. because of serialization error.

    Data store connection problems must not be signaled as rejections.

    It is possible but not mandatory to reduce number of allocations by returning null for the happy path, i.e. when no messages are rejected.

    Calls to this method are serialized by the enclosing journal actor. If you spawn work in asynchronous tasks it is alright that they complete the futures in any order, but the actual writes for a specific persistenceId should be serialized to avoid issues such as events of a later write are visible to consumers (query side, or replay) before the events of an earlier write are visible. A PersistentActor will not send a new WriteMessages request before the previous one has been completed.

    Please not that the Sender of the contained Akka.Persistence.Persistent objects has been nulled out (i.e. set to NoSender in order to not use space in the journal for a sender reference that will likely be obsolete during replay.

    Please also note that requests for the highest sequence number may be made concurrently to this call executing for the same persistenceId, in particular it is possible that a restarting actor tries to recover before its outstanding writes have completed. In the latter case it is highly desirable to defer reading the highest sequence number until all outstanding writes have completed, otherwise the PersistentActor may reuse sequence numbers.

    This call is protected with a circuit-breaker.

    Declaration
    protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages, CancellationToken cancellationToken)
    Parameters
    Type Name Description
    IEnumerable<AtomicWrite> messages

    TBD

    CancellationToken cancellationToken

    CancellationToken used to signal cancelled snapshot operation

    Returns
    Type Description
    Task<IImmutableList<Exception>>
    Overrides
    AsyncWriteJournal.WriteMessagesAsync(IEnumerable<AtomicWrite>, CancellationToken)

    Implements

    IInternalActor
    IAsyncRecovery

    Extension Methods

    ObjectExtensions.IsDefaultForType<T>(T)
    ObjectExtensions.AsOption<T>(T)
    Extensions.AsInstanceOf<T>(object)
    In this article
    • githubEdit this page
    • View Source
    Back to top
    Contribute
    • Project Chat
    • Discussion Forum
    • Source Code
    Support
    • Akka.NET Support Plans
    • Akka.NET Observability Tools
    • Akka.NET Training & Consulting
    Maintained By
    • Petabridge - The Akka.NET Company
    • Learn Akka.NET