Saturday, 17 July 2021

Logical Replication Of In-Progress Transactions

Logical Replication was introduced in PostgreSQL-10 and since then it is being improved with each version. Logical Replication is a method to replicate the data selectively unlike physical replication where the data of the entire cluster is copied. This can be used to build a multi-master or bi-directional replication solution. One of the main differences as compared with physical replication was that it allows replicating the transaction only at commit time. This leads to apply lag for large transactions where we need to wait to transfer the data till the transaction is finished. In the upcoming PostgreSQL-14 release, we are introducing a mechanism to stream the large in-progress transactions. We have seen the replication performance improved by 2 or more times due to this for large transactions especially due to early filtering. See the performance test results reported on hackers and in another blog on the same topic. This will reduce the apply lag to a good degree.

The first thing we need for this feature was to decide when to start streaming the WAL content. One could think if we have such a technology why not stream each change of transaction separately as and when we retrieve it from WAL but that would actually lead to sending much more data across the network because we need to send some additional transaction information with each change so that the apply-side can recognize the transaction to which the change belongs. To address this, in PostgreSQL-13, we have introduced a new GUC parameter logical_decoding_work_mem which allows users to specify the maximum amount of memory to be used by logical decoding, before which some of the decoded changes are either written to local disk or stream to the subscriber. The parameter is also used to control the memory used by logical decoding as explained in the blog.

The next thing that prevents incremental decoding was the delay in finding the association of subtransaction and top-level XID. During logical decoding, we accumulate all changes along with its (sub)transaction. Now, while sending the changes to the output plugin or stream to the other node, we need to combine all the changes that happened in the transaction which requires us to find the association of each top-level transaction with its subtransactions. Before PostgreSQL-14, we build this association at XLOG_XACT_ASSIGNMENT WAL record which we normally log after 64 subtransactions or at commit time because these are the only two times when we get such an association in the WAL. To find this association as it happened, we now also write the assignment info into WAL immediately, as part of the first WAL record for each subtransaction. This is done only when wal_level=logical to minimize the overhead.

Yet, another thing that is required for incremental decoding was to process invalidations at each command end. The basic idea of invalidations is that they make the caches (like relation cache) up-to-date to allow the next command to use up-to-date schema. This was required to correctly decode WAL incrementally as while decoding we will use the relation attributes from the caches. For this, when wal_level=logical, we write invalidations at the command end into WAL so that decoding can use this information. The invalidations are decoded and accumulated in top-transaction, and then executed during replay. This obviates the need to decode the invalidations as part of a commit record.

In previous paragraphs, the enhancements required in the server infrastructure to allow incremental decoding are explained. The next step was to provide APIs (stream methods) for out-of-core logical replication to stream large in-progress transactions. We added seven methods to the output plugin API to allow this. Those are: (stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb and stream_change_cb) and two optional callbacks (stream_message_cb and stream_truncate_cb). For details about these APIs, refer to PostgreSQL docs.

When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded changes are transmitted, the transaction can be committed using the stream_commit_cb callback (or possibly aborted using the stream_abort_cb callback). One example sequence of streaming transaction may look like the following:

/* Change logical_decoding_work_mem to 64kB in the session */
postgres=# show logical_decoding_work_mem;
 logical_decoding_work_mem
---------------------------
 64kB
(1 row)
postgres=# CREATE TABLE stream_test(data text);
CREATE TABLE
postgres=# SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 ?column?
----------
 init
(1 row)
postgres=# INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 500) g(i);
INSERT 0 500
postgres=# SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '1', 'skip-empty-xacts', '1', 'stream-changes', '1');
                       data
--------------------------------------------------
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 opening a streamed block for transaction TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
 streaming change for TXN 741
...
...
 streaming change for TXN 741
 streaming change for TXN 741
 closing a streamed block for transaction TXN 741
 committing streamed transaction TXN 741
(505 rows)

The actual sequence of callback calls may be more complicated depending on the server operations. There may be blocks for multiple streamed transactions, some of the transactions may get aborted, etc.

