Monday 25 September 2023

Evolution of Logical Replication

This blog post is about how the Logical Replication has evolved over the years in PostgreSQL, what's in the latest release-16, and what's being worked upon for future releases. Logical replication is a method of replicating data objects and their changes, based upon their replication identity (usually a primary key). We use the term logical in contrast to physical replication, which uses exact block addresses and byte-by-byte replication. To know more about Logical Replication in PostgreSQL, read pgsql-docs.

The foundation of Logical Replication has been laid in PostgreSQL-9.4 where we introduced Logical Decoding (that allows database changes to be streamed out in a customizable format), Replication Slots (that allow preservation of resources like WAL files on the primary until they are no longer needed by standby servers), and Replica Identity (a table level option to control operations for logical replication) have been introduced.

With PostgreSQL-9.5, we introduced features like tracking replication progress via origins, and commit timestamps that will allow conflict resolution in the future. The replication origins allow to restart of the replication from the point where it was before the restart. The replication origins also help us avoid bi-directional loops in the replication setup, a feature introduced in the latest PostgreSQL 16 release.

With PostgreSQL-9.6, we added support for generic WAL messages for logical decoding. This feature allows extensions to insert data into the WAL stream that can be read by logical-decoding plugins.

With PostgreSQL-10.0, we introduced native Logical Replication using the publish/subscribe model. This allows more flexibility than physical replication does, including replication between different major versions of PostgreSQL and selective replication. In this release, we allowed Insert/Update/Delete operations on tables to be replicated. Going forward from here, we have enhanced this feature with each release.

With PostgreSQL-11.0, we allowed replication of the Truncate operation. Logical decoding started to use a generational memory allocator which is optimized for serial allocation/deallocation and leads to a reduction in memory usage for decoding.

With PostgreSQL-12.0, we allowed logical replication slots to be copied. This functionality is helpful when doing an investigation of logical replication issues; and to select a different output plugin to consume changes.

With PostgreSQL-13.0, we introduced replication of partitioned tables which allowed all its partitions to be replicated, prior to this individual partitions need to be replicated. Then we allowed to control the amount of memory used for logical decoding via GUC logical_decoding_work_mem. If you want to learn more about this parameter, read blog. Then we also allowed WAL storage for replication slots to be limited by GUC max_slot_wal_keep_size.

With PostgreSQL-14.0, we introduced streaming of in-progress transactions which reduced the apply lag for large transactions. Then we allowed decoding of prepared transactions which means output plugins can consume changes at prepare time instead of waiting till commit helping in reducing the lag. Then, we also reduced the CPU usage and improved the decoding performance of transactions having a lot of DDLs. There is a long list of features that have been introduced in 14.0, you can read blog for more details.

With PostgreSQL-15.0, we allowed replication of prepared transactions for in-core logical replication which apart from reducing apply lag forms the basis of building conflict-free logical replication. Then users can use row filters and column lists to send selective table data. We also allowed all the tables in the Schema to be published. You can read blog to learn more about the logical replication features introduced in 15.0.

Now let's discuss the features introduced in the latest PostgreSQL-16.0 release.

Prevent loops in bi-directional replication

Today, we are allowed to set up a bi-directional replication but it has problems like conflicts can occur, and such a setup can lead to bi-directional loops. This feature intends to solve the second problem. Let me share a simple example to show how loops can occur.

Publisher
CREATE TABLE mytbl(c1 int primary key);
CREATE PUBLICATION mypub FOR TABLE mytbl;

Subscriber
CREATE TABLE mytbl(c1 int primary key);
CREATE SUBSCRIPTION mysub CONNECTION 'dbname=postgres' PUBLICATION mypub;
CREATE PUBLICATION mypub FOR TABLE mytbl;

Publisher
CREATE SUBSCRIPTION mysub CONNECTION 'dbname=postgres port=5444' PUBLICATION mypub;
INSERT INTO t1 values(1);

Publisher's server LOG:
ERROR:  duplicate key value violates unique constraint "t1_pkey"
DETAIL: Key (c1)=(1) already exists.

We see this error because the subscriber node after applying the change, sent it back to the publisher. Now, if there was no primary key defined on mytbl, we would have again inserted the same row on the publisher and sent it again to the subscriber. So, this would have led to an infinite loop between publisher and subscriber.

We have introduced origin filtering which avoids such a problem. Users need to perform the following statement on the publisher in the above example to avoid loop.

ALTER SUBSCRIPTION mysub SET(origin=none);

The following is the syntax to use this feature:

CREATE SUBSCRIPTION sub1 CONNECTION ... PUBLICATION pub1 WITH (origin = none);

The valid values are 'none' and 'any' with later as default. Setting the origin to 'none' means that the subscription will request the publisher to only send changes that don't have an origin. Setting the origin to 'any' means that the publisher sends changes regardless of their origin. For more information on this feature, you can read blogs [1][2].

Allowed logical decoding to be performed from the standby server

This requires wal_level = logical on both primary and standby. Let us see with an example:

postgres=# select pg_is_in_recovery();
 pg_is_in_recovery
-------------------
 t
(1 row)
The above is to show the following statements are executed on standby.

postgres=# select * from pg_create_logical_replication_slot('slot_1', 'test_decoding', false, false);
 slot_name |    lsn
-----------+-----------
 slot_1    | 0/50001A0
(1 row)
postgres=# SELECT * FROM pg_logical_slot_get_changes('slot_1', NULL, NULL);
    lsn    | xid |                  data
-----------+-----+----------------------------------------
 0/5000250 | 734 | BEGIN 734
 0/5000250 | 734 | table public.t1: INSERT: c1[integer]:3
 0/5000348 | 734 | COMMIT 734
(3 rows)

This ability can be used for workload distribution, by allowing subscribers to subscribe from the standby when the primary is busy. We need to ensure that we can't decode from slots when the required data is removed in the primary. So, we invalidate logical slots on standby (a) when the required rows are removed on primary, and (b) when the wal_level on the primary server is reduced to below logical. We can check the 'conflicting' field in pg_replication_slots to know if the slot is invalidated due to conflict.

Perform operations with table owner's permission

The apply process can be configured to perform operations with the table owner's privileges instead of the subscription owner's privileges. It can be configured with the option 'run_as_owner' as 'false' and the following syntax can be used for the same:

CREATE SUBSCRIPTION mysub CONNECTION … PUBLICATION mypub WITH (run_as_owner = false);

The subscription owner needs to be able to SET ROLE to each role that owns a replicated table. If the table owner doesn't have permission to SET ROLE to the subscription, SECURITY_RESTRICTED_OPERATION is imposed.

If the subscription has been configured with run_as_owner=true, then no user switching will occur. This also means that any user who owns a table into which replication is happening can execute arbitrary code with the privileges of the subscription owner.

The default value of 'run_as_owner' is false which is generally more secure.

Non-superusers can create subscriptions

