Class BatchingSqlJournal<TConnection, TCommand>
An abstract journal used by PersistentActors to read/write events to a database.
This implementation uses horizontal batching to recycle usage of the DbConnection and to optimize writes made to a database. Batching journal is not going to acquire a new DB connection on every request. Instead it will batch incoming requests and execute them only when a previous operation batch has been completed. This means that requests coming from many actors at the same time will be executed in one batch.
Maximum number of batches executed at the same time is defined by MaxConcurrentOperations setting, while max allowed batch size is defined by MaxBatchSize setting.
Batching journal also defines MaxBufferSize, which defines a maximum number of all requests stored at once in memory. Once that value is surpassed, journal will start to apply OnBufferOverflow(IJournalMessage) logic on each incoming requests, until a buffer gets freed again. This may be used for overflow strategies, request denials or backpressure.
Inheritance
Implements
Inherited Members
Namespace: Akka.Persistence.Sql.Common.Journal
Assembly: Akka.Persistence.Sql.Common.dll
Syntax
public abstract class BatchingSqlJournal<TConnection, TCommand> : WriteJournalBase, IInternalActor where TConnection : DbConnection where TCommand : DbCommand
Type Parameters
Name | Description |
---|---|
TConnection | A concrete implementation of DbConnection for targeted database provider. |
TCommand | A concrete implementation of DbCommand for targeted database provider. |
Constructors
| Improve this Doc View SourceBatchingSqlJournal(BatchingSqlJournalSetup)
Initializes a new instance of the BatchingSqlJournal<TConnection, TCommand> class.
Declaration
protected BatchingSqlJournal(BatchingSqlJournalSetup setup)
Parameters
Type | Name | Description |
---|---|---|
BatchingSqlJournalSetup | setup | The settings used to configure the journal. |
Fields
| Improve this Doc View SourceCanPublish
Flag determining if incoming journal requests should be published in current actor system event stream. Useful mostly for tests.
Declaration
protected readonly bool CanPublish
Field Value
Type | Description |
---|---|
Boolean |
IsDeletedIndex
Default index of IsDeleted column get from ByPersistenceIdSql query.
Declaration
protected const int IsDeletedIndex = 3
Field Value
Type | Description |
---|---|
Int32 |
Log
Logging adapter for current journal actor .
Declaration
protected readonly ILoggingAdapter Log
Field Value
Type | Description |
---|---|
ILoggingAdapter |
ManifestIndex
Default index of Manifest column get from ByPersistenceIdSql query.
Declaration
protected const int ManifestIndex = 4
Field Value
Type | Description |
---|---|
Int32 |
OrderingIndex
Default index of tags column get from ByTagSql query.
Declaration
protected const int OrderingIndex = 7
Field Value
Type | Description |
---|---|
Int32 |
PayloadIndex
Default index of Payload column get from ByPersistenceIdSql query.
Declaration
protected const int PayloadIndex = 5
Field Value
Type | Description |
---|---|
Int32 |
PersistenceIdIndex
Default index of PersistenceId column get from ByPersistenceIdSql query.
Declaration
protected const int PersistenceIdIndex = 0
Field Value
Type | Description |
---|---|
Int32 |
SequenceNrIndex
Default index of SequenceNr column get from ByPersistenceIdSql query.
Declaration
protected const int SequenceNrIndex = 1
Field Value
Type | Description |
---|---|
Int32 |
SerializerIdIndex
Default index of Identifier
Declaration
protected const int SerializerIdIndex = 6
Field Value
Type | Description |
---|---|
Int32 |
TimestampIndex
Default index of Timestamp column get from ByPersistenceIdSql query.
Declaration
protected const int TimestampIndex = 2
Field Value
Type | Description |
---|---|
Int32 |
Properties
| Improve this Doc View SourceAllEventsSql
SQL query executed as result of ReplayAllEvents request to journal. It's a part of persistence query protocol.
Declaration
protected virtual string AllEventsSql { get; }
Property Value
Type | Description |
---|---|
String |
AllPersistenceIdsSql
SQL query executed as result of SelectCurrentPersistenceIds request to journal. It's a part of persistence query protocol.
Declaration
protected virtual string AllPersistenceIdsSql { get; }
Property Value
Type | Description |
---|---|
String |
ByPersistenceIdSql
SQL query executed as result of ReplayMessages request to journal. It's also part of persistence query protocol.
Declaration
protected virtual string ByPersistenceIdSql { get; }
Property Value
Type | Description |
---|---|
String |
ByTagSql
SQL query executed as result of ReplayTaggedMessages request to journal. It's a part of persistence query protocol.
Declaration
protected virtual string ByTagSql { get; }
Property Value
Type | Description |
---|---|
String |
DeleteBatchSql
SQL query executed as result of DeleteMessagesTo request to journal.
Declaration
protected virtual string DeleteBatchSql { get; }
Property Value
Type | Description |
---|---|
String |
HighestOrderingSql
TBD
Declaration
protected virtual string HighestOrderingSql { get; }
Property Value
Type | Description |
---|---|
String |
HighestSequenceNrSql
SQL query executed as result of ReadHighestSequenceNr(String, TCommand) request to journal. Also used under some conditions, when storing metadata upon DeleteMessagesTo request.
Declaration
protected virtual string HighestSequenceNrSql { get; }
Property Value
Type | Description |
---|---|
String |
Initializers
A named collection of SQL statements to be executed once journal actor gets initialized and the AutoInitialize flag is set.
Declaration
protected abstract ImmutableDictionary<string, string> Initializers { get; }
Property Value
Type | Description |
---|---|
System.Collections.Immutable.ImmutableDictionary<String, String> |
InsertEventSql
SQL statement executed as result of WriteMessages request to journal.
Declaration
protected virtual string InsertEventSql { get; }
Property Value
Type | Description |
---|---|
String |
Setup
All configurable settings defined for a current batching journal.
Declaration
protected BatchingSqlJournalSetup Setup { get; }
Property Value
Type | Description |
---|---|
BatchingSqlJournalSetup |
TimestampProvider
The timestamp provider that will be used for the timestamp column when writing messages to the database.
Declaration
protected ITimestampProvider TimestampProvider { get; }
Property Value
Type | Description |
---|---|
ITimestampProvider |
UpdateSequenceNrSql
SQL statement executed as result of writing metadata, which is a possible effect of DeleteMessagesTo request.
Declaration
protected virtual string UpdateSequenceNrSql { get; }
Property Value
Type | Description |
---|---|
String |
Methods
| Improve this Doc View SourceAddParameter(TCommand, String, DbType, Object)
Helper method used to add a parameter to existing database command
.
Declaration
protected void AddParameter(TCommand command, string paramName, DbType dbType, object value)
Parameters
Type | Name | Description |
---|---|---|
TCommand | command | DbCommand used to define a parameter in. |
String | paramName | Query or procedure parameter name. |
DbType | dbType | Database type of a query or procedure parameter. |
Object | value | Value of a query or procedure parameter. |
BatchRequest(IJournalRequest)
Tries to add incoming message
to Buffer.
Also checks if any DB connection has been released and next batch can be processed.
Declaration
protected void BatchRequest(IJournalRequest message)
Parameters
Type | Name | Description |
---|---|---|
IJournalRequest | message | TBD |
CreateConnection(String)
Creates a new database connection from a given connectionString
.
Declaration
protected abstract TConnection CreateConnection(string connectionString)
Parameters
Type | Name | Description |
---|---|---|
String | connectionString | TBD |
Returns
Type | Description |
---|---|
TConnection | TBD |
HandleDeleteMessagesTo(DeleteMessagesTo, TCommand)
Declaration
protected virtual async Task HandleDeleteMessagesTo(DeleteMessagesTo req, TCommand command)
Parameters
Type | Name | Description |
---|---|---|
DeleteMessagesTo | req | |
TCommand | command |
Returns
Type | Description |
---|---|
Task |
HandleReplayAllMessages(ReplayAllEvents, TCommand)
Declaration
protected virtual async Task HandleReplayAllMessages(ReplayAllEvents req, TCommand command)
Parameters
Type | Name | Description |
---|---|---|
ReplayAllEvents | req | |
TCommand | command |
Returns
Type | Description |
---|---|
Task |
HandleReplayMessages(ReplayMessages, TCommand, IActorContext)
Declaration
protected virtual async Task HandleReplayMessages(ReplayMessages req, TCommand command, IActorContext context)
Parameters
Type | Name | Description |
---|---|---|
ReplayMessages | req | |
TCommand | command | |
IActorContext | context |
Returns
Type | Description |
---|---|
Task |
HandleReplayTaggedMessages(ReplayTaggedMessages, TCommand)
Declaration
protected virtual async Task HandleReplayTaggedMessages(ReplayTaggedMessages req, TCommand command)
Parameters
Type | Name | Description |
---|---|---|
ReplayTaggedMessages | req | |
TCommand | command |
Returns
Type | Description |
---|---|
Task |
HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds, TCommand)
Declaration
protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPersistenceIds message, TCommand command)
Parameters
Type | Name | Description |
---|---|---|
SelectCurrentPersistenceIds | message | |
TCommand | command |
Returns
Type | Description |
---|---|
Task |
OnBufferOverflow(IJournalMessage)
Method called, once given request
couldn't be added to Buffer
due to buffer overflow. Overflow is controlled by max buffer size and can be set using
MaxBufferSize setting.
Declaration
protected virtual void OnBufferOverflow(IJournalMessage request)
Parameters
Type | Name | Description |
---|---|---|
IJournalMessage | request | TBD |
PreAddParameterToCommand(TCommand, DbParameter)
Override this to customize DbParameter creation used for building database queries
Declaration
protected virtual void PreAddParameterToCommand(TCommand command, DbParameter param)
Parameters
Type | Name | Description |
---|---|---|
TCommand | command | DbCommand used to define a parameter in. |
DbParameter | param | Parameter to customize |
PreStart()
TBD
Declaration
protected override void PreStart()
Overrides
| Improve this Doc View SourceReadEvent(DbDataReader)
Returns a persistent representation of an event read from a current row in the database.
Declaration
protected virtual IPersistentRepresentation ReadEvent(DbDataReader reader)
Parameters
Type | Name | Description |
---|---|---|
DbDataReader | reader | TBD |
Returns
Type | Description |
---|---|
IPersistentRepresentation | TBD |
ReadHighestSequenceNr(TCommand)
Declaration
protected virtual async Task<long> ReadHighestSequenceNr(TCommand command)
Parameters
Type | Name | Description |
---|---|---|
TCommand | command |
Returns
Type | Description |
---|---|
Task<Int64> |
ReadHighestSequenceNr(String, TCommand)
Declaration
protected virtual async Task<long> ReadHighestSequenceNr(string persistenceId, TCommand command)
Parameters
Type | Name | Description |
---|---|---|
String | persistenceId | |
TCommand | command |
Returns
Type | Description |
---|---|
Task<Int64> |
Receive(Object)
TBD
Declaration
protected sealed override bool Receive(object message)
Parameters
Type | Name | Description |
---|---|---|
Object | message | TBD |
Returns
Type | Description |
---|---|
Boolean | TBD |
Overrides
| Improve this Doc View SourceWriteEvent(TCommand, IPersistentRepresentation, String)
Perform write of persistent event with specified tags
into database using given command
.
Declaration
protected virtual void WriteEvent(TCommand command, IPersistentRepresentation persistent, string tags = "")
Parameters
Type | Name | Description |
---|---|---|
TCommand | command | Database command object used to store data. |
IPersistentRepresentation | persistent | Persistent event representation. |
String | tags | Optional tags extracted from persistent event payload. |