Parallelizing aggregates in YugabyteDB
Franck Pachot
Posted on August 19, 2022
In the previous post we have seen a way to simulate loose index scan wit a recursive CTE and a covering range index. I used it to get the last value for each truck_id
. Now I want to count the readings per each truck_id
. This has to scan all rows. However, I can use the previous technique to generate the statements to do this in parallel.
Example
I'm using the truck_readings
table and truck_last_reading
index created in the previous post where I inserted 10000 readings for each of the 2000 trucks.
Note that I have created the index with 10 tablets (in real live, this is done by auto_splitting when table grows):
create index truck_last_reading on truck_readings
( truck_id asc, ts asc)
split at values((100),(200),(300),(400),(500),(600),(700),(800),(1000));
Method 1: generating parallel statements
First, I use the Recursive CTE technique to create a table which lists each truck_id
and has a placeholder to put the count:
create table tmp_partial_count as
with recursive truck_last_reading as (
(
select
last_truck_last_reading.truck_id
from truck_readings last_truck_last_reading
order by
last_truck_last_reading.truck_id desc,
last_truck_last_reading.ts desc
limit 1
)
union all
select
next_truck_last_reading.truck_id
from truck_last_reading, lateral
(
select truck_id from truck_readings
where truck_id < truck_last_reading.truck_id
order by truck_id desc, ts desc limit 1
)
as next_truck_last_reading
) select truck_id, null::int as partial_count
from truck_last_reading
;
This takes 2 seconds here:
...
SELECT 2000
Time: 2464.776 ms (00:02.465)
yugabyte=#
Then I generate, from this table, the statements to count the reading for each truck_id
and update it in my temporary table. I generate the psql
statements to run in background with &
. I add a wait
every 50 jobs to limit the number of connections
\pset format unaligned
\pset tuples_only on
\pset footer off
\o tmp_partial_count.sh
select format($$
psql -d postgres://yugabyte:YugabyteDB@yb0.pachot.net:5433/yugabyte -c 'update tmp_partial_count
set partial_count=(
select count(*) from truck_readings where truck_id=%s
) where truck_id=%s' &
%s
$$
,truck_id,truck_id
,case when mod(row_number()over(), 200 )=0 then 'wait' end
)
from tmp_partial_count where partial_count is null
;
select 'wait';
\o
This generates the script in tmp_partial_count.sh
:
...
yugabyte-# from tmp_partial_count where partial_count is null
yugabyte-# ;
select 'wait';
\oTime: 47.225 ms
yugabyte=# select 'time wait';
Time: 11.529 ms
yugabyte=# \o
I generate it only for the rows where no partial count is there.
I can run this:
\! time sh tmp_partial_count.sh
This lasted half a minute in my lab (a small lab, but the point is that it can scale):
...
UPDATE 1
UPDATE 1
UPDATE 1
real 0m55.229s
user 0m5.690s
sys 0m4.904s
yugabyte=#
I can check that my index is used with an Index Only Scan, which is why it scales (each parallel query reads a different range):
yugabyte=# explain (analyze, costs off)
update tmp_partial_count set partial_count=(
select count(*) from truck_readings
where truck_id=42
) where truck_id=42;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Update on tmp_partial_count (actual time=50.422..50.422 rows=0 loops=1)
InitPlan 1 (returns $0)
-> Aggregate (actual time=41.160..41.160 rows=1 loops=1)
-> Index Only Scan using truck_last_reading on truck_readings (actual time=4.255..40.626 rows=10000 loops=1)
Index Cond: (truck_id = 42)
Heap Fetches: 0
-> Seq Scan on tmp_partial_count (actual time=50.300..50.394 rows=1 loops=1)
Filter: (truck_id = 42)
Rows Removed by Filter: 1999
Planning Time: 0.091 ms
Execution Time: 53.597 ms
Peak Memory Usage: 24 kB
(12 rows)
Again, my lab is small here. With a large cluster the index is probably split (by range) on multiple tablets. This is automatic if you enabled auto-split.
Finally, I check that I have all partial updates:
yugabyte=#
select count(*),count(partial_count),sum(partial_count)
from tmp_partial_count;
count | count | sum
-------+-------+----------
2000 | 2000 | 20000000
(1 row)
Method 2: use Seq Scan parallelism
I'm writing this with YugabyteDB 2.15 and you can expect some optimizations in future versions to parallelize and push down the aggregates. First, let's check the execution plan for one query:
yugabyte=#
explain (costs off, analyze)
select truck_id, count(*) from truck_readings
where truck_id is not null
group by truck_id
;
QUERY PLAN
---------------------------------------------------------------------------------------
HashAggregate (actual time=65548.176..65548.601 rows=2000 loops=1)
Group Key: truck_id
-> Seq Scan on truck_readings (actual time=3.170..62186.109 rows=20000000 loops=1)
Planning Time: 3.297 ms
Execution Time: 65549.386 ms
Peak Memory Usage: 12665 kB
(6 rows)
This takes one minute. On my lab this is not slower than my complex method above, but on a larger volume, this would not scale because one tablet is read at a time.
YugabyteDB can parallelize the Seq Scan to read the tablets in parallel (the default --ysql_select_parallelism=-1
makes it calculated from the number of tservers: 2 per server, bounded between 2 and 16](https://github.com/yugabyte/yugabyte-db/blob/2.15.2/src/yb/yql/pggate/pg_doc_op.cc#L762)) and I have 3 tservers in my lab. Because reading all rows in parallel would saturate the single YSQL process (the PostgreSQL backend), YugabyteDB parallelized the SeqScan only when there is an expression pushed down (Remote Filter
). Here I force this with a dummy where truck_id is not null
:
yugabyte=# set yb_enable_expression_pushdown=true;
yugabyte=#
explain (costs off, analyze)
select truck_id, count(*) from truck_readings
where truck_id is not null
group by truck_id
;
QUERY PLAN
---------------------------------------------------------------------------------------
HashAggregate (actual time=39623.390..39623.786 rows=2000 loops=1)
Group Key: truck_id
-> Seq Scan on truck_readings (actual time=4.386..34862.379 rows=20000000 loops=1)
Remote Filter: (truck_id IS NOT NULL)
Planning Time: 0.071 ms
Execution Time: 39624.001 ms
Peak Memory Usage: 417 kB
(7 rows)
Time: 39636.634 ms (00:39.637)
Here, with a much simple method, I get better results. But remember this depends on your server and your data. Reading the tablets was parallelized but all rows has to be fetched and aggregated by a single backend, which means that it is not as scalable as running queries in parallel from multiple connections.
Posted on August 19, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.