Search

Search IconIcon to open search

Postgres WAL Implementation

Last updated by Simon Späti

How it works see A Guide to Logical Replication and CDC in PostgreSQL with Airbyte - Neon.

A nice implementation with Singer Tap: pipelinewise-tap-postgres/tap_postgres/sync_strategies/logical_replication.py at d3180a193e30b3813ea71d3425b1fe8b2379896c · transferwise/pipelinewise-tap-postgres · GitHub.

# General workflow of WAL

When using logical replication in PostgreSQL, if the stream is closed and then restarted after a delay (like the 60 seconds in your script), you do not lose the changes that occurred during the interval when the stream was closed. Here’s why:

  1. Persistent Replication Slots: PostgreSQL uses a concept called “replication slots” for logical replication. When you create a replication slot, PostgreSQL keeps track of the changes that need to be sent to that slot. Even if your script (or the consumer of the replication slot) disconnects or stops consuming the WAL changes, PostgreSQL retains these changes.
  2. Resuming From Last Checkpoint: When you restart the stream, it resumes from the last checkpoint – the last LSN (Log Sequence Number) that was acknowledged by your script using msg.cursor.send_feedback(flush_lsn=msg.data_start). This ensures that all changes that occurred while the stream was closed are still sent to your script once it resumes listening.
  3. Disk Space Consideration: It’s important to note that if a replication slot is not actively consumed (like during your 60-second pause), the WAL segments that contain changes for that slot will be retained on disk and not recycled. This can lead to increased disk space usage on the server if the slot remains inactive for a long time.
  4. Handling Prolonged Disconnections: In cases where your script might be disconnected for an extended period, it’s crucial to manage the replication slot and ensure it does not lead to excessive disk space usage. You might need to drop and recreate the slot if it becomes too far behind, though this would mean losing some data.

# Activate WAL Log

go to local file postgresql.conf:

1
2
3
4
5
6
7
brew start postgresql@14
cd /opt/homebrew/var/postgresql@14

#add 
wal_level = logical			# minimal, replica, or logical

brew restart postgresql@14

# Read WAL logs with Python

Simples Beispiel wie man ein Stream lesen kann:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras

conn = psycopg2.connect(
    "host=localhost user=sspaeti port=5432 dbname=postgres",
    connection_factory=psycopg2.extras.LogicalReplicationConnection,
)
cur = conn.cursor()
replication_options = {
    "include-xids": "1",
    "include-timestamp": "1",
    "pretty-print": "1",
}
try:
    cur.start_replication(slot_name="pytest", decode=True, options=replication_options)
except psycopg2.ProgrammingError:
    cur.create_replication_slot("pytest", output_plugin="wal2json")
    cur.start_replication(slot_name="pytest", decode=True, options=replication_options)


class DemoConsumer(object):
    def __call__(self, msg):
        print(msg.payload)
        msg.cursor.send_feedback(flush_lsn=msg.data_start)


democonsumer = DemoConsumer()

print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
    cur.consume_stream(democonsumer)
except KeyboardInterrupt:
    cur.close()
    conn.close()
    print(
        "The slot 'pytest' still exists. Drop it with "
        "SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
        file=sys.stderr,
    )
    print(
        "WARNING: Transaction logs will accumulate in pg_xlog "
        "until the slot is dropped.",
        file=sys.stderr,
    )

# Stream json im wal2json format.

Setup database:

1
2
3
4
5
6
create table books (bookid bigint primary key,bookname varchar(100));

insert into books values (1,'First Book');
insert into books values (2,'Second Book');
insert into books values (3,'Third Book');
insert into books values (5,'Fifth Book');

after that we build up the stream with python abobe-script.py .

# Inserts and Updates

If we have these inserts and updates

1
2
3
4
5
insert into books values (6,'Fifth Book');
insert into books values (7,'Fifth Book');

update books set bookname='sixth Book' where bookid=6;
delete from books where bookid=7;

The wal2json looks like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
{
        "xid": 14395,
        "timestamp": "2024-01-18 15:37:57.990582+01",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [6, "Fifth Book"]
                }
        ]
}
{
        "xid": 14396,
        "timestamp": "2024-01-18 15:37:57.993531+01",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [7, "Fifth Book"]
                }
        ]
}
{
        "xid": 14397,
        "timestamp": "2024-01-18 15:44:45.014086+01",
        "change": [
                {
                        "kind": "update",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [6, "sixth Book"],
                        "oldkeys": {
                                "keynames": ["bookid"],
                                "keytypes": ["bigint"],
                                "keyvalues": [6]
                        }
                }
        ]
}
{
        "xid": 14398,
        "timestamp": "2024-01-18 15:44:45.100796+01",
        "change": [
                {
                        "kind": "delete",
                        "schema": "public",
                        "table": "books",
                        "oldkeys": {
                                "keynames": ["bookid"],
                                "keytypes": ["bigint"],
                                "keyvalues": [7]
                        }
                }
        ]
}

Origin: Subscribing to PostgreSql logical replication using python and psycopg2 – dbaStreet – A Database & Cloud Blog
References:
Created 2024-01-18