One Billion Record Challenge in Elixir - faster

Update 31. Jan 2024: Clarify that binaries sent between processes are not copied!

Last week, I stumbled across the blog post Processing very large text files in Elixir. The author goes through the challenge of processing a large file of one billion lines in a semicolon separated format like the following, where each city can occur multiple times:

Hamburg;12.0
Cracow;12.6
Bridgetown;26.9
Istanbul;6.2
Istanbul;23.0
Hamburg;16.0

The task is to output the min, max and mean temperature for each city, and to do it fast, and of course in Elixir!

In the underlying article, I want to quickly present my approach and how it performs. Spoiler: It processes 1bn lines with 10k different cities in 40 seconds using 16 vcpus.

My Approach

You can find my approach in this git repository. It uses pure Elixir and no external dependencies.

When it comes to performance optimization, you should generally look at the following three points:

  • Constant factors
  • Divide and conquer
  • Memory allocations

Constant factors

An often neglected topic, especially in academia! But this is where you can easily get 2x improvements (or more), without architectural changes.

So for instance, when parsing the station (or city) name, I first scan over the bytes until I find the delimiter. Then I use a binary pattern (or binary_part) to extract the binary:

defp parse_station(data) do
  len = parse_station_length(data, 0)
  <<station::binary-size(len), ";", rest::binary>> = data
  {station, rest}
end

defp parse_station_length(<<";", _rest::binary>>, len), do: len
defp parse_station_length(<<_ch, rest::binary>>, len), do: parse_station_length(rest, len + 1)

A naive approach would just use:

[station, rest] = :binary.split(data, ";")

This trivial change alone would increase total runtime by 25% from 40 to 50 seconds.

Next, for parsing the temperature, I use fixed point arithmetics. So “12.3” becomes 123:

defp parse_temp(data, temp \\ 0, sign \\ 1)
defp parse_temp(<<"-", rest::binary>>, temp, sign), do: parse_temp(rest, temp, -1 * sign)
defp parse_temp(<<".", rest::binary>>, temp, sign), do: parse_temp(rest, temp, sign)
defp parse_temp(<<"\n", rest::binary>>, temp, sign), do: {sign * temp, rest}

defp parse_temp(<<ch, rest::binary>>, temp, sign) when ch in ?0..?9 do
  parse_temp(rest, temp * 10 + (ch - ?0), sign)
end

This alone improves performance by a factor of 2x (40 vs 80 seconds) over using Float.parse naively as in:

defp parse_temp(data) do
  {temp, <<"\n", rest::binary>>} = Float.parse(data)
  {trunc(temp * 10), rest}
end

Now, with all this in place, parsing all lines of a block really becomes trivial:

defp parse_lines(<<>>, _cb), do: nil

defp parse_lines(data, cb) do
  {station, rest} = parse_station(data)
  {temp, rest} = parse_temp(rest)
  cb.(station, temp)
  parse_lines(rest, cb)
end

Here we see that I pass in a callback cb. This gets called for every parsed record. Within this callback, the ETS table is updated.

Divide and Conquer

For the 1BRC challenge, it’s obvious that we have to break up the huge file into blocks of lines. Breaking up a file containing lines is a bit tricky as lines are variably sized. That’s why in my approach, I first analyse the file and determine break points (offsets into the file) at which we can split the file and still have valid blocks of lines.

Lets say we want to break the file up into blocks of roughly 8 MiB in size. And lets assume a maximum line length of 1024 bytes. With that in mind, we first read in 1024 bytes near offset 8*1024*1024 and scan this block for a newline. Number of bytes until the newline plus 8*1024*1024 will give our first break point. Then we continue to determine the second break point near offset 16*1024*1024 in the same way. Rinse and repeat until we have found all remaining break points.

In my code, function break_file_into_blocks_of_lines! [link] of module OBRC.FileUtils is responsible for breaking up the file into blocks of lines. It returns a list of functions, which when called, will read in that part of the file. So for instance, to read the third block of lines, you could write:

