Skip to content

Commit

Permalink
initial version
Browse files Browse the repository at this point in the history
xfong committed Sep 22, 2021
1 parent 692a632 commit 1a62151
Showing 551 changed files with 134,480 additions and 1 deletion.
36 changes: 36 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

# Use the default go compiler
GO_BUILDFLAGS=-compiler gc
# Or uncomment the line below to use the gccgo compiler, which may
# or may not be faster than gc and which may or may not compile...
# GO_BUILDFLAGS=-compiler gccgo -gccgoflags '-static-libgcc -O4 -Ofast -march=native'

CGO_CFLAGS_ALLOW='(-fno-schedule-insns|-malign-double|-ffast-math)'


.PHONY: all clkernels clean realclean hooks go.mod

all: clkernels
go install -v $(GO_BUILDFLAGS) github.com/seeder-research/uMagNUS/cmd/...

go.mod:
go mod init

clkernels:
cd ./opencl && $(MAKE)

hooks: .git/hooks/post-commit .git/hooks/pre-commit

.git/hooks/post-commit: post-commit
ln -sf $(CURDIR)/$< $@

.git/hooks/pre-commit: pre-commit
ln -sf $(CURDIR)/$< $@

clean:
rm -frv $(GOPATH)/pkg/*/github.com/seeder-research/uMagNUS/*
rm -frv $(GOPATH)/bin/mumax3* $(GOPATH)/bin/uMagNUS*
cd ./opencl && $(MAKE) clean

realclean: clean
cd ./opencl && ${MAKE} realclean
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,55 @@
# uMagNUS
uMagNUS
======

GPU accelerated micromagnetic simulator based on OpenCL.


Downloads and documentation
---------------------------

The frontend is based on MuMax3 and accepts simulation files written for MuMax3.

Refer to MuMax3 documentation at:
http://mumax.github.io


Paper
-----

- To be updated.


Building from source (for linux)
--------------------

Consider downloading a pre-compiled binary. If you want to compile nevertheless:

* install the OpenCL driver, if not yet present.
- if unsure, it's probably already there
- requires OpenCL 1.2 support
* install Go
- https://golang.org/dl/
- set $GOPATH
* if you have git installed:
- `go install -v github.com/seeder-research/uMagNUS/cmd/`
* if you don't have git:
- seriously, no git?
- get the source from https://github.com/seeder-research/uMagNUS/releases
- unzip the source into $GOPATH/src/github.com/seeder-research/uMagNUS
- `cd $GOPATH/src/github.com/seeder-research/uMagNUS/cmd/uMagNUS`
- `go install`
* optional: install gnuplot if you want pretty graphs
- Ubuntu: `sudo apt-get install gnuplot`

Your binary is now at `$GOPATH/bin/umagnus`

To do all at once on Ubuntu:
```
sudo apt-get install git golang-go gcc gnuplot
export GOPATH=$HOME go install -u -v github.com/seeder-research/uMagNUS/cmd/uMagNUS
```

Contributing
------------

Contributions are gratefully accepted. To contribute code, fork the repo on github and send a pull request.
2 changes: 2 additions & 0 deletions cmd/mumax3cl-convert/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
main
mumax3-convert
2 changes: 2 additions & 0 deletions cmd/mumax3cl-convert/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install -v
26 changes: 26 additions & 0 deletions cmd/mumax3cl-convert/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package main

import (
"fmt"
"io"

"github.com/seeder-research/uMagNUS/data"
)

// comma-separated values
func dumpCSV(f *data.Slice, info data.Meta, out io.Writer) {
f2 := ", " + *flag_format
a := f.Tensors()
for _, a := range a {
for _, a := range a {
for _, a := range a {
fmt.Fprintf(out, *flag_format, a[0])
for i := 1; i < len(a); i++ {
fmt.Fprintf(out, f2, a[i])
}
fmt.Fprintln(out)
}
fmt.Fprintln(out)
}
}
}
43 changes: 43 additions & 0 deletions cmd/mumax3cl-convert/gnuplot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

// Output for gnuplot's "splot"

import (
"bufio"
"fmt"
"io"

"github.com/seeder-research/uMagNUS/data"
)

const DELIM = "\t"

func dumpGnuplot(f *data.Slice, m data.Meta, out io.Writer) {
buf := bufio.NewWriter(out)
defer buf.Flush()

data := f.Tensors()
cellsize := m.CellSize
// If no cell size is set, use generic cell index.
if cellsize == [3]float64{0, 0, 0} {
cellsize = [3]float64{1, 1, 1}
}
ncomp := f.NComp()

for iz := range data[0] {
z := float64(iz) * cellsize[Z]
for iy := range data[0][iz] {
y := float64(iy) * cellsize[Y]
for ix := range data[0][iz][iy] {
x := float64(ix) * cellsize[X]
fmt.Fprint(buf, x, DELIM, y, DELIM, z, DELIM)
for c := 0; c < ncomp-1; c++ {
fmt.Fprint(buf, data[c][iz][iy][ix], DELIM)
}
fmt.Fprint(buf, data[ncomp-1][iz][iy][ix])
fmt.Fprint(buf, "\n")
}
fmt.Fprint(buf, "\n")
}
}
}
13 changes: 13 additions & 0 deletions cmd/mumax3cl-convert/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package main

import (
"encoding/json"
"io"

"github.com/seeder-research/uMagNUS/data"
)

func dumpJSON(f *data.Slice, info data.Meta, out io.Writer) {
w := json.NewEncoder(out)
w.Encode(f.Tensors())
}
453 changes: 453 additions & 0 deletions cmd/mumax3cl-convert/main.go

Large diffs are not rendered by default.

61 changes: 61 additions & 0 deletions cmd/mumax3cl-convert/normalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"math"

"github.com/seeder-research/uMagNUS/data"
)

// normalize vector data to given length
func normalize(f *data.Slice, length float64) {
a := f.Vectors()
for i := range a[0] {
for j := range a[0][i] {
for k := range a[0][i][j] {
x, y, z := a[0][i][j][k], a[1][i][j][k], a[2][i][j][k]
norm := math.Sqrt(float64(x*x + y*y + z*z))
invnorm := float32(1)
if norm != 0 {
invnorm = float32(length / norm)
}
a[0][i][j][k] *= invnorm
a[1][i][j][k] *= invnorm
a[2][i][j][k] *= invnorm

}
}
}
}

func normpeak(f *data.Slice) {
a := f.Vectors()
maxnorm := 0.
for i := range a[0] {
for j := range a[0][i] {
for k := range a[0][i][j] {

x, y, z := a[0][i][j][k], a[1][i][j][k], a[2][i][j][k]
norm := math.Sqrt(float64(x*x + y*y + z*z))
if norm > maxnorm {
maxnorm = norm
}

}
}
}
scale(f, float32(1/maxnorm))
}

func scale(f *data.Slice, factor float32) {
a := f.Vectors()
for i := range a[0] {
for j := range a[0][i] {
for k := range a[0][i][j] {
a[0][i][j][k] *= factor
a[1][i][j][k] *= factor
a[2][i][j][k] *= factor

}
}
}
}
35 changes: 35 additions & 0 deletions cmd/mumax3cl-convert/numpy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"encoding/binary"
"fmt"
"github.com/seeder-research/uMagNUS/data"
"io"
)

func dumpNUMPY(f *data.Slice, info data.Meta, out io.Writer) {

// see npy format: https://www.numpy.org/devdocs/reference/generated/numpy.lib.format.html

// write the first 10 bytes of the 128 byte header
fmt.Fprintf(out, "\x93NUMPY") // magic string
fmt.Fprintf(out, "\x01\x00") // npy format version
binary.Write(out, binary.LittleEndian, uint16(118)) // length of the actual header data (128-10)

// write the actual header data (118 bytes)
shapestr := fmt.Sprintf("(%d,%d,%d,%d)", f.NComp(), f.Size()[2], f.Size()[1], f.Size()[0])
headerData := fmt.Sprintf("{'descr': '<f4', 'fortran_order': False, 'shape': %s, }", shapestr)
fmt.Fprintf(out, "%-117v\n", headerData) // pad with empty spaces and a newline

// write the data
a := f.Tensors()
for _, a := range a {
for _, a := range a {
for _, a := range a {
for i := 0; i < len(a); i++ {
binary.Write(out, binary.LittleEndian, a[i])
}
}
}
}
}
29 changes: 29 additions & 0 deletions cmd/mumax3cl-convert/resize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"log"
"strconv"
"strings"

"github.com/seeder-research/uMagNUS/data"
"github.com/seeder-research/uMagNUS/util"
)

func resize(f *data.Slice, arg string) {
s := parseSize(arg)
resized := data.Resample(f, s)
*f = *resized
}

func parseSize(arg string) (size [3]int) {
words := strings.Split(arg, "x")
if len(words) != 3 {
log.Fatal("resize: need N0xN1xN2 argument")
}
for i, w := range words {
v, err := strconv.Atoi(w)
util.FatalErr(err)
size[i] = v
}
return
}
37 changes: 37 additions & 0 deletions cmd/mumax3cl-convert/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"runtime"
"sync"
)

var tasks chan func()
var taskWG sync.WaitGroup

const TaskCap = 100

func Queue(f func()) {
if tasks == nil {
tasks = make(chan func(), TaskCap)
startWorkers()
}

taskWG.Add(1)
tasks <- func() { defer taskWG.Add(-1); f() }
}

func Wait() {
taskWG.Wait()
}

func startWorkers() {
runtime.GOMAXPROCS(runtime.NumCPU())
nCPU := runtime.GOMAXPROCS(-1)
for i := 0; i < nCPU+1; i++ {
go func() {
for f := range tasks {
f()
}
}()
}
}
169 changes: 169 additions & 0 deletions cmd/mumax3cl-convert/vtk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package main

// Support for vtk 4.2 file output
// Author: Rémy Lassalle-Balier
// Modified by Arne Vansteenkiste, 2012, 2013.
// Modified by Mykola Dvornik, 2013

import (
"bytes"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"log"

"github.com/seeder-research/uMagNUS/data"
)

func dumpVTK(out io.Writer, q *data.Slice, meta data.Meta, dataformat string) (err error) {
err = writeVTKHeader(out, q)
err = writeVTKCellData(out, q, meta, dataformat)
err = writeVTKPoints(out, q, dataformat, meta)
err = writeVTKFooter(out)
return
}

func writeVTKHeader(out io.Writer, q *data.Slice) (err error) {
gridsize := q.Size()
_, err = fmt.Fprintln(out, "<?xml version=\"1.0\"?>")
_, err = fmt.Fprintln(out, "<VTKFile type=\"StructuredGrid\" version=\"0.1\" byte_order=\"LittleEndian\">")
_, err = fmt.Fprintf(out, "\t<StructuredGrid WholeExtent=\"0 %d 0 %d 0 %d\">\n", gridsize[0]-1, gridsize[1]-1, gridsize[2]-1)
_, err = fmt.Fprintf(out, "\t\t<Piece Extent=\"0 %d 0 %d 0 %d\">\n", gridsize[0]-1, gridsize[1]-1, gridsize[2]-1)
return
}

func writeVTKPoints(out io.Writer, q *data.Slice, dataformat string, info data.Meta) (err error) {
_, err = fmt.Fprintln(out, "\t\t\t<Points>")
fmt.Fprintf(out, "\t\t\t\t<DataArray type=\"Float32\" NumberOfComponents=\"3\" format=\"%s\">\n\t\t\t\t\t", dataformat)
gridsize := q.Size()
cellsize := info.CellSize
switch dataformat {
case "ascii":
for k := 0; k < gridsize[2]; k++ {
for j := 0; j < gridsize[1]; j++ {
for i := 0; i < gridsize[0]; i++ {
x := (float32)(i) * (float32)(cellsize[0])
y := (float32)(j) * (float32)(cellsize[1])
z := (float32)(k) * (float32)(cellsize[2])
_, err = fmt.Fprint(out, x, " ", y, " ", z, " ")
}
}
}
case "binary":
buffer := new(bytes.Buffer)
for k := 0; k < gridsize[2]; k++ {
for j := 0; j < gridsize[1]; j++ {
for i := 0; i < gridsize[0]; i++ {
x := (float32)(i) * (float32)(cellsize[0])
y := (float32)(j) * (float32)(cellsize[1])
z := (float32)(k) * (float32)(cellsize[2])
binary.Write(buffer, binary.LittleEndian, x)
binary.Write(buffer, binary.LittleEndian, y)
binary.Write(buffer, binary.LittleEndian, z)
}
}
}
b64len := uint32(len(buffer.Bytes()))
bufLen := new(bytes.Buffer)
binary.Write(bufLen, binary.LittleEndian, b64len)
base64out := base64.NewEncoder(base64.StdEncoding, out)
base64out.Write(bufLen.Bytes())
base64out.Write(buffer.Bytes())
base64out.Close()
default:
log.Fatalf("Illegal VTK data format: %v. Options are: ascii, binary", dataformat)
}
_, err = fmt.Fprintln(out, "\n\t\t\t\t</DataArray>")
_, err = fmt.Fprintln(out, "\t\t\t</Points>")
return
}

func writeVTKCellData(out io.Writer, q *data.Slice, meta data.Meta, dataformat string) (err error) {
N := q.NComp()
data := q.Tensors()
switch N {
case 1:
fmt.Fprintf(out, "\t\t\t<PointData Scalars=\"%s\">\n", meta.Name)
fmt.Fprintf(out, "\t\t\t\t<DataArray type=\"Float32\" Name=\"%s\" NumberOfComponents=\"%d\" format=\"%s\">\n\t\t\t\t\t", meta.Name, N, dataformat)
case 3:
fmt.Fprintf(out, "\t\t\t<PointData Vectors=\"%s\">\n", meta.Name)
fmt.Fprintf(out, "\t\t\t\t<DataArray type=\"Float32\" Name=\"%s\" NumberOfComponents=\"%d\" format=\"%s\">\n\t\t\t\t\t", meta.Name, N, dataformat)
case 6, 9:
fmt.Fprintf(out, "\t\t\t<PointData Tensors=\"%s\">\n", meta.Name)
fmt.Fprintf(out, "\t\t\t\t<DataArray type=\"Float32\" Name=\"%s\" NumberOfComponents=\"%d\" format=\"%s\">\n\t\t\t\t\t", meta.Name, 9, dataformat) // must be 9!
default:
log.Fatalf("vtk: cannot handle %v components", N)
}
gridsize := q.Size()
switch dataformat {
case "ascii":
for k := 0; k < gridsize[2]; k++ {
for j := 0; j < gridsize[1]; j++ {
for i := 0; i < gridsize[0]; i++ {
// if symmetric tensor manage it appart to write the full 9 components
if N == 6 {
fmt.Fprint(out, data[0][k][j][i], " ")
fmt.Fprint(out, data[1][k][j][i], " ")
fmt.Fprint(out, data[2][k][j][i], " ")
fmt.Fprint(out, data[1][k][j][i], " ")
fmt.Fprint(out, data[3][k][j][i], " ")
fmt.Fprint(out, data[4][k][j][i], " ")
fmt.Fprint(out, data[2][k][j][i], " ")
fmt.Fprint(out, data[4][k][j][i], " ")
fmt.Fprint(out, data[5][k][j][i], " ")
} else {
for c := 0; c < N; c++ {
fmt.Fprint(out, data[c][k][j][i], " ")
}
}
}
}
}
case "binary":
// Inlined for performance, terabytes of data will pass here...
buffer := new(bytes.Buffer)
for k := 0; k < gridsize[2]; k++ {
for j := 0; j < gridsize[1]; j++ {
for i := 0; i < gridsize[0]; i++ {
// if symmetric tensor manage it appart to write the full 9 components
if N == 6 {
binary.Write(buffer, binary.LittleEndian, data[0][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[1][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[2][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[1][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[3][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[4][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[2][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[4][k][j][i])
binary.Write(buffer, binary.LittleEndian, data[5][k][j][i])
} else {
for c := 0; c < N; c++ {
binary.Write(buffer, binary.LittleEndian, data[c][k][j][i])
}
}
}
}
}
b64len := uint32(len(buffer.Bytes()))
bufLen := new(bytes.Buffer)
binary.Write(bufLen, binary.LittleEndian, b64len)
base64out := base64.NewEncoder(base64.StdEncoding, out)
base64out.Write(bufLen.Bytes())
base64out.Write(buffer.Bytes())
base64out.Close()
default:
panic(fmt.Errorf("vtk: illegal data format " + dataformat + ". Options are: ascii, binary"))
}

fmt.Fprintln(out, "\n\t\t\t\t</DataArray>")
fmt.Fprintln(out, "\t\t\t</PointData>")
return
}

func writeVTKFooter(out io.Writer) (err error) {
_, err = fmt.Fprintln(out, "\t\t</Piece>")
_, err = fmt.Fprintln(out, "\t</StructuredGrid>")
_, err = fmt.Fprintln(out, "</VTKFile>")
return
}
2 changes: 2 additions & 0 deletions cmd/mumax3cl-httpfsd/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install
44 changes: 44 additions & 0 deletions cmd/mumax3cl-httpfsd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
httpfs server, useful for debugging mumax3-server.
Usage
Start mumax3-httpfsd in a certain working directory.
$ ls
file.mx3
$ mumax3-server -l :35362
Then you can remotely run mumax3 input files:
$ cd elsewhere
$ mumax3 http://localhost:35362/file.mx3
*/
package main

import (
"flag"
"log"
"net/http"
_ "net/http/pprof"

"github.com/seeder-research/uMagNUS/httpfs"
)

var (
flag_addr = flag.String("l", ":35360", "Listen and serve at this network address")
flag_log = flag.Bool("log", false, "log debug output")
)

func main() {
flag.Parse()
log.Println("serving at", *flag_addr)
httpfs.Logging = *flag_log
httpfs.RegisterHandlers()
err := http.ListenAndServe(*flag_addr, nil)
if err != nil {
log.Fatal(err)
}
}
1 change: 1 addition & 0 deletions cmd/mumax3cl-plot/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mumax3-plot
2 changes: 2 additions & 0 deletions cmd/mumax3cl-plot/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install -v
115 changes: 115 additions & 0 deletions cmd/mumax3cl-plot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
The mumax3-plot utility uses gnuplot to automatically plot mumax3 data tables.
mumax3-plot table.txt
Creates graphs of all columns as .svg files.
*/
package main

import (
"bufio"
"flag"
"fmt"
"log"
"os"
"os/exec"
"path"
"strings"
)

func main() {
log.SetFlags(0)
flag.Parse()

for _, f := range flag.Args() {
plotFile(f)
}
}

func plotFile(fname string) {

hdr := readHeader(fname)

// quantities grouped by vector
Qs := []*Q{&Q{[]string{"t"}, "s", []int{1}}}
prev := Qs[0]

quants := strings.Split(hdr, "\t")
for i := 1; i < len(quants); i++ {
spl := strings.Split(quants[i], " ")
name := spl[0]
unit := spl[1]
if unit == "()" {
unit = ""
}

if name[:len(name)-1] == prev.name[0][:len(prev.name[0])-1] {
prev.cols = append(prev.cols, i+1)
prev.name = append(prev.name, name)
} else {
n := &Q{[]string{name}, unit, []int{i + 1}}
Qs = append(Qs, n)
prev = n
}
}
log.Println(Qs)

for i := 1; i < len(Qs); i++ {
makePlot(fname, Qs[i])
}
}

func makePlot(fname string, q *Q) {
term := "svg"
outf := path.Dir(fname) + "/" + q.vecname()
cmd := fmt.Sprintf(`set term %v noenhanced size 400 300 font 'Arial,10'; set output "%v.%v";`, term, outf, term)
cmd += fmt.Sprintf(`set xlabel "t(ns)";`)

cmd += fmt.Sprintf(`set ylabel "%v %v";`, q.vecname(), q.unit)
cmd += fmt.Sprint(`set format y "%g";`)
cmd += fmt.Sprint(`plot "`, fname, `" u ($1*1e9):`, q.cols[0], ` w li title "`, q.name[0], `"`)
for i := 1; i < len(q.cols); i++ {
cmd += fmt.Sprint(`, "`, fname, `" u ($1*1e9):`, q.cols[i], ` w li title "`, q.name[i], `"`)
}
cmd += "; set output;"

out, err := exec.Command("gnuplot", "-e", cmd).CombinedOutput()
os.Stderr.Write(out)
check(err)
}

type Q struct {
name []string
unit string
cols []int
}

func (q *Q) String() string { return fmt.Sprint(q.name, "(", q.unit, ")", q.cols) }

func (q *Q) vecname() string {
if len(q.cols) > 1 {
return q.name[0][:len(q.name[0])-1]
} else {
return q.name[0]
}
}

