Saturday, January 23, 2010

Distributed cache in Erlang

Implementing distributed cache in Erlang is relatively simple task because concurrency, distribution and failover mechanisms are built in the language. In fact, it's so simple that this task is a part of Erlang tutorial. Here I want to show the complete solution which is only 100 lines of code.

I'm going to implement the cache as a typical Erlang server application, that means set of three modules: server, supervisor and application. As underlined storage I'm using Mnesia database which is a part of standard Erlang distribution. It doesn't probably give you the best performance, but it does provide automatic replication. The cache is deployed on three nodes, each node on a separate machine.



Clients will connect to in-memory slave nodes, the master node is dedicated to persistence.

Configure Erlang cluster


Create file .erlang.cookie containing one line with random text. Copy this file to every machine in a cluster to home directory of the user who will start Erlang VM. Make sure this file has unix permissions 600.

Check /etc/hosts on every box to verify that every machine knows others by name.

Set up Mnesia database


Open terminals on all machines and enter Erlang prompt
ubuntu$ erl -sname master
Erlang R13B01 (erts-5.7.2) [source] [rq:1] [async-threads:0] [kernel-poll:false]
Eshell V5.7.2 (abort with ^G)

macBook$ erl -sname slave1
Erlang R13B02 (erts-5.7.3) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]
Eshell V5.7.3 (abort with ^G)

iMac$ erl -sname slave2
Erlang R13B03 (erts-5.7.4) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]
Eshell V5.7.4 (abort with ^G)

From one machine ping other two
(slave1@macBook)1> net_adm:ping(master@ubuntu).
pong
(slave1@macBook)2> net_adm:ping(slave2@iMac).
pong

Create database configuration
(slave1@macBook)3> mnesia:create_schema([slave1@macBook, slave2@iMac, master@ubuntu]).
ok

Start database on all nodes
(master@ubuntu)1> application:start(mnesia).
ok
(slave1@macBook)4> application:start(mnesia).
ok
(slave2@iMac)1> application:start(mnesia).
ok

Create cache table
(slave1@macBook)5> rd(mycache, {key, value}).
mycache
(slave1@macBook)6> mnesia:create_table(mycache, [{attributes, record_info(fields, mycache)},
{disc_only_copies, [master@ubuntu]}, {ram_copies, [slave1@macBook, slave2@iMac]}]).

{atomic,ok}

Stop database and quit Erlang VM
(slave1@macBook)7> application:stop(mnesia).
ok
(slave2@iMac)2> application:stop(mnesia).
ok
(master@ubuntu)2> application:stop(mnesia).
ok

Implement Erlang application


The main module of this application is mycache.erl
-module(mycache).
-export([start/0, stop/0]).
-export([put/2, get/1, remove/1]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2]).
-behaviour(gen_server).
-include("mycache.hrl").

% Start/stop functions

start() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

stop() ->
gen_server:cast(?MODULE, stop).

% Functional interface

put(Key, Value) ->
gen_server:call(?MODULE, {put, Key, Value}).

get(Key) ->
gen_server:call(?MODULE, {get, Key}).

remove(Key) ->
gen_server:call(?MODULE, {remove, Key}).

% Callback functions

init(_) ->
application:start(mnesia),
mnesia:wait_for_tables([mycache], infinity),
{ok, []}.

terminate(_Reason, _State) ->
application:stop(mnesia).

handle_cast(stop, State) ->
{stop, normal, State}.

