Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ketama Support #66

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions memcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,6 +1412,146 @@ def __str__(self):
return "unix:%s%s" % (self.address, d)


class KetamaClient(Client):
""" Memcach client with Consistent hashing support.

Ketama is an implementation of a consistent hashing algorithm, meaning you
can add or remove servers from the memcached pool without causing a
complete remap of all keys. It was designed by Richard Jones.

How Ketama Works:
1. Hash each server to several unsigned integer values.
2. Conceptually, these numbers are placed on a ring.
3. Each number links to the server it was hashed from, so servers
appear at several points on the ring.
4. To map a key->server, hash the key to an unsigned integer and find
the next biggest number on the ring. That's your server. If
the number is too big, roll over to the first server in the ring
When a server is added or removed, only some keys will be remapped to
different servers. With the original modula algorithm, all keys
would have been remapped.

TODO: Improve the documentation, add test cases.
"""
# For this Consistent hashing client, the weight of the server is the
# number of entries it will have in the ring. This will make sure
# each server has well normalized key distribution.
DEFAULT_SERVER_WEIGHT = 200

# Total number of slots on the ring.
RING_SIZE = 2**16

def __init__(self, *args, **kwargs):
# Mapping between ring slot -> server.
self._ketama_server_ring = {}

# Sorted server slots on top of the virtual ring.
self._ketama_server_slots = []

super(KetamaClient, self).__init__(*args, **kwargs)

def _get_server(self, key):
"""
Get the memcache server corresponding to the given key.

@param key: key, or (server_hash, key) tuple if you want to specify
a hash to determine which server is selected

@return A tuple with (server_obj, key), or (None, None) if no servers
were available.
"""
# map the key on to the ring slot space.
h_key = self._generate_ring_slot(key)

if isinstance(key, tuple):
serverhash, key = key

for slot in self._ketama_server_slots:
if h_key <= slot:
server = self._ketama_server_ring[slot]
if server.connect():
return (server, key)

# Roll over to the first available server
for server in self._ketama_server_ring.values():
if server and server.connect():
return (server, key)

return (None, None)

def set_servers(self, servers):
"""
Set servers for this client.

@param servers: List of server hosts in <IP>:<PORT> format.
or
List of tuples with each tuple of the format
(<IP>:<PORT>, weight)
"""
# Set the default weight if weight isn't passed.
self.servers = [_Host(
s if isinstance(s, tuple) else (s, self.DEFAULT_SERVER_WEIGHT),
self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect) for s in servers]

# Place all the servers on rings based on the slot allocation
# specifications.
map(self._place_server_on_ring, self.servers)

def _place_server_on_ring(self, server):
"""
Based on the weight of the server, generate multiple slots for
each server. This ensures when a server is added/remove keys won't all
remap to the same new server

@param server: An instance of :class:~`memcache._Host`.
"""
server_slots = self._get_server_slots_on_ring(server)
for slot in server_slots:
if slot not in self._ketama_server_ring:
self._ketama_server_ring[slot] = server
self._ketama_server_slots.append(slot)
else:
# TODO: Handle collisions
pass

# Sort the server slot keys to make it a ring.
self._ketama_server_slots.sort()

def _get_server_slots_on_ring(self, server):
"""
Returns list of slot on the ring for given server.

This make sure that the slots won't collide with others server.

@param: server An object of :class:~`memcache._Host`.
@return: list of slots on the ring.
"""
server_slots = []

for i in range(0, server.weight):
server_key = "%s:%d_%d" % (server.ip, server.port, i)
server_slots.append(self._generate_ring_slot(server_key))

return server_slots

def _generate_ring_slot(self, key):
"""
Returns a slot in the ring for the given key.

@param key: Key which needs to be mapped to the ring.
@type key: str

@return: hash value corresponding to the `key`
"""

if isinstance(key, tuple):
serverhash, key = key
else:
serverhash = binascii.crc32(key.encode('ascii')) & 0xffffffff
return serverhash % self.RING_SIZE

def _doctest():
import doctest
import memcache
Expand Down
12 changes: 9 additions & 3 deletions tests/test_memcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from unittest import TestCase

from memcache import Client, SERVER_MAX_KEY_LENGTH
from memcache import Client, KetamaClient, SERVER_MAX_KEY_LENGTH

try:
_str_cls = basestring
Expand Down Expand Up @@ -31,10 +31,10 @@ def __eq__(self, other):


class TestMemcache(TestCase):
def setUp(self):
def setUp(self, client_class=Client):
# TODO: unix socket server stuff
servers = ["127.0.0.1:11211"]
self.mc = Client(servers, debug=1)
self.mc = client_class(servers, debug=1)
pass

def check_setget(self, key, val, noreply=False):
Expand Down Expand Up @@ -119,6 +119,12 @@ def test_sending_key_too_long(self):
self.mc.set('a' * SERVER_MAX_KEY_LENGTH, 1, noreply=True)


class TestMemcacheKetama(TestMemcache):
def setUp(self):
# Run all the tests again using the KetamaClient
super(TestMemcacheKetama, self).setUp(KetamaClient)


if __name__ == "__main__":
# failures = 0
# print("Testing docstrings...")
Expand Down