"measurements.txt"
|> OBRC.FileUtils.break_file_into_blocks_of_lines!()
|> Enum.fetch!(2)
|> apply([])

Next important design decision is to use a worker pool instead of using Task.async_stream. The latter will spawn “worker” processes for you and is very convenient to use to distribute work to multiple cores. In in our case, we want to use ETS (Erlang Term Storage) per worker and for that, we need full control of each worker. For that I quickly hacked OBRC.WorkerPool with function process_in_parallel/2 [link]. It will spawn workers and pass a request_work function to each worker function. So, each worker uses it’s own process and can request work by calling request_work. Important to note is that we do not pass the line data between the Pool management process and each individul worker. Instead we pass a function that, when called, will read the data in. This way, we avoid copying data between processes.

Each worker first sets up a local ETS table (using :set), then requests a lazy block of lines, reads that in as a binary, parses it, and for each {station, temperature} record, updates the ETS table while maintaining statistics like min temperature, max temperature, sum of temperatures and number of entries per station. Note that using a mutable ETS table is much more efficient than an immutable Map.

Then each worker requests the next block until no more blocks are left. Once no more work is available, each worker converts it’s private ETS table into a Map, deletes it’s private ETS table and returns the Map. Note that as this code runs exactly once per worker, I do not really care about performance much.

Back in the main function, we collect all async results and sequentially merge the Maps appropriatly and generate the output. Done!

Memory Allocation

The key here is to avoid sending too much data between processes. For example, a naive approach would use code like this:

filename
|> File.stream!()
|> Stream.chunk_every(10_000)
|> Task.async_stream(&process(&1))
|> ...

The big issue here is that the main process reads in all data line by line and then transfer chunks of lines to our worker processes. Note that the whole chunk of data gets copied when sent to another process! This creates lots of garbage in the main process and the same amount of garbage in the worker processes.

Update 31. Jan 2024: To clarify: Binaries (strings) sent between processes are not copied by the runtime system, but any other data type would be copied to the heap of the receiving process and put into it’s mailbox. It’s yet unclear to me how lists of binaries are handled when being sent across process boundaries, but I am assuming that sending a list of binaries in Elixir is quite performant. So, the above pattern is likely quite performant. One thing to research is the effect on memory consumption when cuttig a huge binary into many smaller binaries. It seems that Elixir and Erlang will efficiently avoid copying the underlying data and just create references into the huge binary with offset and length. But that of course keeps the huge binary in memory. Quoting the Erlang documentation of referenced_byte_size:

If a binary references a larger binary (often described as being a subbinary), it can be useful to get the size of the referenced binary. This function can be used in a program to trigger the use of copy/1. By copying a binary, one can dereference the original, possibly large, binary that a smaller binary is a reference to.

In my approach all data is read within the worker processes - it never crosses process boundaries - shared nothing.

Results

On a cloud server with 16 virtual CPUs, my approach can process one billion lines (with 10k different city names) in around 40 seconds. That’s about 1.5 million lines per vcpu per second. Not too bad for an interpreted language like Elixir. As a reference, by using the Explorer library, which is backed by some Rust code, I was able to process the same dataset in about 20 seconds. I’d love to run my code on a 128-core machine, just to see how it scales.

Final thoughts

Note that some high-performance Java solutions really use terribly unsafe code and are barely readable at the same time. Some other Erlang solutions use hashing for the city names and fail to work with arbitrary city names (fail for 10k different city names). I think the goal is not to optimize too much for one concrete benchmark, but to provide a generic solution that works with all inputs.

My code is arguably readable, for someone with Elixir bachground obviously, and the largest parts are reusable (splitting a file into blocks of lines, worker pool). And still, without using any dependencies, it only weights in at 288 lines of code. Recently, I added some micro-optimizations concerning the values stored in the ETS table (encoding min/max in one value, and sum/cnt) in order to gain 10% in performance. This added another 20-30 lines and some complexity.