Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloudwatch integration demo #2013

Open
wants to merge 65 commits into
base: develop-pre-1.11.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
3294ebb
Start
disa6302 May 1, 2024
b760fb4
Trial
disa6302 May 2, 2024
df911a4
Initial commit:rtp stats, config file
disa6302 May 8, 2024
b0e2d60
Rename, include logs and metrics
disa6302 May 9, 2024
ec673a9
Rebased
disa6302 May 17, 2024
e431aec
Rearrange
disa6302 May 13, 2024
6c2402d
Stop time
disa6302 May 14, 2024
47e72fc
Add viewer
disa6302 May 14, 2024
37f17de
Add profiling
disa6302 May 15, 2024
1bc1763
E2E added, only video
disa6302 May 20, 2024
1d68bb0
Rearrange, restore original samples
disa6302 May 20, 2024
bdc40c2
iot, label names
disa6302 May 21, 2024
e2e9e97
run time
disa6302 May 21, 2024
5780112
terminate in viewer
disa6302 May 21, 2024
8211ae6
aws cpp target arch
disa6302 May 21, 2024
487210d
TTFF
disa6302 May 21, 2024
4f7ff5d
logging...
disa6302 May 22, 2024
5af0d8f
more logging
disa6302 May 22, 2024
e45405b
use shared libs instead
disa6302 May 23, 2024
d3b1c69
Clean up cred provider
disa6302 May 24, 2024
94f1149
Storage periodic
disa6302 May 29, 2024
a291867
Update config
disa6302 Jun 4, 2024
23c524e
fix labels and storage
disa6302 Jun 4, 2024
48ac308
Remove file
disa6302 Jun 4, 2024
6d0953e
use iot disable
disa6302 Jun 4, 2024
0eb9d47
Scripts
disa6302 Jun 5, 2024
68ac66d
iot try
disa6302 Jun 5, 2024
6bacd1e
fix use iot in master
disa6302 Jun 5, 2024
16c0a4c
Fix for viewer too
disa6302 Jun 5, 2024
f1f5728
fix labels
disa6302 Jun 5, 2024
385e85b
Change namespace
disa6302 Jun 6, 2024
45c65d3
build fix
disa6302 Jun 6, 2024
f2d91df
Fix defines
disa6302 Jun 6, 2024
41d8171
fix segfault iot
disa6302 Jun 7, 2024
302d6a2
error checks, move to C threads to be consistent
disa6302 Jun 7, 2024
e151c22
More debugging
disa6302 Jun 7, 2024
1661c1f
Double pointer
disa6302 Jun 7, 2024
148bbd5
timers fix
disa6302 Jun 10, 2024
d210b91
mini refactor
disa6302 Jun 11, 2024
cb6b765
shutdown api viewer
disa6302 Jun 13, 2024
fc318dc
Change path to workspace
disa6302 Jun 13, 2024
c8ebd04
Nits, misses
disa6302 Jun 13, 2024
6495879
Mutex
disa6302 Jun 13, 2024
e60285f
locks, ref counts
disa6302 Jun 14, 2024
2e5daaf
more sync
disa6302 Jun 14, 2024
dfef40b
more locking
disa6302 Jun 14, 2024
9303587
nit cleanup
disa6302 Jun 14, 2024
f13f3f6
switch ref count args
disa6302 Jun 14, 2024
2f01d7e
use cw_config_header instead
disa6302 Jun 14, 2024
3d49d96
runner label fix
disa6302 Jun 14, 2024
e252957
fix enableIceStats
disa6302 Jun 14, 2024
b12f0d7
Cleanup
disa6302 Jun 14, 2024
85bb903
no data channel build fix
disa6302 Jun 14, 2024
7e65b91
newline
disa6302 Jun 15, 2024
d6b5e3c
unused var
disa6302 Jun 15, 2024
2756e80
more unused var
disa6302 Jun 15, 2024
eae9f33
misplaced braces
disa6302 Jun 15, 2024
d48fa78
fixes
disa6302 Jun 20, 2024
b96ac98
CI fixes (macos12), potential infinite loop bug fix
disa6302 Jun 20, 2024
d9d9558
unused var
disa6302 Jun 20, 2024
33437f7
unused
disa6302 Jun 20, 2024
1c268a0
threadpool lib build
disa6302 Jun 20, 2024
68a8480
fix scenario label
disa6302 Jun 25, 2024
7a4a2bc
Readme
disa6302 Jul 1, 2024
8cf886b
Merge branch 'develop' into simplify-sample
disa6302 Jul 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions CMake/Dependencies/libawscpp-CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,28 @@ cmake_minimum_required(VERSION 3.6.3)
project(libawscpp-download NONE)
include(ExternalProject)

if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
message(STATUS "Configuring for Linux")
set(TARGET_ARCH LINUX)
elseif(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
message(STATUS "Configuring for macOS")
set(TARGET_ARCH APPLE)
elseif(CMAKE_SYSTEM_NAME STREQUAL "Windows")
message(STATUS "Configuring for Windows")
set(TARGET_ARCH WINDOWS)
endif()

ExternalProject_Add(libawscpp-download
GIT_REPOSITORY https://github.com/aws/aws-sdk-cpp.git
GIT_TAG 1.11.217
LIST_SEPARATOR "|"
CMAKE_ARGS -DBUILD_SHARED_LIBS=OFF
-DBUILD_ONLY=kinesisvideo|kinesis-video-webrtc-storage
CMAKE_ARGS -DBUILD_SHARED_LIBS=ON
-DENABLE_TESTING=OFF
-DBUILD_ONLY=${BUILD_ONLY}
-DCMAKE_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX}
-DCMAKE_PREFIX_PATH=${OPEN_SRC_INSTALL_PREFIX}
-DTARGET_ARCH=${TARGET_ARCH}
-DCUSTOM_MEMORY_MANAGEMENT=OFF
BUILD_ALWAYS TRUE
TEST_COMMAND ""
)
20 changes: 18 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ option(ENABLE_DATA_CHANNEL "Enable support for data channel" ON)
option(ENABLE_KVS_THREADPOOL "Enable support for KVS thread pool in signaling" ON)
option(INSTRUMENTED_ALLOCATORS "Enable memory instrumentation" OFF)
option(ENABLE_AWS_SDK_IN_TESTS "Enable support for compiling AWS SDKs for tests" ON)
option(ENABLE_AWS_SDK_INTEG "Enable building samples with cloudwatch" OFF)

# Developer Flags
option(BUILD_TEST "Build the testing tree." OFF)
Expand Down Expand Up @@ -233,9 +234,12 @@ if(BUILD_DEPENDENCIES)

if(BUILD_TEST)
build_dependency(gtest)

set(BUILD_ARGS
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DBUILD_ONLY=kinesisvideo|kinesis-video-webrtc-storage)
if(ENABLE_AWS_SDK_IN_TESTS)
build_dependency(awscpp)
build_dependency(awscpp ${BUILD_ARGS})
endif()

endif()
Expand All @@ -247,9 +251,18 @@ if(BUILD_DEPENDENCIES)
if (LINK_PROFILER)
build_dependency(gperftools)
endif()

message(STATUS "Finished building dependencies.")
endif()

if(ENABLE_AWS_SDK_INTEG)
set(BUILD_ARGS
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DBUILD_ONLY=monitoring|logs)
build_dependency(awscpp ${BUILD_ARGS})
endif()

