resurrect gholt blog posts on building consistent hashing ring

These are blog posts authored by Greg Holt (gholt) and used with
permission to add here. Content was only reformatted as rst and
wrap prose lines at 70 characters.

Change-Id: I7aa47c24b5019aa598ee005e01612a49514da25f
This commit is contained in:
Paul Dardeau 2016-05-09 20:28:42 +00:00 committed by John Dickinson
parent 2bf5eb775f
commit cb8cda6588
3 changed files with 960 additions and 0 deletions

View File

@ -58,6 +58,7 @@ Overview and Concepts
crossdomain crossdomain
overview_erasure_code overview_erasure_code
overview_backing_store overview_backing_store
ring_background
associated_projects associated_projects
Developer Documentation Developer Documentation

View File

@ -426,3 +426,5 @@ for rings that were close to balanceable, like 3 machines with 60TB, 60TB, and
didn't always get it. After that, overload was added to the ring builder so didn't always get it. After that, overload was added to the ring builder so
that operators could choose a balance between dispersion and device weights. that operators could choose a balance between dispersion and device weights.
In time the overload concept was improved and made more accurate. In time the overload concept was improved and made more accurate.
For more background on consistent hashing rings, please see :doc:`ring_background`.

View File

@ -0,0 +1,957 @@
==================================
Building a Consistent Hashing Ring
==================================
---------------------
Authored by Greg Holt
---------------------
This is compilation of five posts I made earlier discussing how to build
a consistent hashing ring. The posts seemed to be accessed quite frequently,
so I've gathered them all here on one page for easier reading.
Part 1
======
“Consistent Hashing” is a term used to describe a process where data is
distributed using a hashing algorithm to determine its location. Using
only the hash of the id of the data you can determine exactly where that
data should be. This mapping of hashes to locations is usually termed a
“ring”.
Probably the simplest hash is just a modulus of the id. For instance, if
all ids are numbers and you have two machines you wish to distribute data
to, you could just put all odd numbered ids on one machine and even numbered
ids on the other. Assuming you have a balanced number of odd and even
numbered ids, and a balanced data size per id, your data would be balanced
between the two machines.
Since data ids are often textual names and not numbers, like paths for
files or URLs, it makes sense to use a “real” hashing algorithm to convert
the names to numbers first. Using MD5 for instance, the hash of the name
mom.png is 4559a12e3e8da7c2186250c2f292e3af and the hash of dad.png
is 096edcc4107e9e18d6a03a43b3853bea. Now, using the modulus, we can
place mom.jpg on the odd machine and dad.png on the even one. Another
benefit of using a hashing algorithm like MD5 is that the resulting hashes
have a known even distribution, meaning your ids will be evenly distributed
without worrying about keeping the id values themselves evenly distributed.
Here is a simple example of this in action:
.. code-block:: python
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
DATA_ID_COUNT = 10000000
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
# This just pulls part of the hash out as an integer
hsh = unpack_from('>I', md5(data_id).digest())[0]
node_id = hsh % NODE_COUNT
node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
::
100000: Desired data ids per node
100695: Most data ids on one node, 0.69% over
99073: Least data ids on one node, 0.93% under
So thats not bad at all; less than a percent over/under for distribution
per node. In the next part of this series well examine where modulus
distribution causes problems and how to improve our ring to overcome them.
Part 2
======
In Part 1 of this series, we did a simple test of using the modulus of a
hash to locate data. We saw very good distribution, but thats only part
of the story. Distributed systems not only need to distribute load, but
they often also need to grow as more and more data is placed in it.
So lets imagine we have a 100 node system up and running using our
previous algorithm, but its starting to get full so we want to add
another node. When we add that 101st node to our algorithm we notice
that many ids now map to different nodes than they previously did.
Were going to have to shuffle a ton of data around our system to get
it all into place again.
Lets examine whats happened on a much smaller scale: just 2 nodes
again, node 0 gets even ids and node 1 gets odd ids. So data id 100
would map to node 0, data id 101 to node 1, data id 102 to node 0, etc.
This is simply node = id % 2. Now we add a third node (node 2) for more
space, so we want node = id % 3. So now data id 100 maps to node id 1,
data id 101 to node 2, and data id 102 to node 0. So we have to move
data for 2 of our 3 ids so they can be found again.
Lets examine this at a larger scale:
.. code-block:: python
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
node_id = hsh % NODE_COUNT
new_node_id = hsh % NEW_NODE_COUNT
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
::
9900989 ids moved, 99.01%
Wow, thats severe. Wed have to shuffle around 99% of our data just
to increase our capacity 1%! We need a new algorithm that combats this
behavior.
This is where the “ring” really comes in. We can assign ranges of hashes
directly to nodes and then use an algorithm that minimizes the changes
to those ranges. Back to our small scale, lets say our ids range from 0
to 999. We have two nodes and well assign data ids 0499 to node 0 and
500999 to node 1. Later, when we add node 2, we can take half the data
ids from node 0 and half from node 1, minimizing the amount of data that
needs to move.
Lets examine this at a larger scale:
.. code-block:: python
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000
node_range_starts = []
for node_id in xrange(NODE_COUNT):
node_range_starts.append(DATA_ID_COUNT /
NODE_COUNT * node_id)
new_node_range_starts = []
for new_node_id in xrange(NEW_NODE_COUNT):
new_node_range_starts.append(DATA_ID_COUNT /
NEW_NODE_COUNT * new_node_id)
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
node_id = bisect_left(node_range_starts,
hsh % DATA_ID_COUNT) % NODE_COUNT
new_node_id = bisect_left(new_node_range_starts,
hsh % DATA_ID_COUNT) % NEW_NODE_COUNT
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
::
4901707 ids moved, 49.02%
Okay, that is better. But still, moving 50% of our data to add 1% capacity
is not very good. If we examine what happened more closely well see what
is an “accordion effect”. We shrunk node 0s range a bit to give to the
new node, but that shifted all the other nodes ranges by the same amount.
We can minimize the change to a nodes assigned range by assigning several
smaller ranges instead of the single broad range we were before. This can
be done by creating “virtual nodes” for each node. So 100 nodes might have
1000 virtual nodes. Lets examine how that might work.
.. code-block:: python
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000
vnode_range_starts = []
vnode2node = []
for vnode_id in xrange(VNODE_COUNT):
vnode_range_starts.append(DATA_ID_COUNT /
VNODE_COUNT * vnode_id)
vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
NEW_NODE_COUNT = NODE_COUNT + 1
vnodes_to_reassign = VNODE_COUNT / NEW_NODE_COUNT
while vnodes_to_reassign > 0:
for node_to_take_from in xrange(NODE_COUNT):
for vnode_id, node_id in enumerate(new_vnode2node):
if node_id == node_to_take_from:
new_vnode2node[vnode_id] = new_node_id
vnodes_to_reassign -= 1
break
if vnodes_to_reassign <= 0:
break
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
vnode_id = bisect_left(vnode_range_starts,
hsh % DATA_ID_COUNT) % VNODE_COUNT
node_id = vnode2node[vnode_id]
new_node_id = new_vnode2node[vnode_id]
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
::
90423 ids moved, 0.90%
There we go, we added 1% capacity and only moved 0.9% of existing data.
The vnode_range_starts list seems a bit out of place though. Its values
are calculated and never change for the lifetime of the cluster, so lets
optimize that out.
.. code-block:: python
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
NODE_COUNT = 100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000
vnode2node = []
for vnode_id in xrange(VNODE_COUNT):
vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
vnodes_to_reassign = VNODE_COUNT / (NODE_COUNT + 1)
while vnodes_to_reassign > 0:
for node_to_take_from in xrange(NODE_COUNT):
for vnode_id, node_id in enumerate(vnode2node):
if node_id == node_to_take_from:
vnode2node[vnode_id] = new_node_id
vnodes_to_reassign -= 1
break
if vnodes_to_reassign <= 0:
break
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
vnode_id = hsh % VNODE_COUNT
node_id = vnode2node[vnode_id]
new_node_id = new_vnode2node[vnode_id]
if node_id != new_node_id:
moved_ids += 1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print '%d ids moved, %.02f%%' % (moved_ids, percent_moved)
::
89841 ids moved, 0.90%
There we go. In the next part of this series, will further examine the
algorithms limitations and how to improve on it.
Part 3
======
In Part 2 of this series, we reached an algorithm that performed well
even when adding new nodes to the cluster. We used 1000 virtual nodes
that could be independently assigned to nodes, allowing us to minimize
the amount of data moved when a node was added.
The number of virtual nodes puts a cap on how many real nodes you can
have. For example, if you have 1000 virtual nodes and you try to add a
1001st real node, you cant assign a virtual node to it without leaving
another real node with no assignment, leaving you with just 1000 active
real nodes still.
Unfortunately, the number of virtual nodes created at the beginning can
never change for the life of the cluster without a lot of careful work.
For example, you could double the virtual node count by splitting each
existing virtual node in half and assigning both halves to the same real
node. However, if the real node uses the virtual nodes id to optimally
store the data (for example, all data might be stored in /[virtual node
id]/[data id]) it would have to move data around locally to reflect the
change. And it would have to resolve data using both the new and old
locations while the moves were taking place, making atomic operations
difficult or impossible.
Lets continue with this assumption that changing the virtual node
count is more work than its worth, but keep in mind that some applications
might be fine with this.
The easiest way to deal with this limitation is to make the limit high
enough that it wont matter. For instance, if we decide our cluster will
never exceed 60,000 real nodes, we can just make 60,000 virtual nodes.
Also, we should include in our calculations the relative size of our
nodes. For instance, a year from now we might have real nodes that can
handle twice the capacity of our current nodes. So wed want to assign
twice the virtual nodes to those future nodes, so maybe we should raise
our virtual node estimate to 120,000.
A good rule to follow might be to calculate 100 virtual nodes to each
real node at maximum capacity. This would allow you to alter the load
on any given node by 1%, even at max capacity, which is pretty fine
tuning. So now were at 6,000,000 virtual nodes for a max capacity cluster
of 60,000 real nodes.
6 million virtual nodes seems like a lot, and it might seem like wed
use up way too much memory. But the only structure this affects is the
virtual node to real node mapping. The base amount of memory required
would be 6 million times 2 bytes (to store a real node id from 0 to
65,535). 12 megabytes of memory just isnt that much to use these days.
Even with all the overhead of flexible data types, things arent that
bad. I changed the code from the previous part in this series to have
60,000 real and 6,000,000 virtual nodes, changed the list to an array(H),
and python topped out at 27m of resident memory and that includes two
rings.
To change terminology a bit, were going to start calling these virtual
nodes “partitions”. This will make it a bit easier to discern between the
two types of nodes weve been talking about so far. Also, it makes sense
to talk about partitions as they are really just unchanging sections
of the hash space.
Were also going to always keep the partition count a power of two. This
makes it easy to just use bit manipulation on the hash to determine the
partition rather than modulus. It isnt much faster, but it is a little.
So, heres our updated ring code, using 8,388,608 (2 ** 23) partitions
and 65,536 nodes. Weve upped the sample data id set and checked the
distribution to make sure we havent broken anything.
.. code-block:: python
from array import array
from hashlib import md5
from struct import unpack_from
PARTITION_POWER = 23
PARTITION_SHIFT = 32 - PARTITION_POWER
NODE_COUNT = 65536
DATA_ID_COUNT = 100000000
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_id = part2node[part]
node_counts[node_id] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
::
1525: Desired data ids per node
1683: Most data ids on one node, 10.36% over
1360: Least data ids on one node, 10.82% under
Hmm. +10% seems a bit high, but I reran with 65,536 partitions and
256 nodes and got +0.4% so its just that our sample size (100m) is
too small for our number of partitions (8m). Itll take way too long
to run experiments with an even larger sample size, so lets reduce
back down to these lesser numbers. (To be certain, I reran at the full
version with a 10 billion data id sample set and got +1%, but it took
6.5 hours to run.)
In the next part of this series, well talk about how to increase the
durability of our data in the cluster.
Part 4
======
In Part 3 of this series, we just further discussed partitions (virtual
nodes) and cleaned up our code a bit based on that. Now, lets talk
about how to increase the durability and availability of our data in the
cluster.
For many distributed data stores, durability is quite important. Either
RAID arrays or individually distinct copies of data are required. While
RAID will increase the durability, it does nothing to increase the
availability if the RAID machine crashes, the data may be safe but
inaccessible until repairs are done. If we keep distinct copies of the
data on different machines and a machine crashes, the other copies will
still be available while we repair the broken machine.
An easy way to gain this multiple copy durability/availability is to
just use multiple rings and groups of nodes. For instance, to achieve
the industry standard of three copies, youd split the nodes into three
groups and each group would have its own ring and each would receive a
copy of each data item. This can work well enough, but has the drawback
that expanding capacity requires adding three nodes at a time and that
losing one node essentially lowers capacity by three times that nodes
capacity.
Instead, lets use a different, but common, approach of meeting our
requirements with a single ring. This can be done by walking the ring
from the starting point and looking for additional distinct nodes.
Heres code that supports a variable number of replicas (set to 3 for
testing):
.. code-block:: python
from array import array
from hashlib import md5
from struct import unpack_from
REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
DATA_ID_COUNT = 10000000
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_ids = [part2node[part]]
node_counts[node_ids[0]] += 1
for replica in xrange(1, REPLICAS):
while part2node[part] in node_ids:
part += 1
if part > PARTITION_MAX:
part = 0
node_ids.append(part2node[part])
node_counts[node_ids[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
::
117186: Desired data ids per node
118133: Most data ids on one node, 0.81% over
116093: Least data ids on one node, 0.93% under
Thats pretty good; less than 1% over/under. While this works well,
there are a couple of problems.
First, because of how weve initially assigned the partitions to nodes,
all the partitions for a given node have their extra copies on the same
other two nodes. The problem here is that when a machine fails, the load
on these other nodes will jump by that amount. Itd be better if we
initially shuffled the partition assignment to distribute the failover
load better.
The other problem is a bit harder to explain, but deals with physical
separation of machines. Imagine you can only put 16 machines in a rack
in your datacenter. The 256 nodes weve been using would fill 16 racks.
With our current code, if a rack goes out (power problem, network issue,
etc.) there is a good chance some data will have all three copies in that
rack, becoming inaccessible. We can fix this shortcoming by adding the
concept of zones to our nodes, and then ensuring that replicas are stored
in distinct zones.
.. code-block:: python
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
REPLICAS = 3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 2 ** PARTITION_POWER - 1
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
node2zone = []
while len(node2zone) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
node2zone.append(zone)
zone += 1
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
part2node.append(part % NODE_COUNT)
shuffle(part2node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
part = unpack_from('>I',
md5(str(data_id)).digest())[0] >> PARTITION_SHIFT
node_ids = [part2node[part]]
zones = [node2zone[node_ids[0]]]
node_counts[node_ids[0]] += 1
zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
while part2node[part] in node_ids and \
node2zone[part2node[part]] in zones:
part += 1
if part > PARTITION_MAX:
part = 0
node_ids.append(part2node[part])
zones.append(node2zone[node_ids[-1]])
node_counts[node_ids[-1]] += 1
zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
(max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
(min_count, under)
::
117186: Desired data ids per node
118782: Most data ids on one node, 1.36% over
115632: Least data ids on one node, 1.33% under
1875000: Desired data ids per zone
1878533: Most data ids in one zone, 0.19% over
1869070: Least data ids in one zone, 0.32% under
So the shuffle and zone distinctions affected our distribution some,
but still definitely good enough. This test took about 64 seconds to
run on my machine.
Theres a completely alternate, and quite common, way of accomplishing
these same requirements. This alternate method doesnt use partitions
at all, but instead just assigns anchors to the nodes within the hash
space. Finding the first node for a given hash just involves walking
this anchor ring for the next node, and finding additional nodes works
similarly as before. To attain the equivalent of our virtual nodes,
each real node is assigned multiple anchors.
.. code-block:: python
from bisect import bisect_left
from hashlib import md5
from struct import unpack_from
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000
VNODE_COUNT = 100
node2zone = []
while len(node2zone) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
node2zone.append(zone)
zone += 1
hash2index = []
index2node = []
for node in xrange(NODE_COUNT):
for vnode in xrange(VNODE_COUNT):
hsh = unpack_from('>I', md5(str(node)).digest())[0]
index = bisect_left(hash2index, hsh)
if index > len(hash2index):
index = 0
hash2index.insert(index, hsh)
index2node.insert(index, node)
node_counts = [0] * NODE_COUNT
zone_counts = [0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
data_id = str(data_id)
hsh = unpack_from('>I', md5(str(data_id)).digest())[0]
index = bisect_left(hash2index, hsh)
if index >= len(hash2index):
index = 0
node_ids = [index2node[index]]
zones = [node2zone[node_ids[0]]]
node_counts[node_ids[0]] += 1
zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
while index2node[index] in node_ids and \
node2zone[index2node[index]] in zones:
index += 1
if index >= len(hash2index):
index = 0
node_ids.append(index2node[index])
zones.append(node2zone[node_ids[-1]])
node_counts[node_ids[-1]] += 1
zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
(max_count, over)
min_count = min(zone_counts)
under = 100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
(min_count, under)
::
117186: Desired data ids per node
351282: Most data ids on one node, 199.76% over
15965: Least data ids on one node, 86.38% under
1875000: Desired data ids per zone
2248496: Most data ids in one zone, 19.92% over
1378013: Least data ids in one zone, 26.51% under
This test took over 15 minutes to run! Unfortunately, this method also
gives much less control over the distribution. To get better distribution,
you have to add more virtual nodes, which eats up more memory and takes
even more time to build the ring and perform distinct node lookups. The
most common operation, data id lookup, can be improved (by predetermining
each virtual nodes failover nodes, for instance) but it starts off so
far behind our first approach that well just stick with that.
In the next part of this series, well start to wrap all this up into
a useful Python module.
Part 5
======
In Part 4 of this series, we ended up with a multiple copy, distinctly
zoned ring. Or at least the start of it. In this final part well package
the code up into a useable Python module and then add one last feature.
First, lets separate the ring itself from the building of the data for
the ring and its testing.
.. code-block:: python
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time
class Ring(object):
def __init__(self, nodes, part2node, replicas):
self.nodes = nodes
self.part2node = part2node
self.replicas = replicas
partition_power = 1
while 2 ** partition_power < len(part2node):
partition_power += 1
if len(part2node) != 2 ** partition_power:
raise Exception("part2node's length is not an "
"exact power of 2")
self.partition_shift = 32 - partition_power
def get_nodes(self, data_id):
data_id = str(data_id)
part = unpack_from('>I',
md5(data_id).digest())[0] >> self.partition_shift
node_ids = [self.part2node[part]]
zones = [self.nodes[node_ids[0]]]
for replica in xrange(1, self.replicas):
while self.part2node[part] in node_ids and \
self.nodes[self.part2node[part]] in zones:
part += 1
if part >= len(self.part2node):
part = 0
node_ids.append(self.part2node[part])
zones.append(self.nodes[node_ids[-1]])
return [self.nodes[n] for n in node_ids]
def build_ring(nodes, partition_power, replicas):
begin = time()
part2node = array('H')
for part in xrange(2 ** partition_power):
part2node.append(part % len(nodes))
shuffle(part2node)
ring = Ring(nodes, part2node, replicas)
print '%.02fs to build ring' % (time() - begin)
return ring
def test_ring(ring):
begin = time()
DATA_ID_COUNT = 10000000
node_counts = {}
zone_counts = {}
for data_id in xrange(DATA_ID_COUNT):
for node in ring.get_nodes(data_id):
node_counts[node['id']] = \
node_counts.get(node['id'], 0) + 1
zone_counts[node['zone']] = \
zone_counts.get(node['zone'], 0) + 1
print '%ds to test ring' % (time() - begin)
desired_count = \
DATA_ID_COUNT / len(ring.nodes) * REPLICAS
print '%d: Desired data ids per node' % desired_count
max_count = max(node_counts.itervalues())
over = \
100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids on one node, %.02f%% over' % \
(max_count, over)
min_count = min(node_counts.itervalues())
under = \
100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids on one node, %.02f%% under' % \
(min_count, under)
zone_count = \
len(set(n['zone'] for n in ring.nodes.itervalues()))
desired_count = \
DATA_ID_COUNT / zone_count * ring.replicas
print '%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts.itervalues())
over = \
100.0 * (max_count - desired_count) / desired_count
print '%d: Most data ids in one zone, %.02f%% over' % \
(max_count, over)
min_count = min(zone_counts.itervalues())
under = \
100.0 * (desired_count - min_count) / desired_count
print '%d: Least data ids in one zone, %.02f%% under' % \
(min_count, under)
if __name__ == '__main__':
PARTITION_POWER = 16
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
nodes = {}
while len(nodes) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
node_id = len(nodes)
nodes[node_id] = {'id': node_id, 'zone': zone}
zone += 1
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
test_ring(ring)
::
0.06s to build ring
82s to test ring
117186: Desired data ids per node
118773: Most data ids on one node, 1.35% over
115801: Least data ids on one node, 1.18% under
1875000: Desired data ids per zone
1878339: Most data ids in one zone, 0.18% over
1869914: Least data ids in one zone, 0.27% under
It takes a bit longer to test our ring, but thats mostly because of
the switch to dictionaries from arrays for various items. Having node
dictionaries is nice because you can attach any node information you
want directly there (ip addresses, tcp ports, drive paths, etc.). But
were still on track for further testing; our distribution is still good.
Now, lets add our one last feature to our ring: the concept of weights.
Weights are useful because the nodes you add later in a rings life are
likely to have more capacity than those you have at the outset. For this
test, well make half our nodes have twice the weight. Well have to
change build_ring to give more partitions to the nodes with more weight
and well change test_ring to take into account these weights. Since
weve changed so much Ill just post the entire module again:
.. code-block:: python
from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time
class Ring(object):
def __init__(self, nodes, part2node, replicas):
self.nodes = nodes
self.part2node = part2node
self.replicas = replicas
partition_power = 1
while 2 ** partition_power < len(part2node):
partition_power += 1
if len(part2node) != 2 ** partition_power:
raise Exception("part2node's length is not an "
"exact power of 2")
self.partition_shift = 32 - partition_power
def get_nodes(self, data_id):
data_id = str(data_id)
part = unpack_from('>I',
md5(data_id).digest())[0] >> self.partition_shift
node_ids = [self.part2node[part]]
zones = [self.nodes[node_ids[0]]]
for replica in xrange(1, self.replicas):
while self.part2node[part] in node_ids and \
self.nodes[self.part2node[part]] in zones:
part += 1
if part >= len(self.part2node):
part = 0
node_ids.append(self.part2node[part])
zones.append(self.nodes[node_ids[-1]])
return [self.nodes[n] for n in node_ids]
def build_ring(nodes, partition_power, replicas):
begin = time()
parts = 2 ** partition_power
total_weight = \
float(sum(n['weight'] for n in nodes.itervalues()))
for node in nodes.itervalues():
node['desired_parts'] = \
parts / total_weight * node['weight']
part2node = array('H')
for part in xrange(2 ** partition_power):
for node in nodes.itervalues():
if node['desired_parts'] >= 1:
node['desired_parts'] -= 1
part2node.append(node['id'])
break
else:
for node in nodes.itervalues():
if node['desired_parts'] >= 0:
node['desired_parts'] -= 1
part2node.append(node['id'])
break
shuffle(part2node)
ring = Ring(nodes, part2node, replicas)
print '%.02fs to build ring' % (time() - begin)
return ring
def test_ring(ring):
begin = time()
DATA_ID_COUNT = 10000000
node_counts = {}
zone_counts = {}
for data_id in xrange(DATA_ID_COUNT):
for node in ring.get_nodes(data_id):
node_counts[node['id']] = \
node_counts.get(node['id'], 0) + 1
zone_counts[node['zone']] = \
zone_counts.get(node['zone'], 0) + 1
print '%ds to test ring' % (time() - begin)
total_weight = float(sum(n['weight'] for n in
ring.nodes.itervalues()))
max_over = 0
max_under = 0
for node in ring.nodes.itervalues():
desired = DATA_ID_COUNT * REPLICAS * \
node['weight'] / total_weight
diff = node_counts[node['id']] - desired
if diff > 0:
over = 100.0 * diff / desired
if over > max_over:
max_over = over
else:
under = 100.0 * (-diff) / desired
if under > max_under:
max_under = under
print '%.02f%% max node over' % max_over
print '%.02f%% max node under' % max_under
max_over = 0
max_under = 0
for zone in set(n['zone'] for n in
ring.nodes.itervalues()):
zone_weight = sum(n['weight'] for n in
ring.nodes.itervalues() if n['zone'] == zone)
desired = DATA_ID_COUNT * REPLICAS * \
zone_weight / total_weight
diff = zone_counts[zone] - desired
if diff > 0:
over = 100.0 * diff / desired
if over > max_over:
max_over = over
else:
under = 100.0 * (-diff) / desired
if under > max_under:
max_under = under
print '%.02f%% max zone over' % max_over
print '%.02f%% max zone under' % max_under
if __name__ == '__main__':
PARTITION_POWER = 16
REPLICAS = 3
NODE_COUNT = 256
ZONE_COUNT = 16
nodes = {}
while len(nodes) < NODE_COUNT:
zone = 0
while zone < ZONE_COUNT and len(nodes) < NODE_COUNT:
node_id = len(nodes)
nodes[node_id] = {'id': node_id, 'zone': zone,
'weight': 1.0 + (node_id % 2)}
zone += 1
ring = build_ring(nodes, PARTITION_POWER, REPLICAS)
test_ring(ring)
::
0.88s to build ring
86s to test ring
1.66% max over
1.46% max under
0.28% max zone over
0.23% max zone under
So things are still good, even though we have differently weighted nodes.
I ran another test with this code using random weights from 1 to 100 and
got over/under values for nodes of 7.35%/18.12% and zones of 0.24%/0.22%,
still pretty good considering the crazy weight ranges.
Summary
=======
Hopefully this series has been a good introduction to building a ring.
This code is essentially how the OpenStack Swift ring works, except that
Swifts ring has lots of additional optimizations, such as storing each
replica assignment separately, and lots of extra features for building,
validating, and otherwise working with rings.