diff --git a/objectserver/replicator.go b/objectserver/replicator.go index ed46e4eb..30cd3118 100644 --- a/objectserver/replicator.go +++ b/objectserver/replicator.go @@ -40,7 +40,7 @@ import ( ) var ( - StatsReportInterval = 300 * time.Second + StatsReportInterval = 10 * time.Minute TmpEmptyTime = 24 * time.Hour ReplicateDeviceTimeout = 4 * time.Hour // GetRing is a local pointer to the hummingbird function, for overriding in tests @@ -137,6 +137,7 @@ type ReplicationDeviceStats struct { LastCheckin time.Time RunStarted time.Time DeviceStarted time.Time + LastPassDate time.Time LastPassDuration time.Duration TotalPasses int64 } @@ -748,28 +749,23 @@ func (r *Replicator) verifyRunningDevices() { func (r *Replicator) reportStats() { r.runningDevicesLock.Lock() defer r.runningDevicesLock.Unlock() - var totalDuration time.Duration - var maxLastPass time.Time - var doneParts, totalParts int64 - var processingTime float64 + minLastPass := time.Now() allHaveCompleted := true for _, rd := range r.runningDevices { stats := rd.Stats() if stats.TotalPasses <= 1 { allHaveCompleted = false } - if maxLastPass.Before(stats.RunStarted) { - maxLastPass = stats.RunStarted + if stats.LastPassDate.Before(minLastPass) { + minLastPass = stats.LastPassDate } - totalDuration += stats.LastPassDuration - totalParts += stats.Stats["PartitionsTotal"] - doneParts += stats.Stats["PartitionsDone"] - processingTime += time.Since(stats.RunStarted).Seconds() - } - if processingTime > 0 { - partsPerSecond := float64(doneParts) / processingTime - remaining := time.Duration((1.0 / partsPerSecond) * float64(time.Second) * - float64(totalParts-doneParts) / float64(len(r.runningDevices))) + processingTimeSec := time.Since(stats.RunStarted).Seconds() + doneParts := stats.Stats["PartitionsDone"] + totalParts := stats.Stats["PartitionsTotal"] + partsPerSecond := float64(doneParts) / processingTimeSec + + remaining := time.Duration( + int64(float64(totalParts-doneParts)/partsPerSecond)) * time.Second var remainingStr string if remaining >= time.Hour { remainingStr = fmt.Sprintf("%.0fh", remaining.Hours()) @@ -778,16 +774,23 @@ func (r *Replicator) reportStats() { } else { remainingStr = fmt.Sprintf("%.0fs", remaining.Seconds()) } - r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)", - doneParts, totalParts, float64(100*doneParts)/float64(totalParts), - processingTime, partsPerSecond, remainingStr) - } + r.LogInfo("Device %s %d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)", + rd.Key(), doneParts, totalParts, + float64(100*doneParts)/float64(totalParts), + processingTimeSec, partsPerSecond, remainingStr) + + } if allHaveCompleted { + // this is a mess but object_replication_time (in old way) is # minutes + // passed since 1 complete pass of all devices started. + // replication_last is unix time stamp when last complete pass was finished + // now "last pass" means oldest device lastPass + maxLastPassComplete := time.Since(minLastPass).Minutes() middleware.DumpReconCache(r.reconCachePath, "object", map[string]interface{}{ - "object_replication_time": float64(totalDuration) / float64(len(r.runningDevices)) / float64(time.Second), - "object_replication_last": float64(maxLastPass.UnixNano()) / float64(time.Second), + "object_replication_time": maxLastPassComplete, + "object_replication_last": float64(minLastPass.UnixNano()) / float64(time.Second), }) } } @@ -809,12 +812,13 @@ func (r *Replicator) getDeviceProgress() map[string]map[string]interface{} { for key, device := range r.runningDevices { stats := device.Stats() deviceProgress[key] = map[string]interface{}{ - "StartDate": stats.DeviceStarted, - "LastUpdate": stats.LastCheckin, - "LastPassDuration": stats.LastPassDuration, - "LastPassUpdate": stats.RunStarted, - "TotalPasses": stats.TotalPasses, - "CancelCount": r.cancelCounts[key], + "StartDate": stats.DeviceStarted, + "LastUpdate": stats.LastCheckin, + "LastPassDuration": stats.LastPassDuration, + "LastPassFinishDate": stats.LastPassDate, + "LastPassUpdate": stats.RunStarted, + "TotalPasses": stats.TotalPasses, + "CancelCount": r.cancelCounts[key], } for k, v := range stats.Stats { deviceProgress[key][k] = v @@ -830,17 +834,20 @@ func (r *Replicator) runLoopCheck(reportTimer <-chan time.Time) { defer r.runningDevicesLock.Unlock() if rd, ok := r.runningDevices[update.deviceKey]; ok { stats := rd.Stats() - if update.stat == "checkin" { - stats.LastCheckin = time.Now() - } else if update.stat == "startRun" { - stats.TotalPasses++ - stats.LastPassDuration = time.Since(stats.RunStarted) + stats.LastCheckin = time.Now() + switch update.stat { + case "checkin": + case "startRun": stats.RunStarted = time.Now() - stats.LastCheckin = time.Now() for k := range stats.Stats { stats.Stats[k] = 0 } - } else { + case "FullReplicateCount": + stats.LastPassDuration = time.Since(stats.RunStarted) + stats.LastPassDate = time.Now() + stats.TotalPasses++ + stats.Stats["FullReplicateCount"] += update.value + default: stats.Stats[update.stat] += update.value } } diff --git a/objectserver/replicator_test.go b/objectserver/replicator_test.go index 0eaceaee..21a9e293 100644 --- a/objectserver/replicator_test.go +++ b/objectserver/replicator_test.go @@ -138,7 +138,7 @@ func (d *mockReplicationDevice) ReplicateLoop() { } } func (d *mockReplicationDevice) Key() string { - if d._ReplicateLoop != nil { + if d._Key != nil { return d._Key() } return "" @@ -1044,6 +1044,7 @@ func TestReportStats(t *testing.T) { }, } }, + _Key: func() string { return "1.1:10/sda" }, }, "sdb": &mockReplicationDevice{ _Stats: func() *ReplicationDeviceStats { @@ -1052,17 +1053,23 @@ func TestReportStats(t *testing.T) { RunStarted: time.Now().Add(-time.Hour), Stats: map[string]int64{ "PartitionsTotal": 100, - "PartitionsDone": 50, + "PartitionsDone": 40, }, } }, + _Key: func() string { return "1.1:10/sdb" }, }, } logger := &replicationLogSaver{} replicator.logger = logger replicator.reportStats() - require.True(t, strings.Contains(logger.logged[0], "550/1100 (50.00%)")) + fmt.Println(logger.logged[0]) + fmt.Println(logger.logged[1]) + require.True(t, strings.Contains(logger.logged[0], "Device 1.1:10/sda 500/1000 (50.00%)")) require.True(t, strings.Contains(logger.logged[0], " 1h remaining")) + + require.True(t, strings.Contains(logger.logged[1], "Device 1.1:10/sdb 40/100 (40.00%)")) + require.True(t, strings.Contains(logger.logged[1], " 2h remaining")) } func TestPriorityReplicate(t *testing.T) {