func readHeader(fname string) string {
f, err := os.Open(fname)
check(err)
defer f.Close()
in := bufio.NewReader(f)
hdrBytes, _, err2 := in.ReadLine()
check(err2)
hdr := string(hdrBytes)
if hdr[0] != '#' {
log.Fatal("invalid table header:", hdr)
}
hdr = hdr[2:]
return hdr
}

func check(err error) {
if err != nil {
log.Fatal(err)
}
}
1 change: 1 addition & 0 deletions cmd/mumax3cl-script/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mumax3-script
2 changes: 2 additions & 0 deletions cmd/mumax3cl-script/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install -v
83 changes: 83 additions & 0 deletions cmd/mumax3cl-script/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Toy interpreter executes scripts or stdin.
*/
package main

import (
"bufio"
"flag"
"fmt"
"github.com/seeder-research/uMagNUS/script"
"io"
"log"
"os"
)

var debug = flag.Bool("g", false, "print debug output")

var (
world *script.World
ps1 string
)

func main() {
log.SetFlags(0)
flag.Parse()
world = script.NewWorld()
world.Func("exit", exit)
script.Debug = *debug

if flag.NArg() > 1 {
check(fmt.Errorf("need 0 or 1 input files"))
}

if flag.NArg() == 1 {
src, err := os.Open(flag.Arg(0))
check(err)
ps1 = ">"
interpret(src)
} else {
ps1 = ""
interpret(os.Stdin)
}
}

func interpret(in io.Reader) {
scanner := bufio.NewScanner(in)
for scanner.Scan() {
safecall(scanner.Text())
}
check(scanner.Err())
}

func safecall(code string) {
if code == "" {
return
}
defer func() {
err := recover()
if err != nil {
fmt.Fprintln(os.Stderr, "panic:", err)
}
}()
tree, err := world.Compile(code)
if err == nil {
for _, stmt := range tree.Child() {
fmt.Println(stmt.Eval())
}
} else {
fmt.Fprintln(os.Stderr, err)
}

}

func check(e error) {
if e != nil {
fmt.Fprintln(os.Stderr, e)
os.Exit(1)
}
}

func exit() {
os.Exit(0)
}
2 changes: 2 additions & 0 deletions cmd/mumax3cl-script/mumax3-int
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#! /bin/bash
rlwrap -m -S '> ' mumax3-script
2 changes: 2 additions & 0 deletions cmd/uMagNUS-server/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install
292 changes: 292 additions & 0 deletions cmd/uMagNUS-server/compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
package main

/*
Compute service runs jobs on this node's GPUs, if any.
*/

import (
"fmt"
"io"
"log"
"os/exec"
"strings"
"time"

"github.com/seeder-research/uMagNUS/httpfs"
"github.com/seeder-research/uMagNUS/util"
)

var (
UmagnusVersion string
GPUs []string
Processes = make(map[string]*Process) // job id -> process
)

// Process is a running simulation process
type Process struct {
*exec.Cmd
Start time.Time
Out io.WriteCloser
ID string
OutputURL string
GUI string
Killed bool
}

func (p *Process) Host() string {
return JobHost(p.OutputURL)
}

// Runs a compute service on this node, if GPUs are available.
// The compute service asks storage nodes for a job, runs it,
// saves results over httpfs and notifies storage when ready.
func RunComputeService() {

if len(GPUs) == 0 {
return
}

// queue of available GPU numbers
idle := make(chan int, len(GPUs))
for i := range GPUs {
idle <- i
}

for {
gpu := <-idle // take an available GPU
GUIAddr := fmt.Sprint(thisHost+":", GUI_PORT+gpu)
ID := WaitForJob() // take an available job
go func() {

defer func() {
// remove from "running" list
WLock()
delete(Processes, ID)
WUnlock()
// add GPU number back to idle stack
idle <- gpu
}()

p := NewProcess(ID, gpu, GUIAddr)
if p == nil {
return
}

WLock()
Processes[ID] = p
WUnlock()

p.Run()

_, err := RPCCall(JobHost(ID), "UpdateJob", ID)
if err != nil {
log.Println(err)
}

}()
}
}

func WaitForJob() string {
ID := FindJob()
for ID == "" {
time.Sleep(2 * time.Second) // TODO: don't poll
ID = FindJob()
}
return ID
}

func FindJob() string {

// quickly list peers first
RLock()
p := make([]string, 0, len(peers))
for addr, _ := range peers {
p = append(p, addr)
}
RUnlock()
// TODO: pick peers fairly

// then do slow RPC calls without blocking the rest of the program
for _, addr := range p {
ID, _ := RPCCall(addr, "GiveJob", thisAddr)
if ID != "" {
return ID
}
}
return ""
}

// RPC-callable function kills job corresponding to given job id.
// The job has to be running on this node.
func Kill(id string) string {
log.Println("KILL", id)

WLock() // modifies Cmd state
defer WUnlock()

job := Processes[id]
if job == nil {
return fmt.Sprintf("kill %v: job not running.", id)
}
job.Killed = true
err := job.Cmd.Process.Kill()
if err != nil {
return err.Error()
}
return "" // OK
}

// prepare exec.Cmd to run uMagNUS3 compute process
func NewProcess(ID string, gpu int, webAddr string) *Process {
// prepare command
inputURL := "http://" + ID
command := *flag_umagnus
gpuFlag := fmt.Sprint(`-gpu=`, gpu)
httpFlag := fmt.Sprint(`-http=`, webAddr)
cacheFlag := fmt.Sprint(`-cache=`, *flag_cachedir)
forceFlag := `-f=0`
cmd := exec.Command(command, gpuFlag, httpFlag, cacheFlag, forceFlag, inputURL)

// Pipe stdout, stderr to log file over httpfs
outDir := util.NoExt(inputURL) + ".out"
errMkdir := httpfs.Mkdir(outDir)
if errMkdir != nil {
SetJobError(ID, errMkdir)
log.Println("makeProcess", errMkdir)
j := JobByName(ID)
if j != nil {
j.Reque()
}
return nil
}

out, errD := httpfs.Create(outDir + "/stdout.txt")
if errD != nil {
SetJobError(ID, errD)
log.Println("makeProcess", errD)
j := JobByName(ID)
if j != nil {
j.Reque()
}
return nil
}
cmd.Stderr = out
cmd.Stdout = out

return &Process{ID: ID, Cmd: cmd, Start: time.Now(), Out: out, OutputURL: OutputDir(inputURL), GUI: webAddr}
}

func (p *Process) Run() {

log.Println("=> exec ", p.Path, p.Args)

defer p.Out.Close()

httpfs.Put(p.OutputURL+"host", []byte(thisAddr))

startTime := AskTime(p.Host())
httpfs.Put(p.OutputURL+"start", []byte(startTime.Format(time.UnixDate)))

WLock() // Cmd.Start() modifies state
err1 := p.Cmd.Start() // err?
WUnlock()
if err1 != nil {
SetJobError(p.ID, err1)
}

timeOffset := time.Now().Sub(startTime) // our clock is most likely out-of-sync with host
tick := time.NewTicker(KeepaliveInterval)

// need initial alive in case watchdog sniffs between start and first alive tick
httpfs.Put(p.OutputURL+"alive", []byte(time.Now().Add(timeOffset).Format(time.UnixDate)))
go func() {
for t := range tick.C {
httpfs.Put(p.OutputURL+"alive", []byte(t.Add(timeOffset).Format(time.UnixDate)))
}
}()

err2 := p.Cmd.Wait()
if err1 == nil && err2 != nil {
SetJobError(p.ID, err2)
}
tick.Stop()

status := -1

// TODO: determine proper status number
if err1 != nil || err2 != nil {
log.Println(p.Path, p.Args, err1, err2)
status = 1
} else {
status = 0
}

if p.Killed {
httpfs.Put(p.OutputURL+"killed", []byte(time.Now().Format(time.UnixDate)))
} else {
httpfs.Put(p.OutputURL+"exitstatus", []byte(fmt.Sprint(status)))
}

stopTime := AskTime(p.Host())
nanos := stopTime.Sub(startTime).Nanoseconds()
httpfs.Put(p.OutputURL+"duration", []byte(fmt.Sprint(nanos)))

if status == 0 {
ret, err := RPCCall(p.Host(), "AddFairShare", JobUser(p.ID)+"/"+fmt.Sprint(nanos/1e9))
if err != nil || ret != "" {
log.Println("***ERR: AddFairShare", JobUser(p.ID), ret, err)
}
}

return
}

func (p *Process) Duration() time.Duration { return Since(time.Now(), p.Start) }

func DetectGPUs() {
if GPUs != nil {
panic("multiple DetectGPUs() calls")
}

for i := 0; i < MAXGPU; i++ {
gpuflag := fmt.Sprint("-gpu=", i)
out, err := exec.Command(*flag_umagnus, "-test", gpuflag).Output()
if err == nil {
info := string(out)
if strings.HasSuffix(info, "\n") {
info = info[:len(info)-1]
}
log.Println("gpu", i, ":", info)
GPUs = append(GPUs, info)
}
}
}

func DetectuMagNUS() {
out, err := exec.Command(*flag_umagnus, "-test", "-v").CombinedOutput()
info := string(out)
if err == nil {
split := strings.SplitN(info, "\n", 2)
version := split[0]
log.Println("have", version)
UmagnusVersion = version
} else {
UmagnusVersion = fmt.Sprint(*flag_umagnus, "-test", ": ", err, info)
}
}

// RPC-callable function, answers by this node's time
func WhatsTheTime(string) string {
return time.Now().Format(time.UnixDate)
}

func AskTime(host string) time.Time {
str, _ := RPCCall(host, "WhatsTheTime", "")
return parseTime(str)
}

