Mariano Guerra
Ricon 2015 | San Francisco | November 4-6th, 2015
Overview of Little Riak Core Book
Step by Step creation of tanodb, an in memory key value store with a REST API
Will go fast
Don't worry, all is written down in the book
Provide all the tools, features and concepts
So you can focus on the distributed part
Argentinian Software Engineer
Python, Clojure[script], Erlang, Javascript, Efene ;)
Co-Founder of Event Fabric
Build Custom Real Time Dashboards on the Web without Code
Physical Instance
Group of Nodes
Virtual Node, Abstraction 1 Node can have more than 1 VNode
Group of VNodes (64 by default)
rebar3: build tool
relx: rebar3 release
cuttlefish: config generation and validation
rebar3_run: rebar3 run
rebar3_template_riak_core: project template
Install Template
mkdir -p ~/.config/rebar3/templates git clone https://github.com/marianoguerra/rebar3_template_riak_core/ \ ~/.config/rebar3/templates/rebar3_template_riak_core
Create Project
rebar3 new rebar3_riak_core name=tanodb
===> Writing tanodb/.gitignore ===> Writing tanodb/rebar.config ===> Writing tanodb/apps/tanodb/src/tanodb.app.src ===> Writing tanodb/apps/tanodb/src/tanodb.erl ===> Writing tanodb/apps/tanodb/src/tanodb_app.erl ===> Writing tanodb/apps/tanodb/src/tanodb_sup.erl ===> Writing tanodb/apps/tanodb/src/tanodb_console.erl ===> Writing tanodb/apps/tanodb/src/tanodb_vnode.erl ===> Writing tanodb/README.rst ===> Writing tanodb/Makefile ===> Writing tanodb/.editorconfig
===> Writing tanodb/config/nodetool ===> Writing tanodb/config/extended_bin ===> Writing tanodb/config/admin_bin ===> Writing tanodb/config/config.schema ===> Writing tanodb/config/advanced.config ===> Writing tanodb/config/sys.config ===> Writing tanodb/config/vars.config ===> Writing tanodb/config/vars_dev1.config ===> Writing tanodb/config/vars_dev2.config ===> Writing tanodb/config/vars_dev3.config ===> Writing tanodb/config/vm.args ===> Writing tanodb/config/dev1_vm.args ===> Writing tanodb/config/dev2_vm.args ===> Writing tanodb/config/dev3_vm.args
cd tanodb rebar3 release rebar3 run
_build/default/
(tanodb@127.0.0.1)1> tanodb:ping(). {pong,1347321821914426127719021955160323408745312813056} (tanodb@127.0.0.1)1> q().
From now on Partition IDs will look like 13..., but they are that long
make devrel
_build/dev1 _build/dev2 _build/dev3
make dev1-console make dev2-console make dev3-console
make devrel-join
make devrel-status
================================= Membership =============== Status Ring Pending Node ------------------------------------------------------------ valid 100.0% -- 'tanodb1@127.0.0.1' joining 0.0% -- 'tanodb2@127.0.0.1' joining 0.0% -- 'tanodb3@127.0.0.1' ------------------------------------------------------------ Valid:1 / Leaving:0 / Exiting:0 / Joining:2 / Down:0
make devrel-cluster-plan
=============================== Staged Changes ============== Action Details(s) ------------------------------------------------------------- join 'tanodb2@127.0.0.1' join 'tanodb3@127.0.0.1' ------------------------------------------------------------- NOTE: Applying these changes will result in 1 cluster transition
############################################################# After cluster transition 1/1 ############################################################# ================================= Membership ================ Status Ring Pending Node ------------------------------------------------------------- valid 100.0% 34.4% 'tanodb1@127.0.0.1' valid 0.0% 32.8% 'tanodb2@127.0.0.1' valid 0.0% 32.8% 'tanodb3@127.0.0.1' ------------------------------------------------------------- Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0 WARNING: Not all replicas will be on distinct nodes Transfers resulting from cluster changes: 42 21 transfers from 'tanodb1@127.0.0.1' to 'tanodb3@127.0.0.1' 21 transfers from 'tanodb1@127.0.0.1' to 'tanodb2@127.0.0.1'
make devrel-cluster-commit
Cluster changes committed
make devrel-status
================================= Membership ============= Status Ring Pending Node ---------------------------------------------------------- valid 75.0% 34.4% 'tanodb1@127.0.0.1' valid 9.4% 32.8% 'tanodb2@127.0.0.1' valid 7.8% 32.8% 'tanodb3@127.0.0.1' ---------------------------------------------------------- Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
================================= Membership ============== Status Ring Pending Node ----------------------------------------------------------- valid 34.4% -- 'tanodb1@127.0.0.1' valid 32.8% -- 'tanodb2@127.0.0.1' valid 32.8% -- 'tanodb3@127.0.0.1' ----------------------------------------------------------- Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
make devrel-stop
rebar3 as prod release
mkdir vm-ubuntu-1504 cd vm-ubuntu-1504
Vagrantfile:
Vagrant.configure(2) do |config| config.vm.box = "ubuntu/vivid64" config.vm.provider "virtualbox" do |vb| vb.memory = "1024" end end
vagrant up
cd _build/prod/rel tar -czf tanodb.tgz tanodb cd - mv _build/prod/rel/tanodb.tgz vm-ubuntu-1504 export TERM=xterm vagrant ssh cp /vagrant/tanodb.tgz . tar -xzf tanodb.tgz ./tanodb/bin/tanodb console
Add Route in tanodb_app
Create tanodb_http_ping cowboy rest handler
rebar3 release rebar3 run
http localhost:8080/ping
HTTP/1.1 200 OK content-length: 59 content-type: application/json { "pong": "981946412581700398168100746981252653831329677312" }
_build/default/rel/tanodb/etc/tanodb.conf
## Enable/Disable HTTP API ## Default: yes ## Acceptable values: ## - yes or no http.enabled = yes ## port to listen to for HTTP API ## Default: 8080 ## Acceptable values: ## - an integer http.port = 8080 ## number of acceptors to user for HTTP API ## Default: 100 ## Acceptable values: ## - an integer http.acceptors = 100
http localhost:8081/ping
exometer (same as riak_core)
Our metrics and riak_core metrics
(tanodb@127.0.0.1)1> tanodb_metrics:all(). [{tanodb,[ ... {core,[{ping,[{count,0},{one,0}]}]}] (tanodb@127.0.0.1)2> tanodb:ping(). {pong,593735040165679310520246963290989976735222595584} (tanodb@127.0.0.1)3> tanodb_metrics:all(). [{tanodb,[ ... {core,[{ping,[{count,1},{one,1}]}]}]
We use recon
(tanodb@127.0.0.1)1> tanodb_metrics:all(). [{tanodb,[... {node,[{abs,[{process_count,377}, {run_queue,0}, {error_logger_queue_len,0}, {memory_total,30418240}, {memory_procs,11745496}, {memory_atoms,458994}, {memory_bin,232112}, {memory_ets,1470872}]}, {inc,[{bytes_in,11737}, {bytes_out,2470}, {gc_count,7}, {gc_words_reclaimed,29948}, {reductions,2601390}, {scheduler_usage,[{1,0.9291112866248371}, {2,0.04754016011809648}, {3,0.04615958261183974}, {4,0.03682005933534583}]}]}]}, {core,[{ping,[{count,0},{one,0}]}]}]
cowboy_exometer, uses cowboy middleware and response hooks features
(tanodb@127.0.0.1)1> tanodb_metrics:all(). [{tanodb,[ ... {http,[{resp,[{by_code,[{200,[{count,1},{one,1}]}, {201,[{count,0},{one,0}]}, {202,[{count,0},{one,0}]}, ... {400,[...]}, {401,...}, {...}|...]}]}, {req,[{time,[{<<"ping">>, [{n,3}, {mean,44126}, {median,44126}, {min,44126}, {max,44126}, {50,0}, {75,44126}, {90,44126}, {95,44126}, {99,44126}, {999,44126}]}]}, {active,[{value,0},{ms_since_reset,11546}]}, {count,[{<<"ping">>,[{count,1},{one,1}]}]}]}]}, ...]
$ http localhost:8080/metrics
HTTP/1.1 200 OK content-type: application/json { "core": { "ping": { "count": 2, "one": 1 } }, "http": { "req": { "active": { "ms_since_reset": 279958, "value": 1 }, "count": { "metrics": { "count": 1, "one": 0 }, "ping": { "count": 2, "one": 1 } },
"time": { "metrics": { "50": 0, "75": 0, "90": 0, "95": 0, "99": 0, "999": 0, "max": 0, "mean": 0, "median": 0, "min": 0, "n": 0 }, "ping": { "50": 0, "75": 349, "90": 349, "95": 349, "99": 349, "999": 349, "max": 349, "mean": 349, "median": 349, "min": 349, "n": 3 } } },
"resp": { "by_code": { "200": { "count": 3, "one": 1 }, "201": { "count": 0, "one": 0 }, ... "400": { "count": 0, "one": 0 }, "401": { "count": 0, "one": 0 }, ... "404": { "count": 0, "one": 0 }, ... "500": { "count": 0, "one": 0 }, ... } } }, "node": { ... }, "tanodb": { ... } }
riak_core provides riak_core_security module
Roles: Users and Groups
Permissions
Grants
Resources
Cowboy rest handler
Utility library
REST API for riak_core_security
As a library
clojurescript + om.next + bootstrap
Web Admin for riak_core_security
Uses rcs_cowboy
ping() -> tanodb_metrics:core_ping(), DocIdx = riak_core_util:chash_key({<<"ping">>, term_to_binary(os:timestamp())}), PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, tanodb), [{IndexNode, _Type}] = PrefList, riak_core_vnode_master:sync_spawn_command(IndexNode, ping, tanodb_vnode_master).
(tanodb@127.0.0.1)1> DocIdx = riak_core_util:chash_key({<<"ping">>, term_to_binary(os:timestamp())}). <<126,9,218,77,97,108,38,92,0,155,160,26,161,3,200,87,134,213,167,168>>
(tanodb@127.0.0.1)2> PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, tanodb). [{{73..., 'tanodb@127.0.0.1'}, primary}]
(tanodb@127.0.0.1)3> PrefList2 = riak_core_apl:get_primary_apl(DocIdx, 2, tanodb). [{{73..., 'tanodb@127.0.0.1'}, primary}, {{75..., 'tanodb@127.0.0.1'}, primary}]
(tanodb@127.0.0.1)5> [{IndexNode, _Type}] = PrefList. [{{73..., 'tanodb@127.0.0.1'}, primary}]
(tanodb@127.0.0.1)6> riak_core_vnode_master:sync_spawn_command(IndexNode, ping, tanodb_vnode_master). {pong,73...}
(tanodb@127.0.0.1)7> [{IndexNode1, _Type1}, {IndexNode2, _Type2}] = PrefList2. [{{73..., 'tanodb@127.0.0.1'}, primary}, {{75..., 'tanodb@127.0.0.1'}, primary}]
(tanodb@127.0.0.1)9> riak_core_vnode_master:sync_spawn_command(IndexNode2, ping, tanodb_vnode_master). {pong,75...}
-module(tanodb_vnode). -behaviour(riak_core_vnode). -export([start_vnode/1, init/1, terminate/2, handle_command/3, is_empty/1, delete/1, handle_handoff_command/3, handoff_starting/2, handoff_cancelled/1, handoff_finished/2, handle_handoff_data/2, encode_handoff_item/2, handle_coverage/4, handle_exit/3]).
-record(state, {partition}). init([Partition]) -> {ok, #state { partition=Partition }}. handle_command(ping, _Sender, State) -> {reply, {pong, State#state.partition}, State};
Add a function on tanodb.erl
Add a new clause to handle_command
Add metrics of course :)
Get
Put
Delete
get(Key) -> tanodb_metrics:core_get(), send_to_one(Key, {get, Key}). delete(Key) -> tanodb_metrics:core_delete(), send_to_one(Key, {delete, Key}). put(Key, Value) -> tanodb_metrics:core_put(), send_to_one(Key, {put, Key, Value}).
% private functions send_to_one(Key, Cmd) -> DocIdx = riak_core_util:chash_key(Key), PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, tanodb), [{IndexNode, _Type}] = PrefList, riak_core_vnode_master:sync_spawn_command(IndexNode, Cmd, tanodb_vnode_master).
handle_command({put, Key, Value}, _Sender, State=#state{table_name=TableName, partition=Partition}) -> ets:insert(TableName, {Key, Value}), {reply, {ok, Partition}, State};
handle_command({get, Key}, _Sender, State=#state{table_name=TableName, partition=Partition}) -> case ets:lookup(TableName, Key) of [] -> {reply, {not_found, Partition, Key}, State}; [Value] -> {reply, {found, Partition, {Key, Value}}, State} end;
handle_command({delete, Key}, _Sender, State=#state{table_name=TableName, partition=Partition}) -> case ets:lookup(TableName, Key) of [] -> {reply, {not_found, Partition, Key}, State}; [Value] -> true = ets:delete(TableName, Key), {reply, {found, Partition, {Key, Value}}, State} end;
Get not found
(tanodb@127.0.0.1)2> tanodb:get({<<"mybucket">>, <<"k1">>}). {not_found,22..., {<<"mybucket">>,<<"k1">>}}
Put
(tanodb@127.0.0.1)3> tanodb:put({<<"mybucket">>, <<"k1">>}, 42). {ok,22...}
Get found
(tanodb@127.0.0.1)3> tanodb:get({<<"mybucket">>, <<"k1">>}). {found,22..., {{<<"mybucket">>,<<"k1">>},{{<<"mybucket">>,<<"k1">>},42}}}
Delete
(tanodb@127.0.0.1)4> tanodb:delete({<<"mybucket">>, <<"k1">>}). {found,22..., {{<<"mybucket">>,<<"k1">>},{{<<"mybucket">>,<<"k1">>},42}}}
Get not found
(tanodb@127.0.0.1)5> tanodb:get({<<"mybucket">>, <<"k1">>}). {not_found,22..., {<<"mybucket">>,<<"k1">>}}
$ http localhost:8080/store/mybucket/bob
HTTP/1.1 404 Not Found content-length: 0 content-type: application/json
$ http post localhost:8080/store/mybucket/bob name=bob color=yellow
HTTP/1.1 204 No Content content-length: 0 content-type: application/json
$ http localhost:8080/store/mybucket/bob
HTTP/1.1 200 OK content-length: 31 content-type: application/json { "color": "yellow", "name": "bob" }
$ http delete localhost:8080/store/mybucket/bob
HTTP/1.1 204 No Content content-length: 0 content-type: application/json
$ http localhost:8080/store/mybucket/bob
HTTP/1.1 404 Not Found content-length: 0 content-type: application/json
$ http delete localhost:8080/store/mybucket/bob
HTTP/1.1 404 Not Found content-length: 0 content-type: application/json
Problem: List Keys from a Bucket
Init -> Process Results (until #vnodes) -> Finish (ok/error/timeout)
coverate_fsm and coverage_fsm_sup
Need to register coverage_fsm_sup on supervisor tree
keys(Bucket) -> tanodb_metrics:core_keys(), Timeout = 5000, tanodb_coverage_fsm:start({keys, Bucket}, Timeout).
handle_coverage({keys, Bucket}, _KeySpaces, {_, RefId, _}, State=#state{table_name=TableName}) -> Keys0 = ets:match(TableName, {{Bucket, '$1'}, '_'}), Keys = lists:map(fun first/1, Keys0), {reply, {RefId, Keys}, State};
(tanodb@127.0.0.1)1> tanodb:keys(<<"mybucket">>). {ok,[{13..., 'tanodb@127.0.0.1',[]}, ... {95..., 'tanodb@127.0.0.1',...}, {41...,...}, {...}|...]}
Shape
{ok, [{Partition, Node, ListOfKeys}*64]}
Put one value
(tanodb@127.0.0.1)2> tanodb:put({<<"mybucket">>, <<"k1">>}, 42). {ok,22...}
Get and Filter
(tanodb@127.0.0.1)3> lists:filter(fun ({_, _, []}) -> false; (_) -> true end, element(2, tanodb:keys(<<"mybucket">>))). [{22..., 'tanodb@127.0.0.1', [<<"k1">>]}]
$ http localhost:8080/store/mybucket
HTTP/1.1 200 OK content-length: 2 content-type: application/json []
$ http post localhost:8080/store/mybucket/bob name=bob color=yellow
HTTP/1.1 204 No Content content-length: 0 content-type: application/json
$ http localhost:8080/store/mybucket
HTTP/1.1 200 OK content-length: 7 content-type: application/json [ "bob" ]
$ http post localhost:8080/store/mybucket/patrick name=patrick color=pink
HTTP/1.1 204 No Content content-length: 0 content-type: application/json
$ http localhost:8080/store/mybucket
HTTP/1.1 200 OK content-length: 17 content-type: application/json [ "bob", "patrick" ]
If a node goes down all the values on the vnodes it has are lost
Write each value on N vnodes
Consider a successful operation when more than W vnodes reply ok
FSM and supervisor
Receives Operation, N and W
Does Request to N, waits for W replies and accumulates results
OK on success, failure on error or timeout
+------+ +---------+ +---------+ +---------+ +------+ | | | | | | | |remaining = 0 | | | Init +--->| Prepare +--->| Execute +--->| Waiting +------------->| Stop | | | | | | | | | | | +------+ +---------+ +---------+ +-------+-+ +------+ ^ | | | | | +---------+ +---+ +------->| | | Timeout | remaining > 0 timeout | | +---------+
delete(Key) -> tanodb_metrics:core_delete(), ReqID = make_ref(), Timeout = 5000, tanodb_write_fsm:delete(?N, Key, self(), ReqID), wait_for_reqid(ReqID, Timeout).
put(Key, Value) -> tanodb_metrics:core_put(), ReqID = make_ref(), Timeout = 5000, tanodb_write_fsm:write(?N, ?W, Key, Value, self(), ReqID), wait_for_reqid(ReqID, Timeout).
Only Change to VNode code is to return the provided ReqID
(tanodb@127.0.0.1)2> tanodb:put({<<"mybucket">>, <<"k1">>}, 42). {ok,[{ok,27...}, {ok,25...}, {ok,22...}]}
(tanodb@127.0.0.1)3> lists:filter(fun ({_, _, []}) -> false; (_) -> true end, element(2, tanodb:keys(<<"mybucket">>))). [{25..., 'tanodb@127.0.0.1', [<<"k1">>]}, {27..., 'tanodb@127.0.0.1', [<<"k1">>]}, {22..., 'tanodb@127.0.0.1', [<<"k1">>]}]
(tanodb@127.0.0.1)4> tanodb:delete({<<"mybucket">>, <<"k1">>}). {ok,[{found,27..., {{<<"mybucket">>,<<"k1">>},{{<<"mybucket">>,<<"k1">>},42}}}, {found,22..., {{<<"mybucket">>,<<"k1">>},{{<<"mybucket">>,<<"k1">>},42}}}, {found,25..., {{<<"mybucket">>,<<"k1">>}, {{<<"mybucket">>,<<"k1">>},42}}}]}
(tanodb@127.0.0.1)5> lists:filter(fun ({_, _, []}) -> false; (_) -> true end, element(2, tanodb:keys(<<"mybucket">>))). []
Handoff happens when:
A ring update event for a ring that all other nodes have already seen.
A secondary vnode is idle for a period of time and the primary, original owner of the partition is up again.
Handling commands during handoff:
Handle it in the current vnode
Forward it to the vnode we are handing off
Drop it
+-----------+ +----------+ +----------+ | | true | | false | | | Starting +------> is_empty +--------> fold_req | | | | | | | +-----+-----+ +----+-----+ +----+-----+ | | | | false | true | ok | | | +-----v-----+ | +----v-----+ +--------+ | | | | | | | | Cancelled | +--------------> finished +-----> delete | | | | | | | +-----------+ +----------+ +--------+
Fold Request:
handle_handoff_command(?FOLD_REQ{foldfun=FoldFun, acc0=Acc0}, _Sender, State=#state{partition=Partition, table_name=TableName}) -> lager:info("fold req ~p", [Partition]), AccFinal = ets:foldl(fun ({Key, Val}, AccIn) -> lager:info("fold fun ~p: ~p", [Key, Val]), FoldFun(Key, Val, AccIn) end, Acc0, TableName), {reply, AccFinal, State};
Is Empty:
is_empty(State=#state{table_name=TableName, partition=Partition}) -> IsEmpty = (ets:first(TableName) =:= '$end_of_table'), lager:info("is_empty ~p: ~p", [Partition, IsEmpty]), {IsEmpty, State}.
Encode:
encode_handoff_item(Key, Value) -> term_to_binary({Key, Value}).
Receive/Decode:
handle_handoff_data(BinData, State=#state{table_name=TableName}) -> TermData = binary_to_term(BinData), lager:info("handoff data received ~p", [TermData]), {Key, Value} = TermData, ets:insert(TableName, {Key, Value}), {reply, ok, State}.
Delete:
delete(State=#state{table_name=TableName, partition=Partition}) -> lager:info("delete ~p", [Partition]), ets:delete(TableName), {ok, State}.
Authentication/Authorization
lager backend
sends log metrics by level to exometer