-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpicoredis.py
147 lines (109 loc) · 3.98 KB
/
picoredis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# -*- coding: utf-8 -*-
"""A very minimal Redis client library (not only) for MicroPython."""
try:
import usocket as socket
except ImportError:
import socket
try:
import uselect as select
except ImportError:
import select
CRLF = "\r\n"
class RedisError(Exception):
"""RESP error returned by the Redis server."""
pass
class RedisTimeout(Exception):
"""Reply from the Redis server cannot be read within timeout."""
class ParseError(Exception):
"""Invalid input while parsing RESP data."""
pass
def encode_request(*args):
"""Pack a series of arguments into a RESP array of bulk strings."""
result = ["*" + str(len(args)) + CRLF]
for arg in args:
if arg is None:
result.append('$-1' + CRLF)
else:
s = str(arg)
result.append('$' + str(len(s)) + CRLF + s + CRLF)
return "".join(result)
class Redis:
"""A very minimal Redis client."""
def __init__(self, host='127.0.0.1', port=6379, timeout=3000, debug=False):
self.debug = debug
self._sock = None
self._timeout = timeout
self.connect(host, port)
def connect(self, host=None, port=None):
if host is not None:
self._host = host
if port is not None:
self._port = port
if not self._sock:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.connect(socket.getaddrinfo(self._host, self._port)[0][-1])
self._sock_fd = self._sock.makefile('rb')
try:
self._sock_fd = self._sock_fd.fileno()
except AttributeError:
pass
self._poller = select.poll()
self._poller.register(self._sock_fd, select.POLLIN)
def close(self):
if self._sock:
self._poller.unregister(self._sock_fd)
self._sock.close()
self._sock = None
def do_cmd(self, cmd, *args):
if not self._sock:
raise RedisError("Not connected: use 'connect()' to connect to Redis server.")
request = encode_request(cmd, *args)
if self.debug:
print("SEND: {!r}".format(request))
self._sock.send(request.encode('utf-8'))
return self._read_response()
__call__ = do_cmd
def __getattr__(self, name):
if name.isalpha():
return lambda *args: self.do_cmd(name, *args)
raise AttributeError
def _read_response(self):
line = self._readuntil(lambda l, pos: l[-2:] == b'\r\n')
rtype = line[:1].decode('utf-8')
if rtype == '+':
return line[1:-2]
elif rtype == '-':
raise RedisError(*line[1:-2].decode('utf-8').split(None, 1))
elif rtype == ':':
return int(line[1:-2])
elif rtype == '$':
length = int(line[1:-2])
if length == -1:
return None
return self._readuntil(lambda l, pos: pos == length + 2)[:-2]
elif rtype == '*':
length = int(line[1:-2])
if length == -1:
return None
return [self._read_response() for item in range(length)]
else:
raise ParseError("Invalid response header byte.")
def _readuntil(self, predicate):
buf = b''
pos = 0
while not predicate(buf, pos):
readylist = self._poller.poll(self._timeout)
if not readylist:
raise RedisTimeout("Error reading response from Redis server within timeout.")
for entry in readylist:
if (entry[0] is self._sock_fd and entry[1] & select.POLLIN and not
entry[1] & (select.POLLHUP | select.POLLERR)):
buf += self._sock.recv(1)
pos += 1
break
else:
self.close()
raise OSError("Error reading from socket.")
if self.debug:
print("RECV: {!r}".format(buf))
return buf