Interesting Bug when Using Anonymous Functions with Task.Supervisor

Bug resting on a finger
Elle Imhoff

Engineer

Elle Imhoff

This is the tale of cool bugs.

Working on BlockScout for POA Network, we’ve moved beyond indexing just POA Network’s core and test chains and started to test that BlockScout can index Ethereum Mainnet and Ethereum Classic (ETC). The goal of BlockScout is to have an open-source alternative to the closed-source Etherscan that the majority of the Ethereum ecosystem is currently dependent on.

While testing against Ethereum Classic, we ran into a bug where the part of the Indexer OTP app, Indexer.InternalTransactionFetcher kept timing out on a GenServer.call.

04:40:55.756 [error] Task #PID<0.18499.21> started from #PID<0.18448.21> terminating
** (stop) exited in: GenServer.call(Indexer.InternalTransactionFetcher, {:buffer, [...]}, 10000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:924: GenServer.call/3
    (indexer) lib/indexer/block_fetcher/catchup.ex:140: Indexer.BlockFetcher.Catchup.async_import_remaining_block_data/2
    (indexer) lib/indexer/block_fetcher/catchup.ex:107: Indexer.BlockFetcher.Catchup.import/2
    (indexer) lib/indexer/block_fetcher.ex:114: Indexer.BlockFetcher.fetch_and_import_range/2
    (indexer) lib/indexer/block_fetcher/catchup.ex:167: Indexer.BlockFetcher.Catchup.fetch_and_import_range_from_sequence/3
    (elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &:erlang.apply/2
    Args: [#Function<8.121428738/1 in Indexer.BlockFetcher.Catchup.stream_fetch_and_import/2>, [5569266..5569257]]

Because of the awesomeness that is OTP supervision trees, these timeouts weren’t killing the app. The timeouts were however making the indexing very slow. This part of the tree had to restart, try again, make some progress, and then hit timeouts again.

Tracing the code, the following line in the stacktrace:

(indexer) lib/indexer/block_fetcher/catchup.ex:140: Indexer.BlockFetcher.Catchup.async_import_remaining_block_data/2

corresponded to:

blockscout/apps/indexer/lib/indexer/block_fetcher/catchup.ex Line 140 in d106a2c

|> InternalTransactionFetcher.async_fetch(10_000)

This means that the async_fetch is taking longer than 10 seconds (10_000 ms = 10 s).

Internally, the timeout is used for this GenServer.call:

blockscout/apps/indexer/lib/indexer/internal_transaction_fetcher.ex Lines 42 to 47 in d106a2c

@spec async_fetch([%{required(:block_number) => Block.block_number(), required(:hash) => Hash.Full.t()}]) :: :ok 
def async_fetch(transactions_fields, timeout \\ 5000) when is_list(transactions_fields) do 
  params_list = Enum.map(transactions_fields, &transaction_fields_to_params/1) 
 
  BufferedTask.buffer(__MODULE__, params_list, timeout) 
end 

blockscout/apps/indexer/lib/indexer/buffered_task.ex Lines 157 to 159 in d106a2c

def buffer(server, entries, timeout \\ 5000) when is_list(entries) do
  GenServer.call(server, {:buffer, entries}, timeout)
end

When the call is received in the server it buffers the entries by appending them to the state’s current_buffer:

blockscout/apps/indexer/lib/indexer/buffered_task.ex Lines 266 to 268 in d106a2c

def handle_call({:buffer, entries}, _from, state) do 
  {:reply, :ok, buffer_entries(state, entries)} 
end 

blockscout/apps/indexer/lib/indexer/buffered_task.ex Lines 288 to 292 in d106a2c

defp buffer_entries(state, []), do: state 
 
defp buffer_entries(state, entries) do 
  %{state | current_buffer: [entries | state.current_buffer]} 
end 

This is a simple append to a list, which is fast, so the only thing that could take a lot of time is if there is too much data being transferred between processes.

However, it could also be that the handle_call/3 is not even called yet and instead, the InternalTransactionFetcher is too busy doing something else. More investigation is needed.

Tracing

At this point, I couldn’t be sure if the GenServer.call never returned or if it just took longer than 10 seconds to return. To determine which was the case, I turned on tracing. The InternalTransactionFetcher is a callback module for BufferedTask, which is built on top of GenServer. GenServer supports :debug options to turn on :sys.trace when the GenServer is started. We can actually allow these to be passed when the BufferedTask is started as a worker, but we needed to allow all GenServer options to be passed through.

Turning on tracing for the InternalTransactionFetcher shows that the InternalTransactionFetcher eventually responds to the request, so it is simply the timeout being too short.

At this point there were 3 paths forwards:

  1. Simply increase the timeout. Since the timeout is 10 seconds now, anything that hit the timeout before will of course take longer than 10 seconds.
  2. Change from GenServer.call to GenServer.cast.
  3. Investigate why InternalTransactionFetcher is taking 10 seconds to service the request.

Why 10 seconds wasn’t good enough wasn’t obvious to me since it should involve a simple copy of a list over a GenServer.call message, so I didn’t feel like (1) would cause the actual root problem. I was too worried about (2) leading to mailbox flood as it would remove backpressure. That left (3): figure out why InternalTransactionFetcher needs more than 10 seconds. It was the harder path, but better long-term.

Profiling

I couldn’t remember how to configure tracing of calls to get call timing stats, so instead, I decided to use :eprof. Completely unexpectedly, erlang:send/3 took the most time!

Function Calls % Time µs / Call
Indexer.BufferedTask.buffer_entries/2 3145 0.16 98186 31.22
Elixir.Enum."-chunk_while/4-fun-0-"/3 213086 0.16 99449 0.47
:lists.do_flatten/2 219409 0.19 119474 0.54
Stream.Reducers."-chunk_every/5-fun-0-"/5 213086 0.23 146141 0.69
:lists.reverse/2 21407 0.27 169811 7.93
:gen_server.try_dispatch/3 4568 0.61 377158 82.57
:queue.len/1 2250 3.40 2117414 941.07
:erlang.send/3 2250 94.28 58679566 26079.81

Since sends were dominating by so much time, I wanted to check if we were backing up processing messages. Using :observer, I was able to get the stack trace from the named Indexer.InternalTransactionFetcher process. The Stack Trace tab often showed Indexer.InternalTransactionFetcher busy spawning the next batch.

Process info for Indexer.InternalTransactionFetcher

Determining the cause of the slow send was going to be difficult though, because Indexer.TaskSupervisor was a Task.Supervisor used for all Tasks in Indexer, not just for Indexer.InternalTransactionFetcher’s Tasks. I decided to restructure the supervision tree, so that each “fetcher” that spawned Tasks would have its own Task.Supervisor and I would be able to attribute memory usage to specific fetchers and ensure that other fetchers weren’t slowing down Indexer.InternalTransactionFetcher talking to Indexer.TaskSupervisor.

Before:

Indexer supervision tree before

After:

Indexer supervision tree after

With Indexer.InternalTransaction.Fetcher using its own Task.Supervisor, Indexer.TransactionFetcher.TaskSupervisor, I was able to see that Indexer.InternalTransaction.Fetcher alone is the problem as the timeout still occurred.

When there are performance issues, the easiest thing to do with :observer:

  1. Go to the “Processes” tab.
  2. Sort columns to find outliers.
    • Click the “Memory” column to see if any process has an excessive memory usage.
    • Click the “MsgQ” column to see if any process has a message queue size. It is rare that messages stay in queues long enough to be visible in Observer’s GUI.

:observer Processes table

In our case, Indexer.InternalTransaction.TaskSupervisor was using 31 MB (!) of memory after only a few minutes and unlike everything else in the system, Indexer.InternalTransaction.Fetcher had a message queue. When I let it run for longer, the memory just kept growing as did the message queue. Whatever was causing the memory leak in Indexer.InternalTransaction.TaskSupervisor was slowing down Indexer.InternalTransactionFetcher to make it have a message queue. The memory leak is the root cause. If I fixed that I would probably fix the message queue and therefore, the original timeout that was reported in the bug.

I had no idea why Indexer.InternalTransaction.TaskSupervisor should really have any noticeable amount of memory, since each Task it is supervising should be a batch of only 10 JSONRPC requests. I added some print statements in Indexer.InternalTransactionFetcher and Indexer.BufferedTask and they both showed just 10 JSONRPC requests of reasonable size.

Grasping at any efficiency problems I could, I was thinking it might be a problem with REFC binaries, because we’re using a lot of JSON parsing. However, it was unlikely that the library would have a leak problem like this, as it would have affected other people as well. That left a problem with how data was transferred to the Task.Supervisor, so I checked the Erlang Efficiency Guide’s Processes chapter for something we were doing wrong.

I was drawn to the Loss of Sharing section

Shared subterms are not preserved in the following cases:

  • When a term is sent to another process
  • When a term is passed as the initial process arguments in the spawn call
  • When a term is stored in an Ets table

So my hypothesis was that the batches had shared subterms that were getting duplicated. We also were using Task.Supervisor.async_nolink/3 with an anonymous function.

[blockscout/apps/indexer/lib/indexer/buffered_task.ex Lines 379 to 381 in 6aaa300]

Task.Supervisor.async_nolink(state.task_supervisor, fn -> 
  {:performed, state.callback_module.run(batch, retries, state.callback_module_state)} 
end) 

I was worried this anonymous’s closure was somehow leading to the memory bloat – that it was grabbing too much from the Indexer.InternalTransactionFetcher as the environment for the closure – so, I decided to switch to Task.Supervisor.async_nolink/5.

[blockscout/apps/indexer/lib/indexer/buffered_task.ex Lines 379 to 383 in 5b7872f]

Task.Supervisor.async_nolink(state.task_supervisor, state.callback_module, :run, [ 
  batch, 
  retries, 
  state.callback_module_state 
]) 

It worked!

After the change, Indexer.InternalTransaction.TaskSupervisor disappears from the top memory usage and Indexer.InternalTransaction.Fetcher no longer has a MsgQ.

I fixed the bug, but I wasn’t satisfied: I wanted to know why using a closure led to such problems. Thankfully, on the BlockScout project, POA Network also hired Plataformatec to work on another section of the application, so I could ask José Valim to explain why this bug happened.

@KronicDeth

@josevalim why does the memory grow when using Task.Supervisor.async_nolink(supervisor, fun), but not Task.Supervisor.async_nolink(supervisor, module, function, args)? Is this related to sending closures between processes? If so, why does the closure size grow when sampled before the async_nolink call.

@josevalim

@KronicDeth Note that you are passing the whole state to the closure and you are not doing that if you are using a MFA. Can you measure the size of state+batch+retries alongside each closure?

@KronicDeth

O, 🤦‍♂️, of course, the closure captures state because the old code wasn't pulling the state fields into separate locals and the state has the big honky :queue in it and even though we don't use the state.buffer :queue, it still is there in the closure being flattened.

So, the root cause of the bug and the memory leak was because Indexer.BufferedTask extracted the callback_module and callback_module_state fields from state inside of the anonymous function. The entire state of the Indexer.BufferedTask was captured by the closure. That state contained the entire :queue of future work, so every time Indexer.BlockScout.spawn_next_batch was sending a batch of 10 requests to the Task.Supervisor it was also copying the entire queue of all the future work too.

To me that still left the question of why the memory was attributed to the Task.Supervisor instead of the individual Tasks. Thankfully, we have some OTP experts here at DockYard too and Paul Schoenfelder was able to explain why the Task.Supervisor would use memory instead of the Task only.

KronicDeth

Shouldn't Task.Supervisor drop the closure once the tasks starts though?

Paul Schoenfelder

To answer this question, the closure is kept as part of the child spec until the process terminates, unless it is to be restarted. So at a minimum the closure lasts as long as the Task itself does

For permanent restart procs, the child spec is never released, unless you manually terminate/delete the child

This is one of the reasons why you have to be really careful what initial arguments you pass to children managed by a supervisor, since they could be held indefinitely, and in some cases cause large binaries and such to be uncollectable for as long as the supervisor remains alive

elbow-jason

This is really interesting

Takeaway

When using anonymous functions with Task.Supervisor.async_nolink or any other function that runs the anonymous function in another process, make sure that you don’t capture more than is absolutely needed to do the work in the process. Most importantly, don’t do state.field in the anonymous function as it will force the whole state to be in the environment for closure. When the anonymous function passes through a Supervisor, as is the case with the DynamicSupervisor that powers Task.Supervisor, memory can be held in the copy of the child spec for the anonymous function’s Task.

You can detect this type of problem by using :observer:

  • If your Task.Supervisors have memory measured in tens of MBs instead of KBs in the Memory column of the Processes table.
  • If your Tasks.Supervisor.async_nolink callers have a non-zero MsgQ.

But, at the very beginning this can all just appear as a simple proximate cause:

  • If your Tasks.Supervisor.async_nolink callers are timing out.

Epilogue

Once the changes were in, I could see that memory usage for the Task.Supervisor and message queue length were all fixed, which fixed the timeouts in the original bug, but what I wasn’t expecting was the speed improvement. Andrew Cravenho from POA Network reported that he was able to index all blocks on POA’s Sokol test chain in 1 hour. Our previous best time was 5.5 hours a few months ago when the chain was shorter, so not only did we eliminate timeouts, queues, and memory leaks, we made the overall performance 5x faster.

This bug was in the project for almost its entire history, but it never became a problem until we tested against a specific dataset (Ethereum Classic) that made the problem bad enough to trigger timeouts. Even if you don’t have timeouts, it is still worth checking in :observer to see if you have any processes that are an outlier for memory or MsgQ they may indicate a hidden bug in your code that OTP is compensating for.

DockYard is a digital product agency offering exceptional user experience, design, full stack engineering, web app development, software, Ember, Elixir, and Phoenix services, consulting, and training.

Newsletter

Stay in the Know

Get the latest news and insights on Elixir, Phoenix, machine learning, product strategy, and more—delivered straight to your inbox.

Narwin holding a press release sheet while opening the DockYard brand kit box