Skip to content

Commit

Permalink
libraries/RPC: Support adding more endpoints, if needed.
Browse files Browse the repository at this point in the history
Signed-off-by: iabdalkader <[email protected]>
  • Loading branch information
iabdalkader committed Nov 6, 2024
1 parent 11c3c83 commit 4442f8d
Showing 1 changed file with 66 additions and 51 deletions.
117 changes: 66 additions & 51 deletions libraries/RPC/src/RPC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,57 @@
// SPDX-License-Identifier: MPL-2.0
#include "RPC.h"

#define ENDPOINT_ID_RAW 0
#define ENDPOINT_ID_RPC 1

#define MSGPACK_TYPE_REQUEST 0
#define MSGPACK_TYPE_RESPONSE 1
#define MSGPACK_TYPE_NOTIFY 2
typedef struct {
const char *name;
bool ready;
struct rpmsg_endpoint rpmsg;
} endpoint_t;

typedef enum {
ENDPOINT_ID_RAW,
ENDPOINT_ID_RPC,
ENDPOINT_ID_MAX
} endpoint_id_t;

typedef enum {
MSGPACK_ID_REQUEST,
MSGPACK_ID_RESPONSE,
MSGPACK_ID_NOTIFY,
} msgpack_id_t;

static endpoint_t endpoints[ENDPOINT_ID_MAX] = {
[ENDPOINT_ID_RAW] = { .name = "raw", .ready = false, .rpmsg = { 0 } },
[ENDPOINT_ID_RPC] = { .name = "rpc", .ready = false, .rpmsg = { 0 } },
};

arduino::RPCClass RPC;

osThreadId eventHandlerThreadId;
static rtos::Mutex mutex;
static struct rpmsg_endpoint endpoints[2];
#ifdef CORE_CM4
static bool endpoints_init[2] = { 0 };
#endif
osThreadId eventHandlerThreadId;

void RPCClass::new_service_cb(struct rpmsg_device *rdev, const char *name, uint32_t dest) {
uint8_t buffer[1] = {0};
struct rpmsg_endpoint *ept = NULL;

if (strcmp(name, "rpc") == 0) {
ept = &endpoints[ENDPOINT_ID_RPC];
} else if (strcmp(name, "raw") == 0) {
ept = &endpoints[ENDPOINT_ID_RAW];
}

if (ept) {
OPENAMP_create_endpoint(ept, name, dest, rpmsg_recv_callback, NULL);
OPENAMP_send(ept, buffer, sizeof(buffer));
for (size_t i=0; i<ENDPOINT_ID_MAX; i++) {
endpoint_t *ep = &endpoints[i];
if (strcmp(name, ep->name) == 0) {
OPENAMP_create_endpoint(&ep->rpmsg, name, dest, rpmsg_recv_callback, NULL);
OPENAMP_send(&ep->rpmsg, buffer, sizeof(buffer));
break;
}
}
}

int RPCClass::rpmsg_recv_callback(struct rpmsg_endpoint *ept, void *data, size_t len, uint32_t src, void *priv) {
#ifdef CORE_CM4
if (!endpoints_init[ENDPOINT_ID_RPC] && ept == &endpoints[ENDPOINT_ID_RPC]) {
endpoints_init[ENDPOINT_ID_RPC] = true;
return 0;
} else if (!endpoints_init[ENDPOINT_ID_RAW] && ept == &endpoints[ENDPOINT_ID_RAW]) {
endpoints_init[ENDPOINT_ID_RAW] = true;
return 0;
for (size_t i=0; i<ENDPOINT_ID_MAX; i++) {
endpoint_t *ep = &endpoints[i];
if (ept == &ep->rpmsg && !ep->ready) {
ep->ready = true;
return 0;
}
}
#endif

if (ept == &endpoints[ENDPOINT_ID_RAW]) {
if (ept == &endpoints[ENDPOINT_ID_RAW].rpmsg) {
// data on raw endpoint
if (RPC.raw_callback) {
RPC.raw_callback.call((uint8_t *) data, len);
Expand All @@ -59,11 +66,11 @@ int RPCClass::rpmsg_recv_callback(struct rpmsg_endpoint *ept, void *data, size_t

uint8_t msgpack_type = ((uint8_t *) data)[1];
switch (msgpack_type) {
case MSGPACK_TYPE_REQUEST:
case MSGPACK_TYPE_NOTIFY:
case MSGPACK_ID_REQUEST:
case MSGPACK_ID_NOTIFY:
RPC.request((uint8_t *) data, len);
break;
case MSGPACK_TYPE_RESPONSE:
case MSGPACK_ID_RESPONSE:
RPC.response((uint8_t *) data, len);
break;
}
Expand Down Expand Up @@ -144,15 +151,18 @@ int RPCClass::begin() {
return 0;
}

// Initialize rpmsg endpoints.
memset(endpoints, 0, sizeof(endpoints));

// Boot the CM4.
cm4_kick();

// Wait for the remote to announce the services with a timeout.
uint32_t millis_start = millis();
while (endpoints[ENDPOINT_ID_RPC].rdev == NULL || endpoints[ENDPOINT_ID_RAW].rdev == NULL) {
for (uint32_t millis_start = millis(); ; ) {
size_t ep_count = 0;
for (size_t i=0; i<ENDPOINT_ID_MAX; i++) {
ep_count += (endpoints[i].rdev != NULL);
}
if (ep_count == ENDPOINT_ID_MAX) {
break;
}
if ((millis() - millis_start) >= 5000) {
return 0;
}
Expand Down Expand Up @@ -203,20 +213,24 @@ int RPCClass::begin() {
return 0;
}

// Create RAW endpoint.
if (OPENAMP_create_endpoint(&endpoints[ENDPOINT_ID_RAW], "raw", RPMSG_ADDR_ANY, rpmsg_recv_callback, NULL) < 0) {
return 0;
}

// Create RPC endpoint.
if (OPENAMP_create_endpoint(&endpoints[ENDPOINT_ID_RPC], "rpc", RPMSG_ADDR_ANY, rpmsg_recv_callback, NULL) < 0) {
return 0;
// Create endpoints.
for (size_t i=0; i<ENDPOINT_ID_MAX; i++) {
endpoint_t *ep = &endpoints[i];
if (OPENAMP_create_endpoint(&ep->rpmsg, ep->name, RPMSG_ADDR_ANY, rpmsg_recv_callback, NULL) < 0) {
return 0;
}
}

// Wait for endpoints to be initialized first by the host before allowing
// Wait for endpoints to be ready first by the host before allowing
// the remote to use the endpoints.
uint32_t millis_start = millis();
while (!endpoints_init[ENDPOINT_ID_RPC] || !endpoints_init[ENDPOINT_ID_RAW]) {
for (uint32_t millis_start = millis(); ; ) {
size_t ep_count = 0;
for (size_t i=0; i<ENDPOINT_ID_MAX; i++) {
ep_count += endpoints[i].ready;
}
if (ep_count == ENDPOINT_ID_MAX) {
break;
}
if ((millis() - millis_start) >= 5000) {
return 0;
}
Expand Down Expand Up @@ -267,7 +281,8 @@ void RPCClass::request(uint8_t *buf, size_t len) {
auto resp = rpc::detail::dispatcher::dispatch(msg, false);
auto data = resp.get_data();
if (!resp.is_empty()) {
OPENAMP_send(&endpoints[ENDPOINT_ID_RPC], data.data(), data.size());
endpoint_t *ep = &endpoints[ENDPOINT_ID_RPC];
OPENAMP_send(&ep->rpmsg, data.data(), data.size());
}
}
}
Expand All @@ -282,7 +297,7 @@ void rpc::client::write(RPCLIB_MSGPACK::sbuffer *buffer) {

size_t RPCClass::write(const uint8_t *buf, size_t len, bool raw) {
mutex.lock();
OPENAMP_send(&endpoints[raw ? ENDPOINT_ID_RAW : ENDPOINT_ID_RPC], buf, len);
OPENAMP_send(&endpoints[raw ? ENDPOINT_ID_RAW : ENDPOINT_ID_RPC].rpmsg, buf, len);
mutex.unlock();
return len;
}

0 comments on commit 4442f8d

Please sign in to comment.