Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy dockerfile #4184

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Dockerfile.proxy
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
ARG ARCH
FROM ${ARCH}ossrs/srs:ubuntu20 AS build

COPY ./proxy /proxy

WORKDIR /proxy

RUN make clean && make

############################################################
# dist
############################################################
FROM ${ARCH}ubuntu:focal AS dist

WORKDIR /proxy

COPY --from=build /proxy/srs-proxy /proxy/
COPY ./trunk/research /proxy/static

ENV PROXY_STATIC_FILES="/proxy/static"
ENV PROXY_LOAD_BALANCER_TYPE="memory"
ENV PROXY_RTMP_SERVER=1935
ENV PROXY_HTTP_SERVER=8080
ENV PROXY_HTTP_API=1985
ENV PROXY_WEBRTC_SERVER=8000
ENV PROXY_SRT_SERVER=10080
ENV PROXY_SYSTEM_API=12025

EXPOSE 1935 8080 1985 12025 8000/udp 10080/udp

CMD ["./srs-proxy"]
11 changes: 8 additions & 3 deletions proxy/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,23 @@ func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
apiError(ctx, w, r, err, http.StatusInternalServerError)
}
})

// The WebRTC WHEP API handler.
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
apiError(ctx, w, r, err, http.StatusInternalServerError)
}
})

logger.Df(ctx, "Proxy /api/ to srs")
mux.HandleFunc("/api/", func(w http.ResponseWriter, r *http.Request) {
srsLoadBalancer.ProxyHTTPAPI(ctx, w, r)
})

// Run HTTP API server.
v.wg.Add(1)
go func() {
Expand Down Expand Up @@ -239,7 +244,7 @@ func (v *systemAPI) Run(ctx context.Context) error {
logger.Df(ctx, "Register SRS media server, %+v", server)
return nil
}(); err != nil {
apiError(ctx, w, r, err)
apiError(ctx, w, r, err, http.StatusInternalServerError)
}

type Response struct {
Expand Down
4 changes: 2 additions & 2 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (v *HTTPFlvTsConnection) ServeHTTP(w http.ResponseWriter, r *http.Request)
ctx := logger.WithContext(v.ctx)

if err := v.serve(ctx, w, r); err != nil {
apiError(ctx, w, r, err)
apiError(ctx, w, r, err, http.StatusInternalServerError)
} else {
logger.Df(ctx, "HTTP client done")
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (v *HLSPlayStream) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

if err := v.serve(v.ctx, w, r); err != nil {
apiError(v.ctx, w, r, err)
apiError(v.ctx, w, r, err, http.StatusInternalServerError)
} else {
logger.Df(v.ctx, "HLS client %v for %v with %v done",
v.SRSProxyBackendHLSID, v.StreamURL, r.URL.Path)
Expand Down
308 changes: 308 additions & 0 deletions proxy/srs-api-proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
// Copyright (c) 2024 Winlin
//
// SPDX-License-Identifier: MIT
package main

import (
"context"
"encoding/json"
"io"
"net/http"
"srs-proxy/errors"
"srs-proxy/logger"
"strings"
)

type SrsClient struct {
Id string `json:"id"`
Vhost string `json:"vhost"`
Stream string `json:"stream"`
Ip string `json:"ip"`
PageUrl string `json:"pageUrl"`
SwfUrl string `json:"swfUrl"`
TcUrl string `json:"tcUrl"`
Url string `json:"url"`
Name string `json:"name"`
Type string `json:"type"`
Publish bool `json:"publish"`
Alive float32 `json:"alive"`
SendBytes int `json:"send_bytes"`
RecvBytes int `json:"recv_bytes"`
}

type SrsApiCodeResponse struct {
Code int `json:"code"`
}

type SrsAPICommonResponse struct {
SrsApiCodeResponse
Server string `json:"server"`
Service string `json:"service"`
Pid string `json:"pid"`
}

type SrsClientResponse struct {
SrsAPICommonResponse
Client SrsClient `json:"client"`
}

type SrsClientsResponse struct {
SrsAPICommonResponse
Clients []SrsClient `json:"clients"`
}

type SrsKbps struct {
Recv_30s uint32 `json:"recv_30s"`
Send_30s uint32 `json:"send_30s"`
}

type SrsPublish struct {
Active bool `json:"active"`
Cid string `json:"cid"`
}

type SrsVideo struct {
Codec string `json:"codec"`
Profile string `json:"profile"`
Level string `json:"level"`
Width uint32 `json:"width"`
Height uint32 `json:"height"`
}

type SrsAudio struct {
Codec string `json:"codec"`
Sample_rate uint32 `json:"sample_rate"`
Channel uint8 `json:"channel"`
Profile string `json:"profile"`
}

type SrsStream struct {
Id string `json:"id"`
Name string `json:"name"`
Vhost string `json:"vhost"`
App string `json:"app"`
TcUrl string `json:"tcUrl"`
Url string `json:"url"`
Live_ms uint64 `json:"live_ms"`
Clients uint32 `json:"clients"`
Frames uint32 `json:"frames"`
Send_bytes uint32 `json:"send_bytes"`
Recv_bytes uint32 `json:"recv_bytes"`
Kbps SrsKbps `json:"kbps"`
Publish SrsPublish `json:"publish"`
Video SrsVideo `json:"video"`
Audio SrsAudio `json:"audio"`
}

type SrsStreamResponse struct {
SrsAPICommonResponse
Stream SrsStream `json:"stream"`
}

type SrsStreamsResponse struct {
SrsAPICommonResponse
Streams []SrsStream `json:"streams"`
}

type SrsHTTPApi struct {
Enabled bool `json:"enabled"`
Listen string `json:"listen"`
Crossdomain bool `json:"crossdomain"`
Raw_api SrsRawApi `json:"raw_api"`
}

type SrsRawApi struct {
Enabled bool `json:"enabled"`
Allow_reload bool `json:"allow_reload"`
Allow_query bool `json:"allow_query"`
Allow_update bool `json:"allow_update"`
}

type SrsRawResponse struct {
SrsApiCodeResponse
Http_api SrsHTTPApi `json:"http_api"`
}

type SrsRawReloadResponse struct {
SrsApiCodeResponse
}

type SrsRawReloadFetchData struct {
Err int `json:"err"`
Msg string `json:"msg"`
State int `json:"state"`
Rid string `json:"rid"`
}

type SrsRawReloadFetchResponse struct {
SrsApiCodeResponse
Data SrsRawReloadFetchData `json:"data"`
}

type SrsApiProxy struct {
}

func (v *SrsApiProxy) proxySrsAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
if strings.HasPrefix(r.URL.Path, "/api/v1/clients") {
return proxySrsClientsAPI(ctx, servers, w, r)
} else if strings.HasPrefix(r.URL.Path, "/api/v1/streams") {
return proxySrsStreamsAPI(ctx, servers, w, r)
} else if strings.HasPrefix(r.URL.Path, "/api/v1/raw") {
return proxySrsRawAPI(ctx, servers, w, r)
}
return nil
}

// handle srs clients api /api/v1/clients
func proxySrsClientsAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()

clientId := ""
if strings.HasPrefix(r.URL.Path, "/api/v1/clients/") {
clientId = r.URL.Path[len("/api/v1/clients/"):]
}
logger.Df(ctx, "%v %v clientId=%v", r.Method, r.URL.Path, clientId)

body, err := io.ReadAll(r.Body)
if err != nil {
apiError(ctx, w, r, err, http.StatusInternalServerError)
return errors.Wrapf(err, "read request body err")
}

switch r.Method {
case http.MethodDelete:
for _, server := range servers {
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
logger.Df(ctx, "response %v", string(ret))
var res SrsApiCodeResponse
if err := json.Unmarshal(ret, &res); err == nil && res.Code == 0 {
apiResponse(ctx, w, r, res)
return nil
}
}
}

err := errors.Errorf("clientId %v not found in server", clientId)
apiError(ctx, w, r, err, http.StatusNotFound)
return err
case http.MethodGet:
if len(clientId) > 0 {
for _, server := range servers {
var client SrsClientResponse
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
if err := json.Unmarshal(ret, &client); err == nil && client.Code == 0 {
apiResponse(ctx, w, r, client)
return nil
}
}
}
} else { // get all clients
var clients SrsClientsResponse
for _, server := range servers {
var res SrsClientsResponse
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
if err := json.Unmarshal(ret, &res); err == nil && res.Code == 0 {
clients.Clients = append(clients.Clients, res.Clients...)
}
}
}

apiResponse(ctx, w, r, clients)
return nil
}
default:
logger.Df(ctx, "/api/v1/clients %v", r.Method)
}
return nil
}

func proxySrsStreamsAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()

streamId := ""
if strings.HasPrefix(r.URL.Path, "/api/v1/streams/") {
streamId = r.URL.Path[len("/api/v1/streams/"):]
}
logger.Df(ctx, "%v %v streamId=%v", r.Method, r.URL.Path, streamId)

body, err := io.ReadAll(r.Body)
if err != nil {
apiError(ctx, w, r, err, http.StatusInternalServerError)
return errors.Wrapf(err, "read request body err")
}
if r.Method != http.MethodGet {
err := errors.Errorf("Unsupported http method type %v", r.Method)
apiError(ctx, w, r, err, http.StatusBadRequest)
return err
}
if len(streamId) > 0 {
var stream SrsStreamResponse
for _, server := range servers {
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
if err := json.Unmarshal(ret, &stream); err == nil && stream.Code == 0 {
apiResponse(ctx, w, r, stream)
return nil
}
}
}
ret := SrsApiCodeResponse{
Code: 2048,
}
apiResponse(ctx, w, r, ret)
return nil
} else {
var streams SrsStreamsResponse
for _, server := range servers {
var res SrsStreamsResponse
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
if err := json.Unmarshal(ret, &res); err == nil && res.Code == 0 {
streams.Streams = append(streams.Streams, res.Streams...)
}
}
}

apiResponse(ctx, w, r, streams)
return nil
}
}

func proxySrsRawAPI(ctx context.Context, servers []*SRSServer, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()

rpc := r.URL.Query().Get("rpc")
logger.Df(ctx, "%v, rpc=%v", r.URL.Path, rpc)
body, err := io.ReadAll(r.Body)
if err != nil {
apiError(ctx, w, r, err, http.StatusInternalServerError)
return errors.Wrapf(err, "read request body err")
}

for _, server := range servers {
if ret, err := server.ApiRequest(ctx, r, body); err == nil {
if rpc == "raw" {
// return the first success response
var raw SrsRawResponse
if err := json.Unmarshal(ret, &raw); err == nil && raw.Code == 0 {
raw.Http_api.Listen = envHttpAPI()
apiResponse(ctx, w, r, raw)
return nil
}
} else if rpc == "reload" {
var res SrsRawReloadResponse
err := json.Unmarshal(ret, &res)
logger.Df(ctx, "%v %v %v %v", server.IP, r.URL.Path, res, err)
} else if rpc == "reload-fetch" {
var res SrsRawReloadFetchResponse
err := json.Unmarshal(ret, &res)
logger.Df(ctx, "%v %v %v %v", server.IP, r.URL.Path, res, err)
} else {
var code SrsApiCodeResponse
if err := json.Unmarshal(ret, &code); err == nil {
logger.Df(ctx, "%v %v", r.URL.Path, code)
}
}
}
}

return nil
}
Loading
Loading