Membrane Media Processing & LiveView

Underjord is a tiny, wholesome team doing Elixir consulting and contract work. If you like the writing you should really try the code. See our services for more information.

Membrane Framework allows processing media streams in a very high-level compelling way with Elixir. I’ve been wanting to work with it for a while and finally did a small thing with it around my most recent livestream. When we did the live stream we didn’t get it all the way to where I wanted. This resolves that.

At the end of the post there is also a video showing you what the thing does. The idea was to do something better for the visuals. I ended up also fixing an overflowing buffer thing.

The project along with commits and my experimental branch for fixing it without actually having a microphone are available at the lawik/media repo.

The first pipeline iteration

With this commit I had an initial working pipeline, this was my baseline going into the live stream.

Let’s see it:

elixir lib/media/stream_to_file.ex
defmodule Media.StreamToFile do
  use Membrane.Pipeline

  alias Membrane.{File, FFmpeg, MP3.MAD, MP3.Lame, PortAudio, Time}
  alias Membrane.Element.Tee
  alias Membrane.Caps.Audio.Raw

  @impl true
  def handle_init(output_directory) do
    Elixir.File.mkdir_p!(output_directory)

    children = [
      mic_input: PortAudio.Source,
      converter: %FFmpeg.SWResample.Converter{
        input_caps: %Raw{channels: 2, format: :s16le, sample_rate: 48_000},
        output_caps: %Raw{channels: 2, format: :s32le, sample_rate: 44_100}
      },
      splitter: Tee.Master,
      encoder: Lame.Encoder,
      raw_output: %File.Sink{location: Path.join(output_directory, "out.raw")},
      encoded_output: %File.Sink{location: Path.join(output_directory, "out.mp3")}
    ]

    links = [
      link(:mic_input) |> to(:converter) |> to(:splitter),
      link(:splitter) |> via_out(:master) |> to(:raw_output),
      link(:splitter) |> via_out(:copy) |> to(:encoder) |> to(:encoded_output)
    ]

    {{:ok, spec: %ParentSpec{children: children, links: links}}, %{}}
  end
end

What it do you say? What it do indeed. It will use the cross-platform audio device wrangler portaudio and grab the microphone input as the first stage. It produces a stream of media, so that’s a source. It will throw it at a converter based on FFMPEG to convert it into a format that later stage can accept. Then it will duplicate the stream to a master copy and a .. copy .. copy. It will feed the master to a raw file output and the copy? The copy goes into the lame encoder to be turned into an MP3 and then on to a file. The files consume a stream as an endpoint, those are sinks. Source and sink.

So we define the different stages as children and then we connect them using the links. Then we can run this. The Media module shows how that can be done. start_link/1 tells me this very much is a GenServer style process and can be handled with your average OTP approaches. I’ll add that to the application for automatic running later.

elixir lib/media.ex
defmodule Media do
  def record_to_file(path) do
    {:ok, pid} = Media.StreamToFile.start_link(path)
    Media.StreamToFile.play(pid)
    {:ok, pid}
  end

  def stop_to_file(pid) do
    Media.StreamToFile.stop_and_terminate(pid)
  end
end

If you call Media.record_to_file/1 this it will:

  • Grab your default input audio device
  • Resample it for encoding
  • Duplicate it to two streams
  • Write a raw PCM format file from the stream
  • Encode an MP3 of the audio
  • Write the encoded MP3 file

Neat thing, turns out MP3s are quite the space saver, 9-10 Mb of audio turned into ~600 Kb if I recall correctly. The stream VOD likely has the exact numbers.

The second pipeline

Alright. Then I wanted to do more cool stuff, such as acting on the live data as it came in and showing something of it in LiveView. So I ended up ripping most of this out to get a better working example. Including stopping all that file writing as my poor machine was trying to encode video at the time.

So this code was changed into a mic_input, an audiometer and a fake sink. It also introduces handle_notification/4 to deal with messages sent by the pipeline as part of the audiometer.

elixir lib/media/stream_to_file.ex
defmodule Media.StreamToFile do
  use Membrane.Pipeline

  alias Membrane.PortAudio
  alias Membrane.Audiometer.Peakmeter
  alias Membrane.Element.Fake

  @impl true
  def handle_init(output_directory) do
    Elixir.File.mkdir_p!(output_directory)
    Process.register(self(), :default_stream)

    children = [
      mic_input: PortAudio.Source,
      audiometer: %Peakmeter{interval: Membrane.Time.milliseconds(50)},
      sink: Fake.Sink.Buffers
    ]

    links = [
      link(:mic_input) |> to(:audiometer) |> to(:sink)
    ]

    {{:ok, spec: %ParentSpec{children: children, links: links}}, %{}}
  end

  @impl true
  def handle_notification({:amplitudes, channels}, _element, _context, state) do
    IO.inspect(channels, label: "amplitude")
    Phoenix.PubSub.broadcast!(Media.PubSub, "audio", {:amplitudes, channels})
    {:ok, state}
  end

  def handle_notification(_any, _element, _context, state) do
    {:ok, state}
  end
end

The updated module is above and was current as of this commit.

The LiveView is modified from calling mix phx.gen media --live and editing lib/media_web/live/page_live.ex like any good quick hack.

elixir lib/media_web/live/page_live.ex
defmodule MediaWeb.PageLive do
  use MediaWeb, :live_view

  @impl true
  def mount(_params, _session, socket) do
    if connected?(socket) do
      Phoenix.PubSub.subscribe(Media.PubSub, "audio")
      Media.ensure_playing()
    end

    {:ok, assign(socket, channels: [])}
  end

  def handle_info({:amplitudes, channels}, socket) do
    channels =
      Enum.map(channels, fn negative_db ->
        case negative_db do
          :infinity ->
            0

          num ->
            100 - round(num * -1.0)
        end
      end)

    {:noreply, assign(socket, channels: channels)}
  end
end

It subscribes to the events we send in the pipeline. It mangles the numbers and it updates the state. I love a good handle_info/2, screw events, who needs em. Users shouldn’t interact with my stuff, my stuff interacts with users! I kid, but my most interesting uses of LiveView rarely require the user to trigger the behavior, stuff just goes. I find it super satisfying.

The template side is very dense and simplistic, the video at the end covers my slight style tweaking.

leex lib/media_web/live/page_live.html.leex
    <%= for amplitude <- @channels do %>
    <div class="amp-bar" style="width: <%= amplitude %>%;">
    </div>
    <% end %>

That’s it essentially. There is some additional plumbing to just make sure it is started and running, so clone and run the project to try it.

A brief video of how it looks and how I solve problems under unexpected circumstances is available below, and also on my YouTube channel.

Underjord is a 4 people team doing Elixir consulting and contract work. If you like the writing you should really try the code. See our services for more information.

Note: Or try the videos on the YouTube channel.