func parseTime(str string) time.Time {
t, _ := time.Parse(time.UnixDate, str)
return t
}
121 changes: 121 additions & 0 deletions cmd/uMagNUS-server/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
Easy-to-use cluster management tool for uMagNUS, with auto-configuration and web interface. When nodes are connected behind a home router, uMagNUS-server can run without any configuration. Otherwise only the IP address range where the other nodes reside has to be specified.
Input files
Upon starting uMagNUS-server, it scans the current working directory for input files. These should be organised in directories corresponding to user names. E.g.:
john/file1.mx3
john/file2.mx3
...
kate/file1.mx3
kate/file2.mx3
...
Other files will be ignored. These input files will run on all available nodes in the network. After adding/removing files, you should click "rescan" in the web interface, or wait for a few minutes.
Web interface
uMagNUS-server serves a web interface at http://localhost:35360 (you have overridden the port, see below). Depending on your OS you may need to use your exact IP address instead of localhost, e.g.: http://192.168.0.1:35360.
The web interface shows you the queued jobs, running jobs, output files, etc., and allows to re-scan for new job files or kill running jobs
Compute nodes
Each node that runs uMagNUS-server and has a working uMagNUS installation will automatically serve as a compute node (even if it stores input files as well). The web interface will show the umagnus version and available GPUs. The -exec flag may be used to override which uMagNUS binary to use. E.g:
uMagNUS-server -exec /usr/local/uMagNUS/uMagNUS-cuda6.5 #override uMagNUS binary
Scan for other nodes
Upon starting uMagNUS-server, it will automatically scan for other nodes in the local network. These will automatically start running jobs (if they have a GPU and uMagNUS installed), or may serve job files to be executed by other nodes.
By default, we search for nodes with IP addresses in the range 192.168.0.1-128 (local network behind, e.g., a router). This can be changed by the -scan flag. E.g.:
uMagNUS-server -scan 127.0.0.1,169.254.0-1.1-254
uMagNUS-server -ports 35360-25369
Even when a new node appears on the network after the port scan, it should still be automatically detected. If not, hit "rescan" in the web interface. The -ports flag may be used to change the port numbers being scanned, in case the server uses a non-standard port (-l flag).
Override port number
uMagNUS-server uses tcp port 35360, which needs to be accessible (e.g., through your firewall). This port and the service's IP address, can be overridden with the -l flag:
uMagNUS-server -l :35361 #serves at non-standard port
uMagNUS-server -l 192.168.1.1:35360 #serves at specific IP address, e.g. for dual-link machines
Fault tolerance
uMagNUS-server does a great effort to recover from failed nodes, network outages, reboots etc. If a simulation is interrupted for any such reason, it should be re-queued and automatically re-started later. In that case the web interface will show [1x requeued] to indicate that the job has been interrupted, but it will run later nevertheless.
Command line flags
Usage of uMagNUS-server:
-cache="": uMagNUS kernel cache path
-exec="uMagNUS": uMagNUS executable
-halflife=24h0m0s: share decay half-life
-l=":35360": Listen and serve at this network address
-log=true: log debug output
-ports="35360-35361": Scan these ports for other servers
-scan="192.168.0.1-128": Scan these IP address for other servers
-timeout=2s: Portscan timeout
Web interface example
http://localhost:35360
157.193.57.146:35360
Uptime: 27h45m38s
Peer nodes
scan 157.193.57.2-254: 35360-35361
ports 35360-35361
(Rescan)
157.193.57.146:35360
157.193.57.228:35360
Compute service
uMagNUS: uMagNUS 1.0 linux_amd64 go1.3.3 (gc)
GPU0: CUDA 6 GeForce GTX 680(2047MB) cc3.0
GPU1: CUDA 6 GeForce GTX 680(2047MB) cc3.0
GPU2: CUDA 6 GeForce GTX 680(2047MB) cc3.0
Running jobs
[157.193.57.146:35360/john/b_ext_add.mx3] [3s] [GUI] [kill]
[157.193.57.146:35360/john/demag2D.mx3] [2s] [GUI] [kill]
[157.193.57.146:35360/john/demag2Dpbc.mx3] [1s] [GUI] [kill]
Queue service
Users
john 589 GPU-seconds has queued jobs
kate 0 GPU-seconds no queued jobs
Next job for: john
Jobs
[Reload all]
[Wake-up Watchdog]
john
[Reload]
[john/anisenergy.mx3] [.out] [157.193.57.146:35360] [ OK ] [1s]
[john/anisenergyconservation.mx3] [.out] [157.193.57.146:35360] [ OK ] [2s]
[john/anisenergyconservation2.mx3] [.out] [157.193.57.146:35360] [ OK ] [2s]
[john/anisenergyconservation3.mx3] [.out] [157.193.57.228:35360] [ OK ] [1s]
[john/anisenergyconservation4.mx3] [.out] [157.193.57.146:35360] [ OK ] [2s]
kate
[Reload]
*/
package main
266 changes: 266 additions & 0 deletions cmd/uMagNUS-server/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package main

import (
"log"
"os"
"strconv"
"time"

"github.com/seeder-research/uMagNUS/httpfs"
"github.com/seeder-research/uMagNUS/util"
)

const MaxRequeue = 10 // maximum number of re-queues, don't run job if re-queued to many times

// compute Job
type Job struct {
ID string // host/path of the input file, e.g., hostname:port/user/inputfile.mx3
// in-memory properties:
RequeCount int // how many times requeued.
Error interface{} // error that cannot be consolidated to disk
// all of this is cache:
Output string // if exists, points to output ID
Host string // node address in host file (=last host who started this job)
ExitStatus string // what's in the exitstatus file
Start time.Time // When this job was started, if applicable
Alive time.Time // Last time when this job was seen alive
duration time.Duration
}

// Find job belonging to ID
func JobByName(ID string) *Job {
user := Users[BaseDir(LocalPath(ID))]
if user == nil {
log.Println("JobByName: no user for", ID)
return nil
}
jobs := user.Jobs

low := 0
high := len(jobs) - 1
mid := -1

for low <= high {
mid = (low + high) / 2
switch {
case jobs[mid].ID > ID:
high = mid - 1
case jobs[mid].ID < ID:
low = mid + 1
default:
low = high + 1 // break for loop :-(
}
}

if mid >= 0 && mid < len(jobs) && jobs[mid].ID == ID {
return jobs[mid]
} else {
log.Println("JobByName: not found:", ID)
return nil
}
}

// read job files from storage and update status cache
func (j *Job) Update() {
out := j.LocalOutputDir()
if exists(out) {
j.Output = thisAddr + "/" + out
} else {
j.Output = ""
j.ExitStatus = ""
j.Start = time.Time{}
j.Alive = time.Time{}
j.duration = 0
}
if j.Output != "" {
j.Host = httpfsRead(out + "host")
j.ExitStatus = httpfsRead(out + "exitstatus")
j.Start = parseTime(httpfsRead(out + "start"))
j.Alive = parseTime(httpfsRead(out + "alive"))
j.duration = time.Duration(atoi(httpfsRead(out + "duration")))
}
}

// Put job back in queue for later, e.g., when killed.
func (j *Job) Reque() {
log.Println("requeue", j.ID)
j.RequeCount++
httpfs.Remove(j.LocalOutputDir())
j.Update()
}

func SetJobError(ID string, err interface{}) {
log.Println("SetJobErr", ID, err)
WLock()
defer WUnlock()
j := JobByName(ID)
if j == nil {
return
}
j.Error = err
}

// How long job has been running, if running.
func (j *Job) Duration() time.Duration {
if j.Start.IsZero() {
return 0
}
if j.duration != 0 {
return j.duration
}
if j.IsRunning() {
return Since(time.Now(), j.Start)
}
return 0 // unknown duration
}

// user name for this job ID
func (j *Job) User() string {
return JobUser(j.ID)
}

// user name for this job ID
func JobUser(ID string) string {
return BaseDir(LocalPath(ID))
}

// local path of input file
func (j *Job) LocalPath() string {
return LocalPath(j.ID)
}

// local path of input file, without host prefix. E.g.:
// host:123/user/file.mx3 -> user/file.mx3
func LocalPath(ID string) string {
host := JobHost(ID)
if len(host)+1 >= len(ID) {
log.Println("Invalid LocalPath call on", ID)
return ""
}
return ID[len(host)+1:]
}

// local path of output dir
func (j *Job) LocalOutputDir() string {
return OutputDir(j.LocalPath())
}

// output directory for input file
func OutputDir(path string) string {
return util.NoExt(path) + ".out/"
}

// insert "/fs" in front of url path
func (*Job) FS(id string) string {
return FS(id)
}

// insert "/fs" in front of url path
func FS(id string) string {
return BaseDir(id) + "/fs/" + LocalPath(id)
}

// is job queued?
func (j *Job) IsQueued() bool {
return j.Output == "" && j.RequeCount < MaxRequeue
}

// is job running?
func (j *Job) IsRunning() bool {
return j.Output != "" && j.ExitStatus == "" && j.Host != ""
}

// Host of job with this ID (=first path element). E.g.:
// host:123/user/file.mx3 -> host:123
func JobHost(ID string) string {
return BaseDir(ID)
}

// Job status number queued, running,...
type Status int

const (
QUEUED Status = iota
RUNNING
FINISHED
FAILED
)

var statusString = map[Status]string{
QUEUED: "QUEUED",
RUNNING: "RUNNING",
FINISHED: "FINISHED",
FAILED: "FAILED",
}

func (s Status) String() string {
return statusString[s]
}

// human-readable status string (for gui)
func (j *Job) Status() string {
if j.IsQueued() {
return QUEUED.String()
}
if j.ExitStatus == "0" {
return FINISHED.String()
}
if j.ExitStatus == "" && j.Host == "" {
return FINISHED.String()
}
if j.Host != "" && j.ExitStatus == "" {
return RUNNING.String()
}
if j.ExitStatus != "" && j.ExitStatus != "0" {
return FAILED.String()
}
return "UNKNOWN"
}

// remove job output
func Rm(URL string) string {
err := httpfs.Remove("http://" + OutputDir(URL))

// update status after output removal
UpdateJob(URL)

if err != nil {
return err.Error()
}

// report re-queue
// handy if others remove your jobs
job := JobByName(URL)
if job != nil {
job.RequeCount++
}

// make sure job runs again quickly
user := JobUser(URL)
u := Users[user]
if u != nil {
u.nextPtr = 0
}
return ""
}

// check if path exists
func exists(path string) bool {
_, err := os.Stat(path)
return err == nil
}

// atoi, does not return error
func atoi(a string) int64 {
i, _ := strconv.ParseInt(a, 10, 64)
return i
}

// return file content as string, no errors
func httpfsRead(fname string) string {
data, err := httpfs.Read(fname)
if err != nil {
return ""
}
return string(data)
}
221 changes: 221 additions & 0 deletions cmd/uMagNUS-server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package main

import (
"flag"
"fmt"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/seeder-research/uMagNUS/httpfs"
"github.com/seeder-research/uMagNUS/util"
)

