Some years ago, I was working on a Rails application that made multiple HTTP requests to a third party in one of its use cases. The initial implementation can be drafted with the following pseudo-code:
def calculate_scores(resources)
ActiveRecord::Base.transaction do
resources.each do |resource|
request_body = prepare_request_body(resource)
response = HTTParty.post(fake_url, body: request_body)
resource.update(score: response.parsed_response["score"])
end
end
end
This code just worked, and it was okay for a while. However, the day came when we had to rethink this code with performance in mind. The number of objects in the resources
array grew bigger and bigger, and the method started taking too long.
I have to clarify that the method prepare_request_body
basically reads multiple objects from the database, and generates a structured JSON with information from them. The resource#update
method, on the contrary, updates a certain record in the database.
One of the ideas that came to mind immediately was parallelizing this method. Most of the time was spent on HTTP requests (both prepare_request_body
and resource#update
took no time), so I thought using multiple threads would be a good idea (as GIL would not be a problem). I quickly update the method to look like this:
def calculate_scores(resources)
threads = []
ActiveRecord::Base.transaction do
resources.each do |resource|
threads << Thread.new do
request_body = prepare_request_body(resource)
response = HTTParty.post(fake_url, body: request_body)
resource.update(score: response.parsed_response["score"])
end
end
end
threads.each(&:join)
end
However, I started getting an ActiveRecord::ConnectionTimeoutError
. After a quick research, I realized that each one of the threads was taking one connection from the connection pool, which quickly ran out of connections. This fact had two fatal consequences that I had overlooked:
- This process was using all the connections in the pool, so it was possible that other threads in the same process (sharing the connection pool) would not have any connections available.
- Since every thread was using an independent connection, these queries were running outside of the transaction that surrounds the method execution.
After googling for a while, I could not find a way to share a single database connection between threads, so I had to change my approach.
Splitting sequential and parallel code
After analyzing the code a little bit more, I realized that the part that I really wanted to parallelize was the HTTP request itself. I was not very concerned about building the request bodies or updating the resources sequentially, as those operations were not that costly.
With this idea in mind, the next iteration looked like this:
def calculate_scores(resources)
threads = []
responses = Concurrent::Hash.new
ActiveRecord::Base.transaction do
bodies = resources.map { |r| [r.id, prepare_request_body(r)] }.to_h
bodies.each do |id, body|
threads << Thread.new do
responses[id] = HTTParty.post(fake_url, body: body)
end
end
threads.each(&:join)
end
responses.each do |id, response|
# Assuming Resource < ActiveRecord::Base
Resource.update(id, score: response.parsed_response["score"])
end
end
Three things are happening in the code snippet above:
- All request bodies are generated, sequentially, in the main thread. For that reason, the same database connection is used all the time.
- The code executed within the Thread.new block does not touch the database, neither for reading nor for writing.
- Once all responses have been obtained (all threads have been joined), the main thread sequentially updates the resource objects accordingly.
Note: The responses
hash uses Concurrent::Hash
from concurrent-ruby as it is going to be updated concurrently by multiple threads.
A real-world scenario
The scenario depicted above is probably the easiest that I could come up with to illustrate the point of splitting parallel and sequential code. However, my real-world scenario was significantly more complex.
Let’s say that I had a service, ResourcePublisher
that makes multiple parallel HTTP calls to update third parties with the status of the just-updated resource
object:
class ResourcePublisher
private attr_reader :resource
def initialize(resource)
@resource = resource
end
def call
preload
threads = []
threads << Thread.new { notify_partner_a }
threads << Thread.new { notify_partner_b }
threads.each(&:join)
end
private
def preload
# Bring the required resource associations to memory so
# the threads don't need to access the database
end
def notify_partner_a
HTTParty.post("partner-a.com/updates", partner_a_body)
end
def notify_partner_b
HTTParty.post("partner-a.com/updates", partner_b_body)
end
def partner_a_body
{foo: resource.full_name}.to_json
end
def partner_b_body
{foo: resource.full_name}.to_xml
end
end
This class was working fine until I had to call it multiple times to parallelize the processing of multiple resource
objects. In this case, it was not OK just to do:
def parallel_update(resources)
threads = []
resources.each do |resource|
threads << Thread.new do
ResourcePublisher.new(resource).call
end
end
threads.each(&:join)
end
because the ResourcePublisher
class itself was reading from the database, which is something that we disallowed in our original analysis. I considered extracting the body generation methods to a separate class, but I felt that the fact that I am using multiple threads instead of one thread did not change the purpose of the class significantly, it was just an implementation detail.
I finally made the preload
method public to be called from the main thread, and used memoization in the methods that access the database:
class ResourcePublisher
private attr_reader :resource
def initialize(resource)
@resource = resource
end
def preload
# Bring the required resource associations to memory so
# the threads don't need to access the database
self
end
def call
threads = []
threads << Thread.new { notify_partner_a }
threads << Thread.new { notify_partner_b }
threads.each(&:join)
end
private
def notify_partner_a
HTTParty.post("partner-a.com/updates", partner_a_body)
end
def notify_partner_b
HTTParty.post("partner-a.com/updates", partner_b_body)
end
def partner_a_body
@partner_a_body ||= {foo: resource.full_name}.to_json
end
def partner_b_body
@partner_b_body ||= {foo: resource.full_name}.to_xml
end
end
Then, I was able to call
the services from multiple threads, which did not touch the database:
def parallel_update(resources)
threads = []
publishers = resources.map { ResourcePublisher.new(resource).preload }
publishers.each do |publisher|
threads << Thread.new do
publisher.call
end
end
threads.each(&:join)
end
Summary
This approach (which of course is very simplified to illustrate the point) allowed me to improve the performance of the method by parallelizing the most time-consuming parts while keeping things simple in the connection with the database (a single connection, a single transaction).