-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbuffer.py
37 lines (29 loc) · 1.07 KB
/
buffer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
"""Buffer class."""
class Buffer:
"""Buffer class."""
name = ''
write_latency = 0
buffer_size = 0
buffer_messages = []
def __init__(self, name, buffer_size=0, write_latency=0):
"""Initialize buffer."""
self.name = name
self.buffer_size = buffer_size
self.write_latency = write_latency
self.buffer_messages = []
def set_write_latency(self, write_latency):
"""Set write latency."""
self.write_latency = write_latency
def receive_message(self, message):
"""Receive message."""
discarded_message = None
if len(self.buffer_messages) >= self.buffer_size:
discarded_message = self.buffer_messages.pop(0)
self.buffer_messages.append(message)
return discarded_message
def print_info(self):
"""Print buffer info."""
print('Buffer name: ' + self.name)
print('Buffer size: ' + str(self.buffer_size))
print('Buffer write latency: ' + str(self.write_latency))
print('Buffer messages: ' + str(len(self.buffer_messages)))