Skip to content

Commit

Permalink
Recovered OneToMany processor and moved transcodification to
Browse files Browse the repository at this point in the history
OneToManyTranscoder
  • Loading branch information
lodoyun committed Oct 26, 2012
1 parent 84666d5 commit 2351a98
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 108 deletions.
97 changes: 10 additions & 87 deletions src/erizo/OneToManyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,15 @@

#include "OneToManyProcessor.h"
#include "WebRtcConnection.h"
#include "RTPSink.h"
#include "media/rtp/RtpHeader.h"

namespace erizo {
OneToManyProcessor::OneToManyProcessor() :
MediaReceiver() {

sendVideoBuffer_ = (char*) malloc(2000);
sendAudioBuffer_ = (char*) malloc(2000);

publisher = NULL;
sentPackets_ = 0;
ip = new InputProcessor();
sink_ = new RTPSink("127.0.0.1", "50000");
MediaInfo m;
m.proccessorType = RTP_ONLY;
// m.videoCodec.bitRate = 2000000;
// printf("m.videoCodec.bitrate %d\n\n", m.videoCodec.bitRate);
m.hasVideo = true;
m.videoCodec.width = 640;
m.videoCodec.height = 480;
m.hasAudio = false;
if (m.hasAudio) {
m.audioCodec.sampleRate = 8000;
m.audioCodec.bitRate = 64000;

}
printf("init ip\n");
ip->init(m, this);

MediaInfo om;
om.proccessorType = RTP_ONLY;
om.videoCodec.bitRate = 2000000;
om.videoCodec.width = 640;
om.videoCodec.height = 480;
om.videoCodec.frameRate = 20;
om.hasVideo = true;
// om.url = "file://tmp/test.mp4";

om.hasAudio = false;
if (om.hasAudio) {
om.audioCodec.sampleRate = 8000;
om.audioCodec.bitRate = 64000;
}

op = new OutputProcessor();
op->init(om, this);

}

Expand All @@ -60,9 +22,6 @@ OneToManyProcessor::~OneToManyProcessor() {
delete sendVideoBuffer_;
if (sendAudioBuffer_)
delete sendAudioBuffer_;
if (sink_) {
delete sink_;
}
}

int OneToManyProcessor::receiveAudioData(char* buf, int len) {
Expand All @@ -81,70 +40,34 @@ int OneToManyProcessor::receiveAudioData(char* buf, int len) {
}

int OneToManyProcessor::receiveVideoData(char* buf, int len) {
memset(sendVideoBuffer_, 0, len);
memcpy(sendVideoBuffer_, buf, len);

RTPHeader* theHead = reinterpret_cast<RTPHeader*>(buf);
// printf("extension %d pt %u\n", theHead->getExtension(),
// theHead->getPayloadType());

if (theHead->getPayloadType() == 100) {
ip->receiveVideoData(sendVideoBuffer_, len);
} else {
this->receiveRtpData((unsigned char*) buf, len);
}

// if (subscribers.empty() || len <= 0)
// return 0;
// if (sentPackets_ % 500 == 0) {
// publisher->sendFirPacket();
// }
// std::map<int, WebRtcConnection*>::iterator it;
// for (it = subscribers.begin(); it != subscribers.end(); it++) {
// memset(sendVideoBuffer_, 0, len);
// memcpy(sendVideoBuffer_, buf, len);
// (*it).second->receiveVideoData(sendVideoBuffer_, len);
// }
// memset(sendVideoBuffer_, 0, len);
// memcpy(sendVideoBuffer_, buf, len);
// sink_->sendData((unsigned char*)sendVideoBuffer_,len);

sentPackets_++;
return 0;
}

void OneToManyProcessor::receiveRawData(RawDataPacket& pkt) {
// printf("Received %d\n", pkt.length);
op->receiveRawData(pkt);
}

void OneToManyProcessor::receiveRtpData(unsigned char*rtpdata, int len) {
printf("Received rtp data %d\n", len);
memcpy(sendVideoBuffer_, rtpdata, len);

if (subscribers.empty() || len <= 0)
return;
// if (sentPackets_ % 500 == 0) {
// publisher->sendFirPacket();
// }
return 0;
if (sentPackets_ % 500 == 0) {
publisher->sendFirPacket();
}
std::map<int, WebRtcConnection*>::iterator it;
for (it = subscribers.begin(); it != subscribers.end(); it++) {
memcpy(sendVideoBuffer_, rtpdata, len);
memset(sendVideoBuffer_, 0, len);
memcpy(sendVideoBuffer_, buf, len);
(*it).second->receiveVideoData(sendVideoBuffer_, len);
}
sentPackets_++;
return 0;
}

void OneToManyProcessor::setPublisher(WebRtcConnection* webRtcConn) {

this->publisher = webRtcConn;
}

void OneToManyProcessor::addSubscriber(WebRtcConnection* webRtcConn,
int peerId) {

this->subscribers[peerId] = webRtcConn;
}

void OneToManyProcessor::removeSubscriber(int peerId) {

if (this->subscribers.find(peerId) != subscribers.end()) {
this->subscribers[peerId]->close();
this->subscribers.erase(peerId);
Expand Down
21 changes: 2 additions & 19 deletions src/erizo/OneToManyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@
#define ONETOMANYPROCESSOR_H_

#include <map>
#include <vector>

#include "MediaDefinitions.h"
#include "media/MediaProcessor.h"


namespace erizo{

class WebRtcConnection;
class RTPSink;

/**
* Represents a One to Many connection.
* Receives media from one publisher and retransmits it to every subscriber.
*/
class OneToManyProcessor : public MediaReceiver, public RawDataReceiver, public RTPDataReceiver {
class OneToManyProcessor : public MediaReceiver {
public:
WebRtcConnection *publisher;
std::map<int, WebRtcConnection*> subscribers;
Expand All @@ -45,12 +42,6 @@ class OneToManyProcessor : public MediaReceiver, public RawDataReceiver, public
void removeSubscriber(int peerId);
int receiveAudioData(char* buf, int len);
int receiveVideoData(char* buf, int len);
void receiveRawData(RawDataPacket& packet);
void receiveRtpData(unsigned char*rtpdata, int len);

// MediaProcessor *mp;
InputProcessor* ip;
OutputProcessor* op;
/**
* Closes all the subscribers and the publisher, the object is useless after this
*/
Expand All @@ -59,14 +50,6 @@ class OneToManyProcessor : public MediaReceiver, public RawDataReceiver, public
private:
char* sendVideoBuffer_;
char* sendAudioBuffer_;
char* unpackagedBuffer_;
char* decodedBuffer_;
char* codedBuffer_;
RTPSink* sink_;
std::vector<packet> head;
int gotFrame_,gotDecodedFrame_, size_;
void sendHead(WebRtcConnection* conn);
RtpParser pars;
unsigned int sentPackets_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/erizo/media/MediaProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class RawDataReceiver {

class RTPDataReceiver {
public:
virtual void receiveRtpData(unsigned char*rtpdata, int len) = 0;
virtual void receiveRtpData(unsigned char* rtpdata, int len) = 0;
virtual ~RTPDataReceiver() {
}
;
Expand Down
163 changes: 163 additions & 0 deletions src/erizo/media/OneToManyTranscoder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* OneToManyTranscoder.cpp
*/

#include "OneToManyTranscoder.h"
#include "../WebRtcConnection.h"
#include "../RTPSink.h"
#include "rtp/RtpHeader.h"

namespace erizo {
OneToManyTranscoder::OneToManyTranscoder() :
MediaReceiver() {

sendVideoBuffer_ = (char*) malloc(2000);
sendAudioBuffer_ = (char*) malloc(2000);

publisher = NULL;
sentPackets_ = 0;
ip = new InputProcessor();
sink_ = new RTPSink("127.0.0.1", "50000");
MediaInfo m;
m.proccessorType = RTP_ONLY;
// m.videoCodec.bitRate = 2000000;
// printf("m.videoCodec.bitrate %d\n\n", m.videoCodec.bitRate);
m.hasVideo = true;
m.videoCodec.width = 640;
m.videoCodec.height = 480;
m.hasAudio = false;
if (m.hasAudio) {
m.audioCodec.sampleRate = 8000;
m.audioCodec.bitRate = 64000;

}
printf("init ip\n");
ip->init(m, this);

MediaInfo om;
om.proccessorType = RTP_ONLY;
om.videoCodec.bitRate = 2000000;
om.videoCodec.width = 640;
om.videoCodec.height = 480;
om.videoCodec.frameRate = 20;
om.hasVideo = true;
// om.url = "file://tmp/test.mp4";

om.hasAudio = false;
if (om.hasAudio) {
om.audioCodec.sampleRate = 8000;
om.audioCodec.bitRate = 64000;
}

op = new OutputProcessor();
op->init(om, this);

}

OneToManyTranscoder::~OneToManyTranscoder() {
this->closeAll();
if (sendVideoBuffer_)
delete sendVideoBuffer_;
if (sendAudioBuffer_)
delete sendAudioBuffer_;
if (sink_) {
delete sink_;
}
}

int OneToManyTranscoder::receiveAudioData(char* buf, int len) {

if (subscribers.empty() || len <= 0)
return 0;

std::map<int, WebRtcConnection*>::iterator it;
for (it = subscribers.begin(); it != subscribers.end(); it++) {
memset(sendAudioBuffer_, 0, len);
memcpy(sendAudioBuffer_, buf, len);
(*it).second->receiveAudioData(sendAudioBuffer_, len);
}

return 0;
}

int OneToManyTranscoder::receiveVideoData(char* buf, int len) {
memset(sendVideoBuffer_, 0, len);
memcpy(sendVideoBuffer_, buf, len);

RTPHeader* theHead = reinterpret_cast<RTPHeader*>(buf);
// printf("extension %d pt %u\n", theHead->getExtension(),
// theHead->getPayloadType());

if (theHead->getPayloadType() == 100) {
ip->receiveVideoData(sendVideoBuffer_, len);
} else {
this->receiveRtpData((unsigned char*) buf, len);
}

// if (subscribers.empty() || len <= 0)
// return 0;
// if (sentPackets_ % 500 == 0) {
// publisher->sendFirPacket();
// }
// std::map<int, WebRtcConnection*>::iterator it;
// for (it = subscribers.begin(); it != subscribers.end(); it++) {
// memset(sendVideoBuffer_, 0, len);
// memcpy(sendVideoBuffer_, buf, len);
// (*it).second->receiveVideoData(sendVideoBuffer_, len);
// }
// memset(sendVideoBuffer_, 0, len);
// memcpy(sendVideoBuffer_, buf, len);
// sink_->sendData((unsigned char*)sendVideoBuffer_,len);

sentPackets_++;
return 0;
}

void OneToManyTranscoder::receiveRawData(RawDataPacket& pkt) {
// printf("Received %d\n", pkt.length);
op->receiveRawData(pkt);
}

void OneToManyTranscoder::receiveRtpData(unsigned char*rtpdata, int len) {
printf("Received rtp data %d\n", len);
memcpy(sendVideoBuffer_, rtpdata, len);

if (subscribers.empty() || len <= 0)
return;
// if (sentPackets_ % 500 == 0) {
// publisher->sendFirPacket();
// }
std::map<int, WebRtcConnection*>::iterator it;
for (it = subscribers.begin(); it != subscribers.end(); it++) {
memcpy(sendVideoBuffer_, rtpdata, len);
(*it).second->receiveVideoData(sendVideoBuffer_, len);
}
sentPackets_++;
}

void OneToManyTranscoder::setPublisher(WebRtcConnection* webRtcConn) {
this->publisher = webRtcConn;
}

void OneToManyTranscoder::addSubscriber(WebRtcConnection* webRtcConn,
int peerId) {
this->subscribers[peerId] = webRtcConn;
}

void OneToManyTranscoder::removeSubscriber(int peerId) {
if (this->subscribers.find(peerId) != subscribers.end()) {
this->subscribers[peerId]->close();
this->subscribers.erase(peerId);
}
}

void OneToManyTranscoder::closeAll() {
std::map<int, WebRtcConnection*>::iterator it;
for (it = subscribers.begin(); it != subscribers.end(); it++) {
(*it).second->close();
}
this->publisher->close();
}

}/* namespace erizo */

Loading

0 comments on commit 2351a98

Please sign in to comment.