Skip to content

Commit

Permalink
Fix document paths not being served correctly
Browse files Browse the repository at this point in the history
* Add locking to sse.Server
  • Loading branch information
ongyx committed Nov 4, 2023
1 parent 3d11079 commit fd45e05
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 25 deletions.
2 changes: 2 additions & 0 deletions cmd/dip/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func main() {

fmt.Println("shutting down...")

srv.Close()

if err := server.Shutdown(context.Background()); err != nil {
fmt.Printf("error: shutdown: %s\n", err)
}
Expand Down
82 changes: 73 additions & 9 deletions pkg/document/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"net/http"
"net/url"
"path"
"strings"

"github.com/ongyx/dip/pkg/source"
"github.com/ongyx/dip/pkg/sse"
)

const (
Expand All @@ -20,33 +22,57 @@ const (

// Handler is a wrapper around a library for serving documents over HTTP.
type Handler struct {
Lib *Library
Log *log.Logger
lib *Library
log *log.Logger

sse *sse.Server
}

// NewHandler creates a new handler.
func NewHandler(lib *Library, log *log.Logger) *Handler {
h := &Handler{
lib: lib,
log: log,
sse: sse.NewServer(""),
}
go h.reload()

return h
}

// Close cleans up the handler.
func (h *Handler) Close() {
h.sse.Close()
}

// ServeHTTP implements http.Handler.
func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p := path.Clean(r.URL.Path)
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Clean path and remove leading slash
p := strings.TrimPrefix(path.Clean(r.URL.Path), "/")

if p == "/" {
// If the path is empty, use the root document.
if p == "" {
p = source.Root
}

d, err := s.Lib.Open(p)
d, err := h.lib.Open(p)
if err != nil {
// Create the document.
d, err = s.Lib.Create(p)
d, err = h.lib.Create(p)
if err != nil {
http.NotFound(w, r)
return
}

// Create an SSE stream for the document.
h.sse.Add(p)
}

err = d.Borrow(func(buf []byte) error {
eu := &url.URL{
Path: eventURL,
RawQuery: url.Values{
"path": {p},
"stream": {p},
}.Encode(),
}

Expand All @@ -63,6 +89,44 @@ func (s *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return nil
})
if err != nil {
s.Log.Printf("error: serving document %s to %s failed: %s", p, r.Host, err)
h.log.Printf("error: serving document %s to %s failed: %s", p, r.Host, err)
}
}

func (h *Handler) reload() {
if w, ok := h.lib.Watcher(); ok {
files, errors := w.Watch()

for {
select {
case f, ok := <-files:
if !ok {
return
}

h.log.Println("handler: reloading document", f)

d, err := h.lib.Create(f)
if err != nil {
h.log.Printf("error: handler: failed to reload document %s: %s\n", f, err)
continue
}

// Send a reload event over SSE if the stream exists.
if st := h.sse.Get(f); st != nil {
d.Borrow(func(buf []byte) error {
st.Send(&sse.Event{Type: "reload", Data: buf})

return nil
})
}
case err, ok := <-errors:
if !ok {
return
}

h.log.Println("error: handler: watcher ran into an error:", err)
}
}
}
}
26 changes: 16 additions & 10 deletions pkg/document/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,37 @@ import (

// Server serves documents and their accompanying CSS/JS assets over HTTP.
type Server struct {
mux *http.ServeMux
handler *Handler
mux *http.ServeMux
}

func NewServer(u *url.URL, md goldmark.Markdown, lg *log.Logger) (*Server, error) {
// NewServer creates a new server with a source URL.
func NewServer(u *url.URL, md goldmark.Markdown, log *log.Logger) (*Server, error) {
src, err := source.New(u)
if err != nil {
return nil, err
}

h := &Handler{
Lib: NewLibrary(src, md),
Log: lg,
}
h := NewHandler(NewLibrary(src, md), log)

mux := http.NewServeMux()

ap := "/" + assetURL + "/"
// Serve Markdown documents at the root by default.
mux.Handle("/", h)

// Serve assets at the asset path.
ap := "/" + assetURL + "/"
mux.Handle(ap, http.StripPrefix(ap, asset.FileServer))

// Serve Markdown documents at the root by default.
mux.Handle("/", h)
// Serve SSE events.
mux.Handle("/"+eventURL, h.sse)

return &Server{handler: h, mux: mux}, nil
}

return &Server{mux: mux}, nil
// Close closes the server.
func (s *Server) Close() {
s.handler.Close()
}

// ServeHTTP implements http.Handler.
Expand Down
4 changes: 0 additions & 4 deletions pkg/source/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func NewDirectory(u *url.URL) (Source, error) {
}

func (d *Directory) Open(path string) (fs.File, error) {
if path == Root {
path = "README.md"
}

if isMarkdownFile(path) {
return d.fs.Open(path)
}
Expand Down
25 changes: 23 additions & 2 deletions pkg/sse/server.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package sse

import "net/http"
import (
"net/http"
"sync"
)

// Server is a stream multiplexer, allowing clients to receive events from a specific stream.
type Server struct {
defaultStream string
streams map[string]*Stream

mu sync.RWMutex
streams map[string]*Stream
}

// NewServer creates a new server.
Expand All @@ -27,6 +32,9 @@ func NewServer(defaultStream string) *Server {
// Add creates a new stream in the server.
// If the stream already exists, the existing one is returned.
func (s *Server) Add(name string) *Stream {
s.mu.Lock()
defer s.mu.Unlock()

if st, ok := s.streams[name]; ok {
return st
}
Expand All @@ -40,12 +48,18 @@ func (s *Server) Add(name string) *Stream {
// Get returns an existing stream by name in the server.
// If the stream does not exist, nil is returned.
func (s *Server) Get(name string) *Stream {
s.mu.RLock()
defer s.mu.RUnlock()

return s.streams[name]
}

// Remove deletes the stream by name from the server and closes it.
// If the stream does not exist, this is a no-op.
func (s *Server) Remove(name string) {
s.mu.Lock()
defer s.mu.Unlock()

if st, ok := s.streams[name]; ok {
st.Close()
delete(s.streams, name)
Expand All @@ -64,6 +78,13 @@ func (s *Server) Send(name string, e *Event) {
st.Send(e)
}

// Close closes all server streams.
func (s *Server) Close() {
for _, st := range s.streams {
st.Close()
}
}

// ServeHTTP connects a client to a stream in the server depending on the 'stream' query parameter.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("stream")
Expand Down

0 comments on commit fd45e05

Please sign in to comment.