Tolerating Node Failures


While the content of this book is still valid, the code may not run with latest versions of the tools and libraries, for an updated version of the code check the Riak Core Tutorial

You know computers cannot be trusted, so we may want to run our commands in more than one vnode and wait for a subset (or all of them) to finish before considering the operation to be successful, for this when a command is ran we will send the command to a number of vnodes, let’s call it W and wait for a number of them to succeed, let’s call it N.

To do this we will need to do something similar than what we did with coverage calls, we will need to setup a process that will send the command to a number of vnodes and accumulate the responses or timeout if it takes to long, then send the result back to the caller. We will also need a supervisor for it and to register this supervisor in our main supervisor tree.

Here is a diagram of how it works:

+------+    +---------+    +---------+    +---------+              +------+
|      |    |         |    |         |    |         |remaining = 0 |      |
| Init +--->| Prepare +--->| Execute +--->| Waiting +------------->| Stop |
|      |    |         |    |         |    |         |              |      |
+------+    +---------+    +---------+    +-------+-+              +------+
                                              ^   | |
                                              |   | |        +---------+
                                              +---+ +------->|         |
                                                             | Timeout |
                                      remaining > 0  timeout |         |

Quorum Based Writes and Deletes

To implement quorum based writes and deletes we will introduce two new modules, a gen_fsm implementation called tanodb_write_fsm and its supervisor, tanodb_write_fsm_sup. The supervisor is a simple supervisor behavior so I won’t go into details here other than observing that we add it to the supervisor hierarchy as we did with the coverage supervisor, the gen_fsm is the one that is interesting.

On tanodb_write_fsm:write/6 and tanodb_write_fsm:delete/4 we start a supervisor that calls tanodb_write_fsm:start_link which in turn calls tanodb_write_fsm:init/1, this function initialize the state and moves the state machine to the prepare state.

The state from the fsm contains the following fields:

Identifier for this request
Process Id of the process that did the request
Number of vnodes to send the request to
Minimum number of responses to consider the request successful
The key ({Bucket, Key}) that will be used for the operation
An atom identifying the operation type, it can be write or delete
If action is write then data is the value to write, if it’s delete then it’s not used
A riak_core preflist
Counter for current amount of responses
List of current response values, this field was introduced in the next commit

When we move to the prepare state we build the list of nodes we are going to send the request to using the value of n, we store the list of nodes on the preflist field and move to the execute state.

On the execute state we build the command we want to send depending on the value of the action field and we execute it, then we move to the waiting state.

On the waiting state when we receive a result we increment num_w and add the new response to accum, if num_w is equal to w we send the accumulated results to the requester with the req_id so it can distinguis it from others doing a selective receive.

On the tanodb module the changes are to call the delete and write functions in the write_fsm module and then do the selective receive waiting for the req_id we sent if the response doesn’t come after Timeout milliseconds we return an error.

The changes on the tanodb_vnode module are that the put and delete commands now receive an extra argument, ReqId that is returned in the reply.

Relevant code from tanodb.erl:

delete(Key) ->
    ReqID = make_ref(),
    Timeout = 5000,
    tanodb_write_fsm:delete(?N, Key, self(), ReqID),
    wait_for_reqid(ReqID, Timeout).

put(Key, Value) ->
    ReqID = make_ref(),
    Timeout = 5000,
    tanodb_write_fsm:write(?N, ?W, Key, Value, self(), ReqID),
    wait_for_reqid(ReqID, Timeout).

Test it

We start by listing the bucket’s keys to make sure it’s empty:

(tanodb@> lists:filter(fun ({_, _, []}) -> false;
                                      (_) -> true
                                  element(2, tanodb:keys(<<"mybucket">>))).


Then we put a value on that bucket:

(tanodb@> tanodb:put({<<"mybucket">>, <<"k1">>}, 42).


Now we list the keys again, but this time there’s something different, 3 vnodes returned that they have the key k1, this means that our put wrote to 3 vnodes instead of 1 as before.

(tanodb@> lists:filter(fun ({_, _, []}) -> false;
                                      (_) -> true
                                  element(2, tanodb:keys(<<"mybucket">>))).

  'tanodb@', [<<"k1">>]},
  'tanodb@', [<<"k1">>]},
  'tanodb@', [<<"k1">>]}]

Let’s delete that key to see if it deletes in the 3 vnodes:

