Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 55 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,20 @@ jobs:
SHA256_TEST_GAUSSUSER: sha256_test
SHA256_TEST_GAUSSPASSWORD: test4@scram
DB_TYPE: opengauss
LOGICAL_REPLICATION_TEST: '1'
steps:
- name: Show OS
run: |
uname -a
- name: Wait for GaussDB to be ready
run: |
timeout 60 bash -c 'until pg_isready -h localhost -p 5432; do sleep 2; done'
# Using gs_ctl status (native OpenGauss check) instead of pg_isready
CONTAINER_ID=$(docker ps --filter "ancestor=opengauss/opengauss" --format "{{.ID}}")
timeout 60 bash -c "until docker exec $CONTAINER_ID su - omm -c 'gs_ctl status -D /var/lib/opengauss/data' > /dev/null 2>&1; do echo -n '.'; sleep 2; done" || {
echo "Timeout waiting for GaussDB to start"
docker logs $CONTAINER_ID
exit 1
}
- name: Setup SHA256 authentication
run: |
# Wait for database to be fully started
Expand All @@ -82,6 +89,53 @@ jobs:
sleep 5

PGPASSWORD=openGauss@123 psql -h localhost -U ci_user -d ci_db_test -c "CREATE ROLE sha256_test login password 'test4@scram';"
- name: Setup logical replication
if: env.LOGICAL_REPLICATION_TEST == '1'
run: |
# Get container ID
CONTAINER_ID=$(docker ps --filter "ancestor=opengauss/opengauss" --format "{{.ID}}")

# Set wal_level to logical
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'wal_level = logical'"

# Set max_replication_slots
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'max_replication_slots = 10'"

# Set max_wal_senders
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -c 'max_wal_senders = 10'"

# Add replication connection rule to pg_hba.conf
docker exec $CONTAINER_ID su - omm -c "gs_guc set -D /var/lib/opengauss/data/ -h 'host replication all 0.0.0.0/0 md5'"

# Restart container to apply logical replication settings
docker restart $CONTAINER_ID

# Wait for database to be ready again
sleep 10
timeout 60 bash -c "until docker exec $CONTAINER_ID su - omm -c 'gs_ctl status -D /var/lib/opengauss/data' > /dev/null 2>&1; do echo -n '.'; sleep 2; done" || {
echo "Timeout waiting for GaussDB to restart"
docker logs $CONTAINER_ID
exit 1
}

sleep 5

# Verify configuration
echo "Verifying logical replication configuration..."
WAL_LEVEL=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW wal_level;\"" | tr -d ' \n')
MAX_REPL_SLOTS=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW max_replication_slots;\"" | tr -d ' \n')
MAX_WAL_SENDERS=$(docker exec $CONTAINER_ID su - omm -c "gsql -d ci_db_test -t -c \"SHOW max_wal_senders;\"" | tr -d ' \n')

echo " wal_level: ${WAL_LEVEL}"
echo " max_replication_slots: ${MAX_REPL_SLOTS}"
echo " max_wal_senders: ${MAX_WAL_SENDERS}"

if [ "$WAL_LEVEL" != "logical" ]; then
echo "Warning: wal_level is not set to 'logical'"
exit 1
fi

echo "Logical Replication Setup Complete!"
- uses: actions/checkout@v4
with:
persist-credentials: false
Expand Down
175 changes: 175 additions & 0 deletions examples/async-await/logical-replication-demo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { Client, LogicalReplicationService, MppdbDecodingPlugin } from 'gaussdb-node'

const slotName = 'demo_replication_slot'
const tableName = 'demo_replication_table'

// Connection config from environment variables or defaults
const config = {
host: process.env.GAUSSHOST,
port: parseInt(process.env.GAUSSPORT),
database: process.env.GAUSSDATABASE,
user: process.env.GAUSSUSER,
password: process.env.GAUSSPASSWORD,
}

async function checkSlotExists(client) {
const res = await client.query(
`SELECT 1 FROM pg_replication_slots WHERE slot_name = $1`,
[slotName]
)
return res.rowCount > 0
}

async function createSlot(client) {
await client.query(
`SELECT * FROM pg_create_logical_replication_slot('${slotName}', 'mppdb_decoding')`
)
console.log(`slot "${slotName}" created`)
}

async function dropSlot(client) {
await client.query(`SELECT pg_drop_replication_slot('${slotName}')`)
console.log(`slot "${slotName}" dropped`)
}

async function createTable(client) {
await client.query(`
CREATE TABLE IF NOT EXISTS ${tableName} (
id SERIAL PRIMARY KEY,
name TEXT,
value INT
)
`)
console.log(`table "${tableName}" created`)
}

async function dropTable(client) {
await client.query(`DROP TABLE IF EXISTS ${tableName}`)
console.log(`table "${tableName}" dropped`)
}

async function performDML(client) {
console.log('performing DML operations...')

await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['alice', 100])
console.log(' INSERT alice')

await client.query(`INSERT INTO ${tableName} (name, value) VALUES ($1, $2)`, ['bob', 200])
console.log(' INSERT bob')

await client.query(`UPDATE ${tableName} SET value = $1 WHERE name = $2`, [150, 'alice'])
console.log(' UPDATE alice')

await client.query(`DELETE FROM ${tableName} WHERE name = $1`, ['bob'])
console.log(' DELETE bob')

console.log('DML operations completed')
}

async function main() {
let clientB = null
let service = null

try {
clientB = new Client(config)
await clientB.connect()
console.log('client B connected')

// setup: create table and slot if needed
await createTable(clientB)

const slotExists = await checkSlotExists(clientB)
if (!slotExists) {
await createSlot(clientB)
} else {
console.log(`slot "${slotName}" already exists`)
}

// start replication service (client A uses config, not Client instance)
service = new LogicalReplicationService(config, {
acknowledge: { auto: true, timeoutSeconds: 10 },
})

const plugin = new MppdbDecodingPlugin({
includeXids: false,
skipEmptyXacts: true,
})

const receivedMessages = []

service.on('start', () => {
console.log('replication started')
})

service.on('data', (lsn, msg) => {
console.log('[data]', lsn, msg)
receivedMessages.push(msg)
})

service.on('error', (err) => {
console.error('[error]', err)
})

// start replication in background
service.subscribe(plugin, slotName)

// wait for replication to start
await new Promise((resolve) => service.once('start', resolve))

// perform DML operations (client B)
await performDML(clientB)

// wait a bit for replication to catch up
await new Promise((resolve) => setTimeout(resolve, 2000))

// stop replication first
await service.stop()
service = null
console.log('replication stopped')

console.log(`received ${receivedMessages.length} messages`)

// cleanup while clientB is still connected
await dropTable(clientB)
await dropSlot(clientB)
} catch (err) {
console.error('error:', err)
} finally {
// stop service if still running
if (service) {
try {
await service.stop()
} catch (err) {
// ignore
}
}

// cleanup with a new connection if needed
if (clientB) {
try {
await clientB.end()
} catch (err) {
// ignore
}
}

// ensure cleanup with fresh connection
const cleanupClient = new Client(config)
try {
await cleanupClient.connect()
await cleanupClient.query(`DROP TABLE IF EXISTS ${tableName}`)
await cleanupClient.query(`SELECT pg_drop_replication_slot('${slotName}')`).catch(() => {})
} catch (err) {
// ignore cleanup errors
} finally {
await cleanupClient.end().catch(() => {})
}

console.log('cleanup done')
}
}

main().catch((err) => {
console.error(err)
process.exit(1)
})
Loading