From f6a7cadf0cdf4a11e5760211466dcc45b87738dd Mon Sep 17 00:00:00 2001 From: happygame Date: Fri, 23 Jan 2026 11:21:29 +0800 Subject: [PATCH 1/3] feat: add logical replication demo examples --- .../async-await/logical-replication-demo.js | 175 ++++++++++++++++++ examples/promises/logical-replication-demo.js | 155 ++++++++++++++++ 2 files changed, 330 insertions(+) create mode 100644 examples/async-await/logical-replication-demo.js create mode 100644 examples/promises/logical-replication-demo.js diff --git a/examples/async-await/logical-replication-demo.js b/examples/async-await/logical-replication-demo.js new file mode 100644 index 000000000..39531b07b --- /dev/null +++ b/examples/async-await/logical-replication-demo.js @@ -0,0 +1,175 @@ +import { Client, LogicalReplicationService, MppdbDecodingPlugin } from 'gaussdb-node' + +const slotName = 'demo_replication_slot' +const tableName = 'demo_replication_table' + +// Connection config from environment variables or defaults +const config = { + host: process.env.GAUSSHOST, + port: parseInt(process.env.GAUSSPORT), + database: process.env.GAUSSDATABASE, + user: process.env.GAUSSUSER, + password: process.env.GAUSSPASSWORD, +} + +async function checkSlotExists(client) { + const res = await client.query( + `SELECT 1 FROM pg_replication_slots WHERE slot_name = $1`, + [slotName] + ) + return res.rowCount > 0 +} + +async function createSlot(client) { + await client.query( + `SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')` + ) + console.log(`slot "${slotName}" created`) +} + +async function dropSlot(client) { + await client.query(`SELECT pg_drop_replication_slot('${slotName}')`) + console.log(`slot "${slotName}" dropped`) +} + +async function createTable(client) { + await client.query(` + CREATE TABLE IF NOT EXISTS ${tableName} ( + id SERIAL PRIMARY KEY, + name TEXT, + value INT + ) + `) + console.log(`table "${tableName}" created`) +} + +async function dropTable(client) { + await client.query(`DROP TABLE IF EXISTS ${tableName}`) + console.log(`table "${tableName}" dropped`) +} + +async function performDML(client) { + console.log('performing DML operations...') + + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100]) + console.log(' INSERT alice') + + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200]) + console.log(' INSERT bob') + + await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice']) + console.log(' UPDATE alice') + + await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob']) + console.log(' DELETE bob') + + console.log('DML operations completed') +} + +async function main() { + let clientB = null + let service = null + + try { + clientB = new Client(config) + await clientB.connect() + console.log('client B connected') + + // setup: create table and slot if needed + await createTable(clientB) + + const slotExists = await checkSlotExists(clientB) + if (!slotExists) { + await createSlot(clientB) + } else { + console.log(`slot "${slotName}" already exists`) + } + + // start replication service (client A uses config, not Client instance) + service = new LogicalReplicationService(config, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + }) + + const plugin = new MppdbDecodingPlugin({ + includeXids: false, + skipEmptyXacts: true, + }) + + const receivedMessages = [] + + service.on('start', () => { + console.log('replication started') + }) + + service.on('data', (lsn, msg) => { + console.log('[data]', lsn, msg) + receivedMessages.push(msg) + }) + + service.on('error', (err) => { + console.error('[error]', err) + }) + + // start replication in background + service.subscribe(plugin, slotName) + + // wait for replication to start + await new Promise((resolve) => service.once('start', resolve)) + + // perform DML operations (client B) + await performDML(clientB) + + // wait a bit for replication to catch up + await new Promise((resolve) => setTimeout(resolve, 2000)) + + // stop replication first + await service.stop() + service = null + console.log('replication stopped') + + console.log(`received ${receivedMessages.length} messages`) + + // cleanup while clientB is still connected + await dropTable(clientB) + await dropSlot(clientB) + } catch (err) { + console.error('error:', err) + } finally { + // stop service if still running + if (service) { + try { + await service.stop() + } catch (err) { + // ignore + } + } + + // cleanup with a new connection if needed + if (clientB) { + try { + await clientB.end() + } catch (err) { + // ignore + } + } + + // ensure cleanup with fresh connection + const cleanupClient = new Client(config) + try { + await cleanupClient.connect() + await cleanupClient.query(`DROP TABLE IF EXISTS ${tableName}`) + await cleanupClient.query(`SELECT pg_drop_replication_slot('${slotName}')`).catch(() => {}) + } catch (err) { + // ignore cleanup errors + } finally { + await cleanupClient.end().catch(() => {}) + } + + console.log('cleanup done') + } +} + +main().catch((err) => { + console.error(err) + process.exit(1) +}) diff --git a/examples/promises/logical-replication-demo.js b/examples/promises/logical-replication-demo.js new file mode 100644 index 000000000..6e843c770 --- /dev/null +++ b/examples/promises/logical-replication-demo.js @@ -0,0 +1,155 @@ +import { Client, LogicalReplicationService, MppdbDecodingPlugin } from 'gaussdb-node' + +const slotName = 'demo_replication_slot' +const tableName = 'demo_replication_table' + +const config = { + host: process.env.GAUSSHOST, + port: parseInt(process.env.GAUSSPORT), + database: process.env.GAUSSDATABASE, + user: process.env.GAUSSUSER, + password: process.env.GAUSSPASSWORD, +} + +function checkSlotExists(client) { + return client + .query('SELECT 1 FROM pg_replication_slots WHERE slot_name = $1', [slotName]) + .then((res) => res.rowCount > 0) +} + +function createSlot(client) { + return client + .query(`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`) + .then(() => console.log(`slot "${slotName}" created`)) +} + +function dropSlot(client) { + return client + .query(`SELECT pg_drop_replication_slot('${slotName}')`) + .then(() => console.log(`slot "${slotName}" dropped`)) +} + +function createTable(client) { + return client + .query(` + CREATE TABLE IF NOT EXISTS ${tableName} ( + id SERIAL PRIMARY KEY, + name TEXT, + value INT + ) + `) + .then(() => console.log(`table "${tableName}" created`)) +} + +function dropTable(client) { + return client + .query(`DROP TABLE IF EXISTS ${tableName}`) + .then(() => console.log(`table "${tableName}" dropped`)) +} + +function performDML(client) { + console.log('performing DML operations...') + return client + .query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100]) + .then(() => console.log(' INSERT alice')) + .then(() => client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200])) + .then(() => console.log(' INSERT bob')) + .then(() => client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice'])) + .then(() => console.log(' UPDATE alice')) + .then(() => client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob'])) + .then(() => console.log(' DELETE bob')) + .then(() => console.log('DML operations completed')) +} + +function delay(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +function main() { + let clientB = null + let service = null + const receivedMessages = [] + + clientB = new Client(config) + + clientB + .connect() + .then(() => { + console.log('client B connected') + return createTable(clientB) + }) + .then(() => checkSlotExists(clientB)) + .then((exists) => { + if (!exists) { + return createSlot(clientB) + } else { + console.log(`slot "${slotName}" already exists`) + } + }) + .then(() => { + service = new LogicalReplicationService(config, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + }) + + const plugin = new MppdbDecodingPlugin({ + includeXids: false, + skipEmptyXacts: true, + }) + + service.on('start', () => { + console.log('replication started') + }) + + service.on('data', (lsn, msg) => { + console.log('[data]', lsn, msg) + receivedMessages.push(msg) + }) + + service.on('error', (err) => { + console.error('[error]', err) + }) + + service.subscribe(plugin, slotName) + + return new Promise((resolve) => service.once('start', resolve)) + }) + .then(() => performDML(clientB)) + .then(() => delay(2000)) + .then(() => { + return service.stop().then(() => { + service = null + console.log('replication stopped') + console.log(`received ${receivedMessages.length} messages`) + }) + }) + .then(() => dropTable(clientB)) + .then(() => dropSlot(clientB)) + .catch((err) => { + console.error('error:', err) + }) + .finally(() => { + const cleanup = [] + + if (service) { + cleanup.push(service.stop().catch(() => {})) + } + + if (clientB) { + cleanup.push(clientB.end().catch(() => {})) + } + + return Promise.all(cleanup) + .then(() => { + const cleanupClient = new Client(config) + return cleanupClient + .connect() + .then(() => cleanupClient.query(`DROP TABLE IF EXISTS ${tableName}`)) + .then(() => cleanupClient.query(`SELECT pg_drop_replication_slot('${slotName}')`).catch(() => {})) + .finally(() => cleanupClient.end().catch(() => {})) + }) + .catch(() => {}) + .then(() => console.log('cleanup done')) + }) +} + +main() From 1625cc586c5df0a8fa2d6d2f118b73980643d821 Mon Sep 17 00:00:00 2001 From: happygame Date: Fri, 23 Jan 2026 23:26:55 +0800 Subject: [PATCH 2/3] test: add logical replication integration and unit tests --- .../logical-replication-tests.js | 404 ++++++++++++++++++ .../logical-replication-service-tests.js | 80 ++++ .../mppdb-decoding-plugin-tests.js | 153 +++++++ 3 files changed, 637 insertions(+) create mode 100644 packages/gaussdb-node/test/integration/logical-replication/logical-replication-tests.js create mode 100644 packages/gaussdb-node/test/unit/logical-replication/logical-replication-service-tests.js create mode 100644 packages/gaussdb-node/test/unit/logical-replication/mppdb-decoding-plugin-tests.js diff --git a/packages/gaussdb-node/test/integration/logical-replication/logical-replication-tests.js b/packages/gaussdb-node/test/integration/logical-replication/logical-replication-tests.js new file mode 100644 index 000000000..02e11c005 --- /dev/null +++ b/packages/gaussdb-node/test/integration/logical-replication/logical-replication-tests.js @@ -0,0 +1,404 @@ +'use strict' +const helper = require('../test-helper') +const gaussdb = helper.gaussdb +const assert = require('assert') + +const suite = new helper.Suite() + +const report = (label, ok) => { + const mark = ok ? '\u2714' : '\u2716' + console.log(` - ${label} ${mark}`) +} + +const check = (label, predicate, message) => { + const ok = Boolean(predicate) + report(label, ok) + assert.ok(ok, message || label) +} + +const dropReplicationSlot = async (slotName) => { + const cleanupClient = new gaussdb.Client(helper.config) + try { + await cleanupClient.connect() + await cleanupClient.query(`SELECT pg_drop_replication_slot('${slotName}')`) + } catch (err) { + console.warn('Failed to drop replication slot', err) + } finally { + await cleanupClient.end().catch(() => {}) + } +} + +if (process.env.LOGICAL_REPLICATION_TEST !== '1') { + suite.testAsync('skipping logical replication tests (missing env)', () => {}) + return +} + +suite.test('logical replication - standard JSON format (no parallel)', function (done) { + this.timeout = 20000 + const uniqueId = Date.now().toString(36) + const tableName = `logical_replication_${uniqueId}` + const slotName = `logical_replication_slot_${uniqueId}` + const client = new gaussdb.Client(helper.config) + let service = null + + const run = async () => { + try { + await client.connect() + await client.query(`DROP TABLE IF EXISTS ${tableName}`) + await client.query(`CREATE TABLE ${tableName} (id SERIAL PRIMARY KEY, name TEXT, value INT)`) + + await client.query(`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`) + + const res = await client.query('SELECT pg_current_xlog_location() AS lsn') + const currentLsn = res.rows[0].lsn + service = new gaussdb.LogicalReplicationService(helper.config, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + }) + const plugin = new gaussdb.MppdbDecodingPlugin({ + includeXids: false, + skipEmptyXacts: true, + }) + + const messages = [] + const errors = [] + service.on('start', () => {}) + service.on('error', (err) => { + errors.push(err) + }) + service.on('data', (lsn, msg) => { + messages.push({ lsn, msg }) + }) + + service.subscribe(plugin, slotName, currentLsn).catch((err) => errors.push(err)) + await new Promise((resolve) => service.once('start', resolve)) + + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100]) + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200]) + await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice']) + await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob']) + + await new Promise((resolve) => setTimeout(resolve, 3000)) + + if (errors.length > 0) { + throw new Error(`Replication errors: ${errors.map((e) => e.message).join(', ')}`) + } + + check('received replication messages', messages.length > 0, `expected messages, got ${messages.length}`) + + const msgContents = messages.map((m) => m.msg).filter((m) => typeof m === 'string') + check( + 'BEGIN marker', + msgContents.some((m) => m.includes('BEGIN')), + 'expected BEGIN marker' + ) + check( + 'COMMIT marker', + msgContents.some((m) => m.includes('COMMIT')), + 'expected COMMIT marker' + ) + + const jsonMessages = msgContents + .filter((m) => m.startsWith('{')) + .map((m) => { + try { + return JSON.parse(m) + } catch { + return null + } + }) + .filter(Boolean) + + const opTypes = new Set(jsonMessages.map((m) => m.op_type).filter(Boolean)) + check('INSERT payload', opTypes.has('INSERT'), 'expected INSERT payload') + check('UPDATE payload', opTypes.has('UPDATE'), 'expected UPDATE payload') + check('DELETE payload', opTypes.has('DELETE'), 'expected DELETE payload') + + const insertMsg = jsonMessages.find((m) => m.op_type === 'INSERT') + check( + 'INSERT references table', + insertMsg && insertMsg.table_name.includes(tableName), + 'INSERT should reference correct table' + ) + check( + 'INSERT has columns_name', + insertMsg && Array.isArray(insertMsg.columns_name), + 'INSERT should have columns_name array' + ) + check( + 'INSERT has columns_val', + insertMsg && Array.isArray(insertMsg.columns_val), + 'INSERT should have columns_val array' + ) + } finally { + if (service) { + await service.stop().catch(() => {}) + } + await client.query(`DROP TABLE IF EXISTS ${tableName}`).catch(() => {}) + await dropReplicationSlot(slotName) + await client.end().catch(() => {}) + } + } + + run().then(() => done(), done) +}) + +suite.test('logical replication - parallel JSON format (decodeStyle: j)', function (done) { + this.timeout = 20000 + const uniqueId = Date.now().toString(36) + const tableName = `logical_replication_${uniqueId}` + const slotName = `logical_replication_slot_${uniqueId}` + const client = new gaussdb.Client(helper.config) + let service = null + + const run = async () => { + try { + await client.connect() + await client.query(`DROP TABLE IF EXISTS ${tableName}`) + await client.query(`CREATE TABLE ${tableName} (id SERIAL PRIMARY KEY, name TEXT, value INT)`) + + await client.query(`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`) + + const res = await client.query('SELECT pg_current_xlog_location() AS lsn') + const currentLsn = res.rows[0].lsn + service = new gaussdb.LogicalReplicationService(helper.config, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + }) + const plugin = new gaussdb.MppdbDecodingPlugin({ + includeXids: false, + skipEmptyXacts: true, + parallelDecodeNum: 2, + decodeStyle: 'j', + }) + + const messages = [] + const errors = [] + service.on('start', () => {}) + service.on('error', (err) => { + errors.push(err) + }) + service.on('data', (lsn, msg) => { + messages.push({ lsn, msg }) + }) + + service.subscribe(plugin, slotName, currentLsn).catch((err) => errors.push(err)) + await new Promise((resolve) => service.once('start', resolve)) + + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100]) + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200]) + await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice']) + await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob']) + + await new Promise((resolve) => setTimeout(resolve, 3000)) + + if (errors.length > 0) { + throw new Error(`Replication errors: ${errors.map((e) => e.message).join(', ')}`) + } + + check('received replication messages', messages.length > 0, `expected messages, got ${messages.length}`) + + const msgContents = messages.map((m) => m.msg).filter((m) => typeof m === 'string' || typeof m === 'object') + + const beginMessages = msgContents.filter((m) => { + if (typeof m === 'string') return m.includes('BEGIN CSN') + return false + }) + check('BEGIN CSN markers', beginMessages.length > 0, 'expected BEGIN CSN markers') + + const commitMessages = msgContents.filter((m) => { + if (typeof m === 'string') return m === 'commit' + return false + }) + check('commit markers', commitMessages.length > 0, 'expected commit markers') + + const dmlMessages = msgContents.filter((m) => typeof m === 'object' && m.table_name && m.op_type) + const opTypes = new Set(dmlMessages.map((m) => m.op_type)) + check('INSERT payload', opTypes.has('INSERT'), 'expected INSERT payload') + check('UPDATE payload', opTypes.has('UPDATE'), 'expected UPDATE payload') + check('DELETE payload', opTypes.has('DELETE'), 'expected DELETE payload') + } finally { + if (service) { + await service.stop().catch(() => {}) + } + await client.query(`DROP TABLE IF EXISTS ${tableName}`).catch(() => {}) + await dropReplicationSlot(slotName) + await client.end().catch(() => {}) + } + } + + run().then(() => done(), done) +}) + +suite.test('logical replication - parallel text format (decodeStyle: t)', function (done) { + this.timeout = 20000 + const uniqueId = Date.now().toString(36) + const tableName = `logical_replication_${uniqueId}` + const slotName = `logical_replication_slot_${uniqueId}` + const client = new gaussdb.Client(helper.config) + let service = null + + const run = async () => { + try { + await client.connect() + await client.query(`DROP TABLE IF EXISTS ${tableName}`) + await client.query(`CREATE TABLE ${tableName} (id SERIAL PRIMARY KEY, name TEXT, value INT)`) + + await client.query(`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`) + + const res = await client.query('SELECT pg_current_xlog_location() AS lsn') + const currentLsn = res.rows[0].lsn + service = new gaussdb.LogicalReplicationService(helper.config, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + }) + const plugin = new gaussdb.MppdbDecodingPlugin({ + includeXids: false, + skipEmptyXacts: true, + parallelDecodeNum: 2, + decodeStyle: 't', + }) + + const messages = [] + const errors = [] + service.on('start', () => {}) + service.on('error', (err) => { + errors.push(err) + }) + service.on('data', (lsn, msg) => { + messages.push({ lsn, msg }) + }) + + service.subscribe(plugin, slotName, currentLsn).catch((err) => errors.push(err)) + await new Promise((resolve) => service.once('start', resolve)) + + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100]) + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200]) + await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice']) + await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob']) + + await new Promise((resolve) => setTimeout(resolve, 3000)) + + if (errors.length > 0) { + throw new Error(`Replication errors: ${errors.map((e) => e.message).join(', ')}`) + } + + check('received replication messages', messages.length > 0, `expected messages, got ${messages.length}`) + + const msgContents = messages.map((m) => m.msg).filter((m) => typeof m === 'string') + + const beginMessages = msgContents.filter((m) => m.includes('BEGIN CSN')) + check('BEGIN CSN markers', beginMessages.length > 0, 'expected BEGIN CSN markers') + + const commitMessages = msgContents.filter((m) => m === 'commit') + check('commit markers', commitMessages.length > 0, 'expected commit markers') + + const insertMessages = msgContents.filter((m) => m.includes('INSERT:')) + check('INSERT messages', insertMessages.length >= 2, 'expected INSERT messages') + + const updateMessages = msgContents.filter((m) => m.includes('UPDATE:')) + check('UPDATE messages', updateMessages.length >= 1, 'expected UPDATE messages') + + const deleteMessages = msgContents.filter((m) => m.includes('DELETE:')) + check('DELETE messages', deleteMessages.length >= 1, 'expected DELETE messages') + + const sampleInsert = insertMessages[0] + check( + 'INSERT shows id column type', + sampleInsert && sampleInsert.includes('id[integer]'), + 'INSERT should show column types' + ) + check( + 'INSERT shows name column type', + sampleInsert && sampleInsert.includes('name[text]'), + 'INSERT should show column types' + ) + } finally { + if (service) { + await service.stop().catch(() => {}) + } + await client.query(`DROP TABLE IF EXISTS ${tableName}`).catch(() => {}) + await dropReplicationSlot(slotName) + await client.end().catch(() => {}) + } + } + + run().then(() => done(), done) +}) + +suite.test('logical replication - binary format (parallel, no decodeStyle)', function (done) { + this.timeout = 20000 + const uniqueId = Date.now().toString(36) + const tableName = `logical_replication_${uniqueId}` + const slotName = `logical_replication_slot_${uniqueId}` + const client = new gaussdb.Client(helper.config) + let service = null + + const run = async () => { + try { + await client.connect() + await client.query(`DROP TABLE IF EXISTS ${tableName}`) + await client.query(`CREATE TABLE ${tableName} (id SERIAL PRIMARY KEY, name TEXT, value INT)`) + + await client.query(`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`) + + const res = await client.query('SELECT pg_current_xlog_location() AS lsn') + const currentLsn = res.rows[0].lsn + service = new gaussdb.LogicalReplicationService(helper.config, { + acknowledge: { auto: true, timeoutSeconds: 10 }, + }) + const plugin = new gaussdb.MppdbDecodingPlugin({ + includeXids: false, + skipEmptyXacts: true, + parallelDecodeNum: 2, + }) + + const messages = [] + const errors = [] + service.on('start', () => {}) + service.on('error', (err) => { + errors.push(err) + }) + service.on('data', (lsn, msg) => { + messages.push({ lsn, msg }) + }) + + service.subscribe(plugin, slotName, currentLsn).catch((err) => errors.push(err)) + await new Promise((resolve) => service.once('start', resolve)) + + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100]) + await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200]) + await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice']) + await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob']) + + await new Promise((resolve) => setTimeout(resolve, 3000)) + + if (errors.length > 0) { + throw new Error(`Replication errors: ${errors.map((e) => e.message).join(', ')}`) + } + + check('received replication messages', messages.length > 0, `expected messages, got ${messages.length}`) + + const msgContents = messages.map((m) => m.msg).filter((m) => typeof m === 'string') + const allMessages = msgContents.join('') + + check('BEGIN marker (B)', allMessages.includes('B'), 'expected BEGIN marker (B)') + check('COMMIT marker (C)', allMessages.includes('C'), 'expected COMMIT marker (C)') + check('INSERT marker (I)', allMessages.includes('I'), 'expected INSERT marker (I)') + check('UPDATE marker (U)', allMessages.includes('U'), 'expected UPDATE marker (U)') + check('DELETE marker (D)', allMessages.includes('D'), 'expected DELETE marker (D)') + + check( + 'table name in binary messages', + msgContents.some((m) => m.includes(tableName)), + 'expected table name in binary messages' + ) + } finally { + if (service) { + await service.stop().catch(() => {}) + } + await client.query(`DROP TABLE IF EXISTS ${tableName}`).catch(() => {}) + await dropReplicationSlot(slotName) + await client.end().catch(() => {}) + } + } + + run().then(() => done(), done) +}) diff --git a/packages/gaussdb-node/test/unit/logical-replication/logical-replication-service-tests.js b/packages/gaussdb-node/test/unit/logical-replication/logical-replication-service-tests.js new file mode 100644 index 000000000..c17a43328 --- /dev/null +++ b/packages/gaussdb-node/test/unit/logical-replication/logical-replication-service-tests.js @@ -0,0 +1,80 @@ +'use strict' +const helper = require('../test-helper') +const assert = require('assert') +const LogicalReplicationService = require('../../../lib/logical-replication/logical-replication-service') + +const suite = new helper.Suite() + +const u64be = (hi, lo) => { + const buf = Buffer.alloc(8) + buf.writeUInt32BE(hi >>> 0, 0) + buf.writeUInt32BE(lo >>> 0, 4) + return buf +} + +const u64le = (hi, lo) => { + const buf = Buffer.alloc(8) + buf.writeUInt32LE(lo >>> 0, 0) + buf.writeUInt32LE(hi >>> 0, 4) + return buf +} + +const buildXLogData = (start, payload) => { + const header = Buffer.concat([Buffer.from([0x77]), u64be(start.hi, start.lo), u64be(0, 0), u64be(0, 0)]) + return Buffer.concat([header, payload]) +} + +const buildKeepalive = (server, serverClockMicros, shouldRespond) => { + const header = Buffer.concat([Buffer.from([0x6b]), u64le(server.hi, server.lo), u64le(0, 0)]) + const clock = u64le(serverClockMicros.hi, serverClockMicros.lo) + return Buffer.concat([header, clock, Buffer.from([shouldRespond ? 1 : 0])]) +} + +suite.test('xlog data emits parsed message with lsn', () => { + const service = new LogicalReplicationService({}, { acknowledge: { auto: false } }) + const plugin = { + parse: () => 'BEGIN', + } + let received = null + service.on('data', (lsn, data) => { + received = { lsn, data } + }) + const buffer = buildXLogData({ hi: 0, lo: 0x080bc370 }, Buffer.from('BEGIN', 'utf8')) + service._handleCopyData(plugin, buffer) + assert.deepStrictEqual(received, { lsn: '00000000/080BC370', data: 'BEGIN' }) +}) + +suite.test('xlog data uses item lsn when parse returns batch', () => { + const service = new LogicalReplicationService({}, { acknowledge: { auto: false } }) + const plugin = { + parse: () => [{ lsn: '00000000/080BC4D0', payload: 'COMMIT' }], + } + let received = null + service.on('data', (lsn, data) => { + received = { lsn, data } + }) + const buffer = buildXLogData({ hi: 0, lo: 0x080bc370 }, Buffer.from('batch', 'utf8')) + service._handleCopyData(plugin, buffer) + assert.deepStrictEqual(received, { lsn: '00000000/080BC4D0', data: { lsn: '00000000/080BC4D0', payload: 'COMMIT' } }) +}) + +suite.test('keepalive emits heartbeat and requests acknowledge', () => { + const service = new LogicalReplicationService({}, { acknowledge: { auto: false } }) + let heartbeat = null + let ack = null + service.acknowledge = (lsn, ping) => { + ack = { lsn, ping } + return true + } + service.on('heartbeat', (lsn, timestamp, shouldRespond) => { + heartbeat = { lsn, timestamp, shouldRespond } + }) + const buffer = buildKeepalive({ hi: 0, lo: 0x080bc370 }, { hi: 0, lo: 1000000 }, true) + service._handleCopyData({}, buffer) + assert.deepStrictEqual(heartbeat, { + lsn: '00000000/080BC370', + timestamp: 946684801000, + shouldRespond: true, + }) + assert.deepStrictEqual(ack, { lsn: service._lastLsn, ping: true }) +}) diff --git a/packages/gaussdb-node/test/unit/logical-replication/mppdb-decoding-plugin-tests.js b/packages/gaussdb-node/test/unit/logical-replication/mppdb-decoding-plugin-tests.js new file mode 100644 index 000000000..b34fb01d5 --- /dev/null +++ b/packages/gaussdb-node/test/unit/logical-replication/mppdb-decoding-plugin-tests.js @@ -0,0 +1,153 @@ +'use strict' +const helper = require('../test-helper') +const assert = require('assert') +const MppdbDecodingPlugin = require('../../../lib/logical-replication/mppdb-decoding-plugin') + +const suite = new helper.Suite() + +const u16be = (value) => { + const buf = Buffer.alloc(2) + buf.writeUInt16BE(value, 0) + return buf +} + +const u32be = (value) => { + const buf = Buffer.alloc(4) + buf.writeUInt32BE(value >>> 0, 0) + return buf +} + +const u64be = (hi, lo) => { + const buf = Buffer.alloc(8) + buf.writeUInt32BE(hi >>> 0, 0) + buf.writeUInt32BE(lo >>> 0, 4) + return buf +} + +const buildTextBatch = (items) => { + const chunks = [] + for (const item of items) { + const payload = Buffer.from(item.payload, 'utf8') + chunks.push(u32be(payload.length)) + chunks.push(u64be(item.lsn.hi, item.lsn.lo)) + chunks.push(payload) + } + chunks.push(u32be(0)) + return Buffer.concat(chunks) +} + +const buildDmlRecord = (tag, schema, table, tuples) => { + const chunks = [Buffer.from(tag)] + chunks.push(u16be(schema.length), Buffer.from(schema, 'utf8')) + chunks.push(u16be(table.length), Buffer.from(table, 'utf8')) + for (const tuple of tuples) { + const kind = tuple.kind === 'old' ? 0x4f : 0x4e + chunks.push(Buffer.from([kind])) + chunks.push(u16be(tuple.columns.length)) + for (const column of tuple.columns) { + const name = Buffer.from(column.name, 'utf8') + chunks.push(u16be(name.length), name) + chunks.push(u32be(column.typeOid)) + if (column.value === null) { + chunks.push(u32be(0xffffffff)) + } else { + const value = Buffer.from(column.value, 'utf8') + chunks.push(u32be(value.length), value) + } + } + } + return Buffer.concat(chunks) +} + +const buildBinaryBatch = (records) => { + const chunks = [] + for (const record of records) { + chunks.push(u32be(record.buffer.length)) + chunks.push(u64be(record.lsn.hi, record.lsn.lo)) + chunks.push(record.buffer) + chunks.push(Buffer.from([record.separator])) + } + chunks.push(u32be(0)) + return Buffer.concat(chunks) +} + +suite.test('parse text without batch returns raw string', () => { + const plugin = new MppdbDecodingPlugin() + const result = plugin.parse(Buffer.from('BEGIN', 'utf8')) + assert.strictEqual(result, 'BEGIN') +}) + +suite.test('parse text batch decodes JSON payloads', () => { + const plugin = new MppdbDecodingPlugin({ sendingBatch: true, decodeStyle: 'j' }) + const buffer = buildTextBatch([ + { + lsn: { hi: 0, lo: 0x080bc370 }, + payload: '{"op_type":"INSERT","table_name":"public.demo_replication_table"}', + }, + { + lsn: { hi: 0, lo: 0x080bc4d0 }, + payload: 'BEGIN CSN: 3285', + }, + ]) + const result = plugin.parse(buffer) + assert.strictEqual(result.length, 2) + assert.deepStrictEqual(result[0], { + lsn: '00000000/080BC370', + payload: { op_type: 'INSERT', table_name: 'public.demo_replication_table' }, + }) + assert.deepStrictEqual(result[1], { lsn: '00000000/080BC4D0', payload: 'BEGIN CSN: 3285' }) +}) + +suite.test('parse binary batch records and tuples', () => { + const plugin = new MppdbDecodingPlugin({ decodeStyle: 'b' }) + const begin = Buffer.concat([Buffer.from('B'), u64be(0, 3301), u64be(0, 0x080d8b30)]) + const insert = buildDmlRecord('I', 'public', 'demo_replication_table', [ + { + kind: 'new', + columns: [ + { name: 'id', typeOid: 23, value: '6' }, + { name: 'name', typeOid: 25, value: "'alice'" }, + { name: 'value', typeOid: 23, value: '100' }, + ], + }, + ]) + const commit = Buffer.from('C') + const buffer = buildBinaryBatch([ + { lsn: { hi: 0, lo: 0x080d8b30 }, buffer: begin, separator: 0x50 }, + { lsn: { hi: 0, lo: 0x080d8b30 }, buffer: insert, separator: 0x50 }, + { lsn: { hi: 0, lo: 0x080d8c30 }, buffer: commit, separator: 0x46 }, + ]) + const result = plugin.parse(buffer) + assert.strictEqual(result.length, 3) + assert.deepStrictEqual(result[0], { + lsn: '00000000/080D8B30', + batchEnd: false, + record: { tag: 'begin', csn: 3301, firstLsn: '00000000/080D8B30' }, + }) + assert.strictEqual(result[1].record.tag, 'insert') + assert.strictEqual(result[1].record.schema, 'public') + assert.strictEqual(result[1].record.table, 'demo_replication_table') + assert.deepStrictEqual(result[1].record.tuples[0].columns, [ + { name: 'id', typeOid: 23, value: '6' }, + { name: 'name', typeOid: 25, value: "'alice'" }, + { name: 'value', typeOid: 23, value: '100' }, + ]) + assert.deepStrictEqual(result[2], { lsn: '00000000/080D8C30', batchEnd: true, record: { tag: 'commit' } }) +}) + +suite.test('parse binary rejects unknown tuple tags', () => { + const plugin = new MppdbDecodingPlugin({ decodeStyle: 'b' }) + const invalid = Buffer.concat([ + Buffer.from('I'), + u16be(6), + Buffer.from('public'), + u16be(5), + Buffer.from('table'), + Buffer.from('F'), + ]) + const buffer = buildBinaryBatch([{ lsn: { hi: 0, lo: 0x080d8468 }, buffer: invalid, separator: 0x46 }]) + assert.throws( + () => plugin.parse(buffer), + (err) => err && /unknown tuple tag: F/.test(err.message) + ) +}) From b2cc522ae10b269386844d0e0195a924dab518f8 Mon Sep 17 00:00:00 2001 From: happygame Date: Fri, 23 Jan 2026 23:40:55 +0800 Subject: [PATCH 3/3] ci: enable logical replication in CI --- .github/workflows/ci.yml | 56 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 55 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73898f4f3..b48743136 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,13 +60,20 @@ jobs: SHA256_TEST_GAUSSUSER: sha256_test SHA256_TEST_GAUSSPASSWORD: test4@scram DB_TYPE: opengauss + LOGICAL_REPLICATION_TEST: '1' steps: - name: Show OS run: | uname -a - name: Wait for GaussDB to be ready run: | - timeout 60 bash -c 'until pg_isready -h localhost -p 5432; do sleep 2; done' + # Using gs_ctl status (native OpenGauss check) instead of pg_isready + CONTAINER_ID=$(docker ps --filter "ancestor=opengauss/opengauss" --format "{{.ID}}") + timeout 60 bash -c "until docker exec $CONTAINER_ID su - omm -c 'gs_ctl status -D /var/lib/opengauss/data' > /dev/null 2>&1; do echo -n '.'; sleep 2; done" || { + echo "Timeout waiting for GaussDB to start" + docker logs $CONTAINER_ID + exit 1 + } - name: Setup SHA256 authentication run: | # Wait for database to be fully started @@ -82,6 +89,53 @@ jobs: sleep 5 PGPASSWORD=openGauss@123 psql -h localhost -U ci_user -d ci_db_test -c "CREATE ROLE sha256_test login password 'test4@scram';" + - name: Setup logical replication + if: env.LOGICAL_REPLICATION_TEST == '1' + run: | + # Get container ID + CONTAINER_ID=$(docker ps --filter "ancestor=opengauss/opengauss" --format "{{.ID}}") + + # Set wal_level to logical + docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'wal_level = logical'" + + # Set max_replication_slots + docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'max_replication_slots = 10'" + + # Set max_wal_senders + docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'max_wal_senders = 10'" + + # Add replication connection rule to pg_hba.conf + docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -h 'host replication all 0.0.0.0/0 md5'" + + # Restart container to apply logical replication settings + docker restart $CONTAINER_ID + + # Wait for database to be ready again + sleep 10 + timeout 60 bash -c "until docker exec $CONTAINER_ID su - omm -c 'gs_ctl status -D /var/lib/opengauss/data' > /dev/null 2>&1; do echo -n '.'; sleep 2; done" || { + echo "Timeout waiting for GaussDB to restart" + docker logs $CONTAINER_ID + exit 1 + } + + sleep 5 + + # Verify configuration + echo "Verifying logical replication configuration..." + WAL_LEVEL=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW wal_level;\"" | tr -d ' \n') + MAX_REPL_SLOTS=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW max_replication_slots;\"" | tr -d ' \n') + MAX_WAL_SENDERS=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW max_wal_senders;\"" | tr -d ' \n') + + echo " wal_level: ${WAL_LEVEL}" + echo " max_replication_slots: ${MAX_REPL_SLOTS}" + echo " max_wal_senders: ${MAX_WAL_SENDERS}" + + if [ "$WAL_LEVEL" != "logical" ]; then + echo "Warning: wal_level is not set to 'logical'" + exit 1 + fi + + echo "Logical Replication Setup Complete!" - uses: actions/checkout@v4 with: persist-credentials: false