Recently, I was trying to study some traffic patterns to figure out what kept tripping my firewall rules. There wasn't any immediate urgency, so I took the opportunity to work on my LiveView skills by making a web-based histogram visualizer for my Nginx HTTP logs.
The core idea was that I would stream log data into a web page, and generate analytics on the fly for each connected socket. Because the log files are large (up to 1GiB), it's important that the stream is fast and responsive, and doesn't waste time writing into a file when it could be parsing directly into memory.
Act I - LiveView
Already, this seems like it might end up going off the rails in terms of effort, but let's give it a try anyway.
Uploading a file without making a disk copy sounds like a job for a Phoenix.LiveView.UploadWriter
. I'll use a broker process to mediate the connection between a Stream
consuming data in the backend, and the frontend LiveView upload process producing data. On the backend, I'll have a server process receiving the raw upload data sent over mailbox messages, and on the frontend, I'll have a form with a live_file_input
responsible for sending the data.
I have included the code here, but for the sake of brevity and advancing the narrative, it is collapsed by default.
Click to expand
Let's define the broker module. When the writer end calls, it will receive the pid of the reader, and the reader will receive the pid of the writer. Then the broker process will cleanly exit, as it is no longer needed.
defmodule LogParseWeb.Stream.Broker do
@moduledoc false
use GenServer
def register_writer(server) do
GenServer.call(server, {:register, :writer}, :infinity)
end
def register_reader(server) do
GenServer.call(server, {:register, :reader}, :infinity)
end
@impl true
def init(_) do
{:ok, writer: nil, reader: nil}
end
@impl true
def handle_call({:register, type}, from, state) do
state
|> Keyword.put(type, from)
|> maybe_satisfy()
end
defp maybe_satisfy(state) do
with {wpid, _} = writer <- Keyword.get(state, :writer),
{rpid, _} = reader <- Keyword.get(state, :reader) do
GenServer.reply(writer, rpid)
GenServer.reply(reader, wpid)
{:stop, :normal, state}
else
_ ->
{:noreply, state}
end
end
end
I did not add a timeout to this demonstration, but if you use this in production code, you need to add a timeout that causes the process to shut down so that it does not leak.
Now, let's define the common functions that will be used for the processes to communicate. I have a message for sending data, a reply, and a message for closing the connection. After sending data from the writer to the reader, the writer needs to wait, as a form of back pressure so that the incoming data does not arrive faster than the reader can process it.
defmodule LogParseWeb.Stream do
@timeout 60_000
@doc false
def push(reader, data) do
send(reader, {:data, data})
receive do
:data_ack ->
:ok
after
@timeout ->
raise_timeout_error()
end
end
@doc false
def close(reader) do
send(reader, :close)
end
@doc false
def pull(writer) do
receive do
{:data, data} ->
send(writer, :data_ack)
{:data, data}
:close ->
:close
after
@timeout ->
raise_timeout_error()
end
end
defp raise_timeout_error do
raise "Operation timed out after #{@timeout} ms"
end
end
Now the reader module, for the backend, using Stream.resource/3
:
defmodule LogParseWeb.Stream.Reader do
@moduledoc false
def new(broker: broker) do
Stream.resource(
fn -> LogParseWeb.Stream.Broker.register_reader(broker) end,
fn writer ->
case LogParseWeb.Stream.pull(writer) do
{:data, value} ->
{[value], writer}
:close ->
{:halt, writer}
end
end,
fn _writer -> nil end
)
end
end
And the UploadWriter
, which sends the data from the frontend into the Stream
:
defmodule LogParseWeb.Stream.Writer do
@moduledoc false
@behaviour Phoenix.LiveView.UploadWriter
@impl true
def init(broker: broker) do
reader = Broker.register_writer(broker)
{:ok, reader}
end
@impl true
def meta(_state), do: %{}
@impl true
def write_chunk(data, state) do
LogParseWeb.Stream.push(state, data)
{:ok, state}
end
@impl true
def close(state, _reason) do
LogParseWeb.Stream.close(state)
{:ok, state}
end
end
Using this, I can then finally define the connection between the reader and writer in the LiveView's mount callback, passing the reader into the log context:
defmodule LogParseWeb.LogLive.Index do
use LogParseWeb, :live_view
alias LogParseWeb.Stream.Broker
alias LogParseWeb.Stream.Reader
alias LogParseWeb.Stream.Writer
alias LogParse.Logs
@impl true
def mount(_params, _session, socket) do
{:ok, broker} = GenServer.start_link(Broker, [])
writer = {Writer, broker: broker}
reader = Reader.new(broker: broker)
{:ok, database} = Logs.create_log(reader)
{:ok,
socket
|> assign(:database, database)
|> assign(:upload_complete, false)
|> allow_upload(:log_file,
accept: :any,
auto_upload: true,
max_file_size: 999_999_999_999,
writer: fn _name, _entry, _socket -> writer end
)}
end
# ...
end
The form on the frontend looks like this:
<form
class={if(Enum.any?(@uploads.log_file.entries), do: "hidden")}
id="upload-form"
phx-submit="save"
phx-change="validate"
>
<section
phx-drop-target={@uploads.log_file.ref}
class={[
"hover:border-blue-500 hover:border-solid hover:bg-white hover:text-blue-500",
"group rounded-md w-full",
"flex flex-col items-center justify-center",
"border-2 border-dashed border-slate-300",
"text-sm leading-6 text-slate-900 font-medium py-3"
]}
>
<span class="select-none">
<%= gettext("Drag and drop a file here or double click to upload") %>
</span>
<.live_file_input upload={@uploads.log_file} />
</section>
</form>
<div :for={entry <- @uploads.log_file.entries}>
<div class="flex justify-between mb-1">
<span class="text-base font-medium"><%= gettext("Processing upload...") %></span>
<span class="text-sm font-medium"><%= entry.progress %>%</span>
</div>
<div class="w-full bg-slate-200 rounded-full h-2.5">
<div class="bg-blue-600 h-2.5 rounded-full" style={"width: #{entry.progress}%"}></div>
</div>
</div>
And on the context side, I'll need to implement creating a log:
defmodule LogParse.Logs do
@moduledoc """
The Logs context.
"""
alias LogParse.Logs.Server
def create_log(receiver) do
GenServer.start_link(Server, receiver)
end
end
And what the server should do:
defmodule LogParse.Logs.Server do
use GenServer
defstruct is_initialized?: false
@impl true
def init(receiver) do
# Begin receiving the stream immediately
send(self(), {:receive, receiver})
{:ok, %__MODULE__{}}
end
@impl true
def handle_call(_request, _from, state) do
{:reply, :ok, state}
end
@impl true
def handle_info({:receive, receiver}, state) do
# Just for demonstration
:ok = Stream.run(receiver)
{:noreply, state}
end
end
That was a lot of code to implement such a small amount of functionality, and it took me about a day to refine and implement it this way. So, how well does it function when you select a file on the web?
Not great. Because I am not actually doing any log file parsing yet, this serves as a benchmark for how well this approach could theoretically function. I benchmarked it at around 21MB/s in Firefox and around 25MB/s in Chromium, based on the speed of uploading a 1GiB file. So what's going on?
Since Chromium seemed like the more promising of the two browsers, I decided to start there. Some quick benchmarking with the DevTools reveals a huge number of approximately 64 kilobyte chunks are being sent over the WebSocket connection.
Fortunately, this is easy to tune. allow_upload/3
accepts a chunk_size
option, which I can then revise up to a more reasonable 1MiB. And this does indeed increase the upload speed to about 52MB/s. Nice.
{:ok,
socket
|> assign(:database, database)
|> assign(:upload_complete, false)
|> allow_upload(:log_file,
accept: :any,
auto_upload: true,
max_file_size: 999_999_999_999,
chunk_size: 1_048_576,
writer: fn _name, _entry, _socket -> writer end
)}
52MB/s seems kind of low, though. A more reasonable target for sending packets to localhost
would be 1GB/s, since my solid state drive is more than capable of this performance. Let's profile the JS again and see if there's anything preventing us from uploading the file any faster…
readNextChunk(){
let reader = new window.FileReader()
let blob = this.entry.file.slice(this.offset, this.chunkSize + this.offset)
reader.onload = (e) => {
if(e.target.error === null){
this.offset += e.target.result.byteLength
this.pushChunk(e.target.result)
} else {
return logError("Read error: " + e.target.error)
}
}
reader.readAsArrayBuffer(blob)
}
pushChunk(chunk){
if(!this.uploadChannel.isJoined()){ return }
this.uploadChannel.push("chunk", chunk)
.receive("ok", () => {
this.entry.progress((this.offset / this.entry.file.size) * 100)
if(!this.isDone()){
this.chunkTimer = setTimeout(() => this.readNextChunk(), this.liveSocket.getLatencySim() || 0)
}
})
.receive("error", ({reason}) => this.error(reason))
}
*eye twitch*
Okay, so first of all, besides the absolutely disagreeable formatting for this script, which I will try my best not to continue to comment on, every time a frame is uploaded, the code creates a new FileReader
to read the next part of the file into an ArrayBuffer
. But why would anyone need to do that? WebSocket.prototype.send
allows directly sending a Blob
- which file.slice()
would return, without needing to waste time and memory converting this to an ArrayBuffer.
Is there some issue with wherever uploadChannel.push()
is defined that makes it so that it can't accept a Blob
? Let's look at how it handles ArrayBuffer
messages:
encode(msg, callback){
if(msg.payload.constructor === ArrayBuffer){
return callback(this.binaryEncode(msg))
} else {
let payload = [msg.join_ref, msg.ref, msg.topic, msg.event, msg.payload]
return callback(JSON.stringify(payload))
}
},
binaryEncode(message){
let {join_ref, ref, event, topic, payload} = message
let metaLength = this.META_LENGTH + join_ref.length + ref.length + topic.length + event.length
let header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
let view = new DataView(header)
let offset = 0
view.setUint8(offset++, this.KINDS.push) // kind
view.setUint8(offset++, join_ref.length)
view.setUint8(offset++, ref.length)
view.setUint8(offset++, topic.length)
view.setUint8(offset++, event.length)
Array.from(join_ref, char => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(ref, char => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(topic, char => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(event, char => view.setUint8(offset++, char.charCodeAt(0)))
var combined = new Uint8Array(header.byteLength + payload.byteLength)
combined.set(new Uint8Array(header), 0)
combined.set(new Uint8Array(payload), header.byteLength)
return combined.buffer
},
*more furious eye twitching*
Okay, so now there isn't just a single copy of the data being uploaded over the socket, there are TWO copies. The second is created and assigned into the combined
Uint8Array. Would this serialization work if I just changed this to use a Blob? Let's try:
encode(msg, callback){
if(msg.payload.constructor === ArrayBuffer || msg.payload.constructor === Blob){
return callback(this.binaryEncode(msg))
} else {
let payload = [msg.join_ref, msg.ref, msg.topic, msg.event, msg.payload]
return callback(JSON.stringify(payload))
}
},
binaryEncode(message){
let {join_ref, ref, event, topic, payload} = message
let metaLength = this.META_LENGTH + join_ref.length + ref.length + topic.length + event.length
let header = new ArrayBuffer(this.HEADER_LENGTH + metaLength)
let view = new DataView(header)
let offset = 0
view.setUint8(offset++, this.KINDS.push) // kind
view.setUint8(offset++, join_ref.length)
view.setUint8(offset++, ref.length)
view.setUint8(offset++, topic.length)
view.setUint8(offset++, event.length)
Array.from(join_ref, char => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(ref, char => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(topic, char => view.setUint8(offset++, char.charCodeAt(0)))
Array.from(event, char => view.setUint8(offset++, char.charCodeAt(0)))
return new Blob([header, payload])
},
Not the prettiest, but now the code can accept Blob messages, and from some basic testing and a fix for the longpoll fallback, it seems to work. Now, let's fix the readNextChunk
and pushChunk
code to avoid copying the file data:
readNextChunk(){
let blob = this.entry.file.slice(this.offset, this.chunkSize + this.offset)
this.offset += blob.size
return this.pushChunk(blob)
}
pushChunk(chunk){
if(!this.uploadChannel.isJoined()){ return }
return new Promise((resolve, reject) => {
this.uploadChannel.push("chunk", chunk)
.receive("ok", () => {
this.entry.progress((this.offset / this.entry.file.size) * 100)
if(!this.isDone()){
resolve(this.simulateLatency())
}
})
.receive("error", ({reason}) => reject(reason))
})
}
And after some quick testing, I can also confirm and submit this fix as well. But is it faster? Oh yes.
Browser | Before | After |
---|---|---|
Chromium | 51MB/s | 112MB/s |
Since Chromium is so much faster after fixing everything, then Firefox should be faster too, right? Right?
Browser | Before | After |
---|---|---|
Chromium | 51MB/s | 112MB/s |
Firefox | 21MB/s | 7MB/s |
What??
Act II - Bandit
Since I had kind of made all of the performance optimizations at once, and not really stopped to retest Firefox along the way, I was stumped. Firefox's developer tools are a lot laggier and buggier than Chromium's DevTools, but I was able to eventually figure out the profiler, and determined that I was now entirely bottlenecked on the server. It was taking more than 50ms to respond between chunks!
Fortunately, I had only made one relevant server change - the chunk_size
. Was that the culprit? Let's try setting it back to 64K:
Browser | 1MiB chunks | 64K chunks |
---|---|---|
Firefox | 7MB/s | 32MB/s |
At this point I was feeling officially clueless, so I hooked up eprof to the server to try to figure out what was going on. The traces from eprof
showed that all the time was being spent inside the web server library, Bandit.
Okay, that makes sense, it's not like my application is doing anything. But wait, 96% of the call traces are spent, exclusive, on one function, Bandit.WebSocket.Handler.handle_data/3
. Okay, what does that function look like? Here:
@impl ThousandIsland.Handler
def handle_data(data, socket, state) do
(state.buffer <> data)
|> Stream.unfold(
&Frame.deserialize(&1, Keyword.get(state.connection.opts, :max_frame_size, 0))
)
|> Enum.reduce_while({:continue, state}, fn
{:ok, frame}, {:continue, state} ->
case Connection.handle_frame(frame, socket, state.connection) do
{:continue, connection} ->
{:cont, {:continue, %{state | connection: connection, buffer: <<>>}}}
{:close, connection} ->
{:halt, {:close, %{state | connection: connection, buffer: <<>>}}}
{:error, reason, connection} ->
{:halt, {:error, reason, %{state | connection: connection, buffer: <<>>}}}
end
{:more, rest}, {:continue, state} ->
{:halt, {:continue, %{state | buffer: rest}}}
{:error, message}, {:continue, state} ->
{:halt, {:error, {:deserializing, message}, state}}
end)
end
Since this was an exclusive trace, this meant that all the time was spent inside this function, and not any of the functions it was calling like Stream.unfold/2
, Frame.deserialize/2
, or Enum.reduce_while/3
. That really only left one candidate line, (state.buffer <> data)
, which concatenates the input data from the socket with the existing data until a new frame can be produced. But wait, binary concatenation is supposed to be optimized in OTP, so that can't be it…
Just for giggles, let's try Cowboy. Cowboy is a HTTP and WebSocket server historically designed for the Erlang ecosystem, which was adapted later in its life for use by Plug applications such as Phoenix.
Cowboy was the default HTTP server for Phoenix for many years until the release of Bandit, a pure-Elixir HTTP and WebSocket server with fewer dependencies. New Phoenix projects, like the one I created for this log visualizer, are created with Bandit as the server instead for this reason, but I can easily switch it back for testing. And what happens?
Adapter | Browser | 1MiB chunks | 64K chunks |
---|---|---|---|
Bandit | Firefox | 7MB/s | 32MB/s |
Cowboy | Firefox | 104MB/s | 57MB/s |
Well, it seems like the problem really is Bandit. Running eprof
on Cowboy shows no significant hot spots when uploading files over LiveView, so this is probably about as good as it will get, and I'm not very good at writing Erlang anyway.
But I couldn't let that performance hotspot in Bandit go.
Act III - The minimal example
Let's make an example program that proves the problem is in Bandit. So far, I don't know if LiveView is somehow interfering with these performance results, so I need to be sure. Fortunately, this is really easy, because the Bandit documentation comes with an example for how to make a WebSocket server:
Mix.install([:bandit, :websock_adapter])
defmodule ParseDemoServer do
def init(options) do
{:ok, options}
end
def handle_in({_data, [opcode: :binary]}, state) do
{:reply, :ok, {:text, "ok"}, state}
end
def terminate(_reason, state) do
{:ok, state}
end
end
defmodule Router do
use Plug.Router
plug Plug.Logger
plug :match
plug :dispatch
get "/" do
conn
|> put_resp_content_type("text/html")
|> send_resp(200, """
""")
end
get "/websocket" do
conn
|> WebSockAdapter.upgrade(ParseDemoServer, [], timeout: 60_000)
|> halt()
end
match _ do
send_resp(conn, 404, "not found")
end
end
require Logger
webserver = {Bandit, plug: Router, scheme: :http, port: 4000}
{:ok, _} = Supervisor.start_link([webserver], strategy: :one_for_one)
Logger.info("Plug now running on localhost:4000")
Process.sleep(:infinity)
And now I just need to write a script that will connect to the WebSocket and try to send data as fast as possible. I'll use SubtleCrypto
to encrypt a block of zeroes, to avoid the possibility of compression messing with the data being sent.
const size = 1 << 20; // 1 MiB
async function nonCryptographicRandomBlock() {
const key = await crypto.subtle.generateKey(
{
name: 'AES-CTR',
length: 128
},
false,
['encrypt']
);
const buffer = new Uint8Array(size);
const counter = new Uint8Array(16);
return crypto.subtle.encrypt(
{
name: 'AES-CTR',
counter: counter,
length: 64
},
key,
buffer
);
}
const socket = new WebSocket('ws://localhost:4000/websocket');
const performanceIndicator = document.getElementById('performance');
let messageStart = performance.now();
async function sendMessage() {
const difference = performance.now() - messageStart;
const throughput = (size / (difference * 1000));
performanceIndicator.innerText = `Last request: ${Math.round(difference)} ms, ${Math.round(throughput)} MB/s`;
socket.send(await nonCryptographicRandomBlock());
messageStart = performance.now();
}
socket.addEventListener('open', sendMessage);
socket.addEventListener('message', sendMessage);
By running this with elixir demo_server.exs
, and opening the webpage in Firefox, I once again see 7MB/s. And when changing the server to plug_cowboy
, it's back up over 100MB/s. So I've proved it's in Bandit and can file a report.
Act IV - Binary concatenation
Since there's only one reasonable place this performance bottleneck could be coming from, it makes sense to drill into a little more.
Experienced Elixir and Erlang programmers generally would avoid using binary concatenation in performance-sensitive code, because it is not always possible to ensure that the concatenation can be optimized to an append. Instead, the typical pattern is to use iodata
- a recursive list data structure that is comprised of other lists or binaries. iodata
can be efficiently streamed out into a single binary on demand at a later point.
For fun, let's calculate the worst case behavior with binary concatention.
- Bandit will receive 1460 bytes in
handle_data
on each transmission, the maximum allowed for TCP - Given a frame payload size of 1,048,576 bytes (1 MiB), this will result in 1048576/1460 = 719 calls to
handle_data
- Each call will allocate a binary that is 1460 bytes longer than the previous
- Applying the series formula for natural numbers, that should work out to (1460)(719)(719 + 1) / 2, or 377,906,400 bytes of binaries allocated and garbage collected for receiving a 1MiB frame
No wonder Bandit was taking more than 50ms to respond to each chunk.
What happens if I super crudely refactor this code to use iodata
? Well…
Browser | Before | After |
---|---|---|
Chromium | 202MB/s | 291MB/s |
Firefox | 7MB/s | 210MB/s |
I think it's safe to say that there isn't anything wrong with 1MiB chunk sizes anymore.
After more refactoring into a presentable state, this ended up making it in and all users of Bandit will benefit from this change when v1.5.8 is released.
Act V - The limit
While my upload performance is fixed, I'm still a ways off from my performance target of 1GiB/s. Based on the relatively even spread of functions in eprof
, it looks like the Erlang runtime is limiting the performance the most now. 291MB/s is certainly not bad, and is a throughput of over 2Gbps, which is quite fast for production servers. But I'm running my code on localhost, and this kind of connection really should be able to do better than that.
The warp
Rust HTTP server has an example of a two-ended WebSocket connection. I hack it up to include my performance testing JS from earlier, and otherwise turn it into a server that simply responds ok
to all binary frames:
async fn user_connected(ws: WebSocket) {
// Split the socket into a sender and receive of messages.
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
// Return a `Future` that is basically a state machine managing
// this specific user's connection.
// Every time the user sends a message, broadcast it to
// all other users...
while let Some(result) = user_ws_rx.next().await {
let msg = match result {
Ok(msg) => msg,
Err(e) => {
eprintln!("websocket recv error: {}", e);
break;
}
};
if msg.is_binary() {
user_ws_tx
.send(Message::binary(b"ok"))
.unwrap_or_else(|e| {
eprintln!("websocket send error: {}", e);
})
.await
}
}
// user_ws_rx stream will keep processing as long as the user stays
// connected. Once they disconnect, then...
}
And now, for the final results…
Browser | Throughput |
---|---|
Chromium | 783MB/s |
Firefox | 428MB/s |
It seems like Chromium is far and away the winner in this no-holds-barred sample. Both the browsers are now using 100% of a single CPU core sending data over the socket, so I am fully bottlenecked by the browser. The server still has room to spare, so this should be close to the theoretical limit for how fast you can go with WebSockets right now.