diff --git a/internal/api/api.go b/internal/api/api.go index 09766ca5c..051d29781 100644 --- a/internal/api/api.go +++ b/internal/api/api.go @@ -38,12 +38,10 @@ func Init() { log = app.GetLogger("api") initStatic(cfg.Mod.StaticDir) - initWS(cfg.Mod.Origin) HandleFunc("api", apiHandler) HandleFunc("api/config", configHandler) HandleFunc("api/exit", exitHandler) - HandleFunc("api/ws", apiWS) // ensure we can listen without errors listener, err := net.Listen("tcp", cfg.Mod.Listen) diff --git a/internal/api/ws.go b/internal/api/ws/ws.go similarity index 91% rename from internal/api/ws.go rename to internal/api/ws/ws.go index 905857364..26a83a2f6 100644 --- a/internal/api/ws.go +++ b/internal/api/ws/ws.go @@ -1,7 +1,9 @@ -package api +package ws import ( + "github.com/AlexxIT/go2rtc/internal/api" "github.com/gorilla/websocket" + "github.com/rs/zerolog/log" "net/http" "net/url" "strings" @@ -9,6 +11,18 @@ import ( "time" ) +func Init() { + var cfg struct { + Mod struct { + Origin string `yaml:"origin"` + } `yaml:"api"` + } + + initWS(cfg.Mod.Origin) + + api.HandleFunc("api/ws", apiWS) +} + // Message - struct for data exchange in Web API type Message struct { Type string `json:"type"` @@ -33,7 +47,7 @@ func (m *Message) GetString(key string) string { type WSHandler func(tr *Transport, msg *Message) error -func HandleWS(msgType string, handler WSHandler) { +func HandleFunc(msgType string, handler WSHandler) { wsHandlers[msgType] = handler } diff --git a/internal/mjpeg/init.go b/internal/mjpeg/init.go index 598aae629..b804cffb4 100644 --- a/internal/mjpeg/init.go +++ b/internal/mjpeg/init.go @@ -3,6 +3,7 @@ package mjpeg import ( "errors" "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/ffmpeg" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" @@ -20,7 +21,7 @@ func Init() { api.HandleFunc("api/frame.jpeg", handlerKeyframe) api.HandleFunc("api/stream.mjpeg", handlerStream) - api.HandleWS("mjpeg", handlerWS) + ws.HandleFunc("mjpeg", handlerWS) } func handlerKeyframe(w http.ResponseWriter, r *http.Request) { @@ -156,7 +157,7 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) { stream.RemoveProducer(client) } -func handlerWS(tr *api.Transport, _ *api.Message) error { +func handlerWS(tr *ws.Transport, _ *ws.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { @@ -178,7 +179,7 @@ func handlerWS(tr *api.Transport, _ *api.Message) error { return err } - tr.Write(&api.Message{Type: "mjpeg"}) + tr.Write(&ws.Message{Type: "mjpeg"}) tr.OnClose(func() { stream.RemoveConsumer(cons) diff --git a/internal/mp4/mp4.go b/internal/mp4/mp4.go index e6791ee76..a791fd766 100644 --- a/internal/mp4/mp4.go +++ b/internal/mp4/mp4.go @@ -1,6 +1,7 @@ package mp4 import ( + "github.com/AlexxIT/go2rtc/internal/api/ws" "net/http" "strconv" "strings" @@ -17,8 +18,8 @@ import ( func Init() { log = app.GetLogger("mp4") - api.HandleWS("mse", handlerWSMSE) - api.HandleWS("mp4", handlerWSMP4) + ws.HandleFunc("mse", handlerWSMSE) + ws.HandleFunc("mp4", handlerWSMP4) api.HandleFunc("api/frame.mp4", handlerKeyframe) api.HandleFunc("api/stream.mp4", handlerMP4) diff --git a/internal/mp4/ws.go b/internal/mp4/ws.go index 0cd7e4fec..1ef3f02f2 100644 --- a/internal/mp4/ws.go +++ b/internal/mp4/ws.go @@ -3,6 +3,7 @@ package mp4 import ( "errors" "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/mp4" @@ -10,7 +11,7 @@ import ( "strings" ) -func handlerWSMSE(tr *api.Transport, msg *api.Message) error { +func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { @@ -42,7 +43,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error { stream.RemoveConsumer(cons) }) - tr.Write(&api.Message{Type: "mse", Value: cons.MimeType()}) + tr.Write(&ws.Message{Type: "mse", Value: cons.MimeType()}) data, err := cons.Init() if err != nil { @@ -57,7 +58,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error { return nil } -func handlerWSMP4(tr *api.Transport, msg *api.Message) error { +func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error { src := tr.Request.URL.Query().Get("src") stream := streams.GetOrNew(src) if stream == nil { @@ -86,7 +87,7 @@ func handlerWSMP4(tr *api.Transport, msg *api.Message) error { return err } - tr.Write(&api.Message{Type: "mp4", Value: cons.MimeType}) + tr.Write(&ws.Message{Type: "mp4", Value: cons.MimeType}) tr.OnClose(func() { stream.RemoveConsumer(cons) diff --git a/internal/webrtc/candidates.go b/internal/webrtc/candidates.go index 51f97040d..4fdbce2ad 100644 --- a/internal/webrtc/candidates.go +++ b/internal/webrtc/candidates.go @@ -1,7 +1,7 @@ package webrtc import ( - "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/pion/sdp/v3" "strconv" @@ -56,7 +56,7 @@ func GetCandidates() (candidates []string) { return } -func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) { +func asyncCandidates(tr *ws.Transport, cons *webrtc.Conn) { tr.WithContext(func(ctx map[any]any) { if candidates, ok := ctx["candidate"].([]string); ok { // process candidates that receive before this moment @@ -74,7 +74,7 @@ func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) { for _, candidate := range GetCandidates() { log.Trace().Str("candidate", candidate).Msg("[webrtc] config") - tr.Write(&api.Message{Type: "webrtc/candidate", Value: candidate}) + tr.Write(&ws.Message{Type: "webrtc/candidate", Value: candidate}) } } @@ -102,7 +102,7 @@ func syncCanditates(answer string) (string, error) { return string(data), nil } -func candidateHandler(tr *api.Transport, msg *api.Message) error { +func candidateHandler(tr *ws.Transport, msg *ws.Message) error { // process incoming candidate in sync function tr.WithContext(func(ctx map[any]any) { candidate := msg.String() diff --git a/internal/webrtc/client.go b/internal/webrtc/client.go index 9db347e95..8a13a288f 100644 --- a/internal/webrtc/client.go +++ b/internal/webrtc/client.go @@ -2,7 +2,7 @@ package webrtc import ( "errors" - "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/pkg/core" "github.com/AlexxIT/go2rtc/pkg/webrtc" "github.com/gorilla/websocket" @@ -30,13 +30,13 @@ func streamsHandler(url string) (core.Producer, error) { // ex: ws://localhost:1984/api/ws?src=camera1 func asyncClient(url string) (core.Producer, error) { // 1. Connect to signalign server - ws, _, err := websocket.DefaultDialer.Dial(url, nil) + conn, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { return nil, err } defer func() { if err != nil { - _ = ws.Close() + _ = conn.Close() } }() @@ -55,14 +55,14 @@ func asyncClient(url string) (core.Producer, error) { prod.Listen(func(msg any) { switch msg := msg.(type) { case pion.PeerConnectionState: - _ = ws.Close() + _ = conn.Close() case *pion.ICECandidate: sendOffer.Wait() s := msg.ToJSON().Candidate log.Trace().Str("candidate", s).Msg("[webrtc] local") - _ = ws.WriteJSON(&api.Message{Type: "webrtc/candidate", Value: s}) + _ = conn.WriteJSON(&ws.Message{Type: "webrtc/candidate", Value: s}) } }) @@ -79,15 +79,15 @@ func asyncClient(url string) (core.Producer, error) { } // 4. Send offer - msg := &api.Message{Type: "webrtc/offer", Value: offer} - if err = ws.WriteJSON(msg); err != nil { + msg := &ws.Message{Type: "webrtc/offer", Value: offer} + if err = conn.WriteJSON(msg); err != nil { return nil, err } sendOffer.Done() // 5. Get answer - if err = ws.ReadJSON(msg); err != nil { + if err = conn.ReadJSON(msg); err != nil { return nil, err } @@ -104,8 +104,8 @@ func asyncClient(url string) (core.Producer, error) { go func() { for { // receive data from remote - msg := new(api.Message) - if err = ws.ReadJSON(msg); err != nil { + msg := new(ws.Message) + if err = conn.ReadJSON(msg); err != nil { if cerr, ok := err.(*websocket.CloseError); ok { log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr) } @@ -120,7 +120,7 @@ func asyncClient(url string) (core.Producer, error) { } } - _ = ws.Close() + _ = conn.Close() }() return prod, nil diff --git a/internal/webrtc/init.go b/internal/webrtc/init.go index 6855e1bde..07f219b79 100644 --- a/internal/webrtc/init.go +++ b/internal/webrtc/init.go @@ -3,6 +3,7 @@ package webrtc import ( "errors" "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/streams" "github.com/AlexxIT/go2rtc/pkg/core" @@ -68,9 +69,9 @@ func Init() { } // async WebRTC server (two API versions) - api.HandleWS("webrtc", asyncHandler) - api.HandleWS("webrtc/offer", asyncHandler) - api.HandleWS("webrtc/candidate", candidateHandler) + ws.HandleFunc("webrtc", asyncHandler) + ws.HandleFunc("webrtc/offer", asyncHandler) + ws.HandleFunc("webrtc/candidate", candidateHandler) // sync WebRTC server (two API versions) api.HandleFunc("api/webrtc", syncHandler) @@ -84,7 +85,7 @@ var log zerolog.Logger var PeerConnection func(active bool) (*pion.PeerConnection, error) -func asyncHandler(tr *api.Transport, msg *api.Message) error { +func asyncHandler(tr *ws.Transport, msg *ws.Message) error { var stream *streams.Stream var mode core.Mode @@ -134,7 +135,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { s := msg.ToJSON().Candidate log.Trace().Str("candidate", s).Msg("[webrtc] local") - tr.Write(&api.Message{Type: "webrtc/candidate", Value: s}) + tr.Write(&ws.Message{Type: "webrtc/candidate", Value: s}) } }) @@ -179,9 +180,9 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error { if apiV2 { desc := pion.SessionDescription{Type: pion.SDPTypeAnswer, SDP: answer} - tr.Write(&api.Message{Type: "webrtc", Value: desc}) + tr.Write(&ws.Message{Type: "webrtc", Value: desc}) } else { - tr.Write(&api.Message{Type: "webrtc/answer", Value: answer}) + tr.Write(&ws.Message{Type: "webrtc/answer", Value: answer}) } sendAnswer.Done() diff --git a/main.go b/main.go index 97476cd19..cd7b29394 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "github.com/AlexxIT/go2rtc/internal/api" + "github.com/AlexxIT/go2rtc/internal/api/ws" "github.com/AlexxIT/go2rtc/internal/app" "github.com/AlexxIT/go2rtc/internal/debug" "github.com/AlexxIT/go2rtc/internal/dvrip" @@ -32,37 +33,55 @@ import ( ) func main() { - app.Init() // init config and logs - api.Init() // init HTTP API server - streams.Init() // load streams list - onvif.Init() - - rtsp.Init() // add support RTSP client and RTSP server - rtmp.Init() // add support RTMP client - exec.Init() // add support exec scheme (depends on RTSP server) - ffmpeg.Init() // add support ffmpeg scheme (depends on exec scheme) - hass.Init() // add support hass scheme - echo.Init() - ivideon.Init() - http.Init() - dvrip.Init() - tapo.Init() - isapi.Init() - mpegts.Init() - roborock.Init() - nest.Init() - - srtp.Init() - homekit.Init() - - webrtc.Init() - mp4.Init() - hls.Init() - mjpeg.Init() - - webtorrent.Init() - ngrok.Init() - debug.Init() + // 1. Core modules: app, api/ws, streams + + app.Init() // init config and logs + + api.Init() // init API before all others + ws.Init() // init WS API endpoint + + streams.Init() // streams module + + // 2. Main sources and servers + + rtsp.Init() // rtsp source, RTSP server + webrtc.Init() // webrtc source, WebRTC server + + // 3. Main API + + mp4.Init() // MP4 API + hls.Init() // HLS API + mjpeg.Init() // MJPEG API + + // 4. Other sources and servers + + hass.Init() // hass source, Hass API server + onvif.Init() // onvif source, ONVIF API server + webtorrent.Init() // webtorrent source, WebTorrent module + + // 5. Other sources + + rtmp.Init() // rtmp source + exec.Init() // exec source + ffmpeg.Init() // ffmpeg source + echo.Init() // echo source + ivideon.Init() // ivideon source + http.Init() // http/tcp source + dvrip.Init() // dvrip source + tapo.Init() // tapo source + isapi.Init() // isapi source + mpegts.Init() // mpegts passive source + roborock.Init() // roborock source + homekit.Init() // homekit source + nest.Init() // nest source + + // 6. Helper modules + + ngrok.Init() // Ngrok module + srtp.Init() // SRTP server + debug.Init() // debug API + + // 7. Go shell.RunUntilSignal() }