Note that streaming is triggered when the total amount of changes decoded from the WAL (for all in-progress transactions) exceeds the limit defined by the logical_decoding_work_mem setting. At that point, the largest top-level transaction (measured by the amount of memory currently used for decoded changes) is selected and streamed. However, in some cases we still have to spill to disk even if streaming is enabled because we exceed the memory threshold but still have not decoded the complete tuple e.g., only decoded toast table insert but not the main table insert or decoded speculative insert but not the corresponding confirm record. However, as soon as we get the complete tuple we stream the transaction including the serialized changes.

While streaming in-progress transactions, the concurrent aborts may cause failures when the output plugin (or decoding of WAL records) consults catalogs (both system and user-defined). Let me explain this with an example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, the transaction 501 updates the catalog tuple and after that we will have two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is aborted and some other transaction say 502 updates the same catalog tuple then the first tuple will be changed to (xmin: 500, xmax: 502). So, the problem is that when we try to decode the tuple inserted/updated in 501 after the catalog update, we will see the catalog tuple with (xmin: 500, xmax: 502) as visible because it will consider that the tuple is deleted by xid 502 which is not visible to our snapshot. And when we will try to decode with that catalog tuple, it can lead to a wrong result or a crash.  So, it is necessary to detect concurrent aborts to allow streaming of in-progress transactions. For detecting the concurrent abort, during catalog scan we can check the status of the xid and if it is aborted we will report a specific error so that we can stop streaming current transaction and discard the already streamed changes on such an error. We might have already streamed some of the changes for the aborted (sub)transaction, but that is fine because when we decode the abort we will stream the abort message to truncate the changes in the subscriber.

To add support for streaming of in-progress transactions into the built-in logical replication, we need to primarily do four things:

(a) Extend the logical replication protocol to identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). Refer to PostgreSQL docs for the protocol details. 

(b) Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol.

(c) Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit.

(d) Provide a new option for streaming while creating a subscription.

The below example demonstrates how to set up the streaming via built-in logical replication:

Publisher node:

Set logical_decoding_work_mem = '64kB';
# Set up publication with some initial data

CREATE TABLE test_tab (a int primary key, b varchar);
INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar');
CREATE PUBLICATION tap_pub FOR TABLE test_tab;

Subscriber node:

CREATE TABLE test_tab (a int primary key, b varchar);
CREATE SUBSCRIPTION tap_sub CONNECTION 'host=localhost port=5432 dbname=postgres' PUBLICATION tap_pub WITH (streaming = on);

Publisher Node:

# Ensure the corresponding replication slot is created on publisher node
select slot_name, plugin, slot_type from pg_replication_slots;
 slot_name |  plugin  | slot_type
-----------+----------+-----------
 tap_sub   | pgoutput | logical
(1 row)

# Confirm there is no streamed bytes yet
postgres=# SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           0 |            0 |            0
(1 row)

# Insert, update and delete enough rows to exceed the logical_decoding_work_mem (64kB) limit.
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;

# Confirm that streaming happened
SELECT slot_name, stream_txns, stream_count, stream_bytes FROM pg_stat_replication_slots;
 slot_name | stream_txns | stream_count | stream_bytes
-----------+-------------+--------------+--------------
 tap_sub   |           1 |           22 |      1444410
(1 row)

Subscriber Node:
# The streamed data is still not visible.
select * from test_tab;
 a |  b
---+-----
 1 | foo
 2 | bar
(2 rows)

Publisher Node:
# Commit the large transactions
Commit;

Subscriber Node:
# The data must be visible on the subscriber
select count(*) from test_tab;
 count
-------
  3334
(1 row)

This feature was proposed in 2017 and committed in 2020 as part of various commits 0bead9af48c55040ccd045fdc9738b7259736a6e, and 464824323e. It took a long time to complete this feature because of the various infrastructure pieces required to achieve this. I would really like to thank all the people involved in this feature especially Tomas Vondra who has initially proposed it and then Dilip Kumar who along with me had completed various remaining parts and made it a reality. Then also to other people like Neha Sharma, Mahendra Singh Thalor, Ajin Cherian, and Kuntal Ghosh who helped throughout the project to do reviews and various tests. Also, special thanks to Andres Freund and other community members who have suggested solutions to some of the key problems of this feature. Last but not least, thanks to EDB and Fujitsu's management who encouraged me and some of the other members to work on this feature.