For that, the non-superusers must have been granted pg_create_subscription role, and are required to specify a password for authentication. Non-superusers must additionally have CREATE permissions on the database in which the subscription is to be created.

A very basic example to show the above steps:

postgres=# create user u1;
CREATE ROLE
postgres=# Grant pg_create_subscription to u1;
GRANT ROLE
postgres=# Grant Create on database postgres to u1;
GRANT
postgres=> set session authorization u1;
SET
postgres=> create subscription sub1 connection 'dbname=postgres password=p1' publication pub1 with (connect=false);
WARNING:  subscription was created, but is not connected
HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
CREATE SUBSCRIPTION

Note that, superusers can set password_required = false for non-superusers that own the subscription.

Large transactions can be applied in parallel

CREATE SUBSCRIPTION mysub CONNECTION …  PUBLICATION mypub WITH (streaming = parallel);

Performance improvement in the range of 25-40% has been observed (for further details, check here).

Each large transaction is assigned to one of the available workers, which improves lag by immediately applying instead of waiting till the whole transaction is received by the subscriber. The worker remains assigned until the transaction is completed. This preserves commit ordering and avoids file I/O in most cases, although we still need to spill to a file if there is no worker available.

It is important to maintain commit order to avoid failures due to: (a) transaction dependencies - say if we insert a row in the first transaction and update it in the second transaction on publisher then allowing the subscriber to apply both in parallel can lead to failure in the update; (b) deadlocks - allowing transactions that update the same set of rows/tables in the opposite order to be applied in parallel can lead to deadlocks.

max_parallel_apply_workers_per_subscription sets the maximum number of parallel apply workers per subscription.

Logical replication can copy tables in binary format

CREATE SUBSCRIPTION mysub CONNECTION … PUBLICATION mypub WITH (binary = true);

Copying tables in binary format may reduce the time spent, depending on column types. Prior to V16, this option only allows replication to replicate tables in binary format. A binary copy is supported only when both publisher and subscriber are v16 or later.

Indexes other than PK and REPLICA IDENTITY can be used on the subscriber

Using REPLICA IDENTITY FULL on the publisher can lead to a full table scan per tuple change on the subscriber when the REPLICA IDENTITY or PK index is not available. This feature allows us to consider the other indexes present.

The index that can be used must be a btree index, not a partial index, and the leftmost field must be a column (not an expression) that references the remote relation column. These limitations help to keep the index scan similar to PK/RI index scans.

The performance improvement is proportional to the amount of data in the table.

The last topic I wanted to cover in this blog is the features in this area that are being worked upon or are being discussed in the community for future releases.

What's next:

1. Upgrade of logical replication nodes. This will allow a seamless upgrade of logical replication nodes. One of the biggest challenges as of now is we don't migrate slots across versions due to which users need to build most of the setup again See, one of the examples in blog.

2. Synchronization of replication slots to allow failover. This will perform synchronization of logical slots from publisher to physical standbys. After this feature, if the publisher goes down, we can promote its physical standby, and then the subscribers can connect to it and work.

3. Replication of sequences.

4. DDL Replication. This is a relatively large feature but is one of the most asked features to allow seamless major version upgrades.

Monday 7 November 2022

Logical Replication Improvements in PostgreSQL-15

There are various areas in PostgreSQL like Partitioning, Logical Replication, Parallel Query, Vacuum, etc. which improve with each new version. In this blog, I'll summarize the various enhancements in Logical Replication that users could see in the recently released PostgreSQL 15. You can read the enhancements in this area in the previous release in one of my previous blogs.

Allow replication of prepared transactions:

In the last release, we allowed logical decoding of prepared transactions and with this release, we added the support to replicate prepared transactions to built-in logical replication. Previously, we send the changes of the prepared transaction only once the commit prepared had been done. Users can enable replication at PREPARE time with the following syntax:

CREATE PUBLICATION  mypub FOR ALL TABLES;

CREATE SUBSCRIPTION mysub CONNECTION 'dbname=postgres' PUBLICATION  mypub WITH (two_phase = true);

The key advantages of this feature are:

(a) Reduces the lag to replicate data by replicating it at PREPARE time instead of waiting till the COMMIT PREPARED

(b) This provides the base to build conflict-free logical replication because if the prepare fails on subscriber nodes then one can rollback it on the publisher node as well.

The key implementation points:

(a) The replication of prepared transactions is enabled once the initial sync for all the tables is finished.

(b) To avoid conflicts in the prepared transaction during APPLY, we use the prepare identifier as pg_gid_<subscriber-id>_<transaction-id>.

(c) It is not allowed to change this option with ALTER SUBSCRIPTION command.

(d) ALTER SUBSCRIPTION REFRESH PUBLICATION is allowed with copy_data=false once the two_phase is enabled for a subscription.

For a detailed description of this feature, see the blog.

Allow replication of all tables in the schema:

Previously, one needs to specify all the tables of a particular schema while creating a publication if they want to publish all tables of that schema. Then, if later, the user creates more tables in that schema, they also need to be added to the publication separately. This would be inconvenient for users. This feature makes it much easier for users by allowing them to specify just the schema name in case they want all tables of the schema to be published. The syntax to specify schema name (TABLES IN SCHEMA) is as follows:

CREATE PUBLICATION mypub FOR TABLES IN SCHEMA mysch;

CREATE PUBLICATION mypub FOR TABLE mytab, TABLES IN SCHEMA mysch;

Note that it is allowed to specify schemas with individual tables from other schemas. Users can add the schemas to existing publications with the following syntax:

ALTER PUBLICATION mypub ADD TABLES IN SCHEMA mysch;

Note that adding schemas to a publication that is already subscribed to by some subscribers will require an ALTER SUBSCRIPTION … REFRESH PUBLICATION action on the subscriber side in order to become effective.

For a detailed description of this feature, see the blog.

Allow specifying row filters for logical replication of tables:

This feature allows specifying an additional WHERE clause after each table in the publication definition. Rows that don't satisfy this WHERE clause will be filtered out. This allows a set of tables to be partially replicated. The row filter is per table. The WHERE clause must be enclosed in parentheses. Users can define row filters with the following command:

CREATE PUBLICATION mypub FOR TABLE mytab1 WHERE (c1 > 10 and c2 < 20), mytab2 WHERE (c3 LIKE 'bob');

Users are allowed to specify row filters for existing tables in publication with the command:

ALTER PUBLICATION mypub SET TABLE mytab1 WHERE (c1 > 10 and c2 < 20), mytab2 WHERE (c3 LIKE 'bob');

This can help distribute data among nodes, improve performance by sending data selectively, and by hiding some sensitive data.

Key points to note about this feature:

(a) The row filter WHERE clause for a table added to a publication that publishes UPDATES and/or DELETES must contain only columns that are covered by REPLICA IDENTITY.

(b) The row filter WHERE clause for a table added to a publication that publishes INSERT can use any column.