var (
flag_addr = flag.String("l", ":35360", "Listen and serve at this network address")
flag_scan = flag.String("scan", "192.168.0.1-128", "Scan these IP address for other servers")
flag_ports = flag.String("ports", "35360-35361", "Scan these ports for other servers")
flag_timeout = flag.Duration("timeout", 2*time.Second, "Portscan timeout")
flag_umagnus = flag.String("exec", "uMagNUS", "uMagNUS executable")
flag_cachedir = flag.String("cache", "", "uMagNUS kernel cache path")
flag_log = flag.Bool("log", true, "log debug output")
flag_halflife = flag.Duration("halflife", 24*time.Hour, "share decay half-life")
)

const (
MaxIPs = 1024 // maximum number of IP address to portscan
N_SCANNERS = 32 // number of parallel portscan goroutines
MAXGPU = 16 // maximum number of GPU's to check for
KeepaliveInterval = 10 * time.Second // signal process liveness every KeepaliveInterval
)

var (
thisAddr string // unique address of this node, e.g., name:1234
thisHost string // unique hostname of this node, e.g., name
IPs []string
MinPort, MaxPort int
global_lock sync.RWMutex
)

func RLock() { global_lock.RLock() }
func RUnlock() { global_lock.RUnlock() }
func WLock() { global_lock.Lock() }
func WUnlock() { global_lock.Unlock() }

const GUI_PORT = 35367 // base port number for GUI (to be incremented by GPU number)

func main() {
flag.Parse()

IPs = parseIPs()
MinPort, MaxPort = parsePorts()

thisAddr = canonicalAddr(*flag_addr, IPs)
var err error
thisHost, _, err = net.SplitHostPort(thisAddr)
util.FatalErr(err)
DetectuMagNUS()
DetectGPUs()
LoadJobs()

http.HandleFunc("/do/", HandleRPC)
http.HandleFunc("/", HandleStatus)
httpfs.RegisterHandlers()

// Listen and serve on all interfaces
go func() {
log.Println("serving at", thisAddr)

// Resolve the IPs for thisHost
thisIP, err := net.LookupHost(thisHost)
Fatal(err)

// try to listen and serve on all interfaces other than thisAddr
// this is for convenience, errors are not fatal.
_, p, err := net.SplitHostPort(thisAddr)
Fatal(err)
ips := util.InterfaceAddrs()
for _, ip := range ips {
addr := net.JoinHostPort(ip, p)
if !contains(thisIP, ip) { // skip thisIP, will start later and is fatal on error
go func() {
log.Println("serving at", addr)
err := http.ListenAndServe(addr, nil)
if err != nil {
log.Println("info:", err, "(but still serving other interfaces)")
}
}()
}
}

// only on thisAddr, this server's unique address,
// we HAVE to be listening.
Fatal(http.ListenAndServe(thisAddr, nil))
}()

ProbePeer(thisAddr) // make sure we have ourself as peer
go FindPeers(IPs, MinPort, MaxPort)
go RunComputeService()
go LoopWatchdog()
go RunShareDecay()

// re-load jobs every hour so we don't stall on very exceptional circumstances
go func() {
for {
time.Sleep(1 * time.Hour)
LoadJobs()
}
}()

<-make(chan struct{}) // wait forever
}

// replace laddr by a canonical form, as it will serve as unique ID
func canonicalAddr(laddr string, IPs []string) string {
// safe initial guess: hostname:port
h, p, err := net.SplitHostPort(laddr)
Fatal(err)
if h == "" {
h, _ = os.Hostname()
}
name := net.JoinHostPort(h, p)

ips := util.InterfaceAddrs()
for _, ip := range ips {
if contains(IPs, ip) {
return net.JoinHostPort(ip, p)

}
}

return name
}

func contains(arr []string, x string) bool {
for _, s := range arr {
if x == s {
return true
}
}
return false
}

// Parse port range flag. E.g.:
// 1234-1237 -> 1234, 1237
func parsePorts() (minPort, maxPort int) {
p := *flag_ports
split := strings.Split(p, "-")
if len(split) > 2 {
log.Fatal("invalid port range:", p)
}
minPort, _ = strconv.Atoi(split[0])
if len(split) > 1 {
maxPort, _ = strconv.Atoi(split[1])
}
if maxPort == 0 {
maxPort = minPort
}
if minPort == 0 || maxPort == 0 || maxPort < minPort {
log.Fatal("invalid port range:", p)
}
return
}

// init IPs from flag
func parseIPs() []string {
var IPs []string
defer func() {
if err := recover(); err != nil {
log.Fatal("invalid IP range:", *flag_scan)
}
}()

p := *flag_scan
split := strings.Split(p, ",")
for _, s := range split {
split := strings.Split(s, ".")
if len(split) != 4 {
log.Fatal("invalid IP address range:", s)
}
var start, stop [4]uint
for i, s := range split {
split := strings.Split(s, "-")
first := atobyte(split[0])
start[i], stop[i] = first, first
if len(split) > 1 {
stop[i] = atobyte(split[1])
}
}

for A := start[0]; A <= stop[0]; A++ {
for B := start[1]; B <= stop[1]; B++ {
for C := start[2]; C <= stop[2]; C++ {
for D := start[3]; D <= stop[3]; D++ {
if len(IPs) > MaxIPs {
log.Fatal("too many IP addresses to scan in", p)
}
IPs = append(IPs, fmt.Sprintf("%v.%v.%v.%v", A, B, C, D))
}
}
}
}
}
return IPs
}

func atobyte(a string) uint {
i, err := strconv.Atoi(a)
if err != nil {
panic(err)
}
if int(byte(i)) != i {
panic("too large")
}
return uint(i)
}
77 changes: 77 additions & 0 deletions cmd/uMagNUS-server/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package main

// Peer management:
// portscan for peers
// ping peers

import (
"fmt"
"log"
)

var (
peers = make(map[string]*Peer)
)

type Peer struct {
}

func AddPeer(pAddr string) {
WLock()
defer WUnlock()

if _, ok := peers[pAddr]; !ok {
log.Println("add new peer:", pAddr)
peers[pAddr] = NewPeer()
}
}

func NewPeer() *Peer {
return &Peer{}
}

// RPC-called
func Ping(peerAddr string) string {
WLock()
defer WUnlock()

// Somebody just called my status,
// and him as a peer (if not yet so).
if _, ok := peers[peerAddr]; !ok {
peers[peerAddr] = NewPeer()
}
return thisAddr
}

// Ping peer at address, add to peers list if he responds and is not yet added
func ProbePeer(addr string) {
ret, _ := RPCCall(addr, "Ping", thisAddr)
if ret != "" {
AddPeer(ret)
}
}

// Scan IPs and port range for peers that respond to Ping,
// add them to peers list.
func FindPeers(IPs []string, minPort, maxPort int) {
//log.Println("Portscan start")

scanners := make(chan func())

for i := 0; i < N_SCANNERS; i++ {
go func() {
for f := range scanners {
f()
}
}()
}

for _, ip := range IPs {
for port := minPort; port <= maxPort; port++ {
addr := fmt.Sprint(ip, ":", port)
scanners <- func() { ProbePeer(addr) }
}
}
close(scanners)
log.Println("-- portscan done")
}
152 changes: 152 additions & 0 deletions cmd/uMagNUS-server/que.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package main

import (
"log"
"math"
"os"
"path/filepath"
"sort"
"strings"
"time"
)

/*
Queue service scans the working directory for job files.
The working directory should contain per-user subdirectories. E.g.:
arne/
bartel/
...
The in-memory representation is a cache and can be out-of-date at any point.
The queue service decides which job to hand out to a node if asked so.
*/

var (
Users = make(map[string]*User) // maps user -> joblist
)

// RPC-callable method: picks a job of the queue returns it
// for the node to run it.
func GiveJob(nodeAddr string) string {
WLock()
defer WUnlock()
user := nextUser()
if user == "" {
return ""
}
Users[user].FairShare += 1 // 1 second penalty because a job has started
return Users[user].giveJob(nodeAddr).ID
}

func AddFairShare(s string) string {
username := BaseDir(s)
share := atoi(s[len(username)+1:])

WLock()
defer WUnlock()
u := Users[username]
if u == nil {
return "no user " + username
}
log.Println("AddFairShare", username, share)
u.FairShare += float64(share)
return "" // ok
}

func nextUser() string {
// search user with least share and jobs in queue
leastShare := math.Inf(1)
var bestUser string
for n, u := range Users {
if u.HasJob() && u.FairShare < leastShare {
leastShare = u.FairShare
bestUser = n
}
}
return bestUser
}

// (Re-)load all jobs in the working directory.
// Called upon program startup.
func LoadJobs() {
dir, err := os.Open(".")
Fatal(err)
subdirs, err2 := dir.Readdir(-1)
Fatal(err2)

for _, d := range subdirs {
if d.IsDir() {
LoadUserJobs(d.Name())
}
}
}

// (Re-)load all jobs in the user's subdirectory.
func LoadUserJobs(dir string) string {
log.Println("LoadUserJobs", dir)
var newJobs []*Job
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if strings.HasSuffix(path, ".mx3") && !strings.HasPrefix(info.Name(), ".") {
ID := thisAddr + "/" + path
log.Println("addingJob", ID)
job := &Job{ID: ID}
job.Update()
newJobs = append(newJobs, job)
}
return nil
})
l := joblist(newJobs)
sort.Sort(&l)

Fatal(err) // TODO: recover?

WLock()
defer WUnlock()
if _, ok := Users[dir]; !ok {
Users[dir] = NewUser()
}
Users[dir].Jobs = newJobs
Users[dir].nextPtr = 0

return ""
}

type joblist []*Job

func (l *joblist) Len() int { return len(*l) }
func (l *joblist) Less(i, j int) bool { return (*l)[i].ID < (*l)[j].ID }
func (l *joblist) Swap(i, j int) { (*l)[i], (*l)[j] = (*l)[j], (*l)[i] }

// RPC-callable function. Refreshes the in-memory cached info about this job.
// Called, e.g., after a node has finished a job.
func UpdateJob(jobURL string) string {

WLock()
defer WUnlock()

j := JobByName(jobURL)
if j == nil {
log.Println("update", jobURL, ": no such job")
return "" // empty conventionally means error
}
j.Update()

return "updated " + jobURL // not used, but handy if called by Human.
}

