10542d00ea
Curly quotes(Chinese punctuation) usually input from Chinese input method. When read from english context, it makes some confusion. Change-Id: Ibd50299ee287c56ec4759ea8ff53d47d006144f8
958 lines
36 KiB
ReStructuredText
958 lines
36 KiB
ReStructuredText
==================================
|
||
Building a Consistent Hashing Ring
|
||
==================================
|
||
|
||
---------------------
|
||
Authored by Greg Holt
|
||
---------------------
|
||
|
||
This is a compilation of five posts I made earlier discussing how to build
|
||
a consistent hashing ring. The posts seemed to be accessed quite frequently,
|
||
so I've gathered them all here on one page for easier reading.
|
||
|
||
Part 1
|
||
======
|
||
"Consistent Hashing" is a term used to describe a process where data is
|
||
distributed using a hashing algorithm to determine its location. Using
|
||
only the hash of the id of the data you can determine exactly where that
|
||
data should be. This mapping of hashes to locations is usually termed a
|
||
"ring".
|
||
|
||
Probably the simplest hash is just a modulus of the id. For instance, if
|
||
all ids are numbers and you have two machines you wish to distribute data
|
||
to, you could just put all odd numbered ids on one machine and even numbered
|
||
ids on the other. Assuming you have a balanced number of odd and even
|
||
numbered ids, and a balanced data size per id, your data would be balanced
|
||
between the two machines.
|
||
|
||
Since data ids are often textual names and not numbers, like paths for
|
||
files or URLs, it makes sense to use a "real" hashing algorithm to convert
|
||
the names to numbers first. Using MD5 for instance, the hash of the name
|
||
'mom.png' is '4559a12e3e8da7c2186250c2f292e3af' and the hash of 'dad.png'
|
||
is '096edcc4107e9e18d6a03a43b3853bea'. Now, using the modulus, we can
|
||
place 'mom.jpg' on the odd machine and 'dad.png' on the even one. Another
|
||
benefit of using a hashing algorithm like MD5 is that the resulting hashes
|
||
have a known even distribution, meaning your ids will be evenly distributed
|
||
without worrying about keeping the id values themselves evenly distributed.
|
||
|
||
Here is a simple example of this in action:
|
||
|
||
.. code-block:: python
|
||
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
NODE_COUNT = 100
|
||
DATA_ID_COUNT = 10000000
|
||
|
||
node_counts = [0] * NODE_COUNT
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
# This just pulls part of the hash out as an integer
|
||
hsh = unpack_from('>I', md5(data_id).digest())[0]
|
||
node_id = hsh % NODE_COUNT
|
||
node_counts[node_id] += 1
|
||
desired_count = DATA_ID_COUNT / NODE_COUNT
|
||
print '%d: Desired data ids per node' % desired_count
|
||
max_count = max(node_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids on one node, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(node_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids on one node, %.02f%% under' % \
|
||
(min_count, under)
|
||
|
||
::
|
||
|
||
100000: Desired data ids per node
|
||
100695: Most data ids on one node, 0.69% over
|
||
99073: Least data ids on one node, 0.93% under
|
||
|
||
So that's not bad at all; less than a percent over/under for distribution
|
||
per node. In the next part of this series we'll examine where modulus
|
||
distribution causes problems and how to improve our ring to overcome them.
|
||
|
||
Part 2
|
||
======
|
||
In Part 1 of this series, we did a simple test of using the modulus of a
|
||
hash to locate data. We saw very good distribution, but that's only part
|
||
of the story. Distributed systems not only need to distribute load, but
|
||
they often also need to grow as more and more data is placed in it.
|
||
|
||
So let's imagine we have a 100 node system up and running using our
|
||
previous algorithm, but it's starting to get full so we want to add
|
||
another node. When we add that 101st node to our algorithm we notice
|
||
that many ids now map to different nodes than they previously did.
|
||
We're going to have to shuffle a ton of data around our system to get
|
||
it all into place again.
|
||
|
||
Let's examine what's happened on a much smaller scale: just 2 nodes
|
||
again, node 0 gets even ids and node 1 gets odd ids. So data id 100
|
||
would map to node 0, data id 101 to node 1, data id 102 to node 0, etc.
|
||
This is simply node = id % 2. Now we add a third node (node 2) for more
|
||
space, so we want node = id % 3. So now data id 100 maps to node id 1,
|
||
data id 101 to node 2, and data id 102 to node 0. So we have to move
|
||
data for 2 of our 3 ids so they can be found again.
|
||
|
||
Let's examine this at a larger scale:
|
||
|
||
.. code-block:: python
|
||
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
NODE_COUNT = 100
|
||
NEW_NODE_COUNT = 101
|
||
DATA_ID_COUNT = 10000000
|
||
|
||
moved_ids = 0
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
|
||
node_id = hsh % NODE_COUNT
|
||
new_node_id = hsh % NEW_NODE_COUNT
|
||
if node_id != new_node_id:
|
||
moved_ids += 1
|
||
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
|
||
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
|
||
|
||
::
|
||
|
||
9900989 ids moved, 99.01%
|
||
|
||
Wow, that's severe. We'd have to shuffle around 99% of our data just
|
||
to increase our capacity 1%! We need a new algorithm that combats this
|
||
behavior.
|
||
|
||
This is where the "ring" really comes in. We can assign ranges of hashes
|
||
directly to nodes and then use an algorithm that minimizes the changes
|
||
to those ranges. Back to our small scale, let's say our ids range from 0
|
||
to 999. We have two nodes and we'll assign data ids 0–499 to node 0 and
|
||
500–999 to node 1. Later, when we add node 2, we can take half the data
|
||
ids from node 0 and half from node 1, minimizing the amount of data that
|
||
needs to move.
|
||
|
||
Let's examine this at a larger scale:
|
||
|
||
.. code-block:: python
|
||
|
||
from bisect import bisect_left
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
NODE_COUNT = 100
|
||
NEW_NODE_COUNT = 101
|
||
DATA_ID_COUNT = 10000000
|
||
|
||
node_range_starts = []
|
||
for node_id in xrange(NODE_COUNT):
|
||
node_range_starts.append(DATA_ID_COUNT /
|
||
NODE_COUNT * node_id)
|
||
new_node_range_starts = []
|
||
for new_node_id in xrange(NEW_NODE_COUNT):
|
||
new_node_range_starts.append(DATA_ID_COUNT /
|
||
NEW_NODE_COUNT * new_node_id)
|
||
moved_ids = 0
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
|
||
node_id = bisect_left(node_range_starts,
|
||
hsh % DATA_ID_COUNT) % NODE_COUNT
|
||
new_node_id = bisect_left(new_node_range_starts,
|
||
hsh % DATA_ID_COUNT) % NEW_NODE_COUNT
|
||
if node_id != new_node_id:
|
||
moved_ids += 1
|
||
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
|
||
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
|
||
|
||
::
|
||
|
||
4901707 ids moved, 49.02%
|
||
|
||
Okay, that is better. But still, moving 50% of our data to add 1% capacity
|
||
is not very good. If we examine what happened more closely we'll see what
|
||
is an "accordion effect". We shrunk node 0's range a bit to give to the
|
||
new node, but that shifted all the other node's ranges by the same amount.
|
||
|
||
We can minimize the change to a node's assigned range by assigning several
|
||
smaller ranges instead of the single broad range we were before. This can
|
||
be done by creating "virtual nodes" for each node. So 100 nodes might have
|
||
1000 virtual nodes. Let's examine how that might work.
|
||
|
||
.. code-block:: python
|
||
|
||
from bisect import bisect_left
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
NODE_COUNT = 100
|
||
DATA_ID_COUNT = 10000000
|
||
VNODE_COUNT = 1000
|
||
|
||
vnode_range_starts = []
|
||
vnode2node = []
|
||
for vnode_id in xrange(VNODE_COUNT):
|
||
vnode_range_starts.append(DATA_ID_COUNT /
|
||
VNODE_COUNT * vnode_id)
|
||
vnode2node.append(vnode_id % NODE_COUNT)
|
||
new_vnode2node = list(vnode2node)
|
||
new_node_id = NODE_COUNT
|
||
NEW_NODE_COUNT = NODE_COUNT + 1
|
||
vnodes_to_reassign = VNODE_COUNT / NEW_NODE_COUNT
|
||
while vnodes_to_reassign > 0:
|
||
for node_to_take_from in xrange(NODE_COUNT):
|
||
for vnode_id, node_id in enumerate(new_vnode2node):
|
||
if node_id == node_to_take_from:
|
||
new_vnode2node[vnode_id] = new_node_id
|
||
vnodes_to_reassign -= 1
|
||
break
|
||
if vnodes_to_reassign <= 0:
|
||
break
|
||
moved_ids = 0
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
|
||
vnode_id = bisect_left(vnode_range_starts,
|
||
hsh % DATA_ID_COUNT) % VNODE_COUNT
|
||
node_id = vnode2node[vnode_id]
|
||
new_node_id = new_vnode2node[vnode_id]
|
||
if node_id != new_node_id:
|
||
moved_ids += 1
|
||
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
|
||
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
|
||
|
||
::
|
||
|
||
90423 ids moved, 0.90%
|
||
|
||
There we go, we added 1% capacity and only moved 0.9% of existing data.
|
||
The vnode_range_starts list seems a bit out of place though. Its values
|
||
are calculated and never change for the lifetime of the cluster, so let's
|
||
optimize that out.
|
||
|
||
.. code-block:: python
|
||
|
||
from bisect import bisect_left
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
NODE_COUNT = 100
|
||
DATA_ID_COUNT = 10000000
|
||
VNODE_COUNT = 1000
|
||
|
||
vnode2node = []
|
||
for vnode_id in xrange(VNODE_COUNT):
|
||
vnode2node.append(vnode_id % NODE_COUNT)
|
||
new_vnode2node = list(vnode2node)
|
||
new_node_id = NODE_COUNT
|
||
vnodes_to_reassign = VNODE_COUNT / (NODE_COUNT + 1)
|
||
while vnodes_to_reassign > 0:
|
||
for node_to_take_from in xrange(NODE_COUNT):
|
||
for vnode_id, node_id in enumerate(vnode2node):
|
||
if node_id == node_to_take_from:
|
||
vnode2node[vnode_id] = new_node_id
|
||
vnodes_to_reassign -= 1
|
||
break
|
||
if vnodes_to_reassign <= 0:
|
||
break
|
||
moved_ids = 0
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
|
||
vnode_id = hsh % VNODE_COUNT
|
||
node_id = vnode2node[vnode_id]
|
||
new_node_id = new_vnode2node[vnode_id]
|
||
if node_id != new_node_id:
|
||
moved_ids += 1
|
||
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
|
||
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
|
||
|
||
::
|
||
|
||
89841 ids moved, 0.90%
|
||
|
||
There we go. In the next part of this series, will further examine the
|
||
algorithm's limitations and how to improve on it.
|
||
|
||
Part 3
|
||
======
|
||
In Part 2 of this series, we reached an algorithm that performed well
|
||
even when adding new nodes to the cluster. We used 1000 virtual nodes
|
||
that could be independently assigned to nodes, allowing us to minimize
|
||
the amount of data moved when a node was added.
|
||
|
||
The number of virtual nodes puts a cap on how many real nodes you can
|
||
have. For example, if you have 1000 virtual nodes and you try to add a
|
||
1001st real node, you can't assign a virtual node to it without leaving
|
||
another real node with no assignment, leaving you with just 1000 active
|
||
real nodes still.
|
||
|
||
Unfortunately, the number of virtual nodes created at the beginning can
|
||
never change for the life of the cluster without a lot of careful work.
|
||
For example, you could double the virtual node count by splitting each
|
||
existing virtual node in half and assigning both halves to the same real
|
||
node. However, if the real node uses the virtual node's id to optimally
|
||
store the data (for example, all data might be stored in /[virtual node
|
||
id]/[data id]) it would have to move data around locally to reflect the
|
||
change. And it would have to resolve data using both the new and old
|
||
locations while the moves were taking place, making atomic operations
|
||
difficult or impossible.
|
||
|
||
Let's continue with this assumption that changing the virtual node
|
||
count is more work than it's worth, but keep in mind that some applications
|
||
might be fine with this.
|
||
|
||
The easiest way to deal with this limitation is to make the limit high
|
||
enough that it won't matter. For instance, if we decide our cluster will
|
||
never exceed 60,000 real nodes, we can just make 60,000 virtual nodes.
|
||
|
||
Also, we should include in our calculations the relative size of our
|
||
nodes. For instance, a year from now we might have real nodes that can
|
||
handle twice the capacity of our current nodes. So we'd want to assign
|
||
twice the virtual nodes to those future nodes, so maybe we should raise
|
||
our virtual node estimate to 120,000.
|
||
|
||
A good rule to follow might be to calculate 100 virtual nodes to each
|
||
real node at maximum capacity. This would allow you to alter the load
|
||
on any given node by 1%, even at max capacity, which is pretty fine
|
||
tuning. So now we're at 6,000,000 virtual nodes for a max capacity cluster
|
||
of 60,000 real nodes.
|
||
|
||
6 million virtual nodes seems like a lot, and it might seem like we'd
|
||
use up way too much memory. But the only structure this affects is the
|
||
virtual node to real node mapping. The base amount of memory required
|
||
would be 6 million times 2 bytes (to store a real node id from 0 to
|
||
65,535). 12 megabytes of memory just isn't that much to use these days.
|
||
|
||
Even with all the overhead of flexible data types, things aren't that
|
||
bad. I changed the code from the previous part in this series to have
|
||
60,000 real and 6,000,000 virtual nodes, changed the list to an array('H'),
|
||
and python topped out at 27m of resident memory – and that includes two
|
||
rings.
|
||
|
||
To change terminology a bit, we're going to start calling these virtual
|
||
nodes "partitions". This will make it a bit easier to discern between the
|
||
two types of nodes we've been talking about so far. Also, it makes sense
|
||
to talk about partitions as they are really just unchanging sections
|
||
of the hash space.
|
||
|
||
We're also going to always keep the partition count a power of two. This
|
||
makes it easy to just use bit manipulation on the hash to determine the
|
||
partition rather than modulus. It isn't much faster, but it is a little.
|
||
So, here's our updated ring code, using 8,388,608 (2 ** 23) partitions
|
||
and 65,536 nodes. We've upped the sample data id set and checked the
|
||
distribution to make sure we haven't broken anything.
|
||
|
||
.. code-block:: python
|
||
|
||
from array import array
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
PARTITION_POWER = 23
|
||
PARTITION_SHIFT = 32 - PARTITION_POWER
|
||
NODE_COUNT = 65536
|
||
DATA_ID_COUNT = 100000000
|
||
|
||
part2node = array('H')
|
||
for part in xrange(2 ** PARTITION_POWER):
|
||
part2node.append(part % NODE_COUNT)
|
||
node_counts = [0] * NODE_COUNT
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
part = unpack_from('>I',
|
||
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
|
||
node_id = part2node[part]
|
||
node_counts[node_id] += 1
|
||
desired_count = DATA_ID_COUNT / NODE_COUNT
|
||
print '%d: Desired data ids per node' % desired_count
|
||
max_count = max(node_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids on one node, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(node_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids on one node, %.02f%% under' % \
|
||
(min_count, under)
|
||
|
||
::
|
||
|
||
1525: Desired data ids per node
|
||
1683: Most data ids on one node, 10.36% over
|
||
1360: Least data ids on one node, 10.82% under
|
||
|
||
Hmm. +–10% seems a bit high, but I reran with 65,536 partitions and
|
||
256 nodes and got +–0.4% so it's just that our sample size (100m) is
|
||
too small for our number of partitions (8m). It'll take way too long
|
||
to run experiments with an even larger sample size, so let's reduce
|
||
back down to these lesser numbers. (To be certain, I reran at the full
|
||
version with a 10 billion data id sample set and got +–1%, but it took
|
||
6.5 hours to run.)
|
||
|
||
In the next part of this series, we'll talk about how to increase the
|
||
durability of our data in the cluster.
|
||
|
||
Part 4
|
||
======
|
||
In Part 3 of this series, we just further discussed partitions (virtual
|
||
nodes) and cleaned up our code a bit based on that. Now, let's talk
|
||
about how to increase the durability and availability of our data in the
|
||
cluster.
|
||
|
||
For many distributed data stores, durability is quite important. Either
|
||
RAID arrays or individually distinct copies of data are required. While
|
||
RAID will increase the durability, it does nothing to increase the
|
||
availability – if the RAID machine crashes, the data may be safe but
|
||
inaccessible until repairs are done. If we keep distinct copies of the
|
||
data on different machines and a machine crashes, the other copies will
|
||
still be available while we repair the broken machine.
|
||
|
||
An easy way to gain this multiple copy durability/availability is to
|
||
just use multiple rings and groups of nodes. For instance, to achieve
|
||
the industry standard of three copies, you'd split the nodes into three
|
||
groups and each group would have its own ring and each would receive a
|
||
copy of each data item. This can work well enough, but has the drawback
|
||
that expanding capacity requires adding three nodes at a time and that
|
||
losing one node essentially lowers capacity by three times that node's
|
||
capacity.
|
||
|
||
Instead, let's use a different, but common, approach of meeting our
|
||
requirements with a single ring. This can be done by walking the ring
|
||
from the starting point and looking for additional distinct nodes.
|
||
Here's code that supports a variable number of replicas (set to 3 for
|
||
testing):
|
||
|
||
.. code-block:: python
|
||
|
||
from array import array
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
REPLICAS = 3
|
||
PARTITION_POWER = 16
|
||
PARTITION_SHIFT = 32 - PARTITION_POWER
|
||
PARTITION_MAX = 2 ** PARTITION_POWER - 1
|
||
NODE_COUNT = 256
|
||
DATA_ID_COUNT = 10000000
|
||
|
||
part2node = array('H')
|
||
for part in xrange(2 ** PARTITION_POWER):
|
||
part2node.append(part % NODE_COUNT)
|
||
node_counts = [0] * NODE_COUNT
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
part = unpack_from('>I',
|
||
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
|
||
node_ids = [part2node[part]]
|
||
node_counts[node_ids[0]] += 1
|
||
for replica in xrange(1, REPLICAS):
|
||
while part2node[part] in node_ids:
|
||
part += 1
|
||
if part > PARTITION_MAX:
|
||
part = 0
|
||
node_ids.append(part2node[part])
|
||
node_counts[node_ids[-1]] += 1
|
||
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
|
||
print '%d: Desired data ids per node' % desired_count
|
||
max_count = max(node_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids on one node, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(node_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids on one node, %.02f%% under' % \
|
||
(min_count, under)
|
||
|
||
::
|
||
|
||
117186: Desired data ids per node
|
||
118133: Most data ids on one node, 0.81% over
|
||
116093: Least data ids on one node, 0.93% under
|
||
|
||
That's pretty good; less than 1% over/under. While this works well,
|
||
there are a couple of problems.
|
||
|
||
First, because of how we've initially assigned the partitions to nodes,
|
||
all the partitions for a given node have their extra copies on the same
|
||
other two nodes. The problem here is that when a machine fails, the load
|
||
on these other nodes will jump by that amount. It'd be better if we
|
||
initially shuffled the partition assignment to distribute the failover
|
||
load better.
|
||
|
||
The other problem is a bit harder to explain, but deals with physical
|
||
separation of machines. Imagine you can only put 16 machines in a rack
|
||
in your datacenter. The 256 nodes we've been using would fill 16 racks.
|
||
With our current code, if a rack goes out (power problem, network issue,
|
||
etc.) there is a good chance some data will have all three copies in that
|
||
rack, becoming inaccessible. We can fix this shortcoming by adding the
|
||
concept of zones to our nodes, and then ensuring that replicas are stored
|
||
in distinct zones.
|
||
|
||
.. code-block:: python
|
||
|
||
from array import array
|
||
from hashlib import md5
|
||
from random import shuffle
|
||
from struct import unpack_from
|
||
|
||
REPLICAS = 3
|
||
PARTITION_POWER = 16
|
||
PARTITION_SHIFT = 32 - PARTITION_POWER
|
||
PARTITION_MAX = 2 ** PARTITION_POWER - 1
|
||
NODE_COUNT = 256
|
||
ZONE_COUNT = 16
|
||
DATA_ID_COUNT = 10000000
|
||
|
||
node2zone = []
|
||
while len(node2zone) < NODE_COUNT:
|
||
zone = 0
|
||
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
|
||
node2zone.append(zone)
|
||
zone += 1
|
||
part2node = array('H')
|
||
for part in xrange(2 ** PARTITION_POWER):
|
||
part2node.append(part % NODE_COUNT)
|
||
shuffle(part2node)
|
||
node_counts = [0] * NODE_COUNT
|
||
zone_counts = [0] * ZONE_COUNT
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
part = unpack_from('>I',
|
||
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
|
||
node_ids = [part2node[part]]
|
||
zones = [node2zone[node_ids[0]]]
|
||
node_counts[node_ids[0]] += 1
|
||
zone_counts[zones[0]] += 1
|
||
for replica in xrange(1, REPLICAS):
|
||
while part2node[part] in node_ids and \
|
||
node2zone[part2node[part]] in zones:
|
||
part += 1
|
||
if part > PARTITION_MAX:
|
||
part = 0
|
||
node_ids.append(part2node[part])
|
||
zones.append(node2zone[node_ids[-1]])
|
||
node_counts[node_ids[-1]] += 1
|
||
zone_counts[zones[-1]] += 1
|
||
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
|
||
print '%d: Desired data ids per node' % desired_count
|
||
max_count = max(node_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids on one node, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(node_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids on one node, %.02f%% under' % \
|
||
(min_count, under)
|
||
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
|
||
print '%d: Desired data ids per zone' % desired_count
|
||
max_count = max(zone_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids in one zone, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(zone_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids in one zone, %.02f%% under' % \
|
||
(min_count, under)
|
||
|
||
::
|
||
|
||
117186: Desired data ids per node
|
||
118782: Most data ids on one node, 1.36% over
|
||
115632: Least data ids on one node, 1.33% under
|
||
1875000: Desired data ids per zone
|
||
1878533: Most data ids in one zone, 0.19% over
|
||
1869070: Least data ids in one zone, 0.32% under
|
||
|
||
So the shuffle and zone distinctions affected our distribution some,
|
||
but still definitely good enough. This test took about 64 seconds to
|
||
run on my machine.
|
||
|
||
There's a completely alternate, and quite common, way of accomplishing
|
||
these same requirements. This alternate method doesn't use partitions
|
||
at all, but instead just assigns anchors to the nodes within the hash
|
||
space. Finding the first node for a given hash just involves walking
|
||
this anchor ring for the next node, and finding additional nodes works
|
||
similarly as before. To attain the equivalent of our virtual nodes,
|
||
each real node is assigned multiple anchors.
|
||
|
||
.. code-block:: python
|
||
|
||
from bisect import bisect_left
|
||
from hashlib import md5
|
||
from struct import unpack_from
|
||
|
||
REPLICAS = 3
|
||
NODE_COUNT = 256
|
||
ZONE_COUNT = 16
|
||
DATA_ID_COUNT = 10000000
|
||
VNODE_COUNT = 100
|
||
|
||
node2zone = []
|
||
while len(node2zone) < NODE_COUNT:
|
||
zone = 0
|
||
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
|
||
node2zone.append(zone)
|
||
zone += 1
|
||
hash2index = []
|
||
index2node = []
|
||
for node in xrange(NODE_COUNT):
|
||
for vnode in xrange(VNODE_COUNT):
|
||
hsh = unpack_from('>I', md5(str(node)).digest())[0]
|
||
index = bisect_left(hash2index, hsh)
|
||
if index > len(hash2index):
|
||
index = 0
|
||
hash2index.insert(index, hsh)
|
||
index2node.insert(index, node)
|
||
node_counts = [0] * NODE_COUNT
|
||
zone_counts = [0] * ZONE_COUNT
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
data_id = str(data_id)
|
||
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
|
||
index = bisect_left(hash2index, hsh)
|
||
if index >= len(hash2index):
|
||
index = 0
|
||
node_ids = [index2node[index]]
|
||
zones = [node2zone[node_ids[0]]]
|
||
node_counts[node_ids[0]] += 1
|
||
zone_counts[zones[0]] += 1
|
||
for replica in xrange(1, REPLICAS):
|
||
while index2node[index] in node_ids and \
|
||
node2zone[index2node[index]] in zones:
|
||
index += 1
|
||
if index >= len(hash2index):
|
||
index = 0
|
||
node_ids.append(index2node[index])
|
||
zones.append(node2zone[node_ids[-1]])
|
||
node_counts[node_ids[-1]] += 1
|
||
zone_counts[zones[-1]] += 1
|
||
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
|
||
print '%d: Desired data ids per node' % desired_count
|
||
max_count = max(node_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids on one node, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(node_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids on one node, %.02f%% under' % \
|
||
(min_count, under)
|
||
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
|
||
print '%d: Desired data ids per zone' % desired_count
|
||
max_count = max(zone_counts)
|
||
over = 100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids in one zone, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(zone_counts)
|
||
under = 100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids in one zone, %.02f%% under' % \
|
||
(min_count, under)
|
||
|
||
::
|
||
|
||
117186: Desired data ids per node
|
||
351282: Most data ids on one node, 199.76% over
|
||
15965: Least data ids on one node, 86.38% under
|
||
1875000: Desired data ids per zone
|
||
2248496: Most data ids in one zone, 19.92% over
|
||
1378013: Least data ids in one zone, 26.51% under
|
||
|
||
This test took over 15 minutes to run! Unfortunately, this method also
|
||
gives much less control over the distribution. To get better distribution,
|
||
you have to add more virtual nodes, which eats up more memory and takes
|
||
even more time to build the ring and perform distinct node lookups. The
|
||
most common operation, data id lookup, can be improved (by predetermining
|
||
each virtual node's failover nodes, for instance) but it starts off so
|
||
far behind our first approach that we'll just stick with that.
|
||
|
||
In the next part of this series, we'll start to wrap all this up into
|
||
a useful Python module.
|
||
|
||
Part 5
|
||
======
|
||
In Part 4 of this series, we ended up with a multiple copy, distinctly
|
||
zoned ring. Or at least the start of it. In this final part we'll package
|
||
the code up into a useable Python module and then add one last feature.
|
||
First, let's separate the ring itself from the building of the data for
|
||
the ring and its testing.
|
||
|
||
.. code-block:: python
|
||
|
||
from array import array
|
||
from hashlib import md5
|
||
from random import shuffle
|
||
from struct import unpack_from
|
||
from time import time
|
||
|
||
class Ring(object):
|
||
|
||
def __init__(self, nodes, part2node, replicas):
|
||
self.nodes = nodes
|
||
self.part2node = part2node
|
||
self.replicas = replicas
|
||
partition_power = 1
|
||
while 2 ** partition_power < len(part2node):
|
||
partition_power += 1
|
||
if len(part2node) != 2 ** partition_power:
|
||
raise Exception("part2node's length is not an "
|
||
"exact power of 2")
|
||
self.partition_shift = 32 - partition_power
|
||
|
||
def get_nodes(self, data_id):
|
||
data_id = str(data_id)
|
||
part = unpack_from('>I',
|
||
md5(data_id).digest())[0] >> self.partition_shift
|
||
node_ids = [self.part2node[part]]
|
||
zones = [self.nodes[node_ids[0]]]
|
||
for replica in xrange(1, self.replicas):
|
||
while self.part2node[part] in node_ids and \
|
||
self.nodes[self.part2node[part]] in zones:
|
||
part += 1
|
||
if part >= len(self.part2node):
|
||
part = 0
|
||
node_ids.append(self.part2node[part])
|
||
zones.append(self.nodes[node_ids[-1]])
|
||
return [self.nodes[n] for n in node_ids]
|
||
|
||
def build_ring(nodes, partition_power, replicas):
|
||
begin = time()
|
||
part2node = array('H')
|
||
for part in xrange(2 ** partition_power):
|
||
part2node.append(part % len(nodes))
|
||
shuffle(part2node)
|
||
ring = Ring(nodes, part2node, replicas)
|
||
print '%.02fs to build ring' % (time() - begin)
|
||
return ring
|
||
|
||
def test_ring(ring):
|
||
begin = time()
|
||
DATA_ID_COUNT = 10000000
|
||
node_counts = {}
|
||
zone_counts = {}
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
for node in ring.get_nodes(data_id):
|
||
node_counts[node['id']] = \
|
||
node_counts.get(node['id'], 0) + 1
|
||
zone_counts[node['zone']] = \
|
||
zone_counts.get(node['zone'], 0) + 1
|
||
print '%ds to test ring' % (time() - begin)
|
||
desired_count = \
|
||
DATA_ID_COUNT / len(ring.nodes) * REPLICAS
|
||
print '%d: Desired data ids per node' % desired_count
|
||
max_count = max(node_counts.itervalues())
|
||
over = \
|
||
100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids on one node, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(node_counts.itervalues())
|
||
under = \
|
||
100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids on one node, %.02f%% under' % \
|
||
(min_count, under)
|
||
zone_count = \
|
||
len(set(n['zone'] for n in ring.nodes.itervalues()))
|
||
desired_count = \
|
||
DATA_ID_COUNT / zone_count * ring.replicas
|
||
print '%d: Desired data ids per zone' % desired_count
|
||
max_count = max(zone_counts.itervalues())
|
||
over = \
|
||
100.0 * (max_count - desired_count) / desired_count
|
||
print '%d: Most data ids in one zone, %.02f%% over' % \
|
||
(max_count, over)
|
||
min_count = min(zone_counts.itervalues())
|
||
under = \
|
||
100.0 * (desired_count - min_count) / desired_count
|
||
print '%d: Least data ids in one zone, %.02f%% under' % \
|
||
(min_count, under)
|
||
|
||
if __name__ == '__main__':
|
||
PARTITION_POWER = 16
|
||
REPLICAS = 3
|
||
NODE_COUNT = 256
|
||
ZONE_COUNT = 16
|
||
nodes = {}
|
||
while len(nodes) < NODE_COUNT:
|
||
zone = 0
|
||
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
|
||
node_id = len(nodes)
|
||
nodes[node_id] = {'id': node_id, 'zone': zone}
|
||
zone += 1
|
||
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
|
||
test_ring(ring)
|
||
|
||
::
|
||
|
||
0.06s to build ring
|
||
82s to test ring
|
||
117186: Desired data ids per node
|
||
118773: Most data ids on one node, 1.35% over
|
||
115801: Least data ids on one node, 1.18% under
|
||
1875000: Desired data ids per zone
|
||
1878339: Most data ids in one zone, 0.18% over
|
||
1869914: Least data ids in one zone, 0.27% under
|
||
|
||
It takes a bit longer to test our ring, but that's mostly because of
|
||
the switch to dictionaries from arrays for various items. Having node
|
||
dictionaries is nice because you can attach any node information you
|
||
want directly there (ip addresses, tcp ports, drive paths, etc.). But
|
||
we're still on track for further testing; our distribution is still good.
|
||
|
||
Now, let's add our one last feature to our ring: the concept of weights.
|
||
Weights are useful because the nodes you add later in a ring's life are
|
||
likely to have more capacity than those you have at the outset. For this
|
||
test, we'll make half our nodes have twice the weight. We'll have to
|
||
change build_ring to give more partitions to the nodes with more weight
|
||
and we'll change test_ring to take into account these weights. Since
|
||
we've changed so much I'll just post the entire module again:
|
||
|
||
.. code-block:: python
|
||
|
||
from array import array
|
||
from hashlib import md5
|
||
from random import shuffle
|
||
from struct import unpack_from
|
||
from time import time
|
||
|
||
class Ring(object):
|
||
|
||
def __init__(self, nodes, part2node, replicas):
|
||
self.nodes = nodes
|
||
self.part2node = part2node
|
||
self.replicas = replicas
|
||
partition_power = 1
|
||
while 2 ** partition_power < len(part2node):
|
||
partition_power += 1
|
||
if len(part2node) != 2 ** partition_power:
|
||
raise Exception("part2node's length is not an "
|
||
"exact power of 2")
|
||
self.partition_shift = 32 - partition_power
|
||
|
||
def get_nodes(self, data_id):
|
||
data_id = str(data_id)
|
||
part = unpack_from('>I',
|
||
md5(data_id).digest())[0] >> self.partition_shift
|
||
node_ids = [self.part2node[part]]
|
||
zones = [self.nodes[node_ids[0]]]
|
||
for replica in xrange(1, self.replicas):
|
||
while self.part2node[part] in node_ids and \
|
||
self.nodes[self.part2node[part]] in zones:
|
||
part += 1
|
||
if part >= len(self.part2node):
|
||
part = 0
|
||
node_ids.append(self.part2node[part])
|
||
zones.append(self.nodes[node_ids[-1]])
|
||
return [self.nodes[n] for n in node_ids]
|
||
|
||
def build_ring(nodes, partition_power, replicas):
|
||
begin = time()
|
||
parts = 2 ** partition_power
|
||
total_weight = \
|
||
float(sum(n['weight'] for n in nodes.itervalues()))
|
||
for node in nodes.itervalues():
|
||
node['desired_parts'] = \
|
||
parts / total_weight * node['weight']
|
||
part2node = array('H')
|
||
for part in xrange(2 ** partition_power):
|
||
for node in nodes.itervalues():
|
||
if node['desired_parts'] >= 1:
|
||
node['desired_parts'] -= 1
|
||
part2node.append(node['id'])
|
||
break
|
||
else:
|
||
for node in nodes.itervalues():
|
||
if node['desired_parts'] >= 0:
|
||
node['desired_parts'] -= 1
|
||
part2node.append(node['id'])
|
||
break
|
||
shuffle(part2node)
|
||
ring = Ring(nodes, part2node, replicas)
|
||
print '%.02fs to build ring' % (time() - begin)
|
||
return ring
|
||
|
||
def test_ring(ring):
|
||
begin = time()
|
||
DATA_ID_COUNT = 10000000
|
||
node_counts = {}
|
||
zone_counts = {}
|
||
for data_id in xrange(DATA_ID_COUNT):
|
||
for node in ring.get_nodes(data_id):
|
||
node_counts[node['id']] = \
|
||
node_counts.get(node['id'], 0) + 1
|
||
zone_counts[node['zone']] = \
|
||
zone_counts.get(node['zone'], 0) + 1
|
||
print '%ds to test ring' % (time() - begin)
|
||
total_weight = float(sum(n['weight'] for n in
|
||
ring.nodes.itervalues()))
|
||
max_over = 0
|
||
max_under = 0
|
||
for node in ring.nodes.itervalues():
|
||
desired = DATA_ID_COUNT * REPLICAS * \
|
||
node['weight'] / total_weight
|
||
diff = node_counts[node['id']] - desired
|
||
if diff > 0:
|
||
over = 100.0 * diff / desired
|
||
if over > max_over:
|
||
max_over = over
|
||
else:
|
||
under = 100.0 * (-diff) / desired
|
||
if under > max_under:
|
||
max_under = under
|
||
print '%.02f%% max node over' % max_over
|
||
print '%.02f%% max node under' % max_under
|
||
max_over = 0
|
||
max_under = 0
|
||
for zone in set(n['zone'] for n in
|
||
ring.nodes.itervalues()):
|
||
zone_weight = sum(n['weight'] for n in
|
||
ring.nodes.itervalues() if n['zone'] == zone)
|
||
desired = DATA_ID_COUNT * REPLICAS * \
|
||
zone_weight / total_weight
|
||
diff = zone_counts[zone] - desired
|
||
if diff > 0:
|
||
over = 100.0 * diff / desired
|
||
if over > max_over:
|
||
max_over = over
|
||
else:
|
||
under = 100.0 * (-diff) / desired
|
||
if under > max_under:
|
||
max_under = under
|
||
print '%.02f%% max zone over' % max_over
|
||
print '%.02f%% max zone under' % max_under
|
||
|
||
if __name__ == '__main__':
|
||
PARTITION_POWER = 16
|
||
REPLICAS = 3
|
||
NODE_COUNT = 256
|
||
ZONE_COUNT = 16
|
||
nodes = {}
|
||
while len(nodes) < NODE_COUNT:
|
||
zone = 0
|
||
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
|
||
node_id = len(nodes)
|
||
nodes[node_id] = {'id': node_id, 'zone': zone,
|
||
'weight': 1.0 + (node_id % 2)}
|
||
zone += 1
|
||
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
|
||
test_ring(ring)
|
||
|
||
::
|
||
|
||
0.88s to build ring
|
||
86s to test ring
|
||
1.66% max over
|
||
1.46% max under
|
||
0.28% max zone over
|
||
0.23% max zone under
|
||
|
||
So things are still good, even though we have differently weighted nodes.
|
||
I ran another test with this code using random weights from 1 to 100 and
|
||
got over/under values for nodes of 7.35%/18.12% and zones of 0.24%/0.22%,
|
||
still pretty good considering the crazy weight ranges.
|
||
|
||
Summary
|
||
=======
|
||
Hopefully this series has been a good introduction to building a ring.
|
||
This code is essentially how the OpenStack Swift ring works, except that
|
||
Swift's ring has lots of additional optimizations, such as storing each
|
||
replica assignment separately, and lots of extra features for building,
|
||
validating, and otherwise working with rings.
|