(c) Row filters are ignored for TRUNCATE TABLE commands.

(d) If the row filter evaluates to NULL, it is regarded as "false" aka the corresponding row won't be replicated.

(e) The WHERE clause only allows simple expressions that don't have user-defined functions, user-defined operators, user-defined types, user-defined collations, non-immutable built-in functions, or references to system columns.

(f) During initial table synchronization, only data that satisfies the row filters is copied to the subscriber.

(g) For partitioned tables, the publication parameter publish_via_partition_root determines if it uses the partition's row filter (if the parameter is false, the default) or the root partitioned table's row filter.

For a detailed description of this feature, see docs and blog.

Allow specifying column lists for logical replication of tables:

This feature allows specifying an optional column list when adding a table to logical replication. Columns not included in this list are not sent to the subscriber, allowing the schema on the subscriber to be a subset of the publisher schema. The choice of columns can be based on behavioral or performance reasons. Users can define column lists with the following syntax:

CREATE PUBLICATION mypub FOR TABLE mytab1 (c1, c2), mytab2 (c3);

Users are allowed to specify column lists for existing tables in publication with the command:

ALTER PUBLICATION mypub SET TABLE mytab1 (c1, c2);

Key points to note about this feature:

(a) If a publication publishes UPDATES and/or DELETES, any column list must include the table's replica identity columns.

(b) If a publication publishes only INSERT operations, then the column list may omit replica identity columns.

(c) Column lists are ignored for TRUNCATE TABLE commands.

(d) A column list can contain only simple column references.

(e) A column list can't be specified if the publication also publishes FOR TABLES IN SCHEMA.

(f) During initial data synchronization, only the published columns are copied.

(g) For partitioned tables, the publication parameter publish_via_partition_root determines which column list is used. If publish_via_partition_root is true, the root partitioned table's column list is used. Otherwise, if publish_via_partition_root is false (the default), each partition's column list is used.

For a detailed description of this feature, see docs and blog.

Allows logical replication to run as the owner of the subscription:

Previously, the subscription's APPLY process will run with the privileges of a superuser but now with PostgreSQL 15, it will run with the privileges of the subscription owner. So, this would prevent logical replication workers from performing insert, update, delete, truncate, or copy commands on tables unless the subscription owner has permission to do so. We allow only superusers, roles with bypassrls, and table owners can replicate into tables with row-level security policies.

The purpose of this work is to allow subscriptions to be managed by non-superusers and protect servers with subscriptions from malicious activity on the publisher side.

Conflict Resolution:

The conflicts can happen due to various reasons like PRIMARY KEY violation, schema being different, etc. during apply of transactions in subscriber. By default, PostgreSQL will keep retrying the operation on an error. Before PostgreSQL 15, users have the following options (a) They can manually remove the conflicting data to allow replication to proceed. (b) Use pg_replication_origin_advance() to advance the LSN to a location beyond the failed transaction so that on restart replication starts from a point after the conflicting transaction.

It is quite inconvenient for users to use any of these methods because for option (a) users are forced to remove/change data on subscribers even though they want corresponding data from the publisher to be ignored. To use option (b), users need to find the LSN of the failing transaction probably by using pg_waldump or some other tool on the publisher side, and also the origin information is not apparent as it was generated internally for the purpose of replication. While using pg_replication_origin_advance(), if users by mistake set the wrong LSN (either of a future commit or of some operation in-between the transaction) then the system can omit the data that it was not supposed to leading to an inconsistent replica.

The other problem is that the system will keep retrying to apply the transaction even when it can't succeed without the users intervention and users don't have any way to stop it apart from manually disabling the subscription by using ALTER SUBSCRIPTION mysub DISABLE;

In PostgreSQL 15, we tried to make the use of pg_replication_origin_advance() easier by providing the required information and by providing a similar but more robust way. The other feature it provides is to allow subscriptions to be disabled on error.

Introduced a new subscription option 'disable_on_error' which allows subscription to be automatically disabled if any errors are detected by subscription workers during data replication from the publisher. Users can specify this option either during CREATE SUBSCRIPTION or in ALTER SUBSCRIPTION command.

CREATE SUBSCRIPTION mysub CONNECTION '…' PUBLICATION mypub WITH (disable_on_error = true);

ALTER SUBSCRIPTION mysub SET (disable_on_error = true);

Then, we extended the error context information of subscription worker error by adding (a) Finish LSN. It will indicate commit_lsn for committed transactions, and prepare_lsn for prepared transactions. (b) Replication origin name. This will contain the name of the replication origin that keeps track of replication progress and is created automatically with the subscription definition.

The extended error context information can make the use of pg_replication_origin_advance() easier for users.

Then, we also introduced a more robust way to skip the conflicting transactions by using the command: ALTER SUBSCRIPTION mysub SKIP (lsn = '0/1566D10'); This is more robust because it will prevent users to set some wrong LSN by performing checks against the specified LSN. We do ensure that the specified LSN must be the same as the first transaction's finish LSN that is sent by the publisher after the restart. The first transaction successfully applied to the subscriber will clear the specified LSN. We also ensure that the specified LSN must be greater than the origin's current LSN.

For a detailed description of this feature, see blogs [1] and [2].

pg_stat_subscription_stats:

A new view that shows stats about errors that occurred during the application of logical replication changes or during initial table synchronization. See docs for more information on this.

Communication improvements between publisher and subscriber:

PostgreSQL has made enhancements in communication to (a) prevent sending the transaction BEGIN/END messages where all the transaction data is filtered, and (b) prevent replication to restart due to timeouts while processing large transactions where most or all the data is filtered.

Before PostgreSQL 15, we use to send BEGIN/END messages for empty transactions (where all changes are skipped/filtered) which waste a lot of CPU cycles and network bandwidth to build and transmit such messages. To avoid sending messages for empty transactions, we started sending the BEGIN message only with the first change transmitted from publisher to subscriber and then we allow to send the END (COMMIT) message only when BEGIN is sent. To avoid any delays in synchronous replication, we do send a keepalive message after skipping an empty transaction and process its feedback.

While processing long transactions where most of the changes are filtered due to say the particular operations are not published, the publisher doesn't send any communication to the subscriber which times out after a certain threshold time leading to a restart of replication. To fix this, we start periodically sending keep_alive messages in such cases.

For a detailed description of this work, see the blog.

I believe this is a good mix of improvements for logical replication in PostgreSQL 15 which will help users. Your feedback here or on PostgreSQL mailing lists is welcome!

Wednesday 15 September 2021

Logical Replication Improvements In PostgreSQL-14

In the upcoming release of PostgreSQL-14, we will see multiple enhancements in Logical Replication which I hope will further increase its usage. This blog is primarily to summarize and briefly explain all the enhancements in Logical Replication.

Decoding of large transactions:

