Ian Jones
Posted on March 18, 2020
I posted this question on Stack Overflow and thought I would get the dev.to communities perspective as well.
When we (egghead.io) release a course, we send an email blast to all of our users that are opted into commercial communication. When this happens, our server starts returning 500's because of timed out requests. We use a sidekiq worker (BroadcastMessageSendWorker
) to create jobs that will send the email we have written. We use Sidekiq Pro so we can use the bulk processing feature.
ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
include Sidekiq::Worker
def perform(message_guid)
ActiveRecord::Base.connection_pool.with_connection do
message = BroadcastMessage.find(message_guid)
message.with_lock do
return unless message.pending?
message.pickup!
if message.contacts.count == 0
message.finish!
return
end
batch = Sidekiq::Batch.new
batch.on(:complete, self.class, 'guid' => message_guid)
batch.jobs do
# We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
# will start blowing up. Instead, use an in-memory `seen` index
seen = Set.new({})
message.contacts.select(:id).find_in_batches do |contact_batch|
args = contact_batch.pluck(:id).map do |contact_id|
next unless seen.add?(contact_id) # add? returns nil if the object is already in the set
[message_guid, contact_id]
end
Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
end
end
message.update(batch_id: batch.bid)
end
end
end
def on_complete(_, options)
message = BroadcastMessage.find(options['guid'])
message.finish! if message.sending?
end
end
We are building an in memory set to be sure that we don't send 2 of the same emails to a user. ScoutAPM is telling us that the message.contacts.select(:id)
line is taking a long time (contacts joins our users table, so this is somewhat expected).
I analyzed this query:
Subquery Scan on contacts (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
Filter: (NOT (hashed SubPlan 1))
-> CTE Scan on base_contacts (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
CTE base_contacts
-> Gather (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Hash Left Join (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
Hash Cond: (contacts_1.user_id = users.id)
Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
Rows Removed by Filter: 12924
-> Parallel Seq Scan on contacts contacts_1 (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
Rows Removed by Filter: 108423
SubPlan 2
-> Seq Scan on mailkick_opt_outs mailkick_opt_outs_1 (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
Filter: (active AND (list IS NULL))
Rows Removed by Filter: 19576
SubPlan 3
-> Nested Loop (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
-> Seq Scan on broadcast_messages (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
Filter: (signature = 'broadcast_message_signature'::text)
Rows Removed by Filter: 63
-> Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
Index Cond: (broadcast_message_id = broadcast_messages.id)
Filter: ((user_type)::text = 'ClassType'::text)
-> Parallel Hash (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
Buckets: 131072 Batches: 8 Memory Usage: 3168kB
-> Parallel Seq Scan on users (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
SubPlan 1
-> Seq Scan on mailkick_opt_outs (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms
The Parallel Seq Scan
is taking a lot of time but I dont know how to speed it up.
My first thought it to split this worker into different ranges of IDs and query the database at different times to reduce the load on the database. So instead of querying message.contacts
I would query message.contacts.where('id > 1 && id < 10000')
and then message.contacts.where('id > 10001 && id < 20000')
etc until we reached the max id.
This feels naive. How do I either speed this query up or spread it out over time?
I also thought of adding a multi-column index on users.managed_subscription_id
and users.managed_subscription_id
but hadn't tried that yet.
Posted on March 18, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.