-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathrouteros_api.py
286 lines (233 loc) · 10.4 KB
/
routeros_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# Author: Arturs Laizans
import socket
import ssl
import hashlib
import binascii
from verbose import Log
# Constants - Define defaults
PORT = 8728
SSL_PORT = 8729
USER = 'admin'
PASSWORD = ''
USE_SSL = False
VERBOSE = False # Whether to print API conversation width the router. Useful for debugging
VERBOSE_LOGIC = 'OR' # Whether to print and save verbose log to file. AND - print and save, OR - do only one.
VERBOSE_FILE_MODE = 'w' # Weather to create new file ('w') for log or append to old one ('a').
CONTEXT = ssl.create_default_context() # It is possible to predefine context for SSL socket
CONTEXT.check_hostname = False
CONTEXT.verify_mode = ssl.CERT_NONE
class LoginError(Exception):
pass
class WordTooLong(Exception):
pass
class CreateSocketError(Exception):
pass
class RouterOSTrapError(Exception):
pass
class Api:
def __init__(self, address, user=USER, password=PASSWORD, use_ssl=USE_SSL, port=False,
verbose=VERBOSE, context=CONTEXT):
self.address = address
self.user = user
self.password = password
self.use_ssl = use_ssl
self.port = port
self.verbose = verbose
self.context = context
# Port setting logic
if port:
self.port = port
elif use_ssl:
self.port = SSL_PORT
else:
self.port = PORT
# Create Log instance to save or print verbose logs
self.log = Log(verbose, VERBOSE_LOGIC, VERBOSE_FILE_MODE)
self.log('')
self.log('#-----------------------------------------------#')
self.log('API IP - {}, USER - {}'.format(address, user))
self.sock = None
self.connection = None
self.open_socket()
self.login()
self.log('Instance of Api created')
self.is_alive()
# Open socket connection with router and wrap with SSL if needed.
def open_socket(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.sock.settimeout(5) # Set socket timeout to 5 seconds, default is None
try:
# Trying to connect to RouterOS, error can occur if IP address is not reachable, or API is blocked in
# RouterOS firewall or ip services, or port is wrong.
self.connection = self.sock.connect((self.address, self.port))
except OSError:
raise CreateSocketError('Error: API failed to connect to socket. Host: {}, port: {}.'.format(self.address,
self.port))
if self.use_ssl:
self.sock = self.context.wrap_socket(self.sock)
self.log('API socket connection opened.')
# Login API connection into RouterOS
def login(self):
sentence = ['/login', '=name=' + self.user, '=password=' + self.password]
reply = self.communicate(sentence)
if len(reply[0]) == 1 and reply[0][0] == '!done':
# If login process was successful
self.log('Logged in successfully!')
return reply
elif 'Error' in reply:
# Else if there was some kind of error during login process
self.log('Error in login process - {}'.format(reply))
raise LoginError('Login ' + reply)
elif len(reply[0]) == 2 and reply[0][1][0:5] == '=ret=':
# Else if RouterOS uses old API login method, code continues with old method
self.log('Using old login process.')
md5 = hashlib.md5(('\x00' + self.password).encode('utf-8'))
md5.update(binascii.unhexlify(reply[0][1][5:]))
sentence = ['/login', '=name=' + self.user, '=response=00'
+ binascii.hexlify(md5.digest()).decode('utf-8')]
self.log('Logged in successfully!')
return self.communicate(sentence)
# Sending data to router and expecting something back
def communicate(self, sentence_to_send):
# There is specific way of sending word length in RouterOS API.
# See RouterOS API Wiki for more info.
def send_length(w):
length_to_send = len(w)
if length_to_send < 0x80:
num_of_bytes = 1 # For words smaller than 128
elif length_to_send < 0x4000:
length_to_send += 0x8000
num_of_bytes = 2 # For words smaller than 16384
elif length_to_send < 0x200000:
length_to_send += 0xC00000
num_of_bytes = 3 # For words smaller than 2097152
elif length_to_send < 0x10000000:
length_to_send += 0xE0000000
num_of_bytes = 4 # For words smaller than 268435456
elif length_to_send < 0x100000000:
num_of_bytes = 4 # For words smaller than 4294967296
self.sock.sendall(b'\xF0')
else:
raise WordTooLong('Word is too long. Max length of word is 4294967295.')
self.sock.sendall(length_to_send.to_bytes(num_of_bytes, byteorder='big'))
# Actually I haven't successfully sent words larger than approx. 65520.
# Probably it is some RouterOS limitation of 2^16.
# The same logic applies for receiving word length from RouterOS side.
# See RouterOS API Wiki for more info.
def receive_length():
r = self.sock.recv(1) # Receive the first byte of word length
# If the first byte of word is smaller than 80 (base 16),
# then we already received the whole length and can return it.
# Otherwise if it is larger, then word size is encoded in multiple bytes and we must receive them all to
# get the whole word size.
if r < b'\x80':
r = int.from_bytes(r, byteorder='big')
elif r < b'\xc0':
r += self.sock.recv(1)
r = int.from_bytes(r, byteorder='big')
r -= 0x8000
elif r < b'\xe0':
r += self.sock.recv(2)
r = int.from_bytes(r, byteorder='big')
r -= 0xC00000
elif r < b'\xf0':
r += self.sock.recv(3)
r = int.from_bytes(r, byteorder='big')
r -= 0xE0000000
elif r == b'\xf0':
r = self.sock.recv(4)
r = int.from_bytes(r, byteorder='big')
return r
def read_sentence():
rcv_sentence = [] # Words will be appended here
rcv_length = receive_length() # Get the size of the word
while rcv_length != 0:
received = b''
while rcv_length > len(received):
rec = self.sock.recv(rcv_length - len(received))
if rec == b'':
raise RuntimeError('socket connection broken')
rec = rec
received += rec
received = received.decode('utf-8')
self.log('<<< {}'.format(received))
rcv_sentence.append(received)
rcv_length = receive_length() # Get the size of the next word
self.log('')
return rcv_sentence
# Sending part of conversation
# Each word must be sent separately.
# First, length of the word must be sent,
# Then, the word itself.
for word in sentence_to_send:
send_length(word)
self.sock.sendall(word.encode('utf-8')) # Sending the word
self.log('>>> {}'.format(word))
self.sock.sendall(b'\x00') # Send zero length word to mark end of the sentence
self.log('')
# Receiving part of the conversation
# Will continue receiving until receives '!done' or some kind of error (!trap).
# Everything will be appended to paragraph variable, and then returned.
paragraph = []
received_sentence = ['']
while received_sentence[0] != '!done':
received_sentence = read_sentence()
paragraph.append(received_sentence)
return paragraph
# Initiate a conversation with the router
def talk(self, message):
# It is possible for message to be string, tuple or list containing multiple strings or tuples
if type(message) == str or type(message) == tuple:
return self.send(message)
elif type(message) == list:
reply = []
for sentence in message:
reply.append(self.send(sentence))
return reply
else:
raise TypeError('talk() argument must be str or tuple containing str or list containing str or tuples')
def send(self, sentence):
# If sentence is string, not tuples of strings, it must be divided in words
if type(sentence) == str:
sentence = sentence.split()
reply = self.communicate(sentence)
# If RouterOS returns error from command that was sent
if '!trap' in reply[0][0]:
# You can comment following line out if you don't want to raise an error in case of !trap
raise RouterOSTrapError("\nCommand: {}\nReturned an error: {}".format(sentence, reply))
pass
# reply is list containing strings with RAW output form API
# nice_reply is a list containing output form API sorted in dictionary for easier use later
nice_reply = []
for m in range(len(reply) - 1):
nice_reply.append({})
for k, v in (x[1:].split('=', 1) for x in reply[m][1:]):
nice_reply[m][k] = v
return nice_reply
def is_alive(self) -> bool:
"""Check if socket is alive and router responds"""
# Check if socket is open in this end
try:
self.sock.settimeout(2)
except OSError:
self.log("Socket is closed.")
return False
# Check if we can send and receive through socket
try:
self.talk('/system/identity/print')
except (socket.timeout, IndexError, BrokenPipeError):
self.log("Router does not respond, closing socket.")
self.close()
return False
self.log("Socket is open, router responds.")
self.sock.settimeout(None)
return True
def create_connection(self):
"""Create API connection
1. Open socket
2. Log into router
"""
self.open_socket()
self.login()
def close(self):
self.sock.close()