Liam White
Parsing WebSocket frames faster: a deep dive

Recently, I was trying to study some traffic patterns to figure out what kept tripping my firewall rules. There wasn't any immediate urgency, so I took the opportunity to work on my LiveView skills by making a web-based histogram visualizer for my Nginx HTTP logs.

The core idea was that I would stream log data into a web page, and generate analytics on the fly for each connected socket. Because the log files are large (up to 1GiB), it's important that the stream is fast and responsive, and doesn't waste time writing into a file when it could be parsing directly into memory.

Act I - LiveView

Already, this seems like it might end up going off the rails in terms of effort, but let's give it a try anyway.

Uploading a file without making a disk copy sounds like a job for a Phoenix.LiveView.UploadWriter. I'll use a broker process to mediate the connection between a Stream consuming data in the backend, and the frontend LiveView upload process producing data. On the backend, I'll have a server process receiving the raw upload data sent over mailbox messages, and on the frontend, I'll have a form with a live_file_input responsible for sending the data.

I have included the code here, but for the sake of brevity and advancing the narrative, it is collapsed by default.

Click to expand

Let's define the broker module. When the writer end calls, it will receive the pid of the reader, and the reader will receive the pid of the writer. Then the broker process will cleanly exit, as it is no longer needed.

defmodule LogParseWeb.Stream.Broker do
  @moduledoc false
  use GenServer

  def register_writer(server) do
    GenServer.call(server, {:register, :writer}, :infinity)
  end

  def register_reader(server) do
    GenServer.call(server, {:register, :reader}, :infinity)
  end

  @impl true
  def init(_) do
    {:ok, writer: nil, reader: nil}
  end

  @impl true
  def handle_call({:register, type}, from, state) do
    state
    |> Keyword.put(type, from)
    |> maybe_satisfy()
  end

  defp maybe_satisfy(state) do
    with {wpid, _} = writer <- Keyword.get(state, :writer),
         {rpid, _} = reader <- Keyword.get(state, :reader) do
      GenServer.reply(writer, rpid)
      GenServer.reply(reader, wpid)

      {:stop, :normal, state}
    else
      _ ->
        {:noreply, state}
    end
  end
end

I did not add a timeout to this demonstration, but if you use this in production code, you need to add a timeout that causes the process to shut down so that it does not leak.


Now, let's define the common functions that will be used for the processes to communicate. I have a message for sending data, a reply, and a message for closing the connection. After sending data from the writer to the reader, the writer needs to wait, as a form of back pressure so that the incoming data does not arrive faster than the reader can process it.

defmodule LogParseWeb.Stream do
  @timeout 60_000

  @doc false
  def push(reader, data) do
    send(reader, {:data, data})

    receive do
      :data_ack ->
        :ok
    after
      @timeout ->
        raise_timeout_error()
    end
  end

  @doc false
  def close(reader) do
    send(reader, :close)
  end

  @doc false
  def pull(writer) do
    receive do
      {:data, data} ->
        send(writer, :data_ack)
        {:data, data}

      :close ->
        :close
    after
      @timeout ->
        raise_timeout_error()
    end
  end

  defp raise_timeout_error do
    raise "Operation timed out after #{@timeout} ms"
  end
end

Now the reader module, for the backend, using Stream.resource/3:

defmodule LogParseWeb.Stream.Reader do
  @moduledoc false

  def new(broker: broker) do
    Stream.resource(
      fn -> LogParseWeb.Stream.Broker.register_reader(broker) end,
      fn writer ->
        case LogParseWeb.Stream.pull(writer) do
          {:data, value} ->
            {[value], writer}

          :close ->
            {:halt, writer}
        end
      end,
      fn _writer -> nil end
    )
  end
end

And the UploadWriter, which sends the data from the frontend into the Stream:

defmodule LogParseWeb.Stream.Writer do
  @moduledoc false
  @behaviour Phoenix.LiveView.UploadWriter

  @impl true
  def init(broker: broker) do
    reader = Broker.register_writer(broker)
    {:ok, reader}
  end

  @impl true
  def meta(_state), do: %{}

  @impl true
  def write_chunk(data, state) do
    LogParseWeb.Stream.push(state, data)
    {:ok, state}
  end

  @impl true
  def close(state, _reason) do
    LogParseWeb.Stream.close(state)
    {:ok, state}
  end
end

Using this, I can then finally define the connection between the reader and writer in the LiveView's mount callback, passing the reader into the log context:

