## Saturday, January 23, 2010

### Distributed cache in Erlang

Implementing distributed cache in Erlang is relatively simple task because concurrency, distribution and failover mechanisms are built in the language. In fact, it's so simple that this task is a part of Erlang tutorial. Here I want to show the complete solution which is only 100 lines of code.

I'm going to implement the cache as a typical Erlang server application, that means set of three modules: server, supervisor and application. As underlined storage I'm using Mnesia database which is a part of standard Erlang distribution. It doesn't probably give you the best performance, but it does provide automatic replication. The cache is deployed on three nodes, each node on a separate machine.

Clients will connect to in-memory slave nodes, the master node is dedicated to persistence.

### Configure Erlang cluster

Create file .erlang.cookie containing one line with random text. Copy this file to every machine in a cluster to home directory of the user who will start Erlang VM. Make sure this file has unix permissions 600.

Check /etc/hosts on every box to verify that every machine knows others by name.

### Set up Mnesia database

Open terminals on all machines and enter Erlang prompt
ubuntu$erl -sname masterErlang R13B01 (erts-5.7.2) [source] [rq:1] [async-threads:0] [kernel-poll:false]Eshell V5.7.2 (abort with ^G)macBook$ erl -sname slave1Erlang R13B02 (erts-5.7.3) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]Eshell V5.7.3  (abort with ^G)iMac$erl -sname slave2Erlang R13B03 (erts-5.7.4) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]Eshell V5.7.4 (abort with ^G) From one machine ping other two (slave1@macBook)1> net_adm:ping(master@ubuntu).pong(slave1@macBook)2> net_adm:ping(slave2@iMac).pong Create database configuration (slave1@macBook)3> mnesia:create_schema([slave1@macBook, slave2@iMac, master@ubuntu]).ok Start database on all nodes (master@ubuntu)1> application:start(mnesia).ok(slave1@macBook)4> application:start(mnesia).ok(slave2@iMac)1> application:start(mnesia).ok Create cache table (slave1@macBook)5> rd(mycache, {key, value}).mycache(slave1@macBook)6> mnesia:create_table(mycache, [{attributes, record_info(fields, mycache)},{disc_only_copies, [master@ubuntu]}, {ram_copies, [slave1@macBook, slave2@iMac]}]).{atomic,ok} Stop database and quit Erlang VM (slave1@macBook)7> application:stop(mnesia).ok(slave2@iMac)2> application:stop(mnesia).ok(master@ubuntu)2> application:stop(mnesia).ok ### Implement Erlang application The main module of this application is mycache.erl -module(mycache).-export([start/0, stop/0]).-export([put/2, get/1, remove/1]).-export([init/1, terminate/2, handle_call/3, handle_cast/2]).-behaviour(gen_server).-include("mycache.hrl").% Start/stop functionsstart() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).stop() -> gen_server:cast(?MODULE, stop).% Functional interfaceput(Key, Value) -> gen_server:call(?MODULE, {put, Key, Value}).get(Key) -> gen_server:call(?MODULE, {get, Key}).remove(Key) -> gen_server:call(?MODULE, {remove, Key}).% Callback functionsinit(_) -> application:start(mnesia), mnesia:wait_for_tables([mycache], infinity), {ok, []}.terminate(_Reason, _State) -> application:stop(mnesia).handle_cast(stop, State) -> {stop, normal, State}.handle_call({put, Key, Value}, _From, State) -> Rec = #mycache{key = Key, value = Value}, F = fun() -> case mnesia:read(mycache, Key) of [] -> mnesia:write(Rec), null; [#mycache{value = OldValue}] -> mnesia:write(Rec), OldValue end end, {atomic, Result} = mnesia:transaction(F), {reply, Result, State};handle_call({get, Key}, _From, State) -> case mnesia:dirty_read({mycache, Key}) of [#mycache{value = Value}] -> {reply, Value, []}; _ -> {reply, null, State} end;handle_call({remove, Key}, _From, State) -> F = fun() -> case mnesia:read(mycache, Key) of [] -> null; [#mycache{value = Value}] -> mnesia:delete({mycache, Key}), Value end end, {atomic, Result} = mnesia:transaction(F), {reply, Result, State}. It implements Erlang generic server behaviour and provides three client functions – put, get, remove – with the same signature as similar methods in java.util.Map interface. Next file is a supervisor for the cache, mycache_sup.erl -module(mycache_sup).-export([start/0]).-export([init/1]).-behaviour(supervisor).start() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).init(_) -> MycacheWorker = {mycache, {mycache, start, []}, permanent, 30000, worker, [mycache, mnesia]}, {ok, {{one_for_all, 5, 3600}, [MycacheWorker]}}. It's going to monitor the main cache process and restart it in case of crash. Next file, mycache_app.erl, provides methods to start and stop our cache gracefully within Erlang VM -module(mycache_app).-export([start/2, stop/1]).-behaviour(application).start(_Type, _StartArgs) -> mycache_sup:start().stop(_State) -> ok. Create application descriptor, mycache.app {application, mycache, [{description, "Distributed cache"}, {vsn, "1.0"}, {modules, [mycache, mycache_sup, mycache_app]}, {registered, [mycache, mycache_sup]}, {applications, [kernel, stdlib]}, {env, []}, {mod, {mycache_app, []}}]}. The last module is optional, it provides a quick way to load our application on VM startup -module(mycache_boot).-export([start/0]).start() -> application:start(mycache). That's it. Compile all these modules and copy binaries to all machines in the cluster. Place the binaries in the same folder you created Mnesia configuration. ### Run Erlang application Start Erlang VMs and load the application ubuntu$ erl -sname master -s mycache_bootErlang R13B01 (erts-5.7.2) [source] [rq:1] [async-threads:0] [kernel-poll:false]macBook$erl -sname slave1 -s mycache_bootErlang R13B02 (erts-5.7.3) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]iMac$ erl -sname slave2 -s mycache_bootErlang R13B03 (erts-5.7.4) [source] [smp:2:2] [rq:2] [async-threads:0] [kernel-poll:false]

The cache is ready. You can start using it
(slave1@macBook)1> mycache:put("mykey", "myvalue").null(slave2@iMac)1> mycache:get("mykey")."myvalue"(master@ubuntu)1> mycache:put("mykey", "newvalue")."myvalue"(slave1@macBook)2> mycache:remove("mykey")."newvalue"(master@ubuntu)2> mycache:get("mykey").null

It works! So, what do we actually achieve here with about 100 lines of Erlang code and bit of scripting?

• Distribution I run the app on three physical boxes, and it's transparent for the clients.
• Scaleability To add a new node to the cluster is just a matter of Mnesia re-configuration and copying of binary files to the new box.
• Concurrency Write and remove operations are transactional, and because of concurrent nature of Erlang itself our data is consistent and can be accessed by thousands of client processes.
• Fault tolerance Try to kill mycache process inside Erlang VM; it will be restarted automatically by supervisor and data will be replicated from other nodes to the new process.
• Persistence is optional and provided by Mnesia module.

All these benefits are given for free by Erlang/OTP, and it's not the end.

### Call Erlang cache from Java

There are several ways of integrating Erlang applications with other languages. For Java the most convenient one is JInterface library. Here is the implementation of java.util.Map interface that communicates with the cache application we've just developed
import com.ericsson.otp.erlang.*;public class ErlStringMap implements Map<String, String> {    private final OtpSelf self;    private final OtpPeer other;    private final String cacheModule;    public ErlStringMap(String client, String cookie, String serverNode, String cacheModule) {        try {            self = new OtpSelf(client, cookie);            other = new OtpPeer(serverNode);            this.cacheModule = cacheModule;        } catch (Exception e) {            throw new RuntimeException(e.getMessage(), e);        }    }    public String put(String key, String value) {        return remoteCall("put", key, value);    }    public String get(Object key) {        return remoteCall("get", (String) key);    }    public String remove(Object key) {        return remoteCall("remove", (String) key);    }    private String remoteCall(String method, String... args) {        try {            OtpConnection connection = self.connect(other);            connection.sendRPC(cacheModule, method, stringsToErlangStrings(args));            OtpErlangObject received = connection.receiveRPC();            connection.close();            return parse(received);        } catch (Exception e) {            throw new RuntimeException(e.getMessage(), e);        }    }    private OtpErlangObject[] stringsToErlangStrings(String[] strings) {        OtpErlangObject[] result = new OtpErlangObject[strings.length];        for (int i = 0; i < strings.length; i++) result[i] = new OtpErlangString(strings[i]);        return result;    }    private String parse(OtpErlangObject otpObj) {        if (otpObj instanceof OtpErlangAtom) {            OtpErlangAtom atom = (OtpErlangAtom) otpObj;            if (atom.atomValue().equals("null")) return null;            else throw new IllegalArgumentException("Only atom null is supported");        } else if (otpObj instanceof OtpErlangString) {            OtpErlangString str = (OtpErlangString) otpObj;            return str.stringValue();        }        throw new IllegalArgumentException("Unexpected type " + otpObj.getClass().getName());    }    // Other methods are omitted}

Now from the Java application we can use our distributed cache same way we are using HashMap
String cookie = FileUtils.readFileToString(new File("/Users/andrey/.erlang.cookie"));Map<String, String> map = new ErlStringMap("client1", cookie, "slave1@macBook", "mycache");map.put("foo", "bar")

### Performance?

Let's deploy Erlang and Java nodes following this topology

Here is the speed of the cache operations I get in the Java client:

write 30.385 ms
delete 21.665 ms

If we remove network, i.e. move all VMs, Java and Erlang, to the same box, we'll get the following performance:

write 2.091 ms
delete 2.057 ms

And if we also disable persistence, the numbers will be

write 1.75 ms
delete 1.75 ms

As you can see, performance is not the best, but keep in mind that the purpose of this post is not to build production ready cache application, but show the power of Erlang/OTP in building distributed fault-tolerant systems. As an exercise, try to implement the same functionality using JDK only.

Resources

• Source code used in the blog.

• Upcoming book where authors seem to implement similar application.

David Dossot said...

Andrey: thanks for sharing this!

You may want to detail further why the performances are not stellar?

As far as I can see, the first bottleneck is the gen_server process in-box that basically serializes all calls to your cache. You could switch to a gen_rpc and have one process per incoming call, leading to better concurrency.

Of course, you would then hit the next bottleneck: Mnesia transactions, which are serialized. A strategy here would be to create n-tables and hash your keys on these tables to spread concurrency issues on these n-buckets.

Finally the Java client itself can probably be improved. I don't know for example if open the connection on each call is the best approach.

Again, thanks for this detailed and in-depth post.

D.

PS. I find your code to be production grade. As you said, it exhibits scalability and fault tolerance so, IMO, it's shippable ;-)

Andrey Paramonov said...

Thank you David for your comment.

When I said 'not the best' I meant that performance might be better since I didn't do any optimization, and you pointed out possible ways to improve it. But in terms of numbers I didn't actually check what's the performance of other caching frameworks. Now I found this post comparing some of them:

http://javalandscape.blogspot.com/2009/05/intro-to-cachingcaching-algorithms-and.html

For example, the performance of single-node co-located in-memory JBossCache is: Write 110.12 ms, Read 3.26 ms. Do I understand correctly that those numbers are per operation? If so, then performance of Erlang cache is actually really good! I should probably update the last section of the blog post.

Andrey

Andrey Paramonov said...

The numbers in the article I've mentioned are actually in microseconds. Unfortunately I cannot find performance test results for distributed persistent JBossCache.

Anonymous said...

What a great resource!