Allow streaming large in-progress transactions to subscribers. Before PostgreSQL-14, the transactions were streamed only at commit time which leads to a large apply lag for large transactions. With this feature, we will see apply lag to be reduced, and in certain scenarios that will lead to a big performance win. I have explained this feature in detail in my previous blog.

Performance of logical decoding:

Reduced the CPU usage and improve decoding performance of transactions having a lot of DDLs. It has been observed that decoding of a transaction containing truncation of a table with 1000 partitions would be finished in 1s whereas before this work it used to take 4-5 minutes. Before explaining, how we have achieved this performance gain, let me briefly tell what an invalidation message is in PostgreSQL as that is important to understand this optimization. These are messages to flush invisible system cache entries in each backend session. We normally execute these at the command end in the backend which generated them and send them at the transaction end via a shared queue to other backends for processing. These are normally generated for insert/delete/update operations on system catalogs which happens for DDL operations.

While decoding we use to execute all the invalidations of an entire transaction at each command end as we had no way of knowing which invalidations happened before that command. Due to this, transactions involving large amounts of DDLs use to take more time and also lead to high CPU usage. But now we know specific invalidations at each command end so we execute only required invalidations. This work has been accomplished by commit d7eb52d718.

Initial table sync:

The initial table synchronization involves copying the initial snapshot of the table by the table sync worker and then the table is brought up to a synchronized state with the main apply worker. This whole work use to be done in a single transaction using a temporary replication slot which has major drawbacks: (a) The slot will hold the WAL till the entire sync is complete. (b) Any error during the sync phase will rollback the entire copy which is painful for large copies. (c) There is a risk of exceeding the CID limit.

We did below improvements to overcome the drawbacks:
Allowed multiple transactions in tablesync phase.
Used permanent slots and origins to track the progress of tablesync.

This work has been explained in detail in the blog. This work has been accomplished by commit ce0fdbfe97.

Logical decoding of two-phase commits:

This will allow us to decode the transactions at prepare time and send the same to the output plugin instead of doing it at commit time. This will allow the plugins to decipher the transaction at prepare time and route it to another node if required. This has two advantages (a) allows two-phase distributed transactions across multiple nodes via logical replication, (b) reduces the apply-lag by sending and replaying the transaction on another node at prepare time.

This work has been explained in detail in the blog. This work has been accomplished by commits [1][2][3].

Monitor logical decoding:

Replication slots are used to keep state about replication streams originating from this cluster. Their primary purpose is to prevent the premature removal of WAL. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.

We have added a system view pg_stat_replication_slots to report replication slot activity. This can be used to monitor the amount of data streamed to output plugin or subscriber, spilled to disk. Additionally user can monitor total amount of transaction data decoded for sending transactions to the decoding output plugin while decoding changes from WAL for this slot. Note that this includes data that is streamed and/or spilled. The function pg_stat_reset_replication_slot() resets slot statistics.

Example:

CREATE TABLE stats_test(data text); 
SET logical_decoding_work_mem to '64kB';
SELECT 'init' FROM pg_create_logical_replication_slot('slot_stats', 'test_decoding');
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
SELECT count(*) FROM pg_logical_slot_peek_changes('slot_stats', NULL, NULL, 'skip-empty-xacts', '1');

SELECT slot_name, spill_txns, spill_count, spill_bytes, total_txns, total_bytes FROM pg_stat_replication_slots; 
 slot_name  | spill_txns | spill_count | spill_bytes | total_txns | total_bytes
------------+------------+-------------+-------------+------------+-------------
 slot_stats |          1 |          12 |      763893 |          1 |      763893
(1 row) 

DROP TABLE stats_test; 

SELECT pg_drop_replication_slot('slot_stats'); 

Allow publications to be easily added and removed:

Currently, if the user needs to add/remove additional publications to a subscription, she needs to mention all the existing publications along with it. Consider a case where a subscription is subscribed to two publications and we want to add an additional publication to it then the user needs to mention all the three (two previous and one new) while doing Alter Subscription. The same is explained with an example below:

Initial Subscription
CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION mypub1, mypub2;

Add a new Publication mypub3.
ALTER SUBSCRIPTION mysub SET PUBLICATION mypub1, mypub2, mypub3;

This could be inconvenient for users especially if there are many existing publications to which a subscription is subscribed. We have added a new way to make this easier by supporting ADD/DROP individual publications. See docs for the syntax. With the new way, in the above case, to add a new publication, the user needs to perform 
ALTER SUBSCRIPTION mysub ADD mypub3;

This work has been accomplished by commit  82ed7748b7.

Binary transfer mode:

This feature provides an option during Create/Alter Subscription to allow data from publishers to be sent in binary format. The default value of this option is false. Even when this option is enabled, only data types that have binary send and receive functions will be transferred in binary. When doing cross-version replication, if the subscriber lacks a binary receive function for the type, the data transfer will fail, and this option can't be used. This mode is generally faster. Example to enable binary mode:

CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION mypub1 WITH (binary = true);

This work has been accomplished by commit 9de77b5453.

Allow to get messages via pgoutput:

Provide a “messages” option to the pgoutput plugin. This allows logical decoding messages (i.e. generated via pg_logical_emit_message) to be sent to the slot consumer. This is useful for pgoutput plugin users that use it for Change Data Capture. An example of the same is given below:

SELECT pg_create_logical_replication_slot('pgout_slot','pgoutput');

SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')

SELECT get_byte(data, 1), encode(substr(data, 24, 23), 'escape')  FROM pg_logical_slot_peek_binary_changes('pgout_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pgout_slot', 'messages', 'true') OFFSET 1 LIMIT 1;

 get_byte |         encode
----------+-------------------------
        1 | a transactional message
(1 row)

SELECT pg_drop_replication_slot('pgout_slot');
While getting changes, the publication_names is not required for logical decoding messages but is specified just so that function doesn't give an error. You can refer to logical replication message formats in the PostgreSQL docs. This work has been accomplished by commit ac4645c015.

Saturday 17 July 2021

Logical Replication Of In-Progress Transactions

Logical Replication was introduced in PostgreSQL-10 and since then it is being improved with each version. Logical Replication is a method to replicate the data selectively unlike physical replication where the data of the entire cluster is copied. This can be used to build a multi-master or bi-directional replication solution. One of the main differences as compared with physical replication was that it allows replicating the transaction only at commit time. This leads to apply lag for large transactions where we need to wait to transfer the data till the transaction is finished. In the upcoming PostgreSQL-14 release, we are introducing a mechanism to stream the large in-progress transactions. We have seen the replication performance improved by 2 or more times due to this for large transactions especially due to early filtering. See the performance test results reported on hackers and in another blog on the same topic. This will reduce the apply lag to a good degree.