defmodule LogParseWeb.LogLive.Index do
  use LogParseWeb, :live_view

  alias LogParseWeb.Stream.Broker
  alias LogParseWeb.Stream.Reader
  alias LogParseWeb.Stream.Writer
  alias LogParse.Logs

  @impl true
  def mount(_params, _session, socket) do
    {:ok, broker} = GenServer.start_link(Broker, [])
    writer = {Writer, broker: broker}
    reader = Reader.new(broker: broker)
    {:ok, database} = Logs.create_log(reader)

    {:ok,
     socket
     |> assign(:database, database)
     |> assign(:upload_complete, false)
     |> allow_upload(:log_file,
       accept: :any,
       auto_upload: true,
       max_file_size: 999_999_999_999,
       writer: fn _name, _entry, _socket -> writer end
     )}
  end

  # ...
end

The form on the frontend looks like this:

<form
  class={if(Enum.any?(@uploads.log_file.entries), do: "hidden")}
  id="upload-form"
  phx-submit="save"
  phx-change="validate"
>
  <section
    phx-drop-target={@uploads.log_file.ref}
    class={[
      "hover:border-blue-500 hover:border-solid hover:bg-white hover:text-blue-500",
      "group rounded-md w-full",
      "flex flex-col items-center justify-center",
      "border-2 border-dashed border-slate-300",
      "text-sm leading-6 text-slate-900 font-medium py-3"
    ]}
  >
    <span class="select-none">
      <%= gettext("Drag and drop a file here or double click to upload") %>
    </span>
    <.live_file_input upload={@uploads.log_file} />
  </section>
</form>

<div :for={entry <- @uploads.log_file.entries}>
  <div class="flex justify-between mb-1">
    <span class="text-base font-medium"><%= gettext("Processing upload...") %></span>
    <span class="text-sm font-medium"><%= entry.progress %>%</span>
  </div>
  <div class="w-full bg-slate-200 rounded-full h-2.5">
    <div class="bg-blue-600 h-2.5 rounded-full" style={"width: #{entry.progress}%"}></div>
  </div>
</div>

And on the context side, I'll need to implement creating a log:

defmodule LogParse.Logs do
  @moduledoc """
  The Logs context.
  """

  alias LogParse.Logs.Server

  def create_log(receiver) do
    GenServer.start_link(Server, receiver)
  end
end

And what the server should do:

defmodule LogParse.Logs.Server do
  use GenServer

  defstruct is_initialized?: false

  @impl true
  def init(receiver) do
    # Begin receiving the stream immediately
    send(self(), {:receive, receiver})

    {:ok, %__MODULE__{}}
  end

  @impl true
  def handle_call(_request, _from, state) do
    {:reply, :ok, state}
  end

  @impl true
  def handle_info({:receive, receiver}, state) do
    # Just for demonstration
    :ok = Stream.run(receiver)

    {:noreply, state}
  end
end

That was a lot of code to implement such a small amount of functionality, and it took me about a day to refine and implement it this way. So, how well does it function when you select a file on the web?

Not great. Because I am not actually doing any log file parsing yet, this serves as a benchmark for how well this approach could theoretically function. I benchmarked it at around 21MB/s in Firefox and around 25MB/s in Chromium, based on the speed of uploading a 1GiB file. So what's going on?

Since Chromium seemed like the more promising of the two browsers, I decided to start there. Some quick benchmarking with the DevTools reveals a huge number of approximately 64 kilobyte chunks are being sent over the WebSocket connection.

Fortunately, this is easy to tune. allow_upload/3 accepts a chunk_size option, which I can then revise up to a more reasonable 1MiB. And this does indeed increase the upload speed to about 52MB/s. Nice.

    {:ok,
     socket
     |> assign(:database, database)
     |> assign(:upload_complete, false)
     |> allow_upload(:log_file,
       accept: :any,
       auto_upload: true,
       max_file_size: 999_999_999_999,
       chunk_size: 1_048_576,
       writer: fn _name, _entry, _socket -> writer end
     )}

52MB/s seems kind of low, though. A more reasonable target for sending packets to localhost would be 1GB/s, since my solid state drive is more than capable of this performance. Let's profile the JS again and see if there's anything preventing us from uploading the file any faster…

  readNextChunk(){
    let reader = new window.FileReader()
    let blob = this.entry.file.slice(this.offset, this.chunkSize + this.offset)
    reader.onload = (e) => {
      if(e.target.error === null){
        this.offset += e.target.result.byteLength
        this.pushChunk(e.target.result)
      } else {
        return logError("Read error: " + e.target.error)
      }
    }
    reader.readAsArrayBuffer(blob)
  }

  pushChunk(chunk){
    if(!this.uploadChannel.isJoined()){ return }
    this.uploadChannel.push("chunk", chunk)
      .receive("ok", () => {
        this.entry.progress((this.offset / this.entry.file.size) * 100)
        if(!this.isDone()){
          this.chunkTimer = setTimeout(() => this.readNextChunk(), this.liveSocket.getLatencySim() || 0)
        }
      })
      .receive("error", ({reason}) => this.error(reason))
  }

