Some years ago, I was working on a Rails application that made multiple HTTP requests to a third party in one of its use cases. The initial implementation can be drafted with the following pseudo-code:

def calculate_scores(resources)
  ActiveRecord::Base.transaction do
    resources.each do |resource|
      request_body = prepare_request_body(resource)
      response = HTTParty.post(fake_url, body: request_body)
      resource.update(score: response.parsed_response["score"])
    end
  end
end

This code just worked, and it was okay for a while. However, the day came when we had to rethink this code with performance in mind. The number of objects in the resources array grew bigger and bigger, and the method started taking too long.

I have to clarify that the method prepare_request_body basically reads multiple objects from the database, and generates a structured JSON with information from them. The resource#update method, on the contrary, updates a certain record in the database.

One of the ideas that came to mind immediately was parallelizing this method. Most of the time was spent on HTTP requests (both prepare_request_body and resource#update took no time), so I thought using multiple threads would be a good idea (as GIL would not be a problem). I quickly update the method to look like this:

def calculate_scores(resources)
  threads = []
  ActiveRecord::Base.transaction do
    resources.each do |resource|
      threads << Thread.new do
        request_body = prepare_request_body(resource)
        response = HTTParty.post(fake_url, body: request_body)
        resource.update(score: response.parsed_response["score"])
      end
    end
  end

  threads.each(&:join)
end

However, I started getting an ActiveRecord::ConnectionTimeoutError. After a quick research, I realized that each one of the threads was taking one connection from the connection pool, which quickly ran out of connections. This fact had two fatal consequences that I had overlooked:

  1. This process was using all the connections in the pool, so it was possible that other threads in the same process (sharing the connection pool) would not have any connections available.
  2. Since every thread was using an independent connection, these queries were running outside of the transaction that surrounds the method execution.

After googling for a while, I could not find a way to share a single database connection between threads, so I had to change my approach.

Splitting sequential and parallel code

After analyzing the code a little bit more, I realized that the part that I really wanted to parallelize was the HTTP request itself. I was not very concerned about building the request bodies or updating the resources sequentially, as those operations were not that costly.

With this idea in mind, the next iteration looked like this:

def calculate_scores(resources)
  threads = []
  responses = Concurrent::Hash.new

  ActiveRecord::Base.transaction do
    bodies = resources.map { |r| [r.id, prepare_request_body(r)] }.to_h

    bodies.each do |id, body|
      threads << Thread.new do
        responses[id] = HTTParty.post(fake_url, body: body)
      end
    end

    threads.each(&:join)
  end

  responses.each do |id, response|
    # Assuming Resource < ActiveRecord::Base
    Resource.update(id, score: response.parsed_response["score"])
  end
end

Three things are happening in the code snippet above:

  1. All request bodies are generated, sequentially, in the main thread. For that reason, the same database connection is used all the time.
  2. The code executed within the Thread.new block does not touch the database, neither for reading nor for writing.
  3. Once all responses have been obtained (all threads have been joined), the main thread sequentially updates the resource objects accordingly.

Note: The responses hash uses Concurrent::Hash from concurrent-ruby as it is going to be updated concurrently by multiple threads.

A real-world scenario

The scenario depicted above is probably the easiest that I could come up with to illustrate the point of splitting parallel and sequential code. However, my real-world scenario was significantly more complex.

Let’s say that I had a service, ResourcePublisher that makes multiple parallel HTTP calls to update third parties with the status of the just-updated resource object:

class ResourcePublisher
  private attr_reader :resource
  def initialize(resource)
    @resource = resource
  end

  def call
    preload
    threads = []
    threads << Thread.new { notify_partner_a }
    threads << Thread.new { notify_partner_b }
    threads.each(&:join)
  end

  private

  def preload
    # Bring the required resource associations to memory so 
    # the threads don't need to access the database
  end

  def notify_partner_a
    HTTParty.post("partner-a.com/updates", partner_a_body)
  end

  def notify_partner_b
    HTTParty.post("partner-a.com/updates", partner_b_body)
  end

  def partner_a_body
    {foo: resource.full_name}.to_json
  end

  def partner_b_body
    {foo: resource.full_name}.to_xml
  end
end

This class was working fine until I had to call it multiple times to parallelize the processing of multiple resource objects. In this case, it was not OK just to do:

def parallel_update(resources)
  threads = []
  resources.each do |resource|
    threads << Thread.new do
      ResourcePublisher.new(resource).call
    end
  end

  threads.each(&:join)
end

because the ResourcePublisher class itself was reading from the database, which is something that we disallowed in our original analysis. I considered extracting the body generation methods to a separate class, but I felt that the fact that I am using multiple threads instead of one thread did not change the purpose of the class significantly, it was just an implementation detail.

I finally made the preload method public to be called from the main thread, and used memoization in the methods that access the database:

class ResourcePublisher
  private attr_reader :resource
  def initialize(resource)
    @resource = resource
  end

  def preload
    # Bring the required resource associations to memory so 
    # the threads don't need to access the database
    self
  end

  def call
    threads = []
    threads << Thread.new { notify_partner_a }
    threads << Thread.new { notify_partner_b }
    threads.each(&:join)
  end

  private

  def notify_partner_a
    HTTParty.post("partner-a.com/updates", partner_a_body)
  end

  def notify_partner_b
    HTTParty.post("partner-a.com/updates", partner_b_body)
  end

  def partner_a_body
    @partner_a_body ||= {foo: resource.full_name}.to_json
  end

  def partner_b_body
    @partner_b_body ||= {foo: resource.full_name}.to_xml
  end
end

Then, I was able to call the services from multiple threads, which did not touch the database:

def parallel_update(resources)
  threads = []
  publishers = resources.map { ResourcePublisher.new(resource).preload }

  publishers.each do |publisher|
    threads << Thread.new do
      publisher.call
    end
  end

  threads.each(&:join)
end

Summary

This approach (which of course is very simplified to illustrate the point) allowed me to improve the performance of the method by parallelizing the most time-consuming parts while keeping things simple in the connection with the database (a single connection, a single transaction).