// Periodically updates user's usedShare so they decay
// exponentially according to flag_haflife
func RunShareDecay() {
halflife := *flag_halflife
quantum := halflife / 100 // several updates per half-life gives smooth decay
reduce := math.Pow(0.5, float64(quantum)/float64(halflife))
for {
time.Sleep(quantum)
WLock()
for _, u := range Users {
u.FairShare *= reduce
}
WUnlock()
}
}
86 changes: 86 additions & 0 deletions cmd/uMagNUS-server/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package main

import (
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
)

type RPCFunc func(string) string

var methods = map[string]RPCFunc{
"AddFairShare": AddFairShare,
"GiveJob": GiveJob,
"Kill": Kill,
"LoadJobs": wrap(LoadJobs),
"LoadUserJobs": LoadUserJobs,
"Ping": Ping,
"UpdateJob": UpdateJob,
"Rescan": func(string) string { go FindPeers(IPs, MinPort, MaxPort); return "" },
"WhatsTheTime": WhatsTheTime,
"WakeupWatchdog": WakeupWatchdog,
"rm": Rm,
}

func wrap(f func()) RPCFunc {
return func(string) string { f(); return "" }
}

func HandleRPC(w http.ResponseWriter, r *http.Request) {

var ret string

defer func() {
//log.Println(" < call ", r.Host, r.URL.Path, "->", ret)
if err := recover(); err != nil {
log.Println("*** RPC panic: ", r.URL.Path, ":", err)
http.Error(w, "Does not compute: "+r.URL.Path, http.StatusBadRequest)
}
}()
request := r.URL.Path[len("/do/"):]
slashPos := strings.Index(request, "/")
method := request[:slashPos]
arg := request[slashPos+1:]

m, ok := methods[method]
if !ok {
log.Println("*** RPC no such method", r.URL.Path)
http.Error(w, "Does not compute: "+method, http.StatusBadRequest)
return
}
ret = m(arg)
fmt.Fprint(w, ret)
}

// re-usable http client for making RPC calls
var httpClient = http.Client{Timeout: 2 * time.Second}

// make RPC call to method on node with given address.
func RPCCall(addr, method, arg string) (ret string, err error) {

//defer func() { log.Println(" > call ", addr, method, arg, "->", ret, err) }()

//TODO: escape args?
resp, err := httpClient.Get("http://" + addr + "/do/" + method + "/" + arg)
if err != nil {
//log.Println("*** RPC error: ", err)
return "", err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Println("*** RPC error: ", resp.Status)
return "", fmt.Errorf("http status %v", resp.Status)
}

if b, err := ioutil.ReadAll(resp.Body); err != nil {
log.Println("*** RPC read error: ", err)
return "", err
} else {
return string(b), nil
}

}
184 changes: 184 additions & 0 deletions cmd/uMagNUS-server/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package main

// Serves human-readable status information over http.

import (
"html/template"
"net/http"
"time"
)

var (
templ = template.Must(template.New("status").Parse(templText))
upSince = time.Now()
)

func HandleStatus(w http.ResponseWriter, r *http.Request) {
RLock()
defer RUnlock()

if r.URL.Path != "/" {
http.Error(w, "Does not compute", http.StatusNotFound)
return
}

err := templ.Execute(w, &status{})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

type status struct{} // dummy type to define template methods on

func (*status) IPRange() string { return *flag_scan + ": " + *flag_ports }
func (*status) Ports() string { return *flag_ports }
func (*status) ThisAddr() string { return thisAddr }
func (*status) Uptime() time.Duration { return Since(time.Now(), upSince) }
func (*status) UmagnusVersion() string { return UmagnusVersion }
func (*status) GPUs() []string { return GPUs }
func (*status) Processes() map[string]*Process { return Processes }
func (*status) Users() map[string]*User { return Users }
func (*status) NextUser() string { return nextUser() }
func (*status) Peers() map[string]*Peer { return peers }
func (*status) FS(a string) string { return FS(a) }

const templText = `
{{define "Job"}}
<tr class={{.Status}}>
<td class={{.Status}}> [<a class={{.Status}} href="http://{{.FS .ID}}">{{.LocalPath}}</a>] </td>
<td class={{.Status}}> [{{with .Output}}<a href="http://{{$.FS $.Output}}">.out</a>{{end}}] </td>
<td class={{.Status}}> [{{with .Output}}<a onclick='doEvent("rm", "{{$.ID}}")'>rm</a>{{end}}]</td>
<td class={{.Status}}> [{{with .Host}}<a href="http://{{.}}">{{.}}</a>{{end}}] </td>
<td class={{.Status}}> [{{with .ExitStatus}}{{if eq . "0"}} OK {{else}}<a class={{$.Status}} href="http://{{$.FS $.Output}}stdout.txt">FAIL</a>{{end}}{{end}}] </td>
<td class={{.Status}}> [{{with .Output}}{{$.Duration}}{{end}}{{with .RequeCount}} {{.}}x re-queued{{end}}{{with .Error}} {{.}}{{end}}] </td>
</tr>
{{end}}
<html>
<head>
<style>
body{font-family:monospace; margin-left:5%; margin-top:1em}
p{margin-left: 2em}
h3{margin-left: 2em}
a{text-decoration: none; color:#0000AA}
a:hover{text-decoration: underline; cursor: hand;}
.FAILED{color:red; font-weight:bold}
.RUNNING{font-weight: bold; color:blue}
.QUEUED{color:black}
.FINISHED{color: grey}
.active, .collapsible:hover {cursor:pointer; font-weight:normal; background-color:#eee; width:50%;}
</style>
</head>
<script>
function doEvent(method, arg){
try{
var req = new XMLHttpRequest();
var URL = "http://" + window.location.hostname + ":" + window.location.port + "/do/" + method + "/" + arg;
req.open("GET", URL, false);
req.send(null);
}catch(e){
alert(e);
}
location.reload();
}
function refreshPage () {
document.location.reload(true);
}
setTimeout(refreshPare, 60000);
</script>
<body>
<h1>{{.ThisAddr}}</h1>
Uptime: {{.Uptime}} <br/>
<h2>Peer nodes</h2>
<b>scan</b> {{.IPRange}}<br/>
<b>ports</b> {{.Ports}}<br/>
<button onclick='doEvent("Rescan", "")'>Rescan</button> <br/>
{{range $k,$v := .Peers}}
<a href="http://{{$k}}">{{$k}}</a> <br/>
{{end}}
<h2>Compute service</h2><p>
<b>umagnus:</b>
{{with .UmagnusVersion}}
{{.}}
{{else}}
not available<br/>
{{end}}
<br/>
{{with .GPUs}}
{{range $i, $v := .}}
<b>GPU{{$i}}</b>: {{$v}}<br/>
{{end}}
{{else}}
No GPUs available<br/>
{{end}}
</p>
<h3>Running jobs</h3><p>
<table>
{{range $k,$v := .Processes}}
<tr>
<td> [<a href="http://{{$.FS $k}}">{{$k}}</a>] </td>
<td> [{{$v.Duration}}]</td>
<td> [<a href="http://{{$v.GUI}}">GUI</a>]</td>
<td> <button onclick='doEvent("Kill", "{{$k}}")'>kill</button> </td>
</tr>
{{end}}
</table>
</p>
<h2>Queue service</h2><p>
<h3>Users</h3><p>
<table>
{{range $k,$v := .Users}} <tr>
<td>{{$k}}</td><td>{{$v.FairShare}} GPU-seconds</td><td>{{with .HasJob}} has {{else}} no {{end}} queued jobs</td>
</tr>{{end}}
</table>
<b>Next job for:</b> {{.NextUser}}
</p>
<h3>Jobs</h3>
<button onclick='doEvent("LoadJobs", "")'>Reload all</button> (consider reloading just your own files). <br/>
<button onclick='doEvent("WakeupWatchdog", "")'>Wake-up Watchdog</button> (re-queue dead simulations right now).
{{range $k,$v := .Users}}
<a id="{{$k}}"></a>
<h3 title="Click to show/hide" class="collapsible" onclick='this.classList.toggle("active");var cont=this.nextElementSibling;if (cont.style.display==="none") {cont.style.display="block"; window.location.hash = "{{$k}}";} else cont.style.display = "none";'>
&dtrif; {{$k}}</h3><p>
<b>Jobs</b>
<button onclick='doEvent("LoadUserJobs", "{{$k}}")'>Reload</button> (only needed when you changed your files on disk)
<table> {{range $v.Jobs}} {{template "Job" .}} {{end}} </table>
</p>
{{end}}
</p>
<script>
//let's collapse all job lists.
var collapsibleElements = document.getElementsByClassName("collapsible");
var hash = self.location.hash;
for (var i = 0; i < collapsibleElements.length; i++) {
if(hash=="" || !collapsibleElements[i].textContent.includes(hash.split("#")[1]) ) { //If there's an anchor link. Let's open that user !
collapsibleElements[i].classList.toggle("active");
var cont = collapsibleElements[i].nextElementSibling;
cont.style.display=(cont.style.display==="none"?"block":"none");
}
}
</script>
</body>
</html>
`
46 changes: 46 additions & 0 deletions cmd/uMagNUS-server/user.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package main

import "time"

type User struct {
Jobs []*Job
FairShare float64 // Used-up compute time in the past (decays)
nextPtr int // pointer suggesting next job to start. Reset on re-scan. len(Jobs) means no queued job
}

func NewUser() *User {
return &User{}
}

// nextJob looks for the next free job in the list.
// it does a tiny bit of linear search, starting from nextPtr.
func (u *User) giveJob(node string) *Job {
index := u.nextJobPtr()
if index >= len(u.Jobs) {
return nil
}
u.nextPtr++
j := u.Jobs[index]
// all below are preliminary, to get rapid gui response.
// may be overwritten by update
j.Host = node
j.Output = OutputDir(j.ID)
j.Start = time.Now()
return j
}

func (u *User) HasJob() bool {
i := u.nextJobPtr()
return i < len(u.Jobs)
}

// returns
func (u *User) nextJobPtr() int {
for ; u.nextPtr < len(u.Jobs); u.nextPtr++ {
j := u.Jobs[u.nextPtr]
if j.IsQueued() {
return u.nextPtr
}
}
return u.nextPtr
}
48 changes: 48 additions & 0 deletions cmd/uMagNUS-server/utitl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package main

import (
"log"
"net/url"
"strings"
"time"
)

// BaseDir returns the first path element, without slashes and ignoring http:// . E.g.:
// /home/user/file -> home
// user/file -> user
// http://home/user/file -> home
func BaseDir(dir string) string {
if strings.HasPrefix(dir, "http://") {
return BaseDir(dir[len("http://"):])
}
firstSlash := strings.Index(dir, "/")
switch {
case firstSlash < 0:
return dir
case firstSlash == 0:
return BaseDir(dir[1:])
default:
return dir[:firstSlash]
}
}

func Fatal(err error) {
if err != nil {
log.Fatal(err)
}
}

// rounded up to 1s precission
func Since(a, b time.Time) time.Duration {
d := a.Sub(b)
return (d/1e9)*1e9 + 1e9
}

// Parse URL, panic on error
func MustParseURL(URL string) *url.URL {
u, err := url.Parse(URL)
if err != nil {
panic(err)
}
return u
}
60 changes: 60 additions & 0 deletions cmd/uMagNUS-server/watchdog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"log"
"time"
)