The first thing we need for this feature was to decide when to start streaming the WAL content. One could think if we have such a technology why not stream each change of transaction separately as and when we retrieve it from WAL but that would actually lead to sending much more data across the network because we need to send some additional transaction information with each change so that the apply-side can recognize the transaction to which the change belongs. To address this, in PostgreSQL-13, we have introduced a new GUC parameter logical_decoding_work_mem which allows users to specify the maximum amount of memory to be used by logical decoding, before which some of the decoded changes are either written to local disk or stream to the subscriber. The parameter is also used to control the memory used by logical decoding as explained in the blog.

The next thing that prevents incremental decoding was the delay in finding the association of subtransaction and top-level XID. During logical decoding, we accumulate all changes along with its (sub)transaction. Now, while sending the changes to the output plugin or stream to the other node, we need to combine all the changes that happened in the transaction which requires us to find the association of each top-level transaction with its subtransactions. Before PostgreSQL-14, we build this association at XLOG_XACT_ASSIGNMENT WAL record which we normally log after 64 subtransactions or at commit time because these are the only two times when we get such an association in the WAL. To find this association as it happened, we now also write the assignment info into WAL immediately, as part of the first WAL record for each subtransaction. This is done only when wal_level=logical to minimize the overhead.

Yet, another thing that is required for incremental decoding was to process invalidations at each command end. The basic idea of invalidations is that they make the caches (like relation cache) up-to-date to allow the next command to use up-to-date schema. This was required to correctly decode WAL incrementally as while decoding we will use the relation attributes from the caches. For this, when wal_level=logical, we write invalidations at the command end into WAL so that decoding can use this information. The invalidations are decoded and accumulated in top-transaction, and then executed during replay. This obviates the need to decode the invalidations as part of a commit record.

In previous paragraphs, the enhancements required in the server infrastructure to allow incremental decoding are explained. The next step was to provide APIs (stream methods) for out-of-core logical replication to stream large in-progress transactions. We added seven methods to the output plugin API to allow this. Those are: (stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb and stream_change_cb) and two optional callbacks (stream_message_cb and stream_truncate_cb). For details about these APIs, refer to PostgreSQL docs.

When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded changes are transmitted, the transaction can be committed using the stream_commit_cb callback (or possibly aborted using the stream_abort_cb callback). One example sequence of streaming transaction may look like the following:

/* Change logical_decoding_work_mem to 64kB in the session */
postgres=# show logical_decoding_work_mem;
 logical_decoding_work_mem
---------------------------
 64kB
(1 row)
postgres=# CREATE TABLE stream_test(data text);
CREATE TABLE
postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ?column?
----------
 init
(1 row)
postgres=# INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 500) g(i);
INSERT 0 500
postgres=# SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '1', 'skip-empty-xacts', '1', 'stream-changes', '1');
                       data
--------------------------------------------------
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 committing streamed transaction TXN 741
(505 rows)

The actual sequence of callback calls may be more complicated depending on the server operations. There may be blocks for multiple streamed transactions, some of the transactions may get aborted, etc.

Note that streaming is triggered when the total amount of changes decoded from the WAL (for all in-progress transactions) exceeds the limit defined by the logical_decoding_work_mem setting. At that point, the largest top-level transaction (measured by the amount of memory currently used for decoded changes) is selected and streamed. However, in some cases we still have to spill to disk even if streaming is enabled because we exceed the memory threshold but still have not decoded the complete tuple e.g., only decoded toast table insert but not the main table insert or decoded speculative insert but not the corresponding confirm record. However, as soon as we get the complete tuple we stream the transaction including the serialized changes.

While streaming in-progress transactions, the concurrent aborts may cause failures when the output plugin (or decoding of WAL records) consults catalogs (both system and user-defined). Let me explain this with an example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple and after that we will have two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction say 502 updates the same catalog tuple then the first tuple will be changed to (xmin: 500, xmax: 502). So, the problem is that when we try to decode the tuple inserted/updated in 501 after the catalog update, we will see the catalog tuple with (xmin: 500, xmax: 502) as visible because it will consider that the tuple is deleted by xid 502 which is not visible to our snapshot. And when we will try to decode with that catalog tuple, it can lead to a wrong result or a crash.  So, it is necessary to detect concurrent aborts to allow streaming of in-progress transactions. For detecting the concurrent abort, during catalog scan we can check the status of the xid and if it is aborted we will report a specific error so that we can stop streaming current transaction and discard the already streamed changes on such an error. We might have already streamed some of the changes for the aborted (sub)transaction, but that is fine because when we decode the abort we will stream the abort message to truncate the changes in the subscriber.

To add support for streaming of in-progress transactions into the built-in logical replication, we need to primarily do four things:

(a) Extend the logical replication protocol to identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). Refer to PostgreSQL docs for the protocol details. 

(b) Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol.

(c) Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit.

(d) Provide a new option for streaming while creating a subscription.

The below example demonstrates how to set up the streaming via built-in logical replication:

Publisher node:

Set logical_decoding_work_mem = '64kB';
# Set up publication with some initial data

CREATE TABLE test_tab (a int primary key, b varchar);
INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar');
CREATE PUBLICATION tap_pub FOR TABLE test_tab;

Subscriber node:

CREATE TABLE test_tab (a int primary key, b varchar);
CREATE SUBSCRIPTION tap_sub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION tap_pub WITH (streaming = on);

Publisher Node:

# Ensure the corresponding replication slot is created on publisher node
select slot_name, plugin, slot_type from pg_replication_slots;
 slot_name |  plugin  | slot_type
-----------+----------+-----------
 tap_sub   | pgoutput | logical
(1 row)

# Confirm there is no streamed bytes yet
postgres=# SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           0 |            0 |            0
(1 row)

# Insert, update and delete enough rows to exceed the logical_decoding_work_mem (64kB) limit.
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;

# Confirm that streaming happened
SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           1 |           22 |      1444410
(1 row)

Subscriber Node:
# The streamed data is still not visible.
select * from test_tab;
 a |  b
---+-----
 1 | foo
 2 | bar
(2 rows)

Publisher Node:
# Commit the large transactions
Commit;

Subscriber Node:
# The data must be visible on the subscriber
select count(*) from test_tab;
 count
-------
  3334
(1 row)

This feature was proposed in 2017 and committed in 2020 as part of various commits 0bead9af48c55040ccd045fdc9738b7259736a6e, and 464824323e. It took a long time to complete this feature because of the various infrastructure pieces required to achieve this. I would really like to thank all the people involved in this feature especially Tomas Vondra who has initially proposed it and then Dilip Kumar who along with me had completed various remaining parts and made it a reality. Then also to other people like Neha Sharma, Mahendra Singh Thalor, Ajin Cherian, and Kuntal Ghosh who helped throughout the project to do reviews and various tests. Also, special thanks to Andres Freund and other community members who have suggested solutions to some of the key problems of this feature. Last but not least, thanks to EDB and Fujitsu's management who encouraged me and some of the other members to work on this feature.

Sunday 10 May 2020

Improved (auto)vacuum in PostgreSQL 13