# building kvsCommonLws also builds kvspic
set(BUILD_ARGS
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
Expand Down Expand Up @@ -474,6 +487,9 @@ if (BUILD_SAMPLE)
add_subdirectory(samples)
endif()

if(ENABLE_AWS_SDK_INTEG)
add_subdirectory(cloudwatch-integ)
endif()
if(BUILD_TEST)
# adding ZLIB because aws sdk static link seems to be broken when zlib is needed
if(NOT WIN32)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ The certificate generating function ([createCertificateAndKey](https://awslabs.g
**Important Note: It is recommended to rotate the certificates often - preferably for every peer connection to avoid a compromised client weakening the security of the new connections.**

Take `kvsWebRTCClientMaster` as sample, add `RtcCertificate certificates[CERT_COUNT];` to **SampleConfiguration** in [Samples.h](./samples/Samples.h).
Then pass in the pre-generated certificate in initializePeerConnection() in [Common.c](./samples/Common.c).
Then pass in the pre-generated certificate in initializePeerConnection() in [Common.c](samples/lib/Common.c).

```c
configuration.certificates[0].pCertificate = pSampleConfiguration->certificates[0].pCertificate;
Expand Down
56 changes: 56 additions & 0 deletions cloudwatch-integ/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
cmake_minimum_required(VERSION 3.6.3)

project(KinesisVideoWebRTCClientSamplesCloudwatch LANGUAGES C CXX)
set(CMAKE_CXX_STANDARD 11)

message("OPEN_SRC_INSTALL_PREFIX=${OPEN_SRC_INSTALL_PREFIX}")

include_directories(${OPEN_SRC_INSTALL_PREFIX}/include)
include_directories(${OPEN_SRC_INCLUDE_DIRS})
link_directories(${OPEN_SRC_INSTALL_PREFIX}/lib)

find_package(ZLIB REQUIRED)
find_package(AWSSDK REQUIRED COMPONENTS monitoring logs)

set(CW_CONFIG_HEADER "configs/lr_static_h265_openssl.h" CACHE FILEPATH "Config header for the cloudwatch integrated demo")

add_definitions(-DCW_CONFIG_HEADER="${CW_CONFIG_HEADER}")
message(STATUS "Config header set to ${CW_CONFIG_HEADER}")

add_executable(
kvsWebrtcClientMasterCW
Cloudwatch.cpp
CloudwatchLogs.cpp
CloudwatchMonitoring.cpp
../samples/lib/Common.c
../samples/lib/Utility.c
../samples/lib/MetricsHandling.c
../samples/lib/DataChannelHandling.c
../samples/lib/SignalingMsgHandler.c
../samples/lib/Media.c
kvsWebRTCClientMasterCloudwatch.cpp)
target_link_libraries(kvsWebrtcClientMasterCW
kvsWebrtcClient
kvsWebrtcSignalingClient
${EXTRA_DEPS}
kvsCommonLws kvspicUtils websockets kvssdp kvsstun
${AWSSDK_LINK_LIBRARIES})

add_executable(
kvsWebrtcClientViewerCW
../samples/lib/Common.c
../samples/lib/Utility.c
../samples/lib/MetricsHandling.c
../samples/lib/DataChannelHandling.c
../samples/lib/SignalingMsgHandler.c
../samples/lib/Media.c
Cloudwatch.cpp
CloudwatchLogs.cpp
CloudwatchMonitoring.cpp
kvsWebRTCClientViewerCloudwatch.cpp)
target_link_libraries(kvsWebrtcClientViewerCW
kvsWebrtcClient
kvsWebrtcSignalingClient
${EXTRA_DEPS}
kvsCommonLws kvspicUtils websockets kvssdp kvsstun
${AWSSDK_LINK_LIBRARIES})
82 changes: 82 additions & 0 deletions cloudwatch-integ/Cloudwatch.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "Include.h"
#include "Cloudwatch.h"

namespace CppInteg {

Cloudwatch::Cloudwatch(ClientConfiguration* pClientConfig)
: logs(pClientConfig), monitoring(pClientConfig), terminated(FALSE)
{
}

STATUS Cloudwatch::init(PCHAR channelName, PCHAR region, BOOL isMaster, BOOL isStorage)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
ClientConfiguration clientConfig;
CreateLogGroupRequest createLogGroupRequest;
Aws::CloudWatchLogs::Model::CreateLogStreamOutcome createLogStreamOutcome;
CreateLogStreamRequest createLogStreamRequest;

clientConfig.region = region;
auto& instance = getInstanceImpl(&clientConfig);

if (STATUS_FAILED(instance.logs.init(channelName, region, isMaster, isStorage))) {
DLOGW("Failed to create Cloudwatch logger");
} else {
globalCustomLogPrintFn = logger;
}

CHK_STATUS(instance.monitoring.init(channelName, region, isMaster, isStorage));

CleanUp:

LEAVES();
return retStatus;
}

Cloudwatch& Cloudwatch::getInstance()
{
return getInstanceImpl();
}

Cloudwatch& Cloudwatch::getInstanceImpl(ClientConfiguration* pClientConfig)
{
static Cloudwatch instance{pClientConfig};
return instance;
}

VOID Cloudwatch::deinit()
{
auto& instance = getInstance();
instance.logs.deinit();
instance.monitoring.deinit();
instance.terminated = TRUE;
}

VOID Cloudwatch::logger(UINT32 level, PCHAR tag, PCHAR fmt, ...)
{
CHAR logFmtString[MAX_LOG_FORMAT_LENGTH + 1];
CHAR cwLogFmtString[MAX_LOG_FORMAT_LENGTH + 1];
UINT32 logLevel = GET_LOGGER_LOG_LEVEL();
UNUSED_PARAM(tag);

if (level >= logLevel) {
addLogMetadata(logFmtString, (UINT32) ARRAY_SIZE(logFmtString), fmt, level);

// Creating a copy to store the logFmtString for cloudwatch logging purpose
va_list valist, valist_cw;
va_start(valist_cw, fmt);
vsnprintf(cwLogFmtString, (SIZE_T) SIZEOF(cwLogFmtString), logFmtString, valist_cw);
va_end(valist_cw);
va_start(valist, fmt);
vprintf(logFmtString, valist);
va_end(valist);

auto& instance = getInstance();
if (!instance.terminated) {
instance.logs.push(cwLogFmtString);
}
}
}

} // namespace Canary
31 changes: 31 additions & 0 deletions cloudwatch-integ/Cloudwatch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include "Include.h"
#include "CloudwatchLogs.h"
#include "CloudwatchMonitoring.h"

namespace CppInteg {

class Cloudwatch {
public:
Cloudwatch() = delete;
Cloudwatch(Cloudwatch const&) = delete;
void operator=(Cloudwatch const&) = delete;

CloudwatchLogs logs;
CloudwatchMonitoring monitoring;

static Cloudwatch& getInstance();
static STATUS init(PCHAR channelName, PCHAR region, BOOL isMaster, BOOL isStorage);
static VOID deinit();
static VOID logger(UINT32, PCHAR, PCHAR, ...);

private:
static Cloudwatch& getInstanceImpl(ClientConfiguration* = nullptr);

Cloudwatch(ClientConfiguration*);
BOOL terminated;
};
typedef Cloudwatch* PCloudwatch;

} // namespace Canary
123 changes: 123 additions & 0 deletions cloudwatch-integ/CloudwatchLogs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include "Include.h"
#include "CloudwatchLogs.h"

namespace CppInteg {

CloudwatchLogs::CloudwatchLogs(ClientConfiguration* pClientConfig) : client(*pClientConfig)
{
}

STATUS CloudwatchLogs::init(PCHAR channelName, PCHAR region, BOOL isMaster, BOOL isStorage)
{
STATUS retStatus = STATUS_SUCCESS;
CreateLogGroupRequest createLogGroupRequest;
Aws::CloudWatchLogs::Model::CreateLogStreamOutcome createLogStreamOutcome;
CreateLogStreamRequest createLogStreamRequest;
std::stringstream defaultLogStreamName;
if(isStorage) {
defaultLogStreamName << channelName << '-' << "StorageMaster" << '-'
<< GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;

} else {
defaultLogStreamName << channelName << '-' << (isMaster ? "master" : "viewer") << '-'
<< GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;

}

this->logStreamName = defaultLogStreamName.str();
this->logGroupName = LOG_GROUP_NAME;

DLOGI("Log stream name: %s", this->logStreamName.c_str());

createLogGroupRequest.SetLogGroupName(this->logGroupName);
// ignore error since if this operation fails, CreateLogStream should fail as well.
// There might be some errors that can lead to successfull CreateLogStream, e.g. log group already exists.
this->client.CreateLogGroup(createLogGroupRequest);

createLogStreamRequest.SetLogGroupName(this->logGroupName);
createLogStreamRequest.SetLogStreamName(this->logStreamName);
createLogStreamOutcome = this->client.CreateLogStream(createLogStreamRequest);

CHK_ERR(createLogStreamOutcome.IsSuccess(), STATUS_INVALID_OPERATION, "Failed to create \"%s\" log stream: %s",
this->logStreamName.c_str(), createLogStreamOutcome.GetError().GetMessage().c_str());

CleanUp:

return retStatus;
}

VOID CloudwatchLogs::deinit()
{
this->flush(TRUE);
}

VOID CloudwatchLogs::push(string log)
{
std::lock_guard<std::recursive_mutex> lock(this->sync.mutex);
Aws::String awsCwString(log.c_str(), log.size());
auto logEvent =
Aws::CloudWatchLogs::Model::InputLogEvent().WithMessage(awsCwString).WithTimestamp(GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
this->logs.push_back(logEvent);
if (this->logs.size() >= MAX_CLOUDWATCH_LOG_COUNT) {
this->flush();
}
}

VOID CloudwatchLogs::flush(BOOL sync)
{
std::unique_lock<std::recursive_mutex> lock(this->sync.mutex);
if (this->logs.size() == 0) {
return;
}
auto pendingLogs = this->logs;
this->logs.clear();

// wait until previous logs have been flushed entirely
auto waitUntilFlushed = [this] { return !this->sync.pending.load(); };
this->sync.await.wait(lock, waitUntilFlushed);

auto request = Aws::CloudWatchLogs::Model::PutLogEventsRequest()
.WithLogGroupName(this->logGroupName)
.WithLogStreamName(this->logStreamName)
.WithLogEvents(pendingLogs);

if (this->token != "") {
request.SetSequenceToken(this->token);
}

if (!sync) {
auto asyncHandler = [this](const Aws::CloudWatchLogs::CloudWatchLogsClient* cwClientLog,
const Aws::CloudWatchLogs::Model::PutLogEventsRequest& request,
const Aws::CloudWatchLogs::Model::PutLogEventsOutcome& outcome,
const std::shared_ptr<const Aws::Client::AsyncCallerContext>& context) {
UNUSED_PARAM(cwClientLog);
UNUSED_PARAM(request);
UNUSED_PARAM(context);

if (!outcome.IsSuccess()) {
// Need to use printf so that we don't get into an infinite loop where we keep flushing
printf("Failed to push logs: %s\n", outcome.GetError().GetMessage().c_str());
} else {
printf("Successfully pushed logs to cloudwatch\n");
this->token = outcome.GetResult().GetNextSequenceToken();
}

this->sync.pending = FALSE;
this->sync.await.notify_one();
};

this->sync.pending = TRUE;
this->client.PutLogEventsAsync(request, asyncHandler);
} else {
auto outcome = this->client.PutLogEvents(request);
if (!outcome.IsSuccess()) {
// Need to use printf so that we don't get into an infinite loop where we keep flushing
printf("Failed to push logs: %s\n", outcome.GetError().GetMessage().c_str());
} else {
DLOGS("Successfully pushed logs to cloudwatch");
this->token = outcome.GetResult().GetNextSequenceToken();
}
}
}

} // namespace Canary
Loading
Loading