diff --git a/pkg/database/lazy_db.go b/pkg/database/lazy_db.go index 6d0a74f1da..cfe9532d22 100644 --- a/pkg/database/lazy_db.go +++ b/pkg/database/lazy_db.go @@ -19,7 +19,7 @@ package database import ( "context" "crypto/sha256" - "io" + "errors" "path/filepath" "time" @@ -29,6 +29,8 @@ import ( "github.com/codenotary/immudb/pkg/api/schema" ) +var ErrNoNewTransactions = errors.New("no new transactions") + type lazyDB struct { m *DBManager @@ -440,7 +442,7 @@ func (db *lazyDB) ExportTxByID(ctx context.Context, req *schema.ExportTxRequest) if !req.AllowPreCommitted { if req.Tx > state.TxId { - return nil, 0, [sha256.Size]byte{}, io.EOF + return nil, 0, [sha256.Size]byte{}, ErrNoNewTransactions } } diff --git a/pkg/replication/metrics.go b/pkg/replication/metrics.go index 8f3c824477..a2d6eec31a 100644 --- a/pkg/replication/metrics.go +++ b/pkg/replication/metrics.go @@ -65,6 +65,11 @@ var ( Name: "immudb_replication_allow_commit_up_to_tx_id", Help: "most recently received confirmation up to which commit id the replica is allowed to durably commit", }, []string{"db"}) + + _metricsReplicationLag = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "immudb_replication_lag", + Help: "The difference between the last transaction committed by the primary and replicated by the replica", + }, []string{"db"}) ) type metrics struct { @@ -76,6 +81,7 @@ type metrics struct { replicatorsInRetryDelay prometheus.Gauge primaryCommittedTxID prometheus.Gauge allowCommitUpToTxID prometheus.Gauge + replicationLag prometheus.Gauge } // metricsForDb returns metrics object for particular database name @@ -89,6 +95,7 @@ func metricsForDb(dbName string) metrics { replicatorsInRetryDelay: _metricsReplicatorsInRetryDelay.WithLabelValues(dbName), primaryCommittedTxID: _metricsReplicationPrimaryCommittedTxID.WithLabelValues(dbName), allowCommitUpToTxID: _metricsAllowCommitUpToTxID.WithLabelValues(dbName), + replicationLag: _metricsReplicationLag.WithLabelValues(dbName), } } @@ -99,6 +106,7 @@ func (m *metrics) reset() { m.replicatorsInRetryDelay.Set(0) m.primaryCommittedTxID.Set(0) m.allowCommitUpToTxID.Set(0) + m.replicationLag.Set(0) } // replicationTimeHistogramTimer returns prometheus timer for replicationTimeHistogram diff --git a/pkg/replication/replicator.go b/pkg/replication/replicator.go index 50853d6634..82102049d0 100644 --- a/pkg/replication/replicator.go +++ b/pkg/replication/replicator.go @@ -377,7 +377,14 @@ func (txr *TxReplicator) fetchNextTx() error { defer txr.disconnect() } + txr.maybeUpdateReplicationLag(commitState.TxId, emd) + if err != nil && !errors.Is(err, io.EOF) { + if strings.Contains(err.Error(), database.ErrNoNewTransactions.Error()) { + txr.metrics.replicationLag.Set(0) + return err + } + if strings.Contains(err.Error(), "replica commit state diverged from primary") { txr.logger.Errorf("replica commit state at '%s' diverged from primary's", txr.db.GetName()) return ErrReplicaDivergedFromPrimary @@ -497,3 +504,13 @@ func (txr *TxReplicator) Error() error { return txr.err } + +func (txr *TxReplicator) maybeUpdateReplicationLag(lastCommittedTxID uint64, metadata map[string][]byte) { + primaryLastCommittedTxIDBin, ok := metadata["committed-txid-bin"] + if !ok { + return + } + + lag := binary.BigEndian.Uint64(primaryLastCommittedTxIDBin) - lastCommittedTxID + txr.metrics.replicationLag.Set(float64(lag)) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 1c53434b73..93d90f683d 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -317,9 +317,18 @@ func (s *ImmuServer) Start() (err error) { startedAt = time.Now() if s.Options.MetricsServer { - s.metricsServer = StartMetrics(1*time.Minute, s.Options.MetricsBind(), s.Options.TLSConfig, s.Logger, s.metricFuncServerUptimeCounter, - s.metricFuncComputeDBSizes, s.metricFuncComputeDBEntries, s.metricFuncComputeLoadedDBSize, s.metricFuncComputeSessionCount, + s.metricsServer = StartMetrics( + 1*time.Minute, + s.Options.MetricsBind(), + s.Options.TLSConfig, + s.Logger, + s.metricFuncServerUptimeCounter, + s.metricFuncComputeDBSizes, + s.metricFuncComputeDBEntries, + s.metricFuncComputeLoadedDBSize, + s.metricFuncComputeSessionCount, s.Options.PProf) + defer func() { if err := s.metricsServer.Close(); err != nil { s.Logger.Errorf("failed to shutdown metric server: %s", err) diff --git a/pkg/server/stream_replication.go b/pkg/server/stream_replication.go index 034d1ac0b6..f49623a801 100644 --- a/pkg/server/stream_replication.go +++ b/pkg/server/stream_replication.go @@ -62,23 +62,24 @@ func (s *ImmuServer) exportTx(req *schema.ExportTxRequest, txsServer schema.Immu return err } - var streamMetadata map[string][]byte + var bCommittedTxID [8]byte + state, err := db.CurrentState() + if err == nil { + binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId) + } + + // In asynchronous replication, the last committed transaction value is sent to the replica + // to enable updating its replication lag. + streamMetadata := map[string][]byte{ + "committed-txid-bin": bCommittedTxID[:], + } if req.ReplicaState != nil { var bMayCommitUpToTxID [8]byte binary.BigEndian.PutUint64(bMayCommitUpToTxID[:], mayCommitUpToTxID) - var bCommittedTxID [8]byte - state, err := db.CurrentState() - if err == nil { - binary.BigEndian.PutUint64(bCommittedTxID[:], state.TxId) - } - - streamMetadata = map[string][]byte{ - "may-commit-up-to-txid-bin": bMayCommitUpToTxID[:], - "may-commit-up-to-alh-bin": mayCommitUpToAlh[:], - "committed-txid-bin": bCommittedTxID[:], - } + streamMetadata["may-commit-up-to-txid-bin"] = bMayCommitUpToTxID[:] + streamMetadata["may-commit-up-to-alh-bin"] = mayCommitUpToAlh[:] if setTrailer { // trailer metadata is kept for backward compatibility