Vacuum is one of the sub-systems in PostgreSQL which gets improved with each release.  I have checked past five releases and each has quite a few improvements for vacuum.  Following the trend, there are a number of improvements in vacuum in the upcoming PostgreSQL release (v13) which are covered in this blog.

Improvement-1
---------------------
Vacuum will be allowed to process indexes in parallel. This means it can leverage multiple CPUs in order to perform index cleanup. This internally uses the background worker infrastructure of PostgreSQL to accomplish the work.  There is a new option PARALLEL which controls the parallelism used by vacuum.  Users can use the new option to specify the number of workers that can be used to perform the vacuum command which is limited by the minimum of (a) the number of indexes on a table and (b) max_parallel_maintenance_workers.  The parallelism for vacuum command is enabled by default which means it will be used even if the user didn't specify the PARALLEL option and it uses the number of workers equal to the number of indexes on the table being vacuumed.  We can disable parallelism for vacuum by specifying zero as the number of workers with the PARALLEL option.  The index can participate in parallel
vacuum iff it's size is greater than min_parallel_index_scan_size.

The PARALLEL option can't be used with the FULL option in vacuum command.  This feature won't be available via autovacuum, users need to use vacuum command to get the benefit provided by this feature.

There are a number of other blogs [1][2] written on this topic which shows the benefit of this feature.  A recent blog published by EnterpriseDB shows that vacuum could be approximately 4 times faster by using 7 workers especially when the relation is in dire need of vacuum, read that blog for more information about the test.

Here is a simple test to show the usage of this feature.
postgres=# create table pvac(c1 int, c2 text DEFAULT md5(random()::text), c3 text DEFAULT md5(random()::text));
CREATE TABLE
postgres=# create index pvac_1 on pvac(c1);
CREATE INDEX
postgres=# create index pvac_2 on pvac(c2);
CREATE INDEX
postgres=# create index pvac_3 on pvac(c3);
CREATE INDEX
postgres=# insert into pvac select i FROM generate_series(1,100000) as i;
INSERT 0 100000
postgres=# update pvac set c1=c1;
UPDATE 100000

postgres=# vacuum (parallel 4, verbose) pvac;
INFO:  vacuuming "public.pvac"
INFO:  launched 2 parallel vacuum workers for index vacuuming (planned: 2)
INFO:  scanned index "pvac_1" to remove 100000 row versions
..
..

In the above test, 2 parallel workers were used even though we have specified 4 and the reason is that the number of indexes is 3, and the number of workers can't be more than index which is further limited by max_parallel_maintenance_workers as you can see by command below.

postgres=# show max_parallel_maintenance_workers;
 max_parallel_maintenance_workers 
----------------------------------
 2
(1 row)

The other most important thing we have ensured in this feature is that the parallelism won't use more memory or I/O bandwidth as compared to non-parallel vacuum.

Improvement-2
---------------------
Allow inserts to trigger autovacuum activity.  This feature will be really helpful for the insert-only tables where anti-wraparound vacuums could be the first vacuum that the table ever receives and such a run would take a really long time.   This allows heap pages to be set as all-visible, which then allows index-only scans to skip heap fetches, and reduces the work necessary when the table needs to be frozen.  This is controlled by two new GUCs and reloptions; autovacuum_vacuum_insert_threshold and autovacuum_vacuum_insert_scale_factor.

Let me demonstrate the benefit of this feature with the help on an example.  Start the server with autovacuum_vacuum_insert_threshold = -1 (one can edit postgresql.conf file to change value of this parameter or can use Alter System command).  By connecting with psql, we can execute below commands to see the behavior.

postgres=# show autovacuum_vacuum_insert_threshold;
 autovacuum_vacuum_insert_threshold 
------------------------------------
 -1
(1 row)

postgres=# create table vac_ins(c1 int, c2 char(500));
CREATE TABLE
postgres=# create index idx_vac_ins on vac_ins(c1);
CREATE INDEX
postgres=# insert into vac_ins values(generate_series(1,20000),'aaaaaa');
INSERT 0 20000

After a few seconds, you can notice the below message in the server log which shows that autovacuum has performed analyze on the table.

LOG:  automatic analyze of table "postgres.public.vac_ins" system usage: CPU: user: 0.03 s, system: 0.11 s, elapsed: 0.15 s

After that, run below command:

postgres=# explain (analyze) select c1 from vac_ins where c1 < 100;
                                                         QUERY PLAN                                                          
-----------------------------------------------------------------------------------------------------------------------------
 Index Only Scan using idx_vac_ins on vac_ins  (cost=0.29..16.02 rows=99 width=4) (actual time=0.019..0.092 rows=99 loops=1)
   Index Cond: (c1 < 100)
   Heap Fetches: 99
 Planning Time: 0.269 ms
 Execution Time: 0.129 ms
(5 rows)

Here, "Heap Fetches: 99" shows that the above query need to visit heap to fetch the required information even though it is present in index and the scan type used is Index Only Scan.

postgres=# truncate vac_ins;
TRUNCATE TABLE

Now, restart the server with a default value of autovacuum_vacuum_insert_threshold and execute below commands from psql to see how the new feature helps:

postgres=# show autovacuum_vacuum_insert_threshold;
 autovacuum_vacuum_insert_threshold 
------------------------------------
 1000
(1 row)

postgres=# insert into vac_ins values(generate_series(1,20000),'aaaaaa');
INSERT 0 20000

After a few seconds, you can notice the below message in the server log which indicates that autovacuum has performed both vacuum and analyze on the table.
LOG:  automatic vacuum of table "postgres.public.vac_ins": index scans: 0
pages: 0 removed, 1334 remain, 0 skipped due to pins, 0 skipped frozen
...
LOG:  automatic analyze of table "postgres.public.vac_ins" system usage: CPU: user: 0.01 s, system: 0.15 s, elapsed: 0.21 s

After that, run below command:

postgres=# explain (analyze) select c1 from vac_ins where c1 < 100;
                                                         QUERY PLAN                                                         
----------------------------------------------------------------------------------------------------------------------------
 Index Only Scan using idx_vac_ins on vac_ins  (cost=0.29..6.02 rows=99 width=4) (actual time=0.016..0.039 rows=99 loops=1)
   Index Cond: (c1 < 100)
   Heap Fetches: 0
 Planning Time: 0.166 ms
 Execution Time: 0.074 ms
(5 rows)

Here, "Heap Fetches: 0" shows that the above query doesn't need to visit heap to fetch the required information. We can see that the time to complete the execution is reduced significantly in this case.

Improvement-3
---------------------
Allow an (auto)vacuum to display additional information about the heap or index in case of an error.  This feature could help users in case the database has some corruption.  For example, if one of the indexes on a relation has some corrupted data (due to bad hardware or some bug), it will let the user know the index information, and the user can take appropriate action like either Reindex or maybe drop and recreate that particular index to overcome the problem.  In the case of the heap, it displays the block number for which an error has occurred which makes it much easier for users and developers to detect the problem.  In the worst case, if any particular block is corrupted in a table, users can remove all the rows of that particular block and the table can be used.