var runWatchdog = make(chan struct{})

func init() {
// run watchdog daemon in background
go func() {
for {
<-runWatchdog // wait for start
DoWatchdog()
}
}()
}

func LoopWatchdog() {
for {
WakeupWatchdog("")
time.Sleep(3 * KeepaliveInterval)
}
}

func WakeupWatchdog(string) string {
select {
default:
return "already running"
case runWatchdog <- struct{}{}:
return "" // ok
}
}

// single watchdog run:
// re-queues all dead processes
func DoWatchdog() {
//log.Println("Watchdog wake-up")
WLock()
defer WUnlock()
for _, u := range Users {
for _, j := range u.Jobs {
id := j.ID
//log.Println(id, "running:", j.IsRunning(), "alive:", time.Since(j.Alive))
if j.IsRunning() && time.Since(j.Alive) > 3*KeepaliveInterval {
j.Update()
lastHeartbeat := time.Since(j.Alive)
if lastHeartbeat > 3*KeepaliveInterval {
log.Println("*** Re-queue", id, "after", lastHeartbeat, "inactivity")
j.Reque()
}
}
}
// re-set nextPtr to beginning so we can start re-queued jobs
if u.nextPtr >= len(u.Jobs) {
u.nextPtr = 0
}
}
}
1 change: 1 addition & 0 deletions cmd/uMagNUS/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mumax3
2 changes: 2 additions & 0 deletions cmd/uMagNUS/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install -v
21 changes: 21 additions & 0 deletions cmd/uMagNUS/browser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"fmt"
"os/exec"
)

// Try to open url in a browser. Instruct to do so if it fails.
func openbrowser(url string) {
for _, cmd := range browsers {
err := exec.Command(cmd, url).Start()
if err == nil {
fmt.Println("//openend web interface in", cmd)
return
}
}
fmt.Println("//please open ", url, " in a browser")
}

// list of browsers to try.
var browsers = []string{"x-www-browser", "google-chrome", "chromium-browser", "firefox", "explorer"}
168 changes: 168 additions & 0 deletions cmd/uMagNUS/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
// uMagNUS main source
package main

import (
"flag"
"fmt"
"github.com/seeder-research/uMagNUS/engine"
"github.com/seeder-research/uMagNUS/opencl"
"github.com/seeder-research/uMagNUS/script"
"github.com/seeder-research/uMagNUS/util"
"log"
"os"
"os/exec"
"path"
"time"
)

// flags in engine/gofiles.go
var ()

func main() {
flag.Parse()
log.SetPrefix("")
log.SetFlags(0)

opencl.Init(*engine.Flag_gpu)

opencl.Synchronous = *engine.Flag_sync
if *engine.Flag_version {
printVersion()
}

// used by bootstrap launcher to test opencl
// successful exit means opencl was initialized fine
if *engine.Flag_test {
fmt.Println(opencl.GPUInfo)
os.Exit(0)
}

defer engine.Close() // flushes pending output, if any

if *engine.Flag_vet {
vet()
return
}

switch flag.NArg() {
case 0:
runInteractive()
case 1:
runFileAndServe(flag.Arg(0))
default:
RunQueue(flag.Args())
}
}

func runInteractive() {
fmt.Println("//no input files: starting interactive session")
//initEngine()

// setup outut dir
now := time.Now()
outdir := fmt.Sprintf("uMagNUS-%v-%02d-%02d_%02dh%02d.out", now.Year(), int(now.Month()), now.Day(), now.Hour(), now.Minute())
engine.InitIO(outdir, outdir, *engine.Flag_forceclean)

engine.Timeout = 365 * 24 * time.Hour // basically forever

// set up some sensible start configuration
engine.Eval(`SetGridSize(128, 64, 1)
SetCellSize(4e-9, 4e-9, 4e-9)
Msat = 1e6
Aex = 10e-12
alpha = 1
m = RandomMag()`)
addr := goServeGUI()
openbrowser("http://127.0.0.1" + addr)
engine.RunInteractive()
}

func runFileAndServe(fname string) {
if path.Ext(fname) == ".go" {
runGoFile(fname)
} else {
runScript(fname)
}
}

func runScript(fname string) {
outDir := util.NoExt(fname) + ".out"
if *engine.Flag_od != "" {
outDir = *engine.Flag_od
}
engine.InitIO(fname, outDir, *engine.Flag_forceclean)

fname = engine.InputFile

var code *script.BlockStmt
var err2 error
if fname != "" {
// first we compile the entire file into an executable tree
code, err2 = engine.CompileFile(fname)
util.FatalErr(err2)
}

// now the parser is not used anymore so it can handle web requests
goServeGUI()

if *engine.Flag_interactive {
openbrowser("http://127.0.0.1" + *engine.Flag_port)
}

// start executing the tree, possibly injecting commands from web gui
engine.EvalFile(code)

if *engine.Flag_interactive {
engine.RunInteractive()
}
}

func runGoFile(fname string) {

// pass through flags
flags := []string{"run", fname}
flag.Visit(func(f *flag.Flag) {
if f.Name != "o" {
flags = append(flags, fmt.Sprintf("-%v=%v", f.Name, f.Value))
}
})

if *engine.Flag_od != "" {
flags = append(flags, fmt.Sprintf("-o=%v", *engine.Flag_od))
}

cmd := exec.Command("go", flags...)
log.Println("go", flags)
cmd.Stdout = os.Stdout
cmd.Stdin = os.Stdin
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
engine.Close()
os.Exit(1)
}
}

// start Gui server and return server address
func goServeGUI() string {
if *engine.Flag_port == "" {
log.Println(`//not starting GUI (-http="")`)
return ""
}
addr := engine.GoServe(*engine.Flag_port)
fmt.Print("//starting GUI at http://127.0.0.1", addr, "\n")
return addr
}

// print version to stdout
func printVersion() {
engine.LogOut("//", engine.UNAME, "\n")
engine.LogOut("//", opencl.GPUInfo, "\n")
engine.LogOut("//(c) Xuanyao Fong, SEEDER Research Group, National University of Singapore, Singapore", "\n")
engine.LogOut("This is free software without any warranty. See license.txt")
engine.LogOut("********************************************************************//")
engine.LogOut(" If you use uMagNUS in any work or publication, //")
engine.LogOut(" we kindly ask you to cite the references in references.bib //")
engine.LogOut("********************************************************************//")
engine.LogOut("//Frontend is based on MuMax 3.10: (c) Arne Vansteenkiste, Dynamat LAB, Ghent University, Belgium", "\n")
}
194 changes: 194 additions & 0 deletions cmd/uMagNUS/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package main

// File que for distributing multiple input files over GPUs.

import (
"flag"
"fmt"
"github.com/seeder-research/uMagNUS/engine"
"github.com/seeder-research/uMagNUS/opencl"
"io"
"log"
"net/http"
"os"
"os/exec"
"sync"
"sync/atomic"
)

var (
exitStatus atom = 0
numOK, numFailed atom = 0, 0
)

func RunQueue(files []string) {
s := NewStateTab(files)
s.PrintTo(os.Stdout)
go s.ListenAndServe(*engine.Flag_port)
s.Run()
fmt.Println(numOK.get(), "OK, ", numFailed.get(), "failed")
os.Exit(int(exitStatus))
}

// StateTab holds the queue state (list of jobs + statuses).
// All operations are atomic.
type stateTab struct {
lock sync.Mutex
jobs []job
next int
}

// Job info.
type job struct {
inFile string // input file to run
webAddr string // http address for gui of running process
uid int
}

// NewStateTab constructs a queue for the given input files.
// After construction, it is accessed atomically.
func NewStateTab(inFiles []string) *stateTab {
s := new(stateTab)
s.jobs = make([]job, len(inFiles))
for i, f := range inFiles {
s.jobs[i] = job{inFile: f, uid: i}
}
return s
}

// StartNext advances the next job and marks it running, setting its webAddr to indicate the GUI url.
// A copy of the job info is returned, the original remains unmodified.
// ok is false if there is no next job.
func (s *stateTab) StartNext(webAddr string) (next job, ok bool) {
s.lock.Lock()
defer s.lock.Unlock()
if s.next >= len(s.jobs) {
return job{}, false
}
s.jobs[s.next].webAddr = webAddr
jobCopy := s.jobs[s.next]
s.next++
return jobCopy, true
}

// Finish marks the job with j's uid as finished.
func (s *stateTab) Finish(j job) {
s.lock.Lock()
defer s.lock.Unlock()
s.jobs[j.uid].webAddr = ""
}

// Runs all the jobs in stateTab.
func (s *stateTab) Run() {
nGPU := len(opencl.ClDevices)
idle := initGPUs(nGPU)
for {
gpu := <-idle
addr := fmt.Sprint(":", 35368+gpu)
j, ok := s.StartNext(addr)
if !ok {
break
}
go func() {
run(j.inFile, gpu, j.webAddr)
s.Finish(j)
idle <- gpu
}()
}
// drain remaining tasks (one already done)
for i := 1; i < nGPU; i++ {
<-idle
}
}

type atom int32

func (a *atom) set(v int) { atomic.StoreInt32((*int32)(a), int32(v)) }
func (a *atom) get() int { return int(atomic.LoadInt32((*int32)(a))) }
func (a *atom) inc() { atomic.AddInt32((*int32)(a), 1) }

func run(inFile string, gpu int, webAddr string) {
// overridden flags
gpuFlag := fmt.Sprint(`-gpu=`, gpu)
httpFlag := fmt.Sprint(`-http=`, webAddr)

// pass through flags
flags := []string{gpuFlag, httpFlag}
flag.Visit(func(f *flag.Flag) {
if f.Name != "gpu" && f.Name != "http" && f.Name != "failfast" {
flags = append(flags, fmt.Sprintf("-%v=%v", f.Name, f.Value))
}
})
flags = append(flags, inFile)

cmd := exec.Command(os.Args[0], flags...)
log.Println(os.Args[0], flags)
output, err := cmd.CombinedOutput()
if err != nil {
log.Println(inFile, err)
log.Printf("%s\n", output)
exitStatus.set(1)
numFailed.inc()
if *engine.Flag_failfast {
os.Exit(1)
}
} else {
numOK.inc()
}
}

func initGPUs(nGpu int) chan int {
if nGpu == 0 {
log.Fatal("no GPUs available")
panic(0)
}
idle := make(chan int, nGpu)
for i := 0; i < nGpu; i++ {
idle <- i
}
return idle
}

func (s *stateTab) PrintTo(w io.Writer) {
s.lock.Lock()
defer s.lock.Unlock()
for i, j := range s.jobs {
fmt.Fprintf(w, "%3d %v %v\n", i, j.inFile, j.webAddr)
}
}

func (s *stateTab) RenderHTML(w io.Writer) {
s.lock.Lock()
defer s.lock.Unlock()
fmt.Fprintln(w, `
<!DOCTYPE html> <html> <head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<meta http-equiv="refresh" content="1">
`+engine.CSS+`
</head><body>
<span style="color:gray; font-weight:bold; font-size:1.5em"> uMagNUS queue status </span><br/>
<hr/>
<pre>
`)

hostname := "localhost"
hostname, _ = os.Hostname()
for _, j := range s.jobs {
if j.webAddr != "" {
fmt.Fprint(w, `<b>`, j.uid, ` <a href="`, "http://", hostname+j.webAddr, `">`, j.inFile, " ", j.webAddr, "</a></b>\n")
} else {
fmt.Fprint(w, j.uid, " ", j.inFile, "\n")
}
}

fmt.Fprintln(w, `</pre><hr/></body></html>`)
}

func (s *stateTab) ListenAndServe(addr string) {
http.Handle("/", s)
go http.ListenAndServe(addr, nil)
}

func (s *stateTab) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RenderHTML(w)
}
30 changes: 30 additions & 0 deletions cmd/uMagNUS/vet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"flag"
"fmt"
"io/ioutil"
"os"

"github.com/seeder-research/uMagNUS/engine"
"github.com/seeder-research/uMagNUS/util"
)

// check all input files for errors, don't run.
func vet() {
status := 0
for _, f := range flag.Args() {
src, ioerr := ioutil.ReadFile(f)
util.FatalErr(ioerr)
engine.World.EnterScope() // avoid name collisions between separate files
_, err := engine.World.Compile(string(src))
engine.World.ExitScope()
if err != nil {
fmt.Println(f, ":", err)
status = 1
} else {
fmt.Println(f, ":", "OK")
}
}
os.Exit(status)
}
2 changes: 2 additions & 0 deletions data/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
all:
go install -v
28 changes: 28 additions & 0 deletions data/crop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package data

// Cut-out a piece between given bounds (incl, excl)
func Crop(in *Slice, x1, x2, y1, y2, z1, z2 int) *Slice {
Nx := x2 - x1
Ny := y2 - y1
Nz := z2 - z1

size := [3]int{Nx, Ny, Nz}
ncomp := in.NComp()

out := NewSlice(ncomp, size)

a := in.Tensors()
b := out.Tensors()

for c := 0; c < ncomp; c++ {
for z := 0; z < Nz; z++ {
for y := 0; y < Ny; y++ {
for x := 0; x < Nx; x++ {
b[c][z][y][x] = a[c][z+z1][y+y1][x+x1]
}
}
}
}

return out
}
4 changes: 4 additions & 0 deletions data/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/*
Package data provides structures to store arrays in a hardware-agnostic (GPU-CPU) way.
*/
package data
95 changes: 95 additions & 0 deletions data/mesh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package data

import (
"fmt"
"log"
)

// Mesh stores info of a finite-difference mesh.
type Mesh struct {
gridSize [3]int
cellSize [3]float64
pbc [3]int
Unit string // unit of cellSize, default: "m"
}

// Retruns a new mesh with N0 x N1 x N2 cells of size cellx x celly x cellz.
// Optional periodic boundary conditions (pbc): number of repetitions
// in X, Y, Z direction. 0,0,0 means no periodicity.
func NewMesh(N0, N1, N2 int, cellx, celly, cellz float64, pbc ...int) *Mesh {
var pbc3 [3]int
if len(pbc) == 3 {
copy(pbc3[:], pbc)
} else {
if len(pbc) != 0 {
log.Panic("mesh: need 0 or 3 PBC arguments, got:", pbc)
}
}
size := [3]int{N0, N1, N2}
return &Mesh{size, [3]float64{cellx, celly, cellz}, pbc3, "m"}
}

// Returns N0, N1, N2, as passed to constructor.
func (m *Mesh) Size() [3]int {
if m == nil {
return [3]int{0, 0, 0}
} else {
return m.gridSize
}
}

// Returns cellx, celly, cellz, as passed to constructor.
func (m *Mesh) CellSize() [3]float64 {
return m.cellSize
}

// Returns pbc (periodic boundary conditions), as passed to constructor.
func (m *Mesh) PBC() [3]int {
return m.pbc
}

func (m *Mesh) SetPBC(nx, ny, nz int) {
m.pbc = [3]int{nx, ny, nz}
}

// Total number of cells, not taking into account PBCs.
// N0 * N1 * N2
func (m *Mesh) NCell() int {
return m.gridSize[0] * m.gridSize[1] * m.gridSize[2]
}

// WorldSize equals (grid)Size x CellSize.
func (m *Mesh) WorldSize() [3]float64 {
return [3]float64{float64(m.gridSize[0]) * m.cellSize[0], float64(m.gridSize[1]) * m.cellSize[1], float64(m.gridSize[2]) * m.cellSize[2]}
}

// 3 bools, packed in one byte, indicating whether there are periodic boundary conditions in
// X (LSB), Y(LSB<<1), Z(LSB<<2)
func (m *Mesh) PBC_code() byte {
var code byte
if m.pbc[X] != 0 {
code = 1
}
if m.pbc[Y] != 0 {
code |= 2
}
if m.pbc[Z] != 0 {
code |= 4
}
return code
}

func (m *Mesh) String() string {
s := m.gridSize
c := m.cellSize
pbc := ""
if m.pbc != [3]int{0, 0, 0} {
pbc = fmt.Sprintf(", PBC: [%v x %v x %v],", m.pbc[0], m.pbc[1], m.pbc[2])
}
return fmt.Sprintf("[%v x %v x %v] x [%vm x %vm x %vm]%v", s[0], s[1], s[2], float32(c[0]), float32(c[1]), float32(c[2]), pbc)
}

// product of elements.
func prod(size [3]int) int {
return size[0] * size[1] * size[2]
}
10 changes: 10 additions & 0 deletions data/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package data

// Holds meta data to be saved together with a slice.
// Typically winds up in OVF or DUMP header
type Meta struct {
Name, Unit string
Time, TimeStep float64
CellSize [3]float64
MeshUnit string
}
92 changes: 92 additions & 0 deletions data/resample.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package data

import (
"github.com/seeder-research/uMagNUS/util"
)

// Resample returns a slice of new size N,
// using nearest neighbor interpolation over the input slice.
func Resample(in *Slice, N [3]int) *Slice {
if in.Size() == N {
return in // nothing to do
}
In := in.Tensors()
out := NewSlice(in.NComp(), N)
Out := out.Tensors()
size1 := SizeOf(In[0])
size2 := SizeOf(Out[0])
for c := range Out {
for i := range Out[c] {
i1 := (i * size1[Z]) / size2[Z]
for j := range Out[c][i] {
j1 := (j * size1[Y]) / size2[Y]
for k := range Out[c][i][j] {
k1 := (k * size1[X]) / size2[X]
Out[c][i][j][k] = In[c][i1][j1][k1]
}
}
}
}
return out
}

// Downsample returns a slice of new size N, smaller than in.Size().
// Averaging interpolation over the input slice.
// in is returned untouched if the sizes are equal.
func Downsample(In [][][][]float32, N [3]int) [][][][]float32 {
if SizeOf(In[0]) == N {
return In // nothing to do
}

nComp := len(In)
out := NewSlice(nComp, N)
Out := out.Tensors()

srcsize := SizeOf(In[0])
dstsize := SizeOf(Out[0])

Dx := dstsize[X]
Dy := dstsize[Y]
Dz := dstsize[Z]
Sx := srcsize[X]
Sy := srcsize[Y]
Sz := srcsize[Z]
scalex := Sx / Dx
scaley := Sy / Dy
scalez := Sz / Dz
util.Assert(scalex > 0 && scaley > 0)

for c := range Out {

for iz := 0; iz < Dz; iz++ {
for iy := 0; iy < Dy; iy++ {
for ix := 0; ix < Dx; ix++ {
sum, n := 0.0, 0.0

for I := 0; I < scalez; I++ {
i2 := iz*scalez + I
for J := 0; J < scaley; J++ {
j2 := iy*scaley + J
for K := 0; K < scalex; K++ {
k2 := ix*scalex + K

if i2 < Sz && j2 < Sy && k2 < Sx {
sum += float64(In[c][i2][j2][k2])
n++
}
}
}
}
Out[c][iz][iy][ix] = float32(sum / n)
}
}
}
}

return Out
}

// Returns the 3D size of block
func SizeOf(block [][][]float32) [3]int {
return [3]int{len(block[0][0]), len(block[0]), len(block)}
}
24 changes: 24 additions & 0 deletions data/reshape.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package data

// Array reshaping.

import "fmt"

// Re-interpret a contiguous array as a multi-dimensional array of given size.
// Underlying storage is shared.
func reshape(array []float32, size [3]int) [][][]float32 {
Nx, Ny, Nz := size[0], size[1], size[2]
if Nx*Ny*Nz != len(array) {
panic(fmt.Errorf("reshape: size mismatch: %v*%v*%v != %v", Nx, Ny, Nz, len(array)))
}
sliced := make([][][]float32, Nz)
for i := range sliced {
sliced[i] = make([][]float32, Ny)
}
for i := range sliced {
for j := range sliced[i] {
sliced[i][j] = array[(i*Ny+j)*Nx+0 : (i*Ny+j)*Nx+Nx]
}
}
return sliced
}
Loading

0 comments on commit 1a62151

Please sign in to comment.