How are you going to make friends if no-one knows you exist? That can be a real problem for some Elixir nodes, for example a few Raspberry Pi Zeroes running Nerves on a local network. One answer is to use the power of multicasting. I will walk you through an easy implementation. You might want to grab a clone of the example repo for this post here.

Multicasting is sending a message out to all that are listening, on the local network1, as opposed to sending messages to a specific destination. Specifically it means sending UDP messages to a 224.0.0.0/4 address2 on a particular port; to receive those messages then listen on the same port.

Basic multicasting GenServer

Let’s get started with a GenServer that will both listen for, and broadcast, multicast messages.

defmodule Multicasting.BroadcasterReceiver do
  use GenServer

  @port (case Mix.env() do
           :test -> 49_002
           _ -> 49_001
         end)
  
  @active 1
  @multicast_group_ip {239, 2, 3, 4}
  @udp_options [
    :binary,
    active: @active,
    add_membership: {@multicast_group_ip, {0, 0, 0, 0}},
    multicast_loop: true
  ]

  
  def start_link(_) do
    GenServer.start_link(__MODULE__, {}, name: __MODULE__)
  end

  def init(_opts) do
    {:ok, socket} = :gen_udp.open(@port, @udp_options)
    send(self(), :broadcast)
    {:ok, %{socket: socket}}
  end

Standard, named, GenServer to add to the supervision tree. On initialisation it opens up a UDP socket on @port with @udp_options using the Erlang module :gen_udp’s :open/2 function. Like TCP ports we just need to choose one that’s above 1023. We’ll pick a different port for tests so we can mix test while also running in dev mode.

It’s worth going through the @udp_options in detail.

  • :binary: We will send and receive using Erlang binary as opposed to a list of bytes.
  • active: 1: Received messages on the port will be sent as messages to this process, to a maximum of 1 message, at which point the socket will switch to passive mode. I will go into this in more detail later. (Spoiler: it gets switched back to active immediately.)
  • add_membership: {{239, 2, 3, 4}, {0, 0, 0, 0}}: Join the multicast group on IP address “239.2.3.4” using our local IP addres “0.0.0.0”. “0.0.0.0” means all available interfaces.
  • multicast_loop: true: also receive the messages that are sent from this socket. This is quite useful, to confirm that everything is set up ok. Again, I will elaborate on this later.

The init/1 function also sends a :broadcast message to its process. We could be modern and used handle_continue/2 callback, but as we are setting up a repeating series of broadcasts a message is simpler.

  @broadcast_interval 15_000    
  @message_prefix "multitastic"

  def handle_info(:broadcast, %{socket: socket} = state) do
    Process.send_after(self(), :broadcast, @broadcast_interval)
    :ok = :gen_udp.send(socket, @multicast_group_ip, @port, "#{@message_prefix}#{hostname()}")
    {:noreply, state}
  end
  defp hostname do
    {:ok, name} = :inet.gethostname()
    List.to_string(name)
  end

A message containing the OS hostname, prefixed with “multitastic:”, is broadcast to all interested parties in the multicast group, listening on the appropriate port.

As an interested party, with a socket in active mode, the message is picked up by

  def handle_info({:udp, _port, ip, _port_number, @message_prefix <> hostname}, state) do
    Logger.info("Broadcast received from #{hostname} on #{format_ip(ip)}")
    {:noreply, state}
  end
  defp format_ip(ip_tuple) do
    ip_tuple |> Tuple.to_list() |> Enum.join(".")
  end

Remember that we set active: 1 in the UDP options, which means that as soon as we receive a multicast message the port switches to passive mode, so no more are received? This is to prevent a badly configured, or just naughty, sender from overflowing the receiver with messages: if the messages were coming in faster than we could process them, the the processes message queue would keep on growing and bad things would happen (tm). The solution from Learn You Some Erlang is to flip-flop between active and passive mode. Helpfully (on purpose) a message is sent to controlling process on being flipped to passive, and we can switch back to active using :inet.setopts/2.

  def handle_info({:udp_passive, _}, %{socket: socket} = state) do
    :inet.setopts(socket, active: @active)
    {:noreply, state}
  end

As we can only revert to active on receipt of the message sent to the queue, it prevents the process queue growing much at all. Note that in this example we are using a super-conservative value of 1 for the number of messages received in active mode. It probably could (should) be a bit higher.

The full module, so far, is here. All that remains is to add it to the application.

defmodule Multicasting.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      Multicasting.BroadcasterReceiver
    ]

    opts = [strategy: :one_for_one, name: Multicasting.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Now we can run iex -S mix and see our node’s own presence.

18:36:33.885 [info]  Broadcast received from caesaraugustus on 192.168.0.11
iex(1)> 
18:36:48.885 [info]  Broadcast received from caesaraugustus on 192.168.0.11
 
18:37:03.886 [info]  Broadcast received from caesaraugustus on 192.168.0.11

It’s still a pretty lonely life, though. If we try and run another node we get


18:38:24.599 [info]  Application multicasting exited: Multicasting.Application.start(:normal, []) returned an error: shutdown: failed to start child: Multicasting.BroadcasterReceiver
    ** (EXIT) an exception was raised:
        ** (MatchError) no match of right hand side value: {:error, :eaddrinuse}
            (multicasting 0.1.0) lib/multicasting/broadcaster_receiver.ex:36: Multicasting.BroadcasterReceiver.init/1
            (stdlib 3.13.2) gen_server.erl:417: :gen_server.init_it/2
            (stdlib 3.13.2) gen_server.erl:385: :gen_server.init_it/6

Only one OS process can bind to a port at a time. 😿😿😿😿😿😿😿😿😿

Cheer up though. If you did clone the example repository, I sneaked in a Dockerfile, so you instead of iex -S mix you can run

 docker build -t multicast . && docker run -it multicast iex -S mix

Run that from a few terminals and you can have a party 🥂🍾🎆.


18:46:34.764 [info]  Broadcast received from 172baf8436fc on 172.17.0.4
iex(1)> 
18:46:38.989 [info]  Broadcast received from 075bc1221641 on 172.17.0.2
 
18:46:47.469 [info]  Broadcast received from 564d411984a5 on 172.17.0.3
 
18:46:49.762 [info]  Broadcast received from 172baf8436fc on 172.17.0.4
 
18:46:54.024 [info]  Broadcast received from 075bc1221641 on 172.17.0.2

Adding a touch of resiliance

Using similar code on a few Nerves installations, I found that occasionally a node would remain isolated - neither receiving or sending multicast messages on the local network. Killing the process, and having the supervision tree restart it, solved the problem. My hypothesis is that it is a timing issue - the socket being created before the WiFi connection was established.

A possible solution would be to subscribe to VintageNet for networking updates and take appropriate action, such as re-opening the socket. I dislike this approach for a few reasons. One is that it couples slightly higher level code to a specificly Nerves-flavoured network implementation; it would make running the application on my development machine more awkward. Another is that my experience of debugging and confirming fixes for intermittent network issues is horribly painful. It is so hard to be sure the problem has been fixed, rather than it randomly not happening.

Let’s add a new GenServer that will die if not touched every so often.

defmodule Multicasting.Tick do
  use GenServer

  def start_link(opts) do
    {timeout, opts} = Keyword.pop!(opts, :timeout)
    GenServer.start_link(__MODULE__, timeout, opts)
  end

  def init(timeout) do
    {:ok, %{timeout: timeout}, timeout}
  end

  def tick(server) do
    GenServer.cast(server, :tick)
  end

  def handle_cast(:tick, %{timeout: timeout} = s) do
    {:noreply, s, timeout}
  end

  def handle_info(:timeout, s) do
    {:stop, :normal, s}
  end
end

Calling tick before the configured timeout time, keeps the process alive. Now lets integrate both with a supervisor.

defmodule Multicasting.BroadcasterReceiverSupervisor do
  use Supervisor

  def start_link(arg) do
    Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def init(arg) do
    children = [
      {Multicasting.Tick, [timeout: 35_000, name: :broadcaster_receiver_tick]},
      Multicasting.BroadcasterReceiver
    ]
    Supervisor.init(children, strategy: :one_for_all)
  end
end

The Tick is configured with a 35 second timeout. The supervision strategy is :one_for_all, so that when one process dies (ie the Tick) they are all restarted. (:rest_for_one would also work here).

We swap in that supervisor for the BroadcasterReceiver in the application supervisor.

  def start(_type, _args) do
    children = [
      Multicasting.BroadcasterReceiverSupervisor
    ]

    opts = [strategy: :one_for_one, name: Multicasting.Supervisor]
    Supervisor.start_link(children, opts)
  end

In the BroadcasterReceiver we call Tick on receipt of a message. Even if there are no other friends on the network then we will still receive a message every 15 seconds as we are listening to ourselves because of the multicast_loop: true option.

  def handle_info({:udp, _port, ip, _port_number, @message_prefix <> hostname}, state) do
    Multicasting.Tick.tick(:broadcaster_receiver_tick)
    Logger.info("Broadcast received from #{hostname} on #{format_ip(ip)}")
    {:noreply, state}
  end

Using this strategy to ensure the multicast is working means we can not set multicast_loop: true unless we are comfortable restarting the process every 35 seconds (though that might not be too bad). It may be better to filter out our own messages here, in the application code.

Caution

I probably don’t need to say this, this is not secure communication. Broadcasting on a network means that any node, friendly or not, can listen in and/or spoof the messages. Be cautious.

References


  1. Effectively the local network. In theory you can broadcast your message over more than the local network by increasing the multicast_ttl value to greater than 1, but I have no idea how this would really work on a NAT‘d network. 

  2. IPv4 address. While I know IPv6 multicasting exists that is all I know about it.