Wednesday 15 September 2021

Logical Replication Improvements In PostgreSQL-14

In the upcoming release of PostgreSQL-14, we will see multiple enhancements in Logical Replication which I hope will further increase its usage. This blog is primarily to summarize and briefly explain all the enhancements in Logical Replication.

Decoding of large transactions:

Allow streaming large in-progress transactions to subscribers. Before PostgreSQL-14, the transactions were streamed only at commit time which leads to a large apply lag for large transactions. With this feature, we will see apply lag to be reduced, and in certain scenarios that will lead to a big performance win. I have explained this feature in detail in my previous blog.

Performance of logical decoding:

Reduced the CPU usage and improve decoding performance of transactions having a lot of DDLs. It has been observed that decoding of a transaction containing truncation of a table with 1000 partitions would be finished in 1s whereas before this work it used to take 4-5 minutes. Before explaining, how we have achieved this performance gain, let me briefly tell what an invalidation message is in PostgreSQL as that is important to understand this optimization. These are messages to flush invisible system cache entries in each backend session. We normally execute these at the command end in the backend which generated them and send them at the transaction end via a shared queue to other backends for processing. These are normally generated for insert/delete/update operations on system catalogs which happens for DDL operations.

While decoding we use to execute all the invalidations of an entire transaction at each command end as we had no way of knowing which invalidations happened before that command. Due to this, transactions involving large amounts of DDLs use to take more time and also lead to high CPU usage. But now we know specific invalidations at each command end so we execute only required invalidations. This work has been accomplished by commit d7eb52d718.

Initial table sync:

The initial table synchronization involves copying the initial snapshot of the table by the table sync worker and then the table is brought up to a synchronized state with the main apply worker. This whole work use to be done in a single transaction using a temporary replication slot which has major drawbacks: (a) The slot will hold the WAL till the entire sync is complete. (b) Any error during the sync phase will rollback the entire copy which is painful for large copies. (c) There is a risk of exceeding the CID limit.

We did below improvements to overcome the drawbacks:
Allowed multiple transactions in tablesync phase.
Used permanent slots and origins to track the progress of tablesync.

This work has been explained in detail in the blog. This work has been accomplished by commit ce0fdbfe97.

Logical decoding of two-phase commits:

This will allow us to decode the transactions at prepare time and send the same to the output plugin instead of doing it at commit time. This will allow the plugins to decipher the transaction at prepare time and route it to another node if required. This has two advantages (a) allows two-phase distributed transactions across multiple nodes via logical replication, (b) reduces the apply-lag by sending and replaying the transaction on another node at prepare time.

This work has been explained in detail in the blog. This work has been accomplished by commits [1][2][3].

Monitor logical decoding:

Replication slots are used to keep state about replication streams originating from this cluster. Their primary purpose is to prevent the premature removal of WAL. In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.

We have added a system view pg_stat_replication_slots to report replication slot activity. This can be used to monitor the amount of data streamed to output plugin or subscriber, spilled to disk. Additionally user can monitor total amount of transaction data decoded for sending transactions to the decoding output plugin while decoding changes from WAL for this slot. Note that this includes data that is streamed and/or spilled. The function pg_stat_reset_replication_slot() resets slot statistics.

Example:

CREATE TABLE stats_test(data text); 
SET logical_decoding_work_mem to '64kB';
SELECT 'init' FROM pg_create_logical_replication_slot('slot_stats', 'test_decoding');
INSERT INTO stats_test SELECT 'serialize-topbig--1:'||g.i FROM generate_series(1, 5000) g(i);
SELECT count(*) FROM pg_logical_slot_peek_changes('slot_stats', NULL, NULL, 'skip-empty-xacts', '1');

SELECT slot_name, spill_txns, spill_count, spill_bytes, total_txns, total_bytes FROM pg_stat_replication_slots; 
 slot_name  | spill_txns | spill_count | spill_bytes | total_txns | total_bytes
------------+------------+-------------+-------------+------------+-------------
 slot_stats |          1 |          12 |      763893 |          1 |      763893
(1 row) 

DROP TABLE stats_test; 

SELECT pg_drop_replication_slot('slot_stats'); 

Allow publications to be easily added and removed:

Currently, if the user needs to add/remove additional publications to a subscription, she needs to mention all the existing publications along with it. Consider a case where a subscription is subscribed to two publications and we want to add an additional publication to it then the user needs to mention all the three (two previous and one new) while doing Alter Subscription. The same is explained with an example below:

Initial Subscription
CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION mypub1, mypub2;

Add a new Publication mypub3.
ALTER SUBSCRIPTION mysub SET PUBLICATION mypub1, mypub2, mypub3;

This could be inconvenient for users especially if there are many existing publications to which a subscription is subscribed. We have added a new way to make this easier by supporting ADD/DROP individual publications. See docs for the syntax. With the new way, in the above case, to add a new publication, the user needs to perform 
ALTER SUBSCRIPTION mysub ADD mypub3;

This work has been accomplished by commit  82ed7748b7.

Binary transfer mode:

This feature provides an option during Create/Alter Subscription to allow data from publishers to be sent in binary format. The default value of this option is false. Even when this option is enabled, only data types that have binary send and receive functions will be transferred in binary. When doing cross-version replication, if the subscriber lacks a binary receive function for the type, the data transfer will fail, and this option can't be used. This mode is generally faster. Example to enable binary mode:

CREATE SUBSCRIPTION mysub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION mypub1 WITH (binary = true);

This work has been accomplished by commit 9de77b5453.

Allow to get messages via pgoutput:

Provide a “messages” option to the pgoutput plugin. This allows logical decoding messages (i.e. generated via pg_logical_emit_message) to be sent to the slot consumer. This is useful for pgoutput plugin users that use it for Change Data Capture. An example of the same is given below:

SELECT pg_create_logical_replication_slot('pgout_slot','pgoutput');

SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')

SELECT get_byte(data, 1), encode(substr(data, 24, 23), 'escape')  FROM pg_logical_slot_peek_binary_changes('pgout_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'pgout_slot', 'messages', 'true') OFFSET 1 LIMIT 1;

 get_byte |         encode
----------+-------------------------
        1 | a transactional message
(1 row)

SELECT pg_drop_replication_slot('pgout_slot');
While getting changes, the publication_names is not required for logical decoding messages but is specified just so that function doesn't give an error. You can refer to logical replication message formats in the PostgreSQL docs. This work has been accomplished by commit ac4645c015.