(tanodb@> tanodb:delete({<<"mybucket">>, <<"k1">>}).

            {{<<"mybucket">>,<<"k1">>}, {{<<"mybucket">>,<<"k1">>},42}}}]}

Listing the keys from the bucket shows that the key went away from all vnodes:

(tanodb@> lists:filter(fun ({_, _, []}) -> false;
                                      (_) -> true
                                  element(2, tanodb:keys(<<"mybucket">>))).



With quorum based writes we are half there, our values are written to more than one vnode but if a node dies and another takes his work or if we add a new node and the vnodes must be rebalanced we need to handle handoff.

The reasons to start a handoff are:

  • A ring update event for a ring that all other nodes have already seen.
  • A secondary vnode is idle for a period of time and the primary, original owner of the partition is up again.

When this happen riak_core will inform the vnode that handoff is starting, calling handoff_starting, if it returns false it’s cancelled, if it returns true it calls is_empty, that must return false to inform that the vnode has something to handoff (it’s not empty) or true to inform that the vnode is empty, in our case we ask for the first element of the ets table and if it’s the special value ‘$end_of_table’ we know it’s empty, if it returns true the handoff is considered finished, if false then a call is done to handle_handoff_command passing as first parameter an opaque structure that contains two fields we are insterested in, foldfun and acc0, they can be unpacked with a macro like this:

handle_handoff_command(?FOLD_REQ{foldfun=Fun, acc0=Acc0}, _Sender, State) ->

The FOLD_REQ macro is defined in the riak_core_vnode.hrl header file which we include.

This function must iterate through all the keys it stores and for each of them call foldfun with the key as first argument, the value as second argument and the latest acc0 value as third.

The result of the function call is the new Acc0 you must pass to the next call to foldfun, the last Acc0 must be returned by the handle_handoff_command.

For each call to Fun(Key, Entry, AccIn0) riak_core will send it to the new vnode, to do that it must encode the data before sending, it does this by calling encode_handoff_item(Key, Value), where you must encode the data before sending it.

When the value is received by the new vnode it must decode it and do something with it, this is done by the function handle_handoff_data, where we decode the received data and do the appropriate thing with it.

When we sent all the key/values handoff_finished will be called and then delete so we cleanup the data on the old vnode.

You can decide to handle other commands sent to the vnode while the handoff is running, you can choose to do one of the followings:

  • Handle it in the current vnode
  • Forward it to the vnode we are handing off
  • Drop it

What to do depends on the design of you app, all of them have tradeoffs.

The signature of all the responses is:

-callback handle_handoff_command(Request::term(), Sender::sender(), ModState::term()) ->
{reply, Reply::term(), NewModState::term()} |
{noreply, NewModState::term()} |
{async, Work::function(), From::sender(), NewModState::term()} |
{forward, NewModState::term()} |
{drop, NewModState::term()} |
{stop, Reason::term(), NewModState::term()}.

A diagram of the flow is as follows:

+-----------+      +----------+        +----------+
|           | true |          | false  |          |
| Starting  +------> is_empty +--------> fold_req |
|           |      |          |        |          |
+-----+-----+      +----+-----+        +----+-----+
      |                 |                   |
      | false           | true              | ok
      |                 |                   |
+-----v-----+           |              +----v-----+     +--------+
|           |           |              |          |     |        |
| Cancelled |           +--------------> finished +-----> delete |
|           |                          |          |     |        |
+-----------+                          +----------+     +--------+

Relevant code from tanodb_vnode.erl:

handle_handoff_command(?FOLD_REQ{foldfun=FoldFun, acc0=Acc0}, _Sender,
                       State=#state{partition=Partition, table_name=TableName}) ->
    lager:info("fold req ~p", [Partition]),
    AccFinal = ets:foldl(fun ({Key, Val}, AccIn) ->
                                 lager:info("fold fun ~p: ~p", [Key, Val]),
                                 FoldFun(Key, Val, AccIn)
                         end, Acc0, TableName),
    {reply, AccFinal, State};

is_empty(State=#state{table_name=TableName, partition=Partition}) ->
    IsEmpty = (ets:first(TableName) =:= '$end_of_table'),
    lager:info("is_empty ~p: ~p", [Partition, IsEmpty]),
    {IsEmpty, State}.

encode_handoff_item(Key, Value) ->
    term_to_binary({Key, Value}).

handle_handoff_data(BinData, State=#state{table_name=TableName}) ->
    TermData = binary_to_term(BinData),
    lager:info("handoff data received ~p", [TermData]),
    {Key, Value} = TermData,
    ets:insert(TableName, {Key, Value}),
    {reply, ok, State}.

delete(State=#state{table_name=TableName, partition=Partition}) ->
    lager:info("delete ~p", [Partition]),
    {ok, State}.

Test it

To test it we will first start a devrel node, put some values and then join two other nodes and see on the console the handoff happening.

To make sure the nodes don’t know about each other in case you played with clustering already we will start by removing the devrel builds:

rm -rf _build/dev*

And build the nodes again:

make devrel

Now we will start the first node and connect to its console:

make dev1-console

We generate a list of some numbers:

(tanodb1@> Nums = lists:seq(1, 10).


And with it create some bucket names:

(tanodb1@> Buckets = lists:map(fun (N) ->
(tanodb1@>             list_to_binary("bucket-" ++ integer_to_list(N))
(tanodb1@>           end, Nums).


And some key names:

(tanodb1@> Keys = lists:map(fun (N) ->
(tanodb1@>          list_to_binary("key-" ++ integer_to_list(N))
(tanodb1@>        end, Nums).


We create a function to generate a value from a bucket and a key:

(tanodb1@> GenValue = fun (Bucket, Key) ->
(tanodb1@>                    [{bucket, Bucket}, {key, Key}]
(tanodb1@>            end.


And then put some values to the buckets and keys we created:

(tanodb1@> lists:foreach(fun (Bucket) ->
(tanodb1@>   lists:foreach(fun (Key) ->
(tanodb1@>     Val = GenValue(Bucket, Key),
(tanodb1@>       tanodb:put({Bucket, Key}, Val)
(tanodb1@>     end, Keys)
(tanodb1@> end, Buckets).


Now that we have some data let’s start the other two nodes:

make dev2-console

In yet another shell:

make dev3-console

This part should remind you of the first chapter:

make devrel-join
Success: staged join request for 'tanodb2@' to 'tanodb1@'
Success: staged join request for 'tanodb3@' to 'tanodb1@'
make devrel-cluster-plan
=============================== Staged Changes =========================
Action         Details(s)
join           'tanodb2@'
join           'tanodb3@'

NOTE: Applying these changes will result in 1 cluster transition

                         After cluster transition 1/1

================================= Membership ===========================
Status     Ring    Pending    Node
valid     100.0%     34.4%    'tanodb1@'
valid       0.0%     32.8%    'tanodb2@'
valid       0.0%     32.8%    'tanodb3@'
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0

WARNING: Not all replicas will be on distinct nodes

Transfers resulting from cluster changes: 42
  21 transfers from 'tanodb1@' to 'tanodb3@'
  21 transfers from 'tanodb1@' to 'tanodb2@'
make devrel-cluster-commit
Cluster changes committed

On the consoles from the nodes you should see some logs like the following, I will just paste some as example.

On the sending side:

00:17:24.240 [info] Starting ownership transfer of tanodb_vnode from
'tanodb1@' 1118962191081472546749696200048404186924073353216 to
'tanodb2@' 1118962191081472546749696200048404186924073353216

00:17:24.240 [info] fold req 1118962191081472546749696200048404186924073353216
00:17:24.240 [info] fold fun {<<"bucket-1">>,<<"key-1">>}:


00:17:24.241 [info] fold fun {<<"bucket-7">>,<<"key-8">>}:

00:17:24.281 [info] ownership transfer of tanodb_vnode from
'tanodb1@' 1118962191081472546749696200048404186924073353216 to
'tanodb2@' 1118962191081472546749696200048404186924073353216
    completed: sent 575.00 B bytes in 7 of 7 objects in 0.04 seconds
    (13.67 KB/second)

00:17:24.280 [info] handoff finished

00:17:24.285 [info] delete

On the receiving side:

00:13:59.641 [info] handoff starting

00:13:59.641 [info] is_empty
    182687704666362864775460604089535377456991567872: true

00:14:34.259 [info] Receiving handoff data for partition
    tanodb_vnode:68507889249886074290797726533575766546371837952 from

00:14:34.296 [info] handoff data received


00:14:34.297 [info] handoff data received

00:14:34.298 [info] Handoff receiver for partition
    68507889249886074290797726533575766546371837952 exited after
    processing 5 objects from {"",47440}