Skip to content

Commit

Permalink
Merge pull request #35 from dpgoetz/repl-logs
Browse files Browse the repository at this point in the history
replicator logging fix
  • Loading branch information
redbo authored Apr 11, 2017
2 parents 7a46172 + b069ed8 commit e4a8187
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 38 deletions.
77 changes: 42 additions & 35 deletions objectserver/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
)

var (
StatsReportInterval = 300 * time.Second
StatsReportInterval = 10 * time.Minute
TmpEmptyTime = 24 * time.Hour
ReplicateDeviceTimeout = 4 * time.Hour
// GetRing is a local pointer to the hummingbird function, for overriding in tests
Expand Down Expand Up @@ -137,6 +137,7 @@ type ReplicationDeviceStats struct {
LastCheckin time.Time
RunStarted time.Time
DeviceStarted time.Time
LastPassDate time.Time
LastPassDuration time.Duration
TotalPasses int64
}
Expand Down Expand Up @@ -748,28 +749,23 @@ func (r *Replicator) verifyRunningDevices() {
func (r *Replicator) reportStats() {
r.runningDevicesLock.Lock()
defer r.runningDevicesLock.Unlock()
var totalDuration time.Duration
var maxLastPass time.Time
var doneParts, totalParts int64
var processingTime float64
minLastPass := time.Now()
allHaveCompleted := true
for _, rd := range r.runningDevices {
stats := rd.Stats()
if stats.TotalPasses <= 1 {
allHaveCompleted = false
}
if maxLastPass.Before(stats.RunStarted) {
maxLastPass = stats.RunStarted
if stats.LastPassDate.Before(minLastPass) {
minLastPass = stats.LastPassDate
}
totalDuration += stats.LastPassDuration
totalParts += stats.Stats["PartitionsTotal"]
doneParts += stats.Stats["PartitionsDone"]
processingTime += time.Since(stats.RunStarted).Seconds()
}
if processingTime > 0 {
partsPerSecond := float64(doneParts) / processingTime
remaining := time.Duration((1.0 / partsPerSecond) * float64(time.Second) *
float64(totalParts-doneParts) / float64(len(r.runningDevices)))
processingTimeSec := time.Since(stats.RunStarted).Seconds()
doneParts := stats.Stats["PartitionsDone"]
totalParts := stats.Stats["PartitionsTotal"]
partsPerSecond := float64(doneParts) / processingTimeSec

remaining := time.Duration(
int64(float64(totalParts-doneParts)/partsPerSecond)) * time.Second
var remainingStr string
if remaining >= time.Hour {
remainingStr = fmt.Sprintf("%.0fh", remaining.Hours())
Expand All @@ -778,16 +774,23 @@ func (r *Replicator) reportStats() {
} else {
remainingStr = fmt.Sprintf("%.0fs", remaining.Seconds())
}
r.LogInfo("%d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)",
doneParts, totalParts, float64(100*doneParts)/float64(totalParts),
processingTime, partsPerSecond, remainingStr)
}

r.LogInfo("Device %s %d/%d (%.2f%%) partitions replicated in %.2f worker seconds (%.2f/sec, %v remaining)",
rd.Key(), doneParts, totalParts,
float64(100*doneParts)/float64(totalParts),
processingTimeSec, partsPerSecond, remainingStr)

}
if allHaveCompleted {
// this is a mess but object_replication_time (in old way) is # minutes
// passed since 1 complete pass of all devices started.
// replication_last is unix time stamp when last complete pass was finished
// now "last pass" means oldest device lastPass
maxLastPassComplete := time.Since(minLastPass).Minutes()
middleware.DumpReconCache(r.reconCachePath, "object",
map[string]interface{}{
"object_replication_time": float64(totalDuration) / float64(len(r.runningDevices)) / float64(time.Second),
"object_replication_last": float64(maxLastPass.UnixNano()) / float64(time.Second),
"object_replication_time": maxLastPassComplete,
"object_replication_last": float64(minLastPass.UnixNano()) / float64(time.Second),
})
}
}
Expand All @@ -809,12 +812,13 @@ func (r *Replicator) getDeviceProgress() map[string]map[string]interface{} {
for key, device := range r.runningDevices {
stats := device.Stats()
deviceProgress[key] = map[string]interface{}{
"StartDate": stats.DeviceStarted,
"LastUpdate": stats.LastCheckin,
"LastPassDuration": stats.LastPassDuration,
"LastPassUpdate": stats.RunStarted,
"TotalPasses": stats.TotalPasses,
"CancelCount": r.cancelCounts[key],
"StartDate": stats.DeviceStarted,
"LastUpdate": stats.LastCheckin,
"LastPassDuration": stats.LastPassDuration,
"LastPassFinishDate": stats.LastPassDate,
"LastPassUpdate": stats.RunStarted,
"TotalPasses": stats.TotalPasses,
"CancelCount": r.cancelCounts[key],
}
for k, v := range stats.Stats {
deviceProgress[key][k] = v
Expand All @@ -830,17 +834,20 @@ func (r *Replicator) runLoopCheck(reportTimer <-chan time.Time) {
defer r.runningDevicesLock.Unlock()
if rd, ok := r.runningDevices[update.deviceKey]; ok {
stats := rd.Stats()
if update.stat == "checkin" {
stats.LastCheckin = time.Now()
} else if update.stat == "startRun" {
stats.TotalPasses++
stats.LastPassDuration = time.Since(stats.RunStarted)
stats.LastCheckin = time.Now()
switch update.stat {
case "checkin":
case "startRun":
stats.RunStarted = time.Now()
stats.LastCheckin = time.Now()
for k := range stats.Stats {
stats.Stats[k] = 0
}
} else {
case "FullReplicateCount":
stats.LastPassDuration = time.Since(stats.RunStarted)
stats.LastPassDate = time.Now()
stats.TotalPasses++
stats.Stats["FullReplicateCount"] += update.value
default:
stats.Stats[update.stat] += update.value
}
}
Expand Down
13 changes: 10 additions & 3 deletions objectserver/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (d *mockReplicationDevice) ReplicateLoop() {
}
}
func (d *mockReplicationDevice) Key() string {
if d._ReplicateLoop != nil {
if d._Key != nil {
return d._Key()
}
return ""
Expand Down Expand Up @@ -1044,6 +1044,7 @@ func TestReportStats(t *testing.T) {
},
}
},
_Key: func() string { return "1.1:10/sda" },
},
"sdb": &mockReplicationDevice{
_Stats: func() *ReplicationDeviceStats {
Expand All @@ -1052,17 +1053,23 @@ func TestReportStats(t *testing.T) {
RunStarted: time.Now().Add(-time.Hour),
Stats: map[string]int64{
"PartitionsTotal": 100,
"PartitionsDone": 50,
"PartitionsDone": 40,
},
}
},
_Key: func() string { return "1.1:10/sdb" },
},
}
logger := &replicationLogSaver{}
replicator.logger = logger
replicator.reportStats()
require.True(t, strings.Contains(logger.logged[0], "550/1100 (50.00%)"))
fmt.Println(logger.logged[0])
fmt.Println(logger.logged[1])
require.True(t, strings.Contains(logger.logged[0], "Device 1.1:10/sda 500/1000 (50.00%)"))
require.True(t, strings.Contains(logger.logged[0], " 1h remaining"))

require.True(t, strings.Contains(logger.logged[1], "Device 1.1:10/sdb 40/100 (40.00%)"))
require.True(t, strings.Contains(logger.logged[1], " 2h remaining"))
}

func TestPriorityReplicate(t *testing.T) {
Expand Down

0 comments on commit e4a8187

Please sign in to comment.