vacuum pvac;
ERROR:  error induced to demonstrate use of information
CONTEXT:  while scanning block 2469 of relation "public.pvac"

Next, I used "SELECT * FROM heap_page_items(get_raw_page('pvac', 2469));" to find the information of all line pointers in the page and then removed them from using below query. 

postgres=# delete from pvac where ctid Between '(2469,1)' and '(2469,11)';
DELETE 11

Then, I again ran vacuum

postgres=# vacuum pvac;
ERROR:  error induced to demonstrate use of information
CONTEXT:  while scanning block 2468 of relation "public.pvac"

Now, you can see the block number is changed from 2469 to 2468 which means the vacuum could proceed.  As, in this case, I have manually induced the error by changing code, so it occurs for every block but in reality it would be for some particular block(s) and once user can get rid of those block(s), the table can be reused.  I don't want to say that is an ideal situation but at least it will allow users to proceed and it can help developers to narrow down the bug if there is any in the code.

Improvement-4
---------------------
Autovacuum will now log WAL usage statistics along with other information.  The WAL usage contains information on the total number of records, number of full pages images, and the total number of bytes.  The buffer usage and WAL usage stats combined gives us approximate usage of I/O by a particular autovacuum run. I took one example runs information which will be shown below, one can refer to "WAL usage" to check the information on newly added stats.

LOG:  automatic vacuum of table "postgres.public.pvac": index scans: 1
pages: 0 removed, 2470 remain, 0 skipped due to pins, 0 skipped frozen
tuples: 100000 removed, 100000 remain, 0 are dead but not yet removable, oldest xmin: 529
buffer usage: 9276 hits, 4 misses, 3 dirtied
avg read rate: 0.062 MB/s, avg write rate: 0.047 MB/s
system usage: CPU: user: 0.40 s, system: 0.00 s, elapsed: 0.50 s
WAL usage: 7909 records, 2 full page images, 2276323 bytes

Improvement-5
---------------------
Make vacuum buffer counters 64-bits wide to avoid overflow of buffer usage stats.  Without this feature, in extreme cases, if there exist tables that are large enough for 4 billion buffer accesses to be a possibility, the stats displayed are meaningless.  See below example of 'buffer usage' stats from pgsql-hackers:

LOG:  automatic vacuum of table "somtab.sf.foobar": index scans: 17
pages: 0 removed, 207650641 remain, 0 skipped due to pins, 13419403 skipped frozen
tuples: 141265419 removed, 3186614627 remain, 87783760 are dead but not yet removable
buffer usage: -2022059267 hits, -17141881 misses, 1252507767 dirtied
avg read rate: -0.043 MB/s, avg write rate: 3.146 MB/s
system usage: CPU 107819.92s/2932957.75u sec elapsed 3110498.10 sec

Improvement-6
---------------------
Add wait event 'VacuumDelay' to report on cost-based vacuum delay.  This will help us to monitor the [auto]vacuum throttling.  This is a small feature but quite important as right now we have no way to monitor [auto]vacuum throttling and it is not very uncommon to see this in user environments.

Wednesday 19 February 2020

Parallelism, what next?

This blog post is about the journey of parallelism in PostgreSQL till now and what is in store for the future.  Since PostgreSQL 9.6 where the first feature of parallel query has arrived, each release improves it.  Below is a brief overview of the parallel query features added in each release.

PG9.6 has added Parallel execution of sequential scans, joins, and aggregates.

PG10 has added (a) Support parallel B-tree index scans, (b) Support parallel bitmap heap scans, (c) Allow merge joins to be performed in parallel, (d) Allow non-correlated subqueries to be run in parallel, (e) Improve ability of parallel workers to return pre-sorted data and (f) Increase parallel query usage in procedural language functions.

PG11 has added (a) Allow parallel building of a btree index, (b) Allow hash joins to be performed in parallel using a shared hash table, (c) Allow parallelization of commands CREATE TABLE ... AS, SELECT INTO, and CREATE MATERIALIZED VIEW, (d) Allow UNION to run each SELECT in parallel if the individual SELECTs cannot be parallelized, (e) Allow partition scans to more efficiently use parallel workers, (f) Allow LIMIT to be passed to parallel workers, this allows workers to reduce returned results and use targeted index scans, (g) Allow single-evaluation queries, e.g. WHERE clause aggregate queries, and functions in the target list to be parallelized.

PG12 has added Allow parallelized queries when in SERIALIZABLE isolation mode.

The progress for PG13 with respect to parallelism.  Some of the important advancements are:
(a) Parallel vacuum - This feature allows the vacuum to leverage multiple CPUs in order to process indexes.  This enables us to perform index vacuuming and index cleanup with background workers.  This adds a PARALLEL option to VACUUM command where the user can specify the number of workers that can be used to perform the command which is limited by the number of indexes on a table.  Specifying zero as a number of workers will disable parallelism.  For more information, see commit.

(b) Improve EXPLAIN's handling of per-worker details.  This allows displaying the worker information in a much better way.  The few visible side-effects as mentioned in the commit

* In text format, instead of something like

  Sort Method: external merge  Disk: 4920kB
  Worker 0:  Sort Method: external merge  Disk: 5880kB
  Worker 1:  Sort Method: external merge  Disk: 5920kB
  Buffers: shared hit=682 read=10188, temp read=1415 written=2101
  Worker 0:  actual time=130.058..130.324 rows=1324 loops=1
    Buffers: shared hit=337 read=3489, temp read=505 written=739
  Worker 1:  actual time=130.273..130.512 rows=1297 loops=1
    Buffers: shared hit=345 read=3507, temp read=505 written=744

you get

  Sort Method: external merge  Disk: 4920kB
  Buffers: shared hit=682 read=10188, temp read=1415 written=2101
  Worker 0:  actual time=130.058..130.324 rows=1324 loops=1
    Sort Method: external merge  Disk: 5880kB
    Buffers: shared hit=337 read=3489, temp read=505 written=739
  Worker 1:  actual time=130.273..130.512 rows=1297 loops=1
    Sort Method: external merge  Disk: 5920kB
    Buffers: shared hit=345 read=3507, temp read=505 written=744

(c) Avoid unnecessary shm writes in Parallel Hash Join.  This improves the performance of Parallel Hash Join by a significant amount on large systems running many-core joins.  Though this work has been back-patched to v11 where Parallel Hash Join was introduced, I mentioned it here as it is done during PG13 development.  For more information, see commit.

What is being discussed for the future:
(a) Parallel grouping sets - PostgreSQL already supports parallel aggregation by aggregating in two stages. First, each process participating in the parallel portion of the query performs an aggregation step, producing a partial result for each group of which that process is aware. Second, the partial results are transferred to the leader via the Gather node. Finally, the leader re-aggregates the results across all workers in order to produce the final result.

Next, there has been a discussion in the community to parallelize queries containing grouping sets in much the same way as we do parallel aggregation.
Basically, the aim is to parallelize queries like SELECT brand, size, sum(sales) FROM items_sold GROUP BY GROUPING SETS ((brand), (size), ());
This feature has been proposed for PG13, but yet not committed.

(b) Parallel copy - We also want to parallelize the Copy command, in particular "Copy <table_name> from .. ;" command.  This will help improve the bulk load operation in PostgreSQL.  Currently, we do a lot of work during the Copy command.  We read the file in 64KB chunks, then find the line endings and process that data line by line, where each line corresponds to one tuple.  We first form the tuple (in form of value/null array) from that line, check if it qualifies the where condition and if it qualifies, then perform constraint check and few other checks and then finally store it in local tuple array.  Once we reach 1000 tuples or consumed 64KB (whichever occurred first), we insert them together and then for each tuple insert into the index(es) and execute after row triggers.  The aim of this work is to parallelize as much as possible the work done during the copy.  There is an ongoing discussion in the community on this topic.

There is a small caveat here that to achieve parallel copy, we need to work on relation extension lock where parallel workers block each other while extending the relation which is not the case currently.  There is already a discussion on this topic in the community.

(c) Parallel file_fdw - The proposed work in this area allows file_fdw to divide its scan up for parallel workers, much like a parallel seq scan.

There are more areas where parallelism can be used like parallel DMLs (inserts, updates, deletes).  During a discussion with Thomas Munro, it came up that it would be beneficial if we can parallelize index creation and index scans for indexes other than btree especially gin and gist.  Note that we already support parallel index scans and parallel index creation for btree. I would not like to go in detail of these operations as till now we haven't seen any proposal for those.  Similarly, we can improve few things in our current parallel infrastructure (a) As of now, for each query the parallel workers are created and destroyed, instead we can have some pool of parallel query workers which can avoid the overhead of starting them for each query, (b) As of now, each worker can use up to work_mem of memory which might increase the overall memory usage of query.  We might want to improve this, but currently, there is no proposal for this.

Friday 25 May 2018

Parallel Index Scans In PostgreSQL


There is a lot to say about parallelism in PostgreSQL. We have come a long way since I wrote my first post on this topic (Parallel Sequential Scans). Each of the past three releases (including PG-11, which is in its beta) have a parallel query as a major feature which in itself says how useful is this feature and the amount of work being done on this feature. You can read more about parallel query from the PostgreSQL docs or from a blog post on this topic by my colleague Robert Haas. The intent of this blog post is to talk about parallel index scans which were released in PostgreSQL 10. Currently, we have supported parallel scan for btree-indexes.

To demonstrate how the feature works, here is an example of TPC-H Q-6 at scale factor - 20 (which means approximately 20GB database). Q6 is a forecasting revenue change query. This query quantifies the amount of revenue increase that would have resulted from eliminating certain company-wide discounts in a given percentage range in a given year. Asking this type of "what if" query can be used to look for ways to increase revenues.

explain analyze
select sum(l_extendedprice * l_discount) as revenue
          from lineitem
          where l_shipdate >= date '1994-01-01' and
          l_shipdate < date '1994-01-01' + interval '1' year and
          l_discount between 0.02 - 0.01 and 0.02 + 0.01 and
          l_quantity < 24
          LIMIT 1;

Non-parallel version of plan
-------------------------------------
Limit
-> Aggregate
    -> Index Scan using idx_lineitem_shipdate on lineitem
         Index Cond: ((l_shipdate >= '1994-01-01'::date) AND (l_shipdate < '1995-01-01
         00:00:00'::timestamp without time zone) AND (l_discount >= 0.01) AND
         (l_discount <= 0.03)  AND  (l_quantity < '24'::numeric))
Planning Time: 0.406 ms
Execution Time: 35073.886 ms

Parallel version of plan
-------------------------------
Limit
-> Finalize Aggregate
    -> Gather
         Workers Planned: 2
         Workers Launched: 2
          -> Partial Aggregate
               -> Parallel Index Scan using idx_lineitem_shipdate on lineitem
                    Index Cond: ((l_shipdate >= '1994-01-01'::date) AND (l_shipdate < '1995-01-01 
                    00:00:00'::timestamp without time zone) AND (l_discount >= 0.01) AND
                    (l_discount <= 0.03) AND (l_quantity < '24'::numeric))
Planning Time: 0.420 ms
Execution Time: 15545.794 ms

We can see that the execution time is reduced by more than half for a parallel plan with two parallel workers. This query filters many rows and the work (CPU time) to perform that is divided among workers (and leader), leading to reduced time.

To further see the impact with a number of workers, we have used somewhat bigger dataset (scale_factor = 50). The setup has been done using TPC-H like benchmark for PostgreSQL. We have also created few additional indexes on columns (l_shipmode, l_shipdate, o_orderdate, o_comment)

Non-default parameter settings:
random_page_cost = seq_page_cost = 0.1
effective_cache_size = 10GB
shared_buffers = 8GB
work_mem = 1GB




The time is reduced almost linearly till 8 workers and then it reduced slowly. The further increase in workers won’t help unless the data to scan increases.

We have further evaluated the parallel index scan feature for all the queries in TPC-H benchmark and found that it is used in a number of queries and the impact is positive (reduced the execution time significantly). Below are results for TPC-H, scale factor - 20 with a number of parallel workers as 2. X-axis indicates (1: Q-6, 2: Q14, 3: Q18).


Under the Hood
The basic idea is quite similar to parallel heap scans where each worker (including leader whenever possible) will scan a block (all the tuples in a block) and then get the next block that is required to be scan. The parallelism is implemented at the leaf level of a btree. The first worker to start a btree scan will scan till it reaches the leaf and others will wait till the first worker has reached the leaf. Once, the first worker read the leaf block, it sets the next block to be read and wakes one of the workers waiting to scan blocks. Further, it proceeds scanning tuples from the block it has read. Henceforth, each worker after reading a block sets the next block to be read and wakes up the next waiting worker. This continues till no more pages are left to scan at which we end the parallel scan and notify all the workers.

A new guc min_parallel_index_scan_size has been introduced which indicates the minimum amount of index data that must be scanned in order for a parallel scan to be considered. Users can try changing the value of this parameter to see if the parallel index plan is effective for their queries. The number of parallel workers is decided based on the number of index pages to be scanned. The final cost of parallel plan considers the cost (CPU cost) to process the rows will be divided equally among workers.

In the end, I would like to thank the people (Rahila Syed and Robert Haas) who were involved in this work (along with me) and my employer EnterpriseDB who has supported this work. I would also like to thank Rafia Sabih who helped me in doing performance testing for this blog.