handle_call({put, Key, Value}, _From, State) ->
Rec = #mycache{key = Key, value = Value},
F = fun() ->
case mnesia:read(mycache, Key) of
[] ->
mnesia:write(Rec),
null;
[#mycache{value = OldValue}] ->
mnesia:write(Rec),
OldValue
end
end,
{atomic, Result} = mnesia:transaction(F),
{reply, Result, State};

handle_call({get, Key}, _From, State) ->
case mnesia:dirty_read({mycache, Key}) of
[#mycache{value = Value}] -> {reply, Value, []};
_ -> {reply, null, State}
end;

handle_call({remove, Key}, _From, State) ->
F = fun() ->
case mnesia:read(mycache, Key) of
[] -> null;
[#mycache{value = Value}] ->
mnesia:delete({mycache, Key}),
Value
end
end,
{atomic, Result} = mnesia:transaction(F),
{reply, Result, State}.

It implements Erlang generic server behaviour and provides three client functions – put, get, remove – with the same signature as similar methods in java.util.Map interface.

Next file is a supervisor for the cache, mycache_sup.erl
-module(mycache_sup).
-export([start/0]).
-export([init/1]).
-behaviour(supervisor).

start() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init(_) ->
MycacheWorker = {mycache, {mycache, start, []}, permanent, 30000, worker, [mycache, mnesia]},
{ok, {{one_for_all, 5, 3600}, [MycacheWorker]}}.

It's going to monitor the main cache process and restart it in case of crash.

Next file, mycache_app.erl, provides methods to start and stop our cache gracefully within Erlang VM
-module(mycache_app).
-export([start/2, stop/1]).
-behaviour(application).

start(_Type, _StartArgs) ->
mycache_sup:start().

stop(_State) ->
ok.

Create application descriptor, mycache.app
{application, mycache,
[{description, "Distributed cache"},
{vsn, "1.0"},
{modules, [mycache, mycache_sup, mycache_app]},
{registered, [mycache, mycache_sup]},
{applications, [kernel, stdlib]},
{env, []},
{mod, {mycache_app, []}}]}.

The last module is optional, it provides a quick way to load our application on VM startup
-module(mycache_boot).
-export([start/0]).

start() ->
application:start(mycache).

That's it. Compile all these modules and copy binaries to all machines in the cluster. Place the binaries in the same folder you created Mnesia configuration.

Run Erlang application


Start Erlang VMs and load the application
ubuntu$ erl -sname master -s mycache_boot
Erlang R13B01 (erts-5.7.2) [source] [rq:1] [async-threads:0] [kernel-poll:false]

macBook$ erl -sname slave1 -s mycache_boot
Erlang R13B02 (erts-5.7.3) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]

iMac$ erl -sname slave2 -s mycache_boot
Erlang R13B03 (erts-5.7.4) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]

The cache is ready. You can start using it
(slave1@macBook)1> mycache:put("mykey", "myvalue").
null
(slave2@iMac)1> mycache:get("mykey").
"myvalue"
(master@ubuntu)1> mycache:put("mykey", "newvalue").
"myvalue"
(slave1@macBook)2> mycache:remove("mykey").
"newvalue"
(master@ubuntu)2> mycache:get("mykey").
null

It works! So, what do we actually achieve here with about 100 lines of Erlang code and bit of scripting?

• Distribution I run the app on three physical boxes, and it's transparent for the clients.
• Scaleability To add a new node to the cluster is just a matter of Mnesia re-configuration and copying of binary files to the new box.
• Concurrency Write and remove operations are transactional, and because of concurrent nature of Erlang itself our data is consistent and can be accessed by thousands of client processes.
• Fault tolerance Try to kill mycache process inside Erlang VM; it will be restarted automatically by supervisor and data will be replicated from other nodes to the new process.
• Persistence is optional and provided by Mnesia module.

All these benefits are given for free by Erlang/OTP, and it's not the end.

Call Erlang cache from Java


There are several ways of integrating Erlang applications with other languages. For Java the most convenient one is JInterface library. Here is the implementation of java.util.Map interface that communicates with the cache application we've just developed
import com.ericsson.otp.erlang.*;

public class ErlStringMap implements Map<String, String> {

private final OtpSelf self;
private final OtpPeer other;
private final String cacheModule;

public ErlStringMap(String client, String cookie, String serverNode, String cacheModule) {
try {
self = new OtpSelf(client, cookie);
other = new OtpPeer(serverNode);
this.cacheModule = cacheModule;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

public String put(String key, String value) {
return remoteCall("put", key, value);
}

public String get(Object key) {
return remoteCall("get", (String) key);
}

public String remove(Object key) {
return remoteCall("remove", (String) key);
}

private String remoteCall(String method, String... args) {
try {
OtpConnection connection = self.connect(other);
connection.sendRPC(cacheModule, method, stringsToErlangStrings(args));
OtpErlangObject received = connection.receiveRPC();
connection.close();
return parse(received);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

private OtpErlangObject[] stringsToErlangStrings(String[] strings) {
OtpErlangObject[] result = new OtpErlangObject[strings.length];
for (int i = 0; i < strings.length; i++) result[i] = new OtpErlangString(strings[i]);
return result;
}

private String parse(OtpErlangObject otpObj) {
if (otpObj instanceof OtpErlangAtom) {
OtpErlangAtom atom = (OtpErlangAtom) otpObj;
if (atom.atomValue().equals("null")) return null;
else throw new IllegalArgumentException("Only atom null is supported");

} else if (otpObj instanceof OtpErlangString) {
OtpErlangString str = (OtpErlangString) otpObj;
return str.stringValue();
}
throw new IllegalArgumentException("Unexpected type " + otpObj.getClass().getName());
}

// Other methods are omitted
}

Now from the Java application we can use our distributed cache same way we are using HashMap
String cookie = FileUtils.readFileToString(new File("/Users/andrey/.erlang.cookie"));
Map<String, String> map = new ErlStringMap("client1", cookie, "slave1@macBook", "mycache");
map.put("foo", "bar")

Performance?


Let's deploy Erlang and Java nodes following this topology



Here is the speed of the cache operations I get in the Java client:

write 30.385 ms
read 1.23 ms
delete 21.665 ms

If we remove network, i.e. move all VMs, Java and Erlang, to the same box, we'll get the following performance:

write 2.091 ms
read 1.35 ms
delete 2.057 ms

And if we also disable persistence, the numbers will be

write 1.75 ms
read 1.38 ms
delete 1.75 ms

As you can see, performance is not the best, but keep in mind that the purpose of this post is not to build production ready cache application, but show the power of Erlang/OTP in building distributed fault-tolerant systems. As an exercise, try to implement the same functionality using JDK only.

Resources

• Source code used in the blog.

• Upcoming book where authors seem to implement similar application.