Elixir OTP applications on Kubernetes

Chirag Singh Toor
DSC Engineering
Published in
10 min readDec 12, 2018

--

OTP (Open Telecom Platform) provides rock solid foundations for building concurrent and fault-tolerant applications in Elixir. By building on top of GenServers and Supervision trees you not only get great performance, but also a modularized application. However, if you’re just starting to integrate Elixir into your stack your production infrastructure is probably not geared towards OTP applications out of the box.

For example, at Dollar Shave Club we use Elixir for a transactional messaging service (email, sms). When customer add items to their DSC plan we email them a summary, but to avoid blasting them with several emails we aggregate their add actions into a single delivery. Our use case fits well with GenServer and DynamicSupervisors. A GenServer could be spawned for each customer to keep track of their added products and handle the timer logic, but all that data would be stored in-memory. We deploy on Kubernetes, so as soon as any Pod is torn down that in-memory data is going to be blown away.

deployments in Kubernetes tear down Pods, so in-memory state is lost

We have been exploring how to better integrate OTP and Kubernetes and got some valuable insight at ElixirConf 2018. In this article we’ll discuss why we use Elixir/OTP, go through step-by-step setting up a demo Elixir application that deals with graceful Node shutdown, and then integrate it with Kubernetes using minikube.

This article assumes you have Elixir 1.7, minikube, and kubectl installed along with some basic understanding of Kubernets and OTP (GenServer, Supervisor, Registry).

Why Elixir and OTP?

Take the messaging service discussed above, OTP is a great fit for this as there is separation between inputs. Inputs are clearly split by customer, and so any issues processing one should not affect the other. On the Erlang VM (BEAM) a separate Process can be used for each customer, providing concurrency and containing any errors processing one customer to only that one process. OTP provides out of the box utilities that build on top of these BEAM processes.

The options for working with OTP are Erlang and Elixir, and between the two we prefer Elixir syntax. Elixir also has a growing community, some fantastic tools (mix), and is overall a joy to work with (pattern matching, pipe operator).

Demo application: ExCluster

The demo application is a order handling service. Each order belongs to a customer and stores a list of integers as order contents.

Create a new application: mix new ex_cluster

Update mix.exs to setup an application callback module and start up a Supervision tree with a DynamicSupervisor for orders:

mix.exs
lib/ex_cluster.ex

Model orders in the application with a GenServer, the list of order contents is the internal state:

lib/ex_cluster/order.ex

Run the application with iex -S mix and create orders:

$ iex -S mix
Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]
16:04:29.435 [info] ExCluster application startedInteractive Elixir (1.7.1) - press Ctrl+C to exit (type h() ENTER for help)iex(1)> DynamicSupervisor.start_child(ExCluster.OrderSupervisor, { ExCluster.Order, "John" })
16:04:43.697 [info] Starting Order for John
{:ok, #PID<0.148.0>}

Note the returned PID (identifier for the underlying BEAM process), the application needs to be able to associate this to a customer. Say input flows in like this:

Customer "John", created order
Customer "Karan", created order
Customer "Karan", added to order items: [2, 3]
Customer "John", added to order items: [1, 2]

Which PID should be used when a customer adds items to an order? This is where a Registry comes into play, providing:

local, decentralized and scalable key-value process storage.

In this case it is a customer to order PID key-value store. Add one to the Supervision tree:

lib/ex_cluster.ex

Update the order GenServer to use the Registry, and also add some functions for adding to and getting the contents of an order:

lib/ex_cluster/order.ex

Now the application can manage multiple orders:

iex(1)> DynamicSupervisor.start_child(ExCluster.OrderSupervisor, { ExCluster.Order, "John" })
13:17:26.344 [info] Starting Order for John
{:ok, #PID<0.136.0>}
iex(2)> DynamicSupervisor.start_child(ExCluster.OrderSupervisor, { ExCluster.Order, "Karan" })
13:17:31.503 [info] Starting Order for Karan
{:ok, #PID<0.138.0>}
iex(3)> ExCluster.Order.add("Karan", [2, 3])
:ok
iex(4)> ExCluster.Order.add("John", [1, 2])
:ok
iex(5)> ExCluster.Order.contents("John")
[1, 2]
iex(6)> ExCluster.Order.contents("Karan")
[2, 3]

If this were deployed to Kubernetes any deployment would cause existing orders to be blown away as they live in-memory. To address this issue and not drop order data the application will need to hook into the deployment lifecycle.

SIGTERM, Distributed Elixir, and Horde

During a deploy new Pods are spun up and old Pods receive a SIGTERM. This gives the old Pods a short window of time (default 30 seconds) to cleanup. During this time orders need to move from the terminating Pod to another Pod. Similar to how a Supervisor locally restarts children, a global Supervisor can be used to restart across Pods.

diagram explaining global Supervisor and deployment lifecycle hookup

Horde is an Elixir library that provides this functionality. Using Distributed Elixir all running instances of the application (Nodes) can be connected together into a Cluster. Then with Horde.Registry and Horde.Supervisor can be used to have a Registry and Supervisor act across our Cluster in a distributed fashion.

This may seem very complicated, but this is where the beauty of the Erlang VM and message passing between processes really shines. Once connected in a Cluster messages can be passed between processes across machines the same way they are passed between locally running processes. The machine to machine communication is abstracted away, so the application logic itself does not get much more complex.

Add Horde ({:horde, "~> 0.3.0"} in deps) and then instead of using the standard Registry and Supervisor use the Horde implementations:

lib/ex_cluster.ex
lib/ex_cluster/order.ex

Note: before starting the Supervision Tree the application is connecting all Nodes into a Cluster(via Node.connect/1). This is leveraging Distributed Elixir and is a building block for Horde. When ExCluster.HordeConnector starts up, it will go through all other Nodes (Node.list()) in the Cluster and join the Supervisor and Registries together, making them work across machines.

For the Node connections to work two VM flags need to be supplied when running our application: name and cookie. Name is associated to the current Node and must be unique in the Cluster, while cookie is a secret value that must be consistent across all Nodes. See this in action by spinning up a 3 Node cluster:

# Terminal 1
ERL_FLAGS="-name count1@127.0.0.1 -setcookie cookie" NODES="count2@127.0.0.1,count3@127.0.0.1" iex -S mix
# Terminal 2
ERL_FLAGS="-name count2@127.0.0.1 -setcookie cookie" NODES="count1@127.0.0.1,count3@127.0.0.1" iex -S mix
# Terminal 3
ERL_FLAGS="-name count3@127.0.0.1 -setcookie cookie" NODES="count1@127.0.0.1,count2@127.0.0.1" iex -S mix

Run Node.list() in any of the 3 windows, the output will be an array with the other two Node names. For example, from Terminal 1 the output is:

[:"count2@127.0.0.1", :"count3@127.0.0.1"]

This means all 3 nodes are connected in a Cluster. You can see the global Supervisor in action by setting up an order in Terminal 1 and then gracefully terminating (:init.stop()):

iex(count1@127.0.0.1)2> Horde.Supervisor.start_child(ExCluster.OrderSupervisor, { ExCluster.Order, "John" })
13:51:57.125 [info] Starting Order for John
{:ok, #PID<0.220.0>}
iex(count1@127.0.0.1)3> ExCluster.Order.add("John", [1, 2])
:ok
iex(count1@127.0.0.1)4> ExCluster.Order.contents("John")
[1, 2]
iex(count1@127.0.0.1)5> :init.stop()
:ok

Look at the output from the other Nodes, on one of them the output will be:

iex(count2@127.0.0.1)1>
13:53:48.001 [debug] Found 1 processes on dead nodes
13:53:48.088 [info] Starting Order for John

The OrderSupervisor restarted John’s order on another Node!

However, there is one thing left to do. The order for John was restarted but the internal state for the order (it’s contents) was not carried over. Get the contents of John’s order from the remaining Nodes:

iex(count2@127.0.0.1)2> ExCluster.Order.contents("John")
[]

It’s an empty list. The order GenServer state needs to be handed off to the restarted process. This will need to be implemented in the application itself as it is not a part of Horde.

Cross-Node State Handoff with DeltaCrdt

A hook in the order GenServer termination is needed so the internal state can be temporarily stored. Then on restart it can be picked up and resume processing.

This can be done by adding a flag to the order GenServer initialization so that a terminate callback is run when it is going down:

lib/ex_cluster/order.ex

In terminate, the order_contents need to be stored externally as the current Pod is about to go away. This can be done with something like Redis, but lets stick to Distributed Elixir solutions running across the Cluster to avoid external dependencies. The application will use conflict-free replicated data types (CRDT’s), which:

are eventually guaranteed to converge globally, meaning that in the absence of network partitions, the data will eventually be the same at every replica.

CRDTs are used internally in the implementations of both Horde and Phoenix Presence, you can find a good intro article here.

A CRDT can be used as a distributed temporary caching mechanism that is synced across our Cluster. When a order GenServer restarts across Pods, it can look into this cache for existing order contents. Set this up by adding a new StateHandoff GenServer to our Supervision Tree, this will have a CRDT as it’s internal state. Similar to Horde Registry and Supervisor, this will need to be connected to all other StateHandoff CRDT’s running across the Cluster:

lib/ex_cluster/state_handoff.ex
lib/ex_cluster.ex

This adds a replicated, eventually consistent key-value store to our application. To finish off integrate this into the order GenServer:

lib/ex_cluster/order.ex

Take it out for a spin: run multiple Nodes, create and add some contents to a order, then terminate:

iex(count1@127.0.0.1)1> Horde.Supervisor.start_child(ExCluster.OrderSupervisor, { ExCluster.Order, "John" })
15:28:05.211 [info] Starting Order for John
{:ok, #PID<0.238.0>}
iex(count1@127.0.0.1)2> ExCluster.Order.add("John", [1, 2])
:ok
iex(count1@127.0.0.1)3> ExCluster.Order.contents("John")
[1, 2]
iex(count1@127.0.0.1)4> :init.stop()
:ok

Check on the order contents for John’s restarted order:

iex(count2@127.0.0.1)1>
15:28:34.463 [debug] Found 1 processes on dead nodes
15:28:34.554 [info] Starting Order for John
iex(count2@127.0.0.1)2> ExCluster.Order.contents(“John”)
[1, 2]

There you have it, a complete migration of orders happening from one Node to the other!

Dynamic Cluster membership with LibCluster and Kubernetes

Till now a fixed list of NODES has been specified via the environment when the application starts up. This works locally but not in production. In production the Cluster membership is dynamic as Pods get swapped in and out. Fortunately there is already a solution available in the Elixir community for managing dynamic Cluster membership: LibCluster.

Before getting into LibCluster details setup Distillery for building releases, update mix.exs deps:

mix.exs

Run mix release.init to configure Distillery release. Update rel/vm.args file to configure the name and cookie for the Node:

rel/vm.args

The syntax used here is specifying to sub-in runtime env variables MY_POD_IP and NODE_COOKIE when the Pod is run. Add a Dockerfile:

Dockerfile

Add two charts for Kubernetes, one for deployment which will use the downward API to grab the IP for each individual Pod and pass it through the environment:

k8s/deployment.yaml

One for a Kubernetes Headless Service, which provides a internal DNS that returns the IP’s of all Pods currently deployed:

k8s/service-headless.yaml

The Node names are specified as ex_cluster@${MY_POD_IP} and the downward API provides the unique MY_POD_IP via the env, giving each Pod a unique name. The cookie is configurable via the charts and consistent across all Nodes, satisfying the clustering requirements.

Integrate LibCluster as follows:

lib/ex_cluster.ex

And configure LibCluster topologies by environment:

config/prod.exs
config/test.exs
config/dev.exs
lib/lib_cluster/local_strategy.ex

In production, LibCluster uses the headless service DNS every 3 seconds to check on Pod deployments and automatically update Cluster membership.
Remember, on application start the Registry, Supervisor, and StateHandoff GenServer’s are connected to all other copies running in the Cluster (via ExCluster.HordeConnector). This ensures that as Pods get taken in and out our cross-Pod connections are kept up to date.

In development, a custom LocalStrategy is used that builds a Cluster based on a runtime env variable NODES, like we have been doing when running locally.

Running in Kubernetes with minikube

After starting minikube, build and tag a docker container and update the minikube docker-env:

$ docker build -t ex_cluster:local .
$ eval $(minikube docker-env)

Create the service and deployment:

$ kubectl create -f k8s/service-headless.yaml
$ kubectl create -f k8s/deployment.yaml

You should be able to see 2 Pods running:

$ kubectl get podsNAME                          READY     STATUS    RESTARTS   AGE
ex-cluster-5b45c8577b-jltm9 1/1 Running 0 5s
ex-cluster-5b45c8577b-ztn4p 1/1 Running 0 5s

Exec onto one, attach to the console, and create a few orders:

$ kubectl exec -it ex-cluster-5b45c8577b-jltm9 /bin/bash[in the pod shell]$ _build/prod/rel/ex_cluster/bin/ex_cluster remote_console[should now be attached to iex on the Pod]iex(ex_cluster@172.17.0.6)1> Horde.Supervisor.start_child(ExCluster.OrderSupervisor, { ExCluster.Order, "John" })iex(ex_cluster@172.17.0.6)3> ExCluster.Order.add("John", [1,2,3])
:ok
iex(ex_cluster@172.17.0.6)4> ExCluster.Order.contents("John")
[1, 2, 3]

Terminate the Pod where the order is running:

$ kubectl delete pod ex-cluster-5b45c8577b-jltm9

A new Pod should have been automatically spun up, with the terminated Pod now going down. Exec onto one of the two running Pods, and check on the order:

$ kubectl get podsNAME                          READY     STATUS        RESTARTS   AGE
ex-cluster-5b45c8577b-8cg9c 1/1 Running 0 3m
ex-cluster-5b45c8577b-d4cm8 1/1 Running 0 5s
ex-cluster-5b45c8577b-fkfj2 1/1 Terminating 0 3m
$ kubectl exec -it ex-cluster-5b45c8577b-d4cm8 /bin/bash[in the pod shell]$ _build/prod/rel/ex_cluster/bin/ex_cluster remote_console[should now be attached to iex on the Pod]iex(ex_cluster@172.17.0.7)1> ExCluster.Order.contents("John")
[1, 2, 3]

There you go, the order was preserved through Pod tear down!

A full Repo is available of this demo application is available here.

Conclusion

With this setup there are a few scenarios to consider:

  • In memory order handling still has issues: How large are your inputs? How many come in at a time? How long do they stay around on average?
  • Non graceful termination, although unlikely, is possible. StateHandoff will not kick in here, so how will you recover from that? (Have a re-playable input source like Kafka?)
  • Horde.Supervisor does not guarantee to start the process on the calling machine. Internally Horde has it’s own way of balancing processes across the Cluster. See the Horde docs for more.

Even with these concerns, we have made headway into integrating OTP with Kubernetes. As the Elixir community continues to mature I believe some of these concerns will be better addressed.

Much of this setup was inspired by Daniel Azuma’s talk and example application presented at ElixirConf 2018, you can view it here and find more info here. The StateHandoff implementation came out of reading the Horde internals. Horde is very well written and documented (thank you Derek Kraan) and worth looking at if you find yourself doing this kind of work.

--

--