Skip to content

Commit

Permalink
Move WS API to separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed May 23, 2023
1 parent 82a8e07 commit 59555cf
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 66 deletions.
2 changes: 0 additions & 2 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ func Init() {
log = app.GetLogger("api")

initStatic(cfg.Mod.StaticDir)
initWS(cfg.Mod.Origin)

HandleFunc("api", apiHandler)
HandleFunc("api/config", configHandler)
HandleFunc("api/exit", exitHandler)
HandleFunc("api/ws", apiWS)

// ensure we can listen without errors
listener, err := net.Listen("tcp", cfg.Mod.Listen)
Expand Down
18 changes: 16 additions & 2 deletions internal/api/ws.go → internal/api/ws/ws.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
package api
package ws

import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"net/http"
"net/url"
"strings"
"sync"
"time"
)

func Init() {
var cfg struct {
Mod struct {
Origin string `yaml:"origin"`
} `yaml:"api"`
}

initWS(cfg.Mod.Origin)

api.HandleFunc("api/ws", apiWS)
}

// Message - struct for data exchange in Web API
type Message struct {
Type string `json:"type"`
Expand All @@ -33,7 +47,7 @@ func (m *Message) GetString(key string) string {

type WSHandler func(tr *Transport, msg *Message) error

func HandleWS(msgType string, handler WSHandler) {
func HandleFunc(msgType string, handler WSHandler) {
wsHandlers[msgType] = handler
}

Expand Down
7 changes: 4 additions & 3 deletions internal/mjpeg/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mjpeg
import (
"errors"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/ffmpeg"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
Expand All @@ -20,7 +21,7 @@ func Init() {
api.HandleFunc("api/frame.jpeg", handlerKeyframe)
api.HandleFunc("api/stream.mjpeg", handlerStream)

api.HandleWS("mjpeg", handlerWS)
ws.HandleFunc("mjpeg", handlerWS)
}

func handlerKeyframe(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -156,7 +157,7 @@ func inputMjpeg(w http.ResponseWriter, r *http.Request) {
stream.RemoveProducer(client)
}

func handlerWS(tr *api.Transport, _ *api.Message) error {
func handlerWS(tr *ws.Transport, _ *ws.Message) error {
src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {
Expand All @@ -178,7 +179,7 @@ func handlerWS(tr *api.Transport, _ *api.Message) error {
return err
}

tr.Write(&api.Message{Type: "mjpeg"})
tr.Write(&ws.Message{Type: "mjpeg"})

tr.OnClose(func() {
stream.RemoveConsumer(cons)
Expand Down
5 changes: 3 additions & 2 deletions internal/mp4/mp4.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mp4

import (
"github.com/AlexxIT/go2rtc/internal/api/ws"
"net/http"
"strconv"
"strings"
Expand All @@ -17,8 +18,8 @@ import (
func Init() {
log = app.GetLogger("mp4")

api.HandleWS("mse", handlerWSMSE)
api.HandleWS("mp4", handlerWSMP4)
ws.HandleFunc("mse", handlerWSMSE)
ws.HandleFunc("mp4", handlerWSMP4)

api.HandleFunc("api/frame.mp4", handlerKeyframe)
api.HandleFunc("api/stream.mp4", handlerMP4)
Expand Down
9 changes: 5 additions & 4 deletions internal/mp4/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package mp4
import (
"errors"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/mp4"
"github.com/AlexxIT/go2rtc/pkg/tcp"
"strings"
)

func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
func handlerWSMSE(tr *ws.Transport, msg *ws.Message) error {
src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {
Expand Down Expand Up @@ -42,7 +43,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
stream.RemoveConsumer(cons)
})

tr.Write(&api.Message{Type: "mse", Value: cons.MimeType()})
tr.Write(&ws.Message{Type: "mse", Value: cons.MimeType()})

data, err := cons.Init()
if err != nil {
Expand All @@ -57,7 +58,7 @@ func handlerWSMSE(tr *api.Transport, msg *api.Message) error {
return nil
}

func handlerWSMP4(tr *api.Transport, msg *api.Message) error {
func handlerWSMP4(tr *ws.Transport, msg *ws.Message) error {
src := tr.Request.URL.Query().Get("src")
stream := streams.GetOrNew(src)
if stream == nil {
Expand Down Expand Up @@ -86,7 +87,7 @@ func handlerWSMP4(tr *api.Transport, msg *api.Message) error {
return err
}

tr.Write(&api.Message{Type: "mp4", Value: cons.MimeType})
tr.Write(&ws.Message{Type: "mp4", Value: cons.MimeType})

tr.OnClose(func() {
stream.RemoveConsumer(cons)
Expand Down
8 changes: 4 additions & 4 deletions internal/webrtc/candidates.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package webrtc

import (
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/pion/sdp/v3"
"strconv"
Expand Down Expand Up @@ -56,7 +56,7 @@ func GetCandidates() (candidates []string) {
return
}

func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) {
func asyncCandidates(tr *ws.Transport, cons *webrtc.Conn) {
tr.WithContext(func(ctx map[any]any) {
if candidates, ok := ctx["candidate"].([]string); ok {
// process candidates that receive before this moment
Expand All @@ -74,7 +74,7 @@ func asyncCandidates(tr *api.Transport, cons *webrtc.Conn) {

for _, candidate := range GetCandidates() {
log.Trace().Str("candidate", candidate).Msg("[webrtc] config")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: candidate})
tr.Write(&ws.Message{Type: "webrtc/candidate", Value: candidate})
}
}

Expand Down Expand Up @@ -102,7 +102,7 @@ func syncCanditates(answer string) (string, error) {
return string(data), nil
}

func candidateHandler(tr *api.Transport, msg *api.Message) error {
func candidateHandler(tr *ws.Transport, msg *ws.Message) error {
// process incoming candidate in sync function
tr.WithContext(func(ctx map[any]any) {
candidate := msg.String()
Expand Down
22 changes: 11 additions & 11 deletions internal/webrtc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package webrtc

import (
"errors"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/pkg/core"
"github.com/AlexxIT/go2rtc/pkg/webrtc"
"github.com/gorilla/websocket"
Expand Down Expand Up @@ -30,13 +30,13 @@ func streamsHandler(url string) (core.Producer, error) {
// ex: ws://localhost:1984/api/ws?src=camera1
func asyncClient(url string) (core.Producer, error) {
// 1. Connect to signalign server
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
_ = ws.Close()
_ = conn.Close()
}
}()

Expand All @@ -55,14 +55,14 @@ func asyncClient(url string) (core.Producer, error) {
prod.Listen(func(msg any) {
switch msg := msg.(type) {
case pion.PeerConnectionState:
_ = ws.Close()
_ = conn.Close()

case *pion.ICECandidate:
sendOffer.Wait()

s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local")
_ = ws.WriteJSON(&api.Message{Type: "webrtc/candidate", Value: s})
_ = conn.WriteJSON(&ws.Message{Type: "webrtc/candidate", Value: s})
}
})

Expand All @@ -79,15 +79,15 @@ func asyncClient(url string) (core.Producer, error) {
}

// 4. Send offer
msg := &api.Message{Type: "webrtc/offer", Value: offer}
if err = ws.WriteJSON(msg); err != nil {
msg := &ws.Message{Type: "webrtc/offer", Value: offer}
if err = conn.WriteJSON(msg); err != nil {
return nil, err
}

sendOffer.Done()

// 5. Get answer
if err = ws.ReadJSON(msg); err != nil {
if err = conn.ReadJSON(msg); err != nil {
return nil, err
}

Expand All @@ -104,8 +104,8 @@ func asyncClient(url string) (core.Producer, error) {
go func() {
for {
// receive data from remote
msg := new(api.Message)
if err = ws.ReadJSON(msg); err != nil {
msg := new(ws.Message)
if err = conn.ReadJSON(msg); err != nil {
if cerr, ok := err.(*websocket.CloseError); ok {
log.Trace().Err(err).Caller().Msgf("[webrtc] ws code=%d", cerr)
}
Expand All @@ -120,7 +120,7 @@ func asyncClient(url string) (core.Producer, error) {
}
}

_ = ws.Close()
_ = conn.Close()
}()

return prod, nil
Expand Down
15 changes: 8 additions & 7 deletions internal/webrtc/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package webrtc
import (
"errors"
"github.com/AlexxIT/go2rtc/internal/api"
"github.com/AlexxIT/go2rtc/internal/api/ws"
"github.com/AlexxIT/go2rtc/internal/app"
"github.com/AlexxIT/go2rtc/internal/streams"
"github.com/AlexxIT/go2rtc/pkg/core"
Expand Down Expand Up @@ -68,9 +69,9 @@ func Init() {
}

// async WebRTC server (two API versions)
api.HandleWS("webrtc", asyncHandler)
api.HandleWS("webrtc/offer", asyncHandler)
api.HandleWS("webrtc/candidate", candidateHandler)
ws.HandleFunc("webrtc", asyncHandler)
ws.HandleFunc("webrtc/offer", asyncHandler)
ws.HandleFunc("webrtc/candidate", candidateHandler)

// sync WebRTC server (two API versions)
api.HandleFunc("api/webrtc", syncHandler)
Expand All @@ -84,7 +85,7 @@ var log zerolog.Logger

var PeerConnection func(active bool) (*pion.PeerConnection, error)

func asyncHandler(tr *api.Transport, msg *api.Message) error {
func asyncHandler(tr *ws.Transport, msg *ws.Message) error {
var stream *streams.Stream
var mode core.Mode

Expand Down Expand Up @@ -134,7 +135,7 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {

s := msg.ToJSON().Candidate
log.Trace().Str("candidate", s).Msg("[webrtc] local")
tr.Write(&api.Message{Type: "webrtc/candidate", Value: s})
tr.Write(&ws.Message{Type: "webrtc/candidate", Value: s})
}
})

Expand Down Expand Up @@ -179,9 +180,9 @@ func asyncHandler(tr *api.Transport, msg *api.Message) error {

if apiV2 {
desc := pion.SessionDescription{Type: pion.SDPTypeAnswer, SDP: answer}
tr.Write(&api.Message{Type: "webrtc", Value: desc})
tr.Write(&ws.Message{Type: "webrtc", Value: desc})
} else {
tr.Write(&api.Message{Type: "webrtc/answer", Value: answer})
tr.Write(&ws.Message{Type: "webrtc/answer", Value: answer})
}

sendAnswer.Done()
Expand Down
Loading

0 comments on commit 59555cf

Please sign in to comment.