*eye twitch*

Okay, so first of all, besides the absolutely disagreeable formatting for this script, which I will try my best not to continue to comment on, every time a frame is uploaded, the code creates a new FileReader to read the next part of the file into an ArrayBuffer. But why would anyone need to do that? WebSocket.prototype.send allows directly sending a Blob - which file.slice() would return, without needing to waste time and memory converting this to an ArrayBuffer.

Is there some issue with wherever uploadChannel.push() is defined that makes it so that it can't accept a Blob? Let's look at how it handles ArrayBuffer messages:

  encode(msg, callback){
    if(msg.payload.constructor === ArrayBuffer){
      return callback(this.binaryEncode(msg))
    } else {
      let payload = [msg.join_ref, msg.ref, msg.topic, msg.event, msg.payload]
      return callback(JSON.stringify(payload))
    }
  },

  binaryEncode(message){
    let {join_ref, ref, event, topic, payload} = message
    let metaLength = this.META_LENGTH + join_ref.length + ref.length + topic.length + event.length
    let header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
    let view = new DataView(header)
    let offset = 0

    view.setUint8(offset++, this.KINDS.push) // kind
    view.setUint8(offset++, join_ref.length)
    view.setUint8(offset++, ref.length)
    view.setUint8(offset++, topic.length)
    view.setUint8(offset++, event.length)
    Array.from(join_ref, char => view.setUint8(offset++, char.charCodeAt(0)))
    Array.from(ref, char => view.setUint8(offset++, char.charCodeAt(0)))
    Array.from(topic, char => view.setUint8(offset++, char.charCodeAt(0)))
    Array.from(event, char => view.setUint8(offset++, char.charCodeAt(0)))

    var combined = new Uint8Array(header.byteLength + payload.byteLength)
    combined.set(new Uint8Array(header), 0)
    combined.set(new Uint8Array(payload), header.byteLength)

    return combined.buffer
  },

*more furious eye twitching*

Okay, so now there isn't just a single copy of the data being uploaded over the socket, there are TWO copies. The second is created and assigned into the combined Uint8Array. Would this serialization work if I just changed this to use a Blob? Let's try:

  encode(msg, callback){
    if(msg.payload.constructor === ArrayBuffer || msg.payload.constructor === Blob){
      return callback(this.binaryEncode(msg))
    } else {
      let payload = [msg.join_ref, msg.ref, msg.topic, msg.event, msg.payload]
      return callback(JSON.stringify(payload))
    }
  },

  binaryEncode(message){
    let {join_ref, ref, event, topic, payload} = message
    let metaLength = this.META_LENGTH + join_ref.length + ref.length + topic.length + event.length
    let header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
    let view = new DataView(header)
    let offset = 0

    view.setUint8(offset++, this.KINDS.push) // kind
    view.setUint8(offset++, join_ref.length)
    view.setUint8(offset++, ref.length)
    view.setUint8(offset++, topic.length)
    view.setUint8(offset++, event.length)
    Array.from(join_ref, char => view.setUint8(offset++, char.charCodeAt(0)))
    Array.from(ref, char => view.setUint8(offset++, char.charCodeAt(0)))
    Array.from(topic, char => view.setUint8(offset++, char.charCodeAt(0)))
    Array.from(event, char => view.setUint8(offset++, char.charCodeAt(0)))

    return new Blob([header, payload])
  },

Not the prettiest, but now the code can accept Blob messages, and from some basic testing and a fix for the longpoll fallback, it seems to work. Now, let's fix the readNextChunk and pushChunk code to avoid copying the file data:

  readNextChunk(){
    let blob = this.entry.file.slice(this.offset, this.chunkSize + this.offset)
    this.offset += blob.size
    return this.pushChunk(blob)
  }

  pushChunk(chunk){
    if(!this.uploadChannel.isJoined()){ return }

    return new Promise((resolve, reject) => {
      this.uploadChannel.push("chunk", chunk)
        .receive("ok", () => {
          this.entry.progress((this.offset / this.entry.file.size) * 100)
          if(!this.isDone()){
            resolve(this.simulateLatency())
          }
        })
        .receive("error", ({reason}) => reject(reason))
    })
  }

