Skip to content

Commit

Permalink
refactor(tunasync):
Browse files Browse the repository at this point in the history
1. refactored manager and worker to support TLS transport
2. if mirror_dir is specified from a mirror config, don't add the mirror name
  • Loading branch information
bigeagle committed Apr 30, 2016
1 parent 9865f28 commit 9fbb8ab
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 27 deletions.
4 changes: 3 additions & 1 deletion manager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"fmt"
"net/http"
"time"

"github.com/gin-gonic/gin"

Expand Down Expand Up @@ -153,6 +154,7 @@ func (s *Manager) listWorkers(c *gin.Context) {
func (s *Manager) registerWorker(c *gin.Context) {
var _worker WorkerStatus
c.BindJSON(&_worker)
_worker.LastOnline = time.Now()
newWorker, err := s.adapter.CreateWorker(_worker)
if err != nil {
err := fmt.Errorf("failed to register worker: %s",
Expand Down Expand Up @@ -230,7 +232,7 @@ func (s *Manager) handleClientCmd(c *gin.Context) {
}

// post command to worker
_, err = postJSON(workerURL, workerCmd)
_, err = PostJSON(workerURL, workerCmd, s.tlsConfig)
if err != nil {
err := fmt.Errorf("post command to worker %s(%s) fail: %s", workerID, workerURL, err.Error())
c.Error(err)
Expand Down
14 changes: 7 additions & 7 deletions manager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestHTTPServer(t *testing.T) {
w := WorkerStatus{
ID: "test_worker1",
}
resp, err := postJSON(baseURL+"/workers", w)
resp, err := PostJSON(baseURL+"/workers", w, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)

Expand All @@ -90,7 +90,7 @@ func TestHTTPServer(t *testing.T) {
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "3GB",
}
resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status)
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s", baseURL, status.Worker, status.Name), status, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
Expand Down Expand Up @@ -136,8 +136,8 @@ func TestHTTPServer(t *testing.T) {
Upstream: "mirrors.tuna.tsinghua.edu.cn",
Size: "4GB",
}
resp, err := postJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
baseURL, status.Worker, status.Name), status)
resp, err := PostJSON(fmt.Sprintf("%s/workers/%s/jobs/%s",
baseURL, status.Worker, status.Name), status, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
defer resp.Body.Close()
Expand All @@ -156,7 +156,7 @@ func TestHTTPServer(t *testing.T) {
ID: "test_worker_cmd",
URL: workerBaseURL + "/cmd",
}
resp, err := postJSON(baseURL+"/workers", w)
resp, err := PostJSON(baseURL+"/workers", w, nil)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)

Expand All @@ -177,7 +177,7 @@ func TestHTTPServer(t *testing.T) {
MirrorID: "ubuntu-sync",
WorkerID: "not_exist_worker",
}
resp, err := postJSON(baseURL+"/cmd", clientCmd)
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close()
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusBadRequest)
Expand All @@ -190,7 +190,7 @@ func TestHTTPServer(t *testing.T) {
WorkerID: w.ID,
}

resp, err := postJSON(baseURL+"/cmd", clientCmd)
resp, err := PostJSON(baseURL+"/cmd", clientCmd, nil)
defer resp.Body.Close()

So(err, ShouldBeNil)
Expand Down
13 changes: 0 additions & 13 deletions manager/util.go

This file was deleted.

8 changes: 8 additions & 0 deletions worker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ func newCmdJob(cmdAndArgs []string, workingDir string, env map[string]string) *c
panic("Command length should be at least 1!")
}

logger.Debug("Executing command %s at %s", cmdAndArgs[0], workingDir)
if _, err := os.Stat(workingDir); os.IsNotExist(err) {
logger.Debug("Making dir %s", workingDir)
if err = os.MkdirAll(workingDir, 0755); err != nil {
logger.Error("Error making dir %s", workingDir)
}
}

cmd.Dir = workingDir
cmd.Env = newEnviron(env, true)

Expand Down
26 changes: 20 additions & 6 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ func GetTUNASyncWorker(cfg *Config) *Worker {
schedule: newScheduleQueue(),
mirrorStatus: make(map[string]SyncStatus),
}

if cfg.Manager.CACert != "" {
tlsConfig, err := GetTLSConfig(cfg.Manager.CACert)
if err != nil {
logger.Error("Failed to init TLS config: %s", err.Error())
return nil
}
w.tlsConfig = tlsConfig
}

w.initJobs()
w.makeHTTPServer()
tunasyncWorker = w
Expand All @@ -75,7 +85,9 @@ func (w *Worker) initProviders() {
logDir = c.Global.LogDir
}
if mirrorDir == "" {
mirrorDir = c.Global.MirrorDir
mirrorDir = filepath.Join(
c.Global.MirrorDir, mirror.Name,
)
}
logDir = formatLogDir(logDir, mirror)

Expand All @@ -87,7 +99,7 @@ func (w *Worker) initProviders() {
name: mirror.Name,
upstreamURL: mirror.Upstream,
command: mirror.Command,
workingDir: filepath.Join(mirrorDir, mirror.Name),
workingDir: mirrorDir,
logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"),
interval: time.Duration(mirror.Interval) * time.Minute,
Expand All @@ -102,9 +114,10 @@ func (w *Worker) initProviders() {
rc := rsyncConfig{
name: mirror.Name,
upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command,
password: mirror.Password,
excludeFile: mirror.ExcludeFile,
workingDir: filepath.Join(mirrorDir, mirror.Name),
workingDir: mirrorDir,
logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6,
Expand All @@ -120,9 +133,10 @@ func (w *Worker) initProviders() {
name: mirror.Name,
stage1Profile: mirror.Stage1Profile,
upstreamURL: mirror.Upstream,
rsyncCmd: mirror.Command,
password: mirror.Password,
excludeFile: mirror.ExcludeFile,
workingDir: filepath.Join(mirrorDir, mirror.Name),
workingDir: mirrorDir,
logDir: logDir,
logFile: filepath.Join(logDir, "latest.log"),
useIPv6: mirror.UseIPv6,
Expand Down Expand Up @@ -303,7 +317,7 @@ func (w *Worker) registorWorker() {

func (w *Worker) updateStatus(jobMsg jobMessage) {
url := fmt.Sprintf(
"%s/%s/jobs/%s",
"%s/workers/%s/jobs/%s",
w.cfg.Manager.APIBase,
w.Name(),
jobMsg.name,
Expand All @@ -329,7 +343,7 @@ func (w *Worker) fetchJobStatus() []MirrorStatus {
var mirrorList []MirrorStatus

url := fmt.Sprintf(
"%s/%s/jobs",
"%s/workers/%s/jobs",
w.cfg.Manager.APIBase,
w.Name(),
)
Expand Down

0 comments on commit 9fbb8ab

Please sign in to comment.