Postgres CDC
GlassFlow provides a managed connector for Postgres CDC
A PostgreSQL CDC (Change Data Capture) source streams real-time database changes, such as inserts, updates, and deletes, to downstream systems for integration or analysis.
GlassFlow has a managed Postgres CDC connector that can stream changes from your database into a GlassFlow pipeline where it is available to you as a json event. You can enrich the event, transform it and connect to a GlassFlow sink to send the data to a destination database. For example, with GlassFlow Postgres CDC source connector and Snowflake sink connector, you can build a pipeline to load data form postgres to snowflake in real time without running any code.
Postgres CDC - Our Approach
There are several ways to do change data capture (CDC) on Postgres. At GlassFlow, we use Logical decoding apprach to extract changes from Posgres. Logical decoding is the process of extracting changes (inserts, updates, deletes) from the PostgreSQL WAL (Write-Ahead Log) in a format that can be interpreted by external consumers. This enables replication, auditing, and real-time data streaming. Unlike physical replication, which copies entire database pages, logical decoding provides a structured and readable representation of changes, allowing for selective replication and transformation. To enable logical decoding, PostgreSQL requires:
- A replication slot (pg_create_logical_replication_slot)
- A logical decoding plugin (test_decoding, wal2json, pgoutput, etc.)
GlassFlow uses the plugin wal2json. wal2json is a popular logical decoding output plugin that converts PostgreSQL WAL changes into JSON format. By default, the changes are streamed on The wal2json plugin for PostgreSQL offers two distinct output formats, known as format version 1 and format version 2, each catering to different use cases and preferences. You can find more details on wal2json github
Format Version 1:
- Structure: Generates a single JSON object per transaction, encapsulating all changes (inserts, updates, deletes) within that transaction.
- Use Case: Ideal for scenarios where a comprehensive view of each transaction is required, maintaining the context of grouped operations.
Format Version 2:
- Structure: Produces individual JSON objects for each tuple (row) change, accompanied by optional JSON objects indicating the beginning and end of transactions.
- Use Case: Suitable for applications that prefer or require processing changes on a per-row basis, allowing for more granular handling of data modifications.
By default, GlassFlow streams changes on format version 2. If you want to use format version 1, you can set it via the glassflow python sdk using the parameter replication_output_plugin_args
Important Info:
Glassflow supports CDC on Postgres only via logical replication with the help of replication slots. Starting logical replication on hot standbys (non-primary nodes) may end up in replication slots getting invalidated. Please checkout and beware of the documentation here if a replication slot is required on a hot standby.
GlassFlow Whitelist IPs
Many of the hosted postgres providers allow access only from specific whitelisted IP addresses. In such a case, please add GlassFlow's IP address in the allowed IP section of your postgres providers
Step 1: Setting Up Postgres
Azure Postgres
If you are using a managed postgres from Azure, the following configuration is needed on the database for GlassFlow Postgres connector to work.
- Set whitelst IP address and allow requests from these two IP addresses:
- Set the following parameters from the database:
wal_level
to logicalmax_worker_processes
to a number larger than 16
- The postgres user that you use for the connection should have replication role and be owner of the tables you want to syc.
- Create replication slot:
Gcloud (wal2json)
If you are using a managed postgres from Google Cloud, the following configuration is needed on the database for GlassFlow Postgres connector to work
- Add the replica's IP Addresses to the primary's authorized networks:
-
Set the
logical_decoding
flag toon
for the instance -
Create a user in role
cloudsqlsuperuser
withreplication
permission:
-
Setup a database e.g.
test_db
or use an existing DB -
Create a replication slot
test_slot
after logging in with thereplication_user
. The slot must be owned by the user responsible for replication.
- Note the public IP of the instance that will be needed when setting up the GlassFlow connector
Step 2: Setting Up the Connector in GlassFlow
You can integrate Postgres CDC with GlassFlow either through the WebApp or by using the Python SDK. Below are the instructions for both methods.
Using WebApp
- Log in to the GlassFlow WebApp and navigate to the "Pipelines" section.
- Create a new pipeline.
- Go to Source section in the pieline setup page and select Postgres.
- Enter the following details:
- Host: Postgres hostname or IP address.
- Port: Postgres Port (default 5432).
- User Name: User on your postgres database (examle
replication_user
created above) - Password: Password for the user on postgres
- Database Name: Database on postgres for which you want to capture CDC
- Replication Slot: name of the replication slot (example
test_slot
created above)
- Click Continue to save your source settings and configure other parts of your pipeline.
Once created, this source will be available for use in your GlassFlow pipelines. CDC data will automatically be captured by GlassFlow and be available in the pipeline.
Using PythonSDK
If you are using GlassFlow Python SDK to deploy and manage the pipeline, the following code shows how to configure Postgres CDC connector via the SDK.
A fully functional example to create a pipeline with Postgres CDC as a managed source connector is available on our examples repo as a Jupyter Notebook
Using Github Actions
If you are using GitHub Actions to to deploy and manage the pipeline, the following snippet shows the YAML configuration of Postgres CDC connector component
Data Format
As described, GlassFlow provides CDC data in wall2json json format. The default format (wall2json Format Version 2) streams the following data:
Transaction Type | JSON Data Format |
---|---|
Insert (I) | { "action": "I", "schema": "public", "table": "transactions", "columns": [ { "name": "id", "type": "integer", "value": 1 }, { "name": "type", "type": "text", "value": "deposit" }, { "name": "amount", "type": "numeric", "value": 100 }, { "name": "currency", "type": "text", "value": "USD" } ] } |
Update (U) | { "action": "U", "schema": "public", "table": "transactions", "columns": [ { "name": "id", "type": "integer", "value": 1 }, { "name": "type", "type": "text", "value": "deposit" }, { "name": "amount", "type": "numeric", "value": 150 }, { "name": "currency", "type": "text", "value": "USD" } ] } |
Delete (D) | { "action": "D", "schema": "public", "table": "transactions", "columns": [ { "name": "id", "type": "integer", "value": 1 }, { "name": "type", "type": "text", "value": "deposit" }, { "name": "amount", "type": "numeric", "value": 100 }, { "name": "currency", "type": "text", "value": "USD" } ] } |
Start Transaction (B) | { "action": "B" } |
End Transaction (C) | { "action": "C"} |
Cleaning Up
Drop replication slot
Replication slots prevent removal of required resources even when there is no connection (subscriber) using them. Dangling inactive replication slots can cause high disk space usage on primary node as it will be unable to VACCUM the WAL files. Always remove test / inactive replication slots:
Check inactive replication slots:
Drop inactive replication slot: