Skip to content

Commit

Permalink
New Init/Open messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Dec 8, 2020
1 parent d8ff083 commit 565b53d
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 415 deletions.
92 changes: 32 additions & 60 deletions include/zenoh-pico/net/private/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
/* Session Messages */
#define _ZN_MID_SCOUT 0x01
#define _ZN_MID_HELLO 0x02
#define _ZN_MID_OPEN 0x03
#define _ZN_MID_ACCEPT 0x04
#define _ZN_MID_INIT 0x03
#define _ZN_MID_OPEN 0x04
#define _ZN_MID_CLOSE 0x05
#define _ZN_MID_SYNC 0x06
#define _ZN_MID_ACK_NACK 0x07
Expand All @@ -53,6 +53,7 @@
/* Message flags */
/*=============================*/
/* Session message flags */
#define _ZN_FLAG_S_A 0x20 // 1 << 5 | Ack if A==1 then the message is an acknowledgment
#define _ZN_FLAG_S_C 0x40 // 1 << 6 | Count if C==1 then number of unacknowledged messages is present
#define _ZN_FLAG_S_E 0x80 // 1 << 7 | End if E==1 then it is the last FRAME fragment
#define _ZN_FLAG_S_F 0x40 // 1 << 6 | Fragment if F==1 then the FRAME is a fragment
Expand All @@ -63,6 +64,7 @@
#define _ZN_FLAG_S_P 0x20 // 1 << 5 | PingOrPong if P==1 then the message is Ping, otherwise is Pong
#define _ZN_FLAG_S_R 0x20 // 1 << 5 | Reliable if R==1 then it concerns the reliable channel, best-effort otherwise
#define _ZN_FLAG_S_S 0x40 // 1 << 6 | SN Resolution if S==1 then the SN Resolution is present
#define _ZN_FLAG_S_T 0x40 // 1 << 6 | TimeRes if T==1 then the time resolution is in seconds
#define _ZN_FLAG_S_W 0x40 // 1 << 6 | WhatAmI if W==1 then WhatAmI is indicated
#define _ZN_FLAG_S_X 0x00 // Unused flags are set to zero
/* Zenoh message flags */
Expand Down Expand Up @@ -272,104 +274,74 @@ typedef struct
_zn_locators_t locators;
} _zn_hello_t;

/*------------------ Open Message ------------------*/
/*------------------ Init Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total lenght
// in bytes of the message, resulting in the maximum lenght of a message being 65_535 bytes.
// This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
// the boundary of the serialized messages. The length is encoded as little-endian.
// In any case, the lenght of a message must not exceed 65_535 bytes.
//
// The OPEN message is sent on a specific Locator to initiate a session with the peer associated
// with that Locator.
// The INIT message is sent on a specific Locator to initiate a session with the peer associated
// with that Locator. The initiator MUST send an INIT message with the A flag set to 0. If the
// corresponding peer deems appropriate to initialize a session with the initiator, the corresponding
// peer MUST reply with an INIT message with the A flag set to 1.
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |L|S|X| OPEN |
// |X|S|A| INIT |
// +-+-+-+-+-------+
// | v_maj | v_min | -- Protocol Version VMaj.VMin
// | v_maj | v_min | if A==0 -- Protocol Version VMaj.VMin
// +-------+-------+
// ~ whatami ~ -- Client, Router, Peer or a combination of them
// +---------------+
// ~ o_peer_id ~ -- PID of the sender of the OPEN
// +---------------+
// ~ lease_period ~ -- Lease period of the session opener
// +---------------+
// ~ initial_sn ~ -- Initial SN proposed by the sender of the OPEN(*)
// ~ peer_id ~ -- PID of the sender of the INIT message
// +---------------+
// ~ sn_resolution ~ if S==1 -- Otherwise 2^28 is assumed(**)
// ~ sn_resolution ~ if S==1(*) -- Otherwise 2^28 is assumed(**)
// +---------------+
// ~ Locators ~ if L==1 -- List of locators the sender of the OPEN is reachable at
// ~ cookie ~ if A==1
// +---------------+
//
// (*) The Initial SN must be bound to the proposed SN Resolution. Otherwise the OPEN message is considered
// invalid and it should be discarded by the recipient of the OPEN message.
// (**) In case of the Accepter Peer negotiates a smaller SN Resolution (see ACCEPT message) and the proposed
// Initial SN results to be out-of-bound, the new Agreed Initial SN is calculated according to the
// following modulo operation:
// Agreed Initial SN := (Initial SN_Open) mod (SN Resolution_Accept)
//
// (*) if A==0 and S==0 then 2^28 is assumed.
// if A==1 and S==0 then the agreed resolution is the one communicated by the initiator.
// ```
typedef struct
{
z_zint_t whatami;
z_bytes_t opid;
z_zint_t lease;
z_zint_t initial_sn;
z_zint_t sn_resolution;
_zn_locators_t locators;
z_bytes_t pid;
z_bytes_t cookie;
uint8_t version;
} _zn_open_t;
} _zn_init_t;

/*------------------ Accept Message ------------------*/
/*------------------ Open Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total lenght
// in bytes of the message, resulting in the maximum lenght of a message being 65_535 bytes.
// This is necessary in those stream-oriented transports (e.g., TCP) that do not preserve
// the boundary of the serialized messages. The length is encoded as little-endian.
// In any case, the lenght of a message must not exceed 65_535 bytes.
//
// The ACCEPT message is sent in response of an OPEN message in case of accepting the new incoming session.
// The OPEN message is sent on a link to finally open an initialized session with the peer.
//
// 7 6 5 4 3 2 1 0
// +-+-+-+-+-+-+-+-+
// |L|S|X| ACCEPT |
// |X|T|A| OPEN |
// +-+-+-+-+-------+
// ~ whatami ~ -- Client, Router, Peer or a combination of them
// ~ lease_period ~ -- Lease period of the sender of the OPEN message(*)
// +---------------+
// ~ o_peer_id ~ -- PID of the sender of the OPEN this ACCEPT is for
// ~ initial_sn ~ -- Initial SN proposed by the sender of the OPEN(**)
// +---------------+
// ~ a_peer_id ~ -- PID of the sender of the ACCEPT
// ~ cookie ~ if A==0(*)
// +---------------+
// ~ lease_period ~ -- Lease period of the session accepter
// +---------------+
// ~ initial_sn ~ -- Initial SN proposed by the sender of the ACCEPT(*)
// +---------------+
// ~ sn_resolution + if S==1 -- Agreed SN Resolution(**)
// +---------------+
// ~ Locators ~ if L==1
// +---------------+
//
// - if S==0 then the agreed sequence number resolution is the one indicated in the OPEN message.
// - if S==1 then the agreed sequence number resolution is the one indicated in this ACCEPT message.
// The resolution in the ACCEPT must be less or equal than the resolution in the OPEN,
// otherwise the ACCEPT message is consmsg::idered invalid and it should be treated as a
// CLOSE message with L==0 by the Opener Peer -- the recipient of the ACCEPT message.
//
// (*) The Initial SN is bound to the proposed SN Resolution.
// (**) In case of the SN Resolution proposed in this ACCEPT message is smaller than the SN Resolution
// proposed in the OPEN message AND the Initial SN contained in the OPEN messages results to be
// out-of-bound, the new Agreed Initial SN for the Opener Peer is calculated according to the
// following modulo operation:
// Agreed Initial SN := (Initial SN_Open) mod (SN Resolution_Accept)
//
// (*) if T==1 then the lease period is expressed in seconds, otherwise in milliseconds
// (**) the cookie MUST be the same received in the INIT message with A==1 from the corresponding peer
// ```
typedef struct
{
z_zint_t whatami;
z_bytes_t opid;
z_bytes_t apid;
z_zint_t lease;
z_zint_t initial_sn;
z_zint_t sn_resolution;
_zn_locators_t locators;
} _zn_accept_t;
z_bytes_t cookie;
} _zn_open_t;

/*------------------ Close Message ------------------*/
// NOTE: 16 bits (2 bytes) may be prepended to the serialized message indicating the total length
Expand Down Expand Up @@ -547,8 +519,8 @@ typedef struct
{
_zn_scout_t scout;
_zn_hello_t hello;
_zn_init_t init;
_zn_open_t open;
_zn_accept_t accept;
_zn_close_t close;
_zn_sync_t sync;
_zn_ack_nack_t ack_nack;
Expand Down
10 changes: 5 additions & 5 deletions include/zenoh-pico/net/private/msgcodec.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ _ZN_P_RESULT_DECLARE(_zn_attachment_t, attachment)
_ZN_P_RESULT_DECLARE(_zn_reply_context_t, reply_context)
_ZN_RESULT_DECLARE(_zn_scout_t, scout)
_ZN_RESULT_DECLARE(_zn_hello_t, hello)
_ZN_RESULT_DECLARE(_zn_init_t, init)
_ZN_RESULT_DECLARE(_zn_open_t, open)
_ZN_RESULT_DECLARE(_zn_accept_t, accept)
_ZN_RESULT_DECLARE(_zn_close_t, close)
_ZN_RESULT_DECLARE(_zn_sync_t, sync)
_ZN_RESULT_DECLARE(_zn_ack_nack_t, ack_nack)
Expand Down Expand Up @@ -202,14 +202,14 @@ _ZN_DECLARE_DECODE(query);
_ZN_DECLARE_FREE_NOH(query);

/*------------------ Session Message ------------------*/
_ZN_DECLARE_ENCODE(init);
_ZN_DECLARE_DECODE(init);
_ZN_DECLARE_FREE(init);

_ZN_DECLARE_ENCODE(open);
_ZN_DECLARE_DECODE(open);
_ZN_DECLARE_FREE(open);

_ZN_DECLARE_ENCODE(accept);
_ZN_DECLARE_DECODE(accept);
_ZN_DECLARE_FREE(accept);

_ZN_DECLARE_ENCODE(close);
_ZN_DECLARE_DECODE(close);
_ZN_DECLARE_FREE(close);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ typedef struct

char *locator;

z_zint_t lease;
volatile z_zint_t lease;
z_zint_t sn_resolution;
z_zint_t sn_resolution_half;

Expand Down
60 changes: 35 additions & 25 deletions src/net/lease_task.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,88 @@

void *_znp_lease_task(void *arg)
{
zn_session_t *z = (zn_session_t *)arg;
z->lease_task_running = 1;
zn_session_t *zn = (zn_session_t *)arg;
zn->lease_task_running = 1;

z->received = 0;
z->transmitted = 0;
zn->received = 0;
zn->transmitted = 0;

unsigned int next_lease = z->lease;
unsigned int next_lease = zn->lease;
unsigned int next_keep_alive = ZN_KEEP_ALIVE_INTERVAL;
while (z->lease_task_running)
while (zn->lease_task_running)
{
// Compute the target interval
unsigned int interval;
if (next_lease < next_keep_alive)
interval = next_lease;
else if (next_keep_alive < ZN_KEEP_ALIVE_INTERVAL)
interval = next_keep_alive;
if (zn->lease > 0)
{
if (next_lease < next_keep_alive)
interval = next_lease;
else if (next_keep_alive < ZN_KEEP_ALIVE_INTERVAL)
interval = next_keep_alive;
else
interval = ZN_KEEP_ALIVE_INTERVAL;
}
else
{
interval = ZN_KEEP_ALIVE_INTERVAL;
}

// The keep alive and lease intervals are expressed in milliseconds
_z_sleep_ms(interval);

// Decrement the interval
next_lease -= interval;
if (zn->lease > 0)
{
next_lease -= interval;
}
next_keep_alive -= interval;

if (next_lease == 0)
if (zn->lease > 0 && next_lease == 0)
{
// Check if received data
if (z->received == 0)
if (zn->received == 0)
{
_Z_DEBUG_VA("Closing session because it has expired after %zums", z->lease);
_zn_session_close(z, _ZN_CLOSE_EXPIRED);
_Z_DEBUG_VA("Closing session because it has expired after %zums", zn->lease);
_zn_session_close(zn, _ZN_CLOSE_EXPIRED);
return 0;
}

// Reset the lease parameters
z->received = 0;
next_lease = z->lease;
zn->received = 0;
next_lease = zn->lease;
}

if (next_keep_alive == 0)
{
// Check if need to send a keep alive
if (z->transmitted == 0)
if (zn->transmitted == 0)
{
znp_send_keep_alive(z);
znp_send_keep_alive(zn);
}

// Reset the keep alive parameters
z->transmitted = 0;
zn->transmitted = 0;
next_keep_alive = ZN_KEEP_ALIVE_INTERVAL;
}
}

return 0;
}

int znp_start_lease_task(zn_session_t *z)
int znp_start_lease_task(zn_session_t *zn)
{
_z_task_t *task = (_z_task_t *)malloc(sizeof(_z_task_t));
memset(task, 0, sizeof(pthread_t));
z->lease_task = task;
if (_z_task_init(task, NULL, _znp_lease_task, z) != 0)
zn->lease_task = task;
if (_z_task_init(task, NULL, _znp_lease_task, zn) != 0)
{
return -1;
}
return 0;
}

int znp_stop_lease_task(zn_session_t *z)
int znp_stop_lease_task(zn_session_t *zn)
{
z->lease_task_running = 0;
zn->lease_task_running = 0;
return 0;
}
Loading

0 comments on commit 565b53d

Please sign in to comment.