And after some quick testing, I can also confirm and submit this fix as well. But is it faster? Oh yes.

Browser Before After
Chromium 51MB/s 112MB/s

Since Chromium is so much faster after fixing everything, then Firefox should be faster too, right? Right?

Browser Before After
Chromium 51MB/s 112MB/s
Firefox 21MB/s 7MB/s

What??

Act II - Bandit

Since I had kind of made all of the performance optimizations at once, and not really stopped to retest Firefox along the way, I was stumped. Firefox's developer tools are a lot laggier and buggier than Chromium's DevTools, but I was able to eventually figure out the profiler, and determined that I was now entirely bottlenecked on the server. It was taking more than 50ms to respond between chunks!

Fortunately, I had only made one relevant server change - the chunk_size. Was that the culprit? Let's try setting it back to 64K:

Browser 1MiB chunks 64K chunks
Firefox 7MB/s 32MB/s

At this point I was feeling officially clueless, so I hooked up eprof to the server to try to figure out what was going on. The traces from eprof showed that all the time was being spent inside the web server library, Bandit.

Okay, that makes sense, it's not like my application is doing anything. But wait, 96% of the call traces are spent, exclusive, on one function, Bandit.WebSocket.Handler.handle_data/3. Okay, what does that function look like? Here:

@impl ThousandIsland.Handler
def handle_data(data, socket, state) do
  (state.buffer <> data)
  |> Stream.unfold(
    &Frame.deserialize(&1, Keyword.get(state.connection.opts, :max_frame_size, 0))
  )
  |> Enum.reduce_while({:continue, state}, fn
    {:ok, frame}, {:continue, state} ->
      case Connection.handle_frame(frame, socket, state.connection) do
        {:continue, connection} ->
          {:cont, {:continue, %{state | connection: connection, buffer: <<>>}}}

        {:close, connection} ->
          {:halt, {:close, %{state | connection: connection, buffer: <<>>}}}

        {:error, reason, connection} ->
          {:halt, {:error, reason, %{state | connection: connection, buffer: <<>>}}}
      end

    {:more, rest}, {:continue, state} ->
      {:halt, {:continue, %{state | buffer: rest}}}

    {:error, message}, {:continue, state} ->
      {:halt, {:error, {:deserializing, message}, state}}
  end)
end

Since this was an exclusive trace, this meant that all the time was spent inside this function, and not any of the functions it was calling like Stream.unfold/2, Frame.deserialize/2, or Enum.reduce_while/3. That really only left one candidate line, (state.buffer <> data), which concatenates the input data from the socket with the existing data until a new frame can be produced. But wait, binary concatenation is supposed to be optimized in OTP, so that can't be it…

Just for giggles, let's try Cowboy. Cowboy is a HTTP and WebSocket server historically designed for the Erlang ecosystem, which was adapted later in its life for use by Plug applications such as Phoenix.

Cowboy was the default HTTP server for Phoenix for many years until the release of Bandit, a pure-Elixir HTTP and WebSocket server with fewer dependencies. New Phoenix projects, like the one I created for this log visualizer, are created with Bandit as the server instead for this reason, but I can easily switch it back for testing. And what happens?

Adapter Browser 1MiB chunks 64K chunks
Bandit Firefox 7MB/s 32MB/s
Cowboy Firefox 104MB/s 57MB/s

Well, it seems like the problem really is Bandit. Running eprof on Cowboy shows no significant hot spots when uploading files over LiveView, so this is probably about as good as it will get, and I'm not very good at writing Erlang anyway.

But I couldn't let that performance hotspot in Bandit go.

Act III - The minimal example

Let's make an example program that proves the problem is in Bandit. So far, I don't know if LiveView is somehow interfering with these performance results, so I need to be sure. Fortunately, this is really easy, because the Bandit documentation comes with an example for how to make a WebSocket server:

Mix.install([:bandit, :websock_adapter])

defmodule ParseDemoServer do
  def init(options) do
    {:ok, options}
  end

  def handle_in({_data, [opcode: :binary]}, state) do
    {:reply, :ok, {:text, "ok"}, state}
  end

  def terminate(_reason, state) do
    {:ok, state}
  end
end

defmodule Router do
  use Plug.Router

  plug Plug.Logger
  plug :match
  plug :dispatch

  get "/" do
    conn
    |> put_resp_content_type("text/html")
    |> send_resp(200, """
    """)
  end

  get "/websocket" do
    conn
    |> WebSockAdapter.upgrade(ParseDemoServer, [], timeout: 60_000)
    |> halt()
  end

  match _ do
    send_resp(conn, 404, "not found")
  end
end

require Logger
webserver = {Bandit, plug: Router, scheme: :http, port: 4000}
{:ok, _} = Supervisor.start_link([webserver], strategy: :one_for_one)
Logger.info("Plug now running on localhost:4000")
Process.sleep(:infinity)

And now I just need to write a script that will connect to the WebSocket and try to send data as fast as possible. I'll use SubtleCrypto to encrypt a block of zeroes, to avoid the possibility of compression messing with the data being sent.

const size = 1 << 20; // 1 MiB
async function nonCryptographicRandomBlock() {
  const key = await crypto.subtle.generateKey(
    {
      name: 'AES-CTR',
      length: 128
    },
    false,
    ['encrypt']
  );

  const buffer = new Uint8Array(size);
  const counter = new Uint8Array(16);

  return crypto.subtle.encrypt(
    {
      name: 'AES-CTR',
      counter: counter,
      length: 64
    },
    key,
    buffer
  );
}

const socket = new WebSocket('ws://localhost:4000/websocket');
const performanceIndicator = document.getElementById('performance');

let messageStart = performance.now();

async function sendMessage() {
  const difference = performance.now() - messageStart;
  const throughput = (size / (difference * 1000));
  performanceIndicator.innerText = `Last request: ${Math.round(difference)} ms, ${Math.round(throughput)} MB/s`;

  socket.send(await nonCryptographicRandomBlock());
  messageStart = performance.now();
}

socket.addEventListener('open', sendMessage);
socket.addEventListener('message', sendMessage);

By running this with elixir demo_server.exs, and opening the webpage in Firefox, I once again see 7MB/s. And when changing the server to plug_cowboy, it's back up over 100MB/s. So I've proved it's in Bandit and can file a report.

Act IV - Binary concatenation

Since there's only one reasonable place this performance bottleneck could be coming from, it makes sense to drill into a little more.

Experienced Elixir and Erlang programmers generally would avoid using binary concatenation in performance-sensitive code, because it is not always possible to ensure that the concatenation can be optimized to an append. Instead, the typical pattern is to use iodata - a recursive list data structure that is comprised of other lists or binaries. iodata can be efficiently streamed out into a single binary on demand at a later point.

For fun, let's calculate the worst case behavior with binary concatention.

No wonder Bandit was taking more than 50ms to respond to each chunk.

What happens if I super crudely refactor this code to use iodata? Well…

Browser Before After
Chromium 202MB/s 291MB/s
Firefox 7MB/s 210MB/s

I think it's safe to say that there isn't anything wrong with 1MiB chunk sizes anymore.

After more refactoring into a presentable state, this ended up making it in and all users of Bandit will benefit from this change when v1.5.8 is released.

Act V - The limit

While my upload performance is fixed, I'm still a ways off from my performance target of 1GiB/s. Based on the relatively even spread of functions in eprof, it looks like the Erlang runtime is limiting the performance the most now. 291MB/s is certainly not bad, and is a throughput of over 2Gbps, which is quite fast for production servers. But I'm running my code on localhost, and this kind of connection really should be able to do better than that.

The warp Rust HTTP server has an example of a two-ended WebSocket connection. I hack it up to include my performance testing JS from earlier, and otherwise turn it into a server that simply responds ok to all binary frames:

async fn user_connected(ws: WebSocket) {
    // Split the socket into a sender and receive of messages.
    let (mut user_ws_tx, mut user_ws_rx) = ws.split();

    // Return a `Future` that is basically a state machine managing
    // this specific user's connection.

    // Every time the user sends a message, broadcast it to
    // all other users...
    while let Some(result) = user_ws_rx.next().await {
        let msg = match result {
            Ok(msg) => msg,
            Err(e) => {
                eprintln!("websocket recv error: {}", e);
                break;
            }
        };

        if msg.is_binary() {
            user_ws_tx
                .send(Message::binary(b"ok"))
                .unwrap_or_else(|e| {
                    eprintln!("websocket send error: {}", e);
                })
                .await
        }
    }

    // user_ws_rx stream will keep processing as long as the user stays
    // connected. Once they disconnect, then...
}

And now, for the final results…

Browser Throughput
Chromium 783MB/s
Firefox 428MB/s

It seems like Chromium is far and away the winner in this no-holds-barred sample. Both the browsers are now using 100% of a single CPU core sending data over the socket, so I am fully bottlenecked by the browser. The server still has room to spare, so this should be close to the theoretical limit for how fast you can go with WebSockets right now.