diff --git a/doc/source/index.rst b/doc/source/index.rst index 8f045cfb18..c648d0af4f 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -58,6 +58,7 @@ Overview and Concepts crossdomain overview_erasure_code overview_backing_store + ring_background associated_projects Developer Documentation diff --git a/doc/source/overview_ring.rst b/doc/source/overview_ring.rst index a1a72d8508..181b2f143c 100644 --- a/doc/source/overview_ring.rst +++ b/doc/source/overview_ring.rst @@ -426,3 +426,5 @@ for rings that were close to balanceable, like 3 machines with 60TB, 60TB, and didn't always get it. After that, overload was added to the ring builder so that operators could choose a balance between dispersion and device weights. In time the overload concept was improved and made more accurate. + +For more background on consistent hashing rings, please see :doc:`ring_background`. diff --git a/doc/source/ring_background.rst b/doc/source/ring_background.rst new file mode 100644 index 0000000000..ed282401b3 --- /dev/null +++ b/doc/source/ring_background.rst @@ -0,0 +1,957 @@ +================================== +Building a Consistent Hashing Ring +================================== + +--------------------- +Authored by Greg Holt +--------------------- + +This is compilation of five posts I made earlier discussing how to build +a consistent hashing ring. The posts seemed to be accessed quite frequently, +so I've gathered them all here on one page for easier reading. + +Part 1 +====== +“Consistent Hashing” is a term used to describe a process where data is +distributed using a hashing algorithm to determine its location. Using +only the hash of the id of the data you can determine exactly where that +data should be. This mapping of hashes to locations is usually termed a +“ring”. + +Probably the simplest hash is just a modulus of the id. For instance, if +all ids are numbers and you have two machines you wish to distribute data +to, you could just put all odd numbered ids on one machine and even numbered +ids on the other. Assuming you have a balanced number of odd and even +numbered ids, and a balanced data size per id, your data would be balanced +between the two machines. + +Since data ids are often textual names and not numbers, like paths for +files or URLs, it makes sense to use a “real” hashing algorithm to convert +the names to numbers first. Using MD5 for instance, the hash of the name +‘mom.png’ is ‘4559a12e3e8da7c2186250c2f292e3af’ and the hash of ‘dad.png’ +is ‘096edcc4107e9e18d6a03a43b3853bea’. Now, using the modulus, we can +place ‘mom.jpg’ on the odd machine and ‘dad.png’ on the even one. Another +benefit of using a hashing algorithm like MD5 is that the resulting hashes +have a known even distribution, meaning your ids will be evenly distributed +without worrying about keeping the id values themselves evenly distributed. + +Here is a simple example of this in action: + +.. code-block:: python + + from hashlib import md5 + from struct import unpack_from + + NODE_COUNT = 100 + DATA_ID_COUNT = 10000000 + + node_counts = [0] * NODE_COUNT + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + # This just pulls part of the hash out as an integer + hsh = unpack_from('>I', md5(data_id).digest())[0] + node_id = hsh % NODE_COUNT + node_counts[node_id] += 1 + desired_count = DATA_ID_COUNT / NODE_COUNT + print '%d: Desired data ids per node' % desired_count + max_count = max(node_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids on one node, %.02f%% over' % \ + (max_count, over) + min_count = min(node_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids on one node, %.02f%% under' % \ + (min_count, under) + +:: + + 100000: Desired data ids per node + 100695: Most data ids on one node, 0.69% over + 99073: Least data ids on one node, 0.93% under + +So that’s not bad at all; less than a percent over/under for distribution +per node. In the next part of this series we’ll examine where modulus +distribution causes problems and how to improve our ring to overcome them. + +Part 2 +====== +In Part 1 of this series, we did a simple test of using the modulus of a +hash to locate data. We saw very good distribution, but that’s only part +of the story. Distributed systems not only need to distribute load, but +they often also need to grow as more and more data is placed in it. + +So let’s imagine we have a 100 node system up and running using our +previous algorithm, but it’s starting to get full so we want to add +another node. When we add that 101st node to our algorithm we notice +that many ids now map to different nodes than they previously did. +We’re going to have to shuffle a ton of data around our system to get +it all into place again. + +Let’s examine what’s happened on a much smaller scale: just 2 nodes +again, node 0 gets even ids and node 1 gets odd ids. So data id 100 +would map to node 0, data id 101 to node 1, data id 102 to node 0, etc. +This is simply node = id % 2. Now we add a third node (node 2) for more +space, so we want node = id % 3. So now data id 100 maps to node id 1, +data id 101 to node 2, and data id 102 to node 0. So we have to move +data for 2 of our 3 ids so they can be found again. + +Let’s examine this at a larger scale: + +.. code-block:: python + + from hashlib import md5 + from struct import unpack_from + + NODE_COUNT = 100 + NEW_NODE_COUNT = 101 + DATA_ID_COUNT = 10000000 + + moved_ids = 0 + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + hsh = unpack_from('>I', md5(str(data_id)).digest())[0] + node_id = hsh % NODE_COUNT + new_node_id = hsh % NEW_NODE_COUNT + if node_id != new_node_id: + moved_ids += 1 + percent_moved = 100.0 * moved_ids / DATA_ID_COUNT + print '%d ids moved, %.02f%%' % (moved_ids, percent_moved) + +:: + + 9900989 ids moved, 99.01% + +Wow, that’s severe. We’d have to shuffle around 99% of our data just +to increase our capacity 1%! We need a new algorithm that combats this +behavior. + +This is where the “ring” really comes in. We can assign ranges of hashes +directly to nodes and then use an algorithm that minimizes the changes +to those ranges. Back to our small scale, let’s say our ids range from 0 +to 999. We have two nodes and we’ll assign data ids 0–499 to node 0 and +500–999 to node 1. Later, when we add node 2, we can take half the data +ids from node 0 and half from node 1, minimizing the amount of data that +needs to move. + +Let’s examine this at a larger scale: + +.. code-block:: python + + from bisect import bisect_left + from hashlib import md5 + from struct import unpack_from + + NODE_COUNT = 100 + NEW_NODE_COUNT = 101 + DATA_ID_COUNT = 10000000 + + node_range_starts = [] + for node_id in xrange(NODE_COUNT): + node_range_starts.append(DATA_ID_COUNT / + NODE_COUNT * node_id) + new_node_range_starts = [] + for new_node_id in xrange(NEW_NODE_COUNT): + new_node_range_starts.append(DATA_ID_COUNT / + NEW_NODE_COUNT * new_node_id) + moved_ids = 0 + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + hsh = unpack_from('>I', md5(str(data_id)).digest())[0] + node_id = bisect_left(node_range_starts, + hsh % DATA_ID_COUNT) % NODE_COUNT + new_node_id = bisect_left(new_node_range_starts, + hsh % DATA_ID_COUNT) % NEW_NODE_COUNT + if node_id != new_node_id: + moved_ids += 1 + percent_moved = 100.0 * moved_ids / DATA_ID_COUNT + print '%d ids moved, %.02f%%' % (moved_ids, percent_moved) + +:: + + 4901707 ids moved, 49.02% + +Okay, that is better. But still, moving 50% of our data to add 1% capacity +is not very good. If we examine what happened more closely we’ll see what +is an “accordion effect”. We shrunk node 0’s range a bit to give to the +new node, but that shifted all the other node’s ranges by the same amount. + +We can minimize the change to a node’s assigned range by assigning several +smaller ranges instead of the single broad range we were before. This can +be done by creating “virtual nodes” for each node. So 100 nodes might have +1000 virtual nodes. Let’s examine how that might work. + +.. code-block:: python + + from bisect import bisect_left + from hashlib import md5 + from struct import unpack_from + + NODE_COUNT = 100 + DATA_ID_COUNT = 10000000 + VNODE_COUNT = 1000 + + vnode_range_starts = [] + vnode2node = [] + for vnode_id in xrange(VNODE_COUNT): + vnode_range_starts.append(DATA_ID_COUNT / + VNODE_COUNT * vnode_id) + vnode2node.append(vnode_id % NODE_COUNT) + new_vnode2node = list(vnode2node) + new_node_id = NODE_COUNT + NEW_NODE_COUNT = NODE_COUNT + 1 + vnodes_to_reassign = VNODE_COUNT / NEW_NODE_COUNT + while vnodes_to_reassign > 0: + for node_to_take_from in xrange(NODE_COUNT): + for vnode_id, node_id in enumerate(new_vnode2node): + if node_id == node_to_take_from: + new_vnode2node[vnode_id] = new_node_id + vnodes_to_reassign -= 1 + break + if vnodes_to_reassign <= 0: + break + moved_ids = 0 + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + hsh = unpack_from('>I', md5(str(data_id)).digest())[0] + vnode_id = bisect_left(vnode_range_starts, + hsh % DATA_ID_COUNT) % VNODE_COUNT + node_id = vnode2node[vnode_id] + new_node_id = new_vnode2node[vnode_id] + if node_id != new_node_id: + moved_ids += 1 + percent_moved = 100.0 * moved_ids / DATA_ID_COUNT + print '%d ids moved, %.02f%%' % (moved_ids, percent_moved) + +:: + + 90423 ids moved, 0.90% + +There we go, we added 1% capacity and only moved 0.9% of existing data. +The vnode_range_starts list seems a bit out of place though. It’s values +are calculated and never change for the lifetime of the cluster, so let’s +optimize that out. + +.. code-block:: python + + from bisect import bisect_left + from hashlib import md5 + from struct import unpack_from + + NODE_COUNT = 100 + DATA_ID_COUNT = 10000000 + VNODE_COUNT = 1000 + + vnode2node = [] + for vnode_id in xrange(VNODE_COUNT): + vnode2node.append(vnode_id % NODE_COUNT) + new_vnode2node = list(vnode2node) + new_node_id = NODE_COUNT + vnodes_to_reassign = VNODE_COUNT / (NODE_COUNT + 1) + while vnodes_to_reassign > 0: + for node_to_take_from in xrange(NODE_COUNT): + for vnode_id, node_id in enumerate(vnode2node): + if node_id == node_to_take_from: + vnode2node[vnode_id] = new_node_id + vnodes_to_reassign -= 1 + break + if vnodes_to_reassign <= 0: + break + moved_ids = 0 + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + hsh = unpack_from('>I', md5(str(data_id)).digest())[0] + vnode_id = hsh % VNODE_COUNT + node_id = vnode2node[vnode_id] + new_node_id = new_vnode2node[vnode_id] + if node_id != new_node_id: + moved_ids += 1 + percent_moved = 100.0 * moved_ids / DATA_ID_COUNT + print '%d ids moved, %.02f%%' % (moved_ids, percent_moved) + +:: + + 89841 ids moved, 0.90% + +There we go. In the next part of this series, will further examine the +algorithm’s limitations and how to improve on it. + +Part 3 +====== +In Part 2 of this series, we reached an algorithm that performed well +even when adding new nodes to the cluster. We used 1000 virtual nodes +that could be independently assigned to nodes, allowing us to minimize +the amount of data moved when a node was added. + +The number of virtual nodes puts a cap on how many real nodes you can +have. For example, if you have 1000 virtual nodes and you try to add a +1001st real node, you can’t assign a virtual node to it without leaving +another real node with no assignment, leaving you with just 1000 active +real nodes still. + +Unfortunately, the number of virtual nodes created at the beginning can +never change for the life of the cluster without a lot of careful work. +For example, you could double the virtual node count by splitting each +existing virtual node in half and assigning both halves to the same real +node. However, if the real node uses the virtual node’s id to optimally +store the data (for example, all data might be stored in /[virtual node +id]/[data id]) it would have to move data around locally to reflect the +change. And it would have to resolve data using both the new and old +locations while the moves were taking place, making atomic operations +difficult or impossible. + +Let’s continue with this assumption that changing the virtual node +count is more work than it’s worth, but keep in mind that some applications +might be fine with this. + +The easiest way to deal with this limitation is to make the limit high +enough that it won’t matter. For instance, if we decide our cluster will +never exceed 60,000 real nodes, we can just make 60,000 virtual nodes. + +Also, we should include in our calculations the relative size of our +nodes. For instance, a year from now we might have real nodes that can +handle twice the capacity of our current nodes. So we’d want to assign +twice the virtual nodes to those future nodes, so maybe we should raise +our virtual node estimate to 120,000. + +A good rule to follow might be to calculate 100 virtual nodes to each +real node at maximum capacity. This would allow you to alter the load +on any given node by 1%, even at max capacity, which is pretty fine +tuning. So now we’re at 6,000,000 virtual nodes for a max capacity cluster +of 60,000 real nodes. + +6 million virtual nodes seems like a lot, and it might seem like we’d +use up way too much memory. But the only structure this affects is the +virtual node to real node mapping. The base amount of memory required +would be 6 million times 2 bytes (to store a real node id from 0 to +65,535). 12 megabytes of memory just isn’t that much to use these days. + +Even with all the overhead of flexible data types, things aren’t that +bad. I changed the code from the previous part in this series to have +60,000 real and 6,000,000 virtual nodes, changed the list to an array(‘H’), +and python topped out at 27m of resident memory – and that includes two +rings. + +To change terminology a bit, we’re going to start calling these virtual +nodes “partitions”. This will make it a bit easier to discern between the +two types of nodes we’ve been talking about so far. Also, it makes sense +to talk about partitions as they are really just unchanging sections +of the hash space. + +We’re also going to always keep the partition count a power of two. This +makes it easy to just use bit manipulation on the hash to determine the +partition rather than modulus. It isn’t much faster, but it is a little. +So, here’s our updated ring code, using 8,388,608 (2 ** 23) partitions +and 65,536 nodes. We’ve upped the sample data id set and checked the +distribution to make sure we haven’t broken anything. + +.. code-block:: python + + from array import array + from hashlib import md5 + from struct import unpack_from + + PARTITION_POWER = 23 + PARTITION_SHIFT = 32 - PARTITION_POWER + NODE_COUNT = 65536 + DATA_ID_COUNT = 100000000 + + part2node = array('H') + for part in xrange(2 ** PARTITION_POWER): + part2node.append(part % NODE_COUNT) + node_counts = [0] * NODE_COUNT + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + part = unpack_from('>I', + md5(str(data_id)).digest())[0] >> PARTITION_SHIFT + node_id = part2node[part] + node_counts[node_id] += 1 + desired_count = DATA_ID_COUNT / NODE_COUNT + print '%d: Desired data ids per node' % desired_count + max_count = max(node_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids on one node, %.02f%% over' % \ + (max_count, over) + min_count = min(node_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids on one node, %.02f%% under' % \ + (min_count, under) + +:: + + 1525: Desired data ids per node + 1683: Most data ids on one node, 10.36% over + 1360: Least data ids on one node, 10.82% under + +Hmm. +–10% seems a bit high, but I reran with 65,536 partitions and +256 nodes and got +–0.4% so it’s just that our sample size (100m) is +too small for our number of partitions (8m). It’ll take way too long +to run experiments with an even larger sample size, so let’s reduce +back down to these lesser numbers. (To be certain, I reran at the full +version with a 10 billion data id sample set and got +–1%, but it took +6.5 hours to run.) + +In the next part of this series, we’ll talk about how to increase the +durability of our data in the cluster. + +Part 4 +====== +In Part 3 of this series, we just further discussed partitions (virtual +nodes) and cleaned up our code a bit based on that. Now, let’s talk +about how to increase the durability and availability of our data in the +cluster. + +For many distributed data stores, durability is quite important. Either +RAID arrays or individually distinct copies of data are required. While +RAID will increase the durability, it does nothing to increase the +availability – if the RAID machine crashes, the data may be safe but +inaccessible until repairs are done. If we keep distinct copies of the +data on different machines and a machine crashes, the other copies will +still be available while we repair the broken machine. + +An easy way to gain this multiple copy durability/availability is to +just use multiple rings and groups of nodes. For instance, to achieve +the industry standard of three copies, you’d split the nodes into three +groups and each group would have its own ring and each would receive a +copy of each data item. This can work well enough, but has the drawback +that expanding capacity requires adding three nodes at a time and that +losing one node essentially lowers capacity by three times that node’s +capacity. + +Instead, let’s use a different, but common, approach of meeting our +requirements with a single ring. This can be done by walking the ring +from the starting point and looking for additional distinct nodes. +Here’s code that supports a variable number of replicas (set to 3 for +testing): + +.. code-block:: python + + from array import array + from hashlib import md5 + from struct import unpack_from + + REPLICAS = 3 + PARTITION_POWER = 16 + PARTITION_SHIFT = 32 - PARTITION_POWER + PARTITION_MAX = 2 ** PARTITION_POWER - 1 + NODE_COUNT = 256 + DATA_ID_COUNT = 10000000 + + part2node = array('H') + for part in xrange(2 ** PARTITION_POWER): + part2node.append(part % NODE_COUNT) + node_counts = [0] * NODE_COUNT + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + part = unpack_from('>I', + md5(str(data_id)).digest())[0] >> PARTITION_SHIFT + node_ids = [part2node[part]] + node_counts[node_ids[0]] += 1 + for replica in xrange(1, REPLICAS): + while part2node[part] in node_ids: + part += 1 + if part > PARTITION_MAX: + part = 0 + node_ids.append(part2node[part]) + node_counts[node_ids[-1]] += 1 + desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS + print '%d: Desired data ids per node' % desired_count + max_count = max(node_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids on one node, %.02f%% over' % \ + (max_count, over) + min_count = min(node_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids on one node, %.02f%% under' % \ + (min_count, under) + +:: + + 117186: Desired data ids per node + 118133: Most data ids on one node, 0.81% over + 116093: Least data ids on one node, 0.93% under + +That’s pretty good; less than 1% over/under. While this works well, +there are a couple of problems. + +First, because of how we’ve initially assigned the partitions to nodes, +all the partitions for a given node have their extra copies on the same +other two nodes. The problem here is that when a machine fails, the load +on these other nodes will jump by that amount. It’d be better if we +initially shuffled the partition assignment to distribute the failover +load better. + +The other problem is a bit harder to explain, but deals with physical +separation of machines. Imagine you can only put 16 machines in a rack +in your datacenter. The 256 nodes we’ve been using would fill 16 racks. +With our current code, if a rack goes out (power problem, network issue, +etc.) there is a good chance some data will have all three copies in that +rack, becoming inaccessible. We can fix this shortcoming by adding the +concept of zones to our nodes, and then ensuring that replicas are stored +in distinct zones. + +.. code-block:: python + + from array import array + from hashlib import md5 + from random import shuffle + from struct import unpack_from + + REPLICAS = 3 + PARTITION_POWER = 16 + PARTITION_SHIFT = 32 - PARTITION_POWER + PARTITION_MAX = 2 ** PARTITION_POWER - 1 + NODE_COUNT = 256 + ZONE_COUNT = 16 + DATA_ID_COUNT = 10000000 + + node2zone = [] + while len(node2zone) < NODE_COUNT: + zone = 0 + while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT: + node2zone.append(zone) + zone += 1 + part2node = array('H') + for part in xrange(2 ** PARTITION_POWER): + part2node.append(part % NODE_COUNT) + shuffle(part2node) + node_counts = [0] * NODE_COUNT + zone_counts = [0] * ZONE_COUNT + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + part = unpack_from('>I', + md5(str(data_id)).digest())[0] >> PARTITION_SHIFT + node_ids = [part2node[part]] + zones = [node2zone[node_ids[0]]] + node_counts[node_ids[0]] += 1 + zone_counts[zones[0]] += 1 + for replica in xrange(1, REPLICAS): + while part2node[part] in node_ids and \ + node2zone[part2node[part]] in zones: + part += 1 + if part > PARTITION_MAX: + part = 0 + node_ids.append(part2node[part]) + zones.append(node2zone[node_ids[-1]]) + node_counts[node_ids[-1]] += 1 + zone_counts[zones[-1]] += 1 + desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS + print '%d: Desired data ids per node' % desired_count + max_count = max(node_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids on one node, %.02f%% over' % \ + (max_count, over) + min_count = min(node_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids on one node, %.02f%% under' % \ + (min_count, under) + desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS + print '%d: Desired data ids per zone' % desired_count + max_count = max(zone_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids in one zone, %.02f%% over' % \ + (max_count, over) + min_count = min(zone_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids in one zone, %.02f%% under' % \ + (min_count, under) + +:: + + 117186: Desired data ids per node + 118782: Most data ids on one node, 1.36% over + 115632: Least data ids on one node, 1.33% under + 1875000: Desired data ids per zone + 1878533: Most data ids in one zone, 0.19% over + 1869070: Least data ids in one zone, 0.32% under + +So the shuffle and zone distinctions affected our distribution some, +but still definitely good enough. This test took about 64 seconds to +run on my machine. + +There’s a completely alternate, and quite common, way of accomplishing +these same requirements. This alternate method doesn’t use partitions +at all, but instead just assigns anchors to the nodes within the hash +space. Finding the first node for a given hash just involves walking +this anchor ring for the next node, and finding additional nodes works +similarly as before. To attain the equivalent of our virtual nodes, +each real node is assigned multiple anchors. + +.. code-block:: python + + from bisect import bisect_left + from hashlib import md5 + from struct import unpack_from + + REPLICAS = 3 + NODE_COUNT = 256 + ZONE_COUNT = 16 + DATA_ID_COUNT = 10000000 + VNODE_COUNT = 100 + + node2zone = [] + while len(node2zone) < NODE_COUNT: + zone = 0 + while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT: + node2zone.append(zone) + zone += 1 + hash2index = [] + index2node = [] + for node in xrange(NODE_COUNT): + for vnode in xrange(VNODE_COUNT): + hsh = unpack_from('>I', md5(str(node)).digest())[0] + index = bisect_left(hash2index, hsh) + if index > len(hash2index): + index = 0 + hash2index.insert(index, hsh) + index2node.insert(index, node) + node_counts = [0] * NODE_COUNT + zone_counts = [0] * ZONE_COUNT + for data_id in xrange(DATA_ID_COUNT): + data_id = str(data_id) + hsh = unpack_from('>I', md5(str(data_id)).digest())[0] + index = bisect_left(hash2index, hsh) + if index >= len(hash2index): + index = 0 + node_ids = [index2node[index]] + zones = [node2zone[node_ids[0]]] + node_counts[node_ids[0]] += 1 + zone_counts[zones[0]] += 1 + for replica in xrange(1, REPLICAS): + while index2node[index] in node_ids and \ + node2zone[index2node[index]] in zones: + index += 1 + if index >= len(hash2index): + index = 0 + node_ids.append(index2node[index]) + zones.append(node2zone[node_ids[-1]]) + node_counts[node_ids[-1]] += 1 + zone_counts[zones[-1]] += 1 + desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS + print '%d: Desired data ids per node' % desired_count + max_count = max(node_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids on one node, %.02f%% over' % \ + (max_count, over) + min_count = min(node_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids on one node, %.02f%% under' % \ + (min_count, under) + desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS + print '%d: Desired data ids per zone' % desired_count + max_count = max(zone_counts) + over = 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids in one zone, %.02f%% over' % \ + (max_count, over) + min_count = min(zone_counts) + under = 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids in one zone, %.02f%% under' % \ + (min_count, under) + +:: + + 117186: Desired data ids per node + 351282: Most data ids on one node, 199.76% over + 15965: Least data ids on one node, 86.38% under + 1875000: Desired data ids per zone + 2248496: Most data ids in one zone, 19.92% over + 1378013: Least data ids in one zone, 26.51% under + +This test took over 15 minutes to run! Unfortunately, this method also +gives much less control over the distribution. To get better distribution, +you have to add more virtual nodes, which eats up more memory and takes +even more time to build the ring and perform distinct node lookups. The +most common operation, data id lookup, can be improved (by predetermining +each virtual nodes’ failover nodes, for instance) but it starts off so +far behind our first approach that we’ll just stick with that. + +In the next part of this series, we’ll start to wrap all this up into +a useful Python module. + +Part 5 +====== +In Part 4 of this series, we ended up with a multiple copy, distinctly +zoned ring. Or at least the start of it. In this final part we’ll package +the code up into a useable Python module and then add one last feature. +First, let’s separate the ring itself from the building of the data for +the ring and its testing. + +.. code-block:: python + + from array import array + from hashlib import md5 + from random import shuffle + from struct import unpack_from + from time import time + + class Ring(object): + + def __init__(self, nodes, part2node, replicas): + self.nodes = nodes + self.part2node = part2node + self.replicas = replicas + partition_power = 1 + while 2 ** partition_power < len(part2node): + partition_power += 1 + if len(part2node) != 2 ** partition_power: + raise Exception("part2node's length is not an " + "exact power of 2") + self.partition_shift = 32 - partition_power + + def get_nodes(self, data_id): + data_id = str(data_id) + part = unpack_from('>I', + md5(data_id).digest())[0] >> self.partition_shift + node_ids = [self.part2node[part]] + zones = [self.nodes[node_ids[0]]] + for replica in xrange(1, self.replicas): + while self.part2node[part] in node_ids and \ + self.nodes[self.part2node[part]] in zones: + part += 1 + if part >= len(self.part2node): + part = 0 + node_ids.append(self.part2node[part]) + zones.append(self.nodes[node_ids[-1]]) + return [self.nodes[n] for n in node_ids] + + def build_ring(nodes, partition_power, replicas): + begin = time() + part2node = array('H') + for part in xrange(2 ** partition_power): + part2node.append(part % len(nodes)) + shuffle(part2node) + ring = Ring(nodes, part2node, replicas) + print '%.02fs to build ring' % (time() - begin) + return ring + + def test_ring(ring): + begin = time() + DATA_ID_COUNT = 10000000 + node_counts = {} + zone_counts = {} + for data_id in xrange(DATA_ID_COUNT): + for node in ring.get_nodes(data_id): + node_counts[node['id']] = \ + node_counts.get(node['id'], 0) + 1 + zone_counts[node['zone']] = \ + zone_counts.get(node['zone'], 0) + 1 + print '%ds to test ring' % (time() - begin) + desired_count = \ + DATA_ID_COUNT / len(ring.nodes) * REPLICAS + print '%d: Desired data ids per node' % desired_count + max_count = max(node_counts.itervalues()) + over = \ + 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids on one node, %.02f%% over' % \ + (max_count, over) + min_count = min(node_counts.itervalues()) + under = \ + 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids on one node, %.02f%% under' % \ + (min_count, under) + zone_count = \ + len(set(n['zone'] for n in ring.nodes.itervalues())) + desired_count = \ + DATA_ID_COUNT / zone_count * ring.replicas + print '%d: Desired data ids per zone' % desired_count + max_count = max(zone_counts.itervalues()) + over = \ + 100.0 * (max_count - desired_count) / desired_count + print '%d: Most data ids in one zone, %.02f%% over' % \ + (max_count, over) + min_count = min(zone_counts.itervalues()) + under = \ + 100.0 * (desired_count - min_count) / desired_count + print '%d: Least data ids in one zone, %.02f%% under' % \ + (min_count, under) + + if __name__ == '__main__': + PARTITION_POWER = 16 + REPLICAS = 3 + NODE_COUNT = 256 + ZONE_COUNT = 16 + nodes = {} + while len(nodes) < NODE_COUNT: + zone = 0 + while zone < ZONE_COUNT and len(nodes) < NODE_COUNT: + node_id = len(nodes) + nodes[node_id] = {'id': node_id, 'zone': zone} + zone += 1 + ring = build_ring(nodes, PARTITION_POWER, REPLICAS) + test_ring(ring) + +:: + + 0.06s to build ring + 82s to test ring + 117186: Desired data ids per node + 118773: Most data ids on one node, 1.35% over + 115801: Least data ids on one node, 1.18% under + 1875000: Desired data ids per zone + 1878339: Most data ids in one zone, 0.18% over + 1869914: Least data ids in one zone, 0.27% under + +It takes a bit longer to test our ring, but that’s mostly because of +the switch to dictionaries from arrays for various items. Having node +dictionaries is nice because you can attach any node information you +want directly there (ip addresses, tcp ports, drive paths, etc.). But +we’re still on track for further testing; our distribution is still good. + +Now, let’s add our one last feature to our ring: the concept of weights. +Weights are useful because the nodes you add later in a ring’s life are +likely to have more capacity than those you have at the outset. For this +test, we’ll make half our nodes have twice the weight. We’ll have to +change build_ring to give more partitions to the nodes with more weight +and we’ll change test_ring to take into account these weights. Since +we’ve changed so much I’ll just post the entire module again: + +.. code-block:: python + + from array import array + from hashlib import md5 + from random import shuffle + from struct import unpack_from + from time import time + + class Ring(object): + + def __init__(self, nodes, part2node, replicas): + self.nodes = nodes + self.part2node = part2node + self.replicas = replicas + partition_power = 1 + while 2 ** partition_power < len(part2node): + partition_power += 1 + if len(part2node) != 2 ** partition_power: + raise Exception("part2node's length is not an " + "exact power of 2") + self.partition_shift = 32 - partition_power + + def get_nodes(self, data_id): + data_id = str(data_id) + part = unpack_from('>I', + md5(data_id).digest())[0] >> self.partition_shift + node_ids = [self.part2node[part]] + zones = [self.nodes[node_ids[0]]] + for replica in xrange(1, self.replicas): + while self.part2node[part] in node_ids and \ + self.nodes[self.part2node[part]] in zones: + part += 1 + if part >= len(self.part2node): + part = 0 + node_ids.append(self.part2node[part]) + zones.append(self.nodes[node_ids[-1]]) + return [self.nodes[n] for n in node_ids] + + def build_ring(nodes, partition_power, replicas): + begin = time() + parts = 2 ** partition_power + total_weight = \ + float(sum(n['weight'] for n in nodes.itervalues())) + for node in nodes.itervalues(): + node['desired_parts'] = \ + parts / total_weight * node['weight'] + part2node = array('H') + for part in xrange(2 ** partition_power): + for node in nodes.itervalues(): + if node['desired_parts'] >= 1: + node['desired_parts'] -= 1 + part2node.append(node['id']) + break + else: + for node in nodes.itervalues(): + if node['desired_parts'] >= 0: + node['desired_parts'] -= 1 + part2node.append(node['id']) + break + shuffle(part2node) + ring = Ring(nodes, part2node, replicas) + print '%.02fs to build ring' % (time() - begin) + return ring + + def test_ring(ring): + begin = time() + DATA_ID_COUNT = 10000000 + node_counts = {} + zone_counts = {} + for data_id in xrange(DATA_ID_COUNT): + for node in ring.get_nodes(data_id): + node_counts[node['id']] = \ + node_counts.get(node['id'], 0) + 1 + zone_counts[node['zone']] = \ + zone_counts.get(node['zone'], 0) + 1 + print '%ds to test ring' % (time() - begin) + total_weight = float(sum(n['weight'] for n in + ring.nodes.itervalues())) + max_over = 0 + max_under = 0 + for node in ring.nodes.itervalues(): + desired = DATA_ID_COUNT * REPLICAS * \ + node['weight'] / total_weight + diff = node_counts[node['id']] - desired + if diff > 0: + over = 100.0 * diff / desired + if over > max_over: + max_over = over + else: + under = 100.0 * (-diff) / desired + if under > max_under: + max_under = under + print '%.02f%% max node over' % max_over + print '%.02f%% max node under' % max_under + max_over = 0 + max_under = 0 + for zone in set(n['zone'] for n in + ring.nodes.itervalues()): + zone_weight = sum(n['weight'] for n in + ring.nodes.itervalues() if n['zone'] == zone) + desired = DATA_ID_COUNT * REPLICAS * \ + zone_weight / total_weight + diff = zone_counts[zone] - desired + if diff > 0: + over = 100.0 * diff / desired + if over > max_over: + max_over = over + else: + under = 100.0 * (-diff) / desired + if under > max_under: + max_under = under + print '%.02f%% max zone over' % max_over + print '%.02f%% max zone under' % max_under + + if __name__ == '__main__': + PARTITION_POWER = 16 + REPLICAS = 3 + NODE_COUNT = 256 + ZONE_COUNT = 16 + nodes = {} + while len(nodes) < NODE_COUNT: + zone = 0 + while zone < ZONE_COUNT and len(nodes) < NODE_COUNT: + node_id = len(nodes) + nodes[node_id] = {'id': node_id, 'zone': zone, + 'weight': 1.0 + (node_id % 2)} + zone += 1 + ring = build_ring(nodes, PARTITION_POWER, REPLICAS) + test_ring(ring) + +:: + + 0.88s to build ring + 86s to test ring + 1.66% max over + 1.46% max under + 0.28% max zone over + 0.23% max zone under + +So things are still good, even though we have differently weighted nodes. +I ran another test with this code using random weights from 1 to 100 and +got over/under values for nodes of 7.35%/18.12% and zones of 0.24%/0.22%, +still pretty good considering the crazy weight ranges. + +Summary +======= +Hopefully this series has been a good introduction to building a ring. +This code is essentially how the OpenStack Swift ring works, except that +Swift’s ring has lots of additional optimizations, such as storing each +replica assignment separately, and lots of extra features for building, +validating, and otherwise working with rings.