cdc

安装量: 81
排名: #9716

安装

npx skills add https://github.com/tursodatabase/turso --skill cdc

CDC (Change Data Capture) - Internal Feature Map Overview CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a dedicated CDC table ( turso_cdc by default). It is per-connection, enabled via PRAGMA, and operates at the bytecode generation (translate) layer. The sync engine consumes CDC records to push local changes to the remote. Architecture Diagram User SQL (INSERT/UPDATE/DELETE/DDL) | v ┌─────────────────────────────────────────────────┐ │ Translate layer (core/translate/) │ │ ┌───────────────────────────────────────────┐ │ │ │ prepare_cdc_if_necessary() │ │ │ │ - checks CaptureDataChangesInfo │ │ │ │ - opens CDC table cursor (OpenWrite) │ │ │ │ - skips if target == CDC table itself │ │ │ └───────────────────────────────────────────┘ │ │ ┌───────────────────────────────────────────┐ │ │ │ emit_cdc_insns() │ │ │ │ - writes (change_id, change_time, │ │ │ │ change_type, table_name, id, │ │ │ │ before, after, updates) into CDC tbl │ │ │ └───────────────────────────────────────────┘ │ │ + emit_cdc_full_record() / emit_cdc_patch_record() │ └─────────────────────────────────────────────────┘ | v CDC table (turso_cdc or custom name) | v ┌─────────────────────────────────────────────────┐ │ Sync engine (sync/engine/) │ │ DatabaseTape reads CDC table → DatabaseChange │ │ → apply/revert → push to remote │ └─────────────────────────────────────────────────┘ Core Data Types CaptureDataChangesMode + CaptureDataChangesInfo — core/lib.rs CDC behavior is controlled by two types:

[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]

[repr(u8)]

enum CdcVersion { V1 = 1 , V2 = 2 , } const CDC_VERSION_CURRENT : CdcVersion = CdcVersion :: V2 ; enum CaptureDataChangesMode { Id , // capture only rowid Before , // capture before-image After , // capture after-image Full , // before + after + updates } struct CaptureDataChangesInfo { mode : CaptureDataChangesMode , table : String , // CDC table name version : Option < CdcVersion

, // schema version (V1 or V2) } The connection stores Option — None means CDC is off. Key methods on CdcVersion : has_commit_record() — self >= V2 , gates COMMIT record emission Display / FromStr — round-trips "v1" ↔ V1 , "v2" ↔ V2 Key methods on CaptureDataChangesInfo : parse(value: &str, version: Option) — parses PRAGMA argument "[,]" , returns None for "off" cdc_version() — returns CdcVersion (panics if version is None). Single accessor replacing old is_v1() / is_v2() / version() methods. has_before() / has_after() / has_updates() — mode capability checks mode_name() — returns mode as string Convenience trait CaptureDataChangesExt on Option provides: has_before() / has_after() / has_updates() — delegates to inner, returns false for None table() — returns Option<&str> , None when CDC is off CDC Table Schema v1 Default table name: turso_cdc (constant TURSO_CDC_DEFAULT_TABLE_NAME ) CREATE TABLE turso_cdc ( change_id INTEGER PRIMARY KEY AUTOINCREMENT , change_time INTEGER , -- unixepoch() change_type INTEGER , -- 1=INSERT, 0=UPDATE, -1=DELETE table_name TEXT , id < untyped

, -- rowid of changed row before BLOB , -- binary record (before-image) after BLOB , -- binary record (after-image) updates BLOB -- binary record of per-column changes ) ; CDC Table Schema v2 (current) CREATE TABLE turso_cdc ( change_id INTEGER PRIMARY KEY AUTOINCREMENT , change_time INTEGER , -- unixepoch() change_type INTEGER , -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT table_name TEXT , id < untyped

, -- rowid of changed row before BLOB , -- binary record (before-image) after BLOB , -- binary record (after-image) updates BLOB , -- binary record of per-column changes change_txn_id INTEGER -- transaction ID (groups rows into transactions) ) ; v2 adds: change_txn_id column — groups CDC rows by transaction. Assigned via conn_txn_id(candidate) opcode which get-or-sets a per-connection transaction ID. change_type=2 (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit COMMIT . The CDC table is created at runtime by the InitCdcVersion opcode via CREATE TABLE IF NOT EXISTS . CDC Version Table When CDC is first enabled, a version tracking table is created: CREATE TABLE turso_cdc_version ( table_name TEXT PRIMARY KEY , version TEXT NOT NULL ) ; Current version: CDC_VERSION_CURRENT = CdcVersion::V2 (defined in core/lib.rs , re-exported from core/translate/pragma.rs ) Version Detection in InitCdcVersion The InitCdcVersion opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it: If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking) If CDC table doesn't exist → create with current version (v2) If version row already exists → use that version as-is DatabaseChange — sync/engine/src/types.rs:229-249 Sync engine's Rust representation of a CDC row. Has into_apply() and into_revert() methods for forward/backward replay. OperationMode — core/translate/emitter.rs Used by emit_cdc_insns() to determine change_type value: INSERT → 1 UPDATE / SELECT → 0 DELETE → -1 COMMIT → 2 (v2 only, emitted by emit_cdc_commit_insns ) Entry Points 1. PRAGMA — Enable/Disable CDC Set: core/translate/pragma.rs Checks MVCC is not enabled (CDC and MVCC are mutually exclusive) Parses mode string via CaptureDataChangesInfo::parse() with CDC_VERSION_CURRENT Emits a single InitCdcVersion opcode — all CDC setup (table creation, version tracking, state change) happens at execution time Get (read current mode): core/translate/pragma.rs Returns 3 columns: mode , table , version When off: returns ("off", NULL, NULL) When active: returns (mode_name, table, version) Pragma registration: core/pragma.rs — CaptureDataChangesConn (and deprecated alias UnstableCaptureDataChangesConn ) with columns ["mode", "table", "version"] 2. Connection State Field: core/connection.rs — capture_data_changes: RwLock> Getter: get_capture_data_changes_info() — returns read guard Setter: set_capture_data_changes_info(opts: Option) Default: initialized as None (CDC off) 3. ProgramBuilder Integration Field: core/vdbe/builder.rs — capture_data_changes_info: Option Accessor: capture_data_changes_info() — returns &Option Passed from: core/translate/mod.rs — read from connection when creating builder 4. PrepareContext Field: core/vdbe/mod.rs — capture_data_changes: Option Set from: PrepareContext::from_connection() — clones from connection.get_capture_data_changes_info() 5. InitCdcVersion Opcode — core/vdbe/execute.rs Always emitted by PRAGMA SET. Handles all CDC setup at execution time: For "off": stores None in state.pending_cdc_info , returns early Checks if CDC table already exists (for v1 backward compatibility) Creates CDC table ( CREATE TABLE IF NOT EXISTS ... ) — v2 schema with change_txn_id column Creates version table ( CREATE TABLE IF NOT EXISTS turso_cdc_version ... ) Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses INSERT OR IGNORE to preserve existing version rows. Reads back actual version from the table Stores computed CaptureDataChangesInfo in state.pending_cdc_info The connection's CDC state is not applied in the opcode . Instead, pending_cdc_info is applied in halt() only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged. All table creation is done via nested conn.prepare() / run_ignore_rows() calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema. Bytecode Emission (core/translate/emitter.rs) These are the core CDC code generation functions: Function Purpose prepare_cdc_if_necessary() Opens CDC table cursor if CDC is active and target != CDC table emit_cdc_full_record() Reads all columns from cursor into a MakeRecord (for before/after image) emit_cdc_patch_record() Builds record from in-flight register values (for after-image of INSERT/UPDATE) emit_cdc_insns() Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. emit_cdc_commit_insns() Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. emit_cdc_autocommit_commit() End-of-statement COMMIT emission. Checks is_autocommit() at runtime — only emits COMMIT if in autocommit mode. v2 only. COMMIT Emission Strategy (v2) Per-row call sites use emit_cdc_insns() (no COMMIT). End-of-statement sites call emit_cdc_autocommit_commit() which checks is_autocommit() at runtime: Autocommit mode: emits a COMMIT record after the statement completes Explicit transaction ( BEGIN...COMMIT ): skips per-statement COMMIT; the explicit COMMIT statement emits the COMMIT record via emit_cdc_commit_insns() This ensures multi-row statements like INSERT INTO t VALUES (1),(2),(3) produce one COMMIT at the end, not one per row. Integration Points — Where CDC Records Are Emitted INSERT — core/translate/insert.rs Per-row: emit_cdc_insns() after insert, and before delete for REPLACE/conflict End-of-statement: emit_cdc_autocommit_commit() in emit_epilogue() after the insert loop UPDATE — core/translate/emitter.rs Per-row: captures before-image, after-image via patch record, emits emit_cdc_insns() End-of-statement: emit_cdc_autocommit_commit() after the update loop DELETE — core/translate/emitter.rs Per-row: captures before-image and emits emit_cdc_insns() End-of-statement: emit_cdc_autocommit_commit() after the delete loop UPSERT (ON CONFLICT DO UPDATE) — core/translate/upsert.rs Per-row: emit_cdc_insns() for all three cases: pure insert, update after conflict, replace No end-of-statement COMMIT — upsert shares INSERT's epilogue Schema Changes (DDL) — core/translate/schema.rs CREATE TABLE: emit_cdc_insns() (insert into sqlite_schema ) + emit_cdc_autocommit_commit() DROP TABLE: emit_cdc_insns() per-row in metadata loop + emit_cdc_autocommit_commit() after loop CREATE INDEX: emit_cdc_insns() + emit_cdc_autocommit_commit() ( core/translate/schema.rs ) DROP INDEX: emit_cdc_insns() per-row + emit_cdc_autocommit_commit() after loop ( core/translate/index.rs ) DDL in explicit transactions ( BEGIN; CREATE TABLE t(x); COMMIT ) does NOT emit per-statement COMMIT — the autocommit check prevents it. ALTER TABLE — core/translate/update.rs Sets cdc_update_alter_statement on the update plan when CDC has updates mode Views/Triggers — Explicitly excluded core/translate/view.rs — passes None for CDC cursor core/translate/trigger.rs — passes None for CDC cursor Subqueries — No CDC core/translate/subquery.rs — cdc_cursor_id: None Helper Functions (for reading CDC data) table_columns_json_array(table_name) — core/function.rs , core/vdbe/execute.rs Returns JSON array of column names for a table. Used to interpret binary records. bin_record_json_object(columns_json, blob) — core/function.rs , core/vdbe/execute.rs Decodes a binary record (from before / after / updates columns) into a JSON object using column names. Sync Engine Integration The sync engine is the primary consumer of CDC data. DatabaseTape — sync/engine/src/database_tape.rs CDC config: DEFAULT_CDC_TABLE_NAME = "turso_cdc" , DEFAULT_CDC_MODE = "full" PRAGMA name: CDC_PRAGMA_NAME = "capture_data_changes_conn" Initialization: connect() sets CDC pragma and caches cdc_version from turso_cdc_version table. Must be called before iterate_changes() . Version caching: cdc_version: RwLock> — set by connect() , read by iterate_changes() . Panics if not set. Iterator: DatabaseChangesIterator reads CDC table in batches, emits DatabaseTapeOperation . For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch. ignore_schema_changes: true (default) filters out sqlite_schema row changes but not COMMIT records. Sync Operations — sync/engine/src/database_sync_operations.rs Change counting: SELECT COUNT(*) FROM turso_cdc WHERE change_id > ? Sync Engine — sync/engine/src/database_sync_engine.rs Initialization: open_db() calls main_tape.connect(coro) to ensure CDC is set up and version is cached before any iterate_changes() calls. During apply_changes , checks if CDC table existed, re-creates it after sync Replay Generator — sync/engine/src/database_replay_generator.rs Requires updates column to be populated (full mode) Bindings CDC Surface All bindings expose cdc_operations as part of sync stats: Binding File Python bindings/python/src/turso_sync.rs JavaScript bindings/javascript/sync/src/lib.rs JS (generator) bindings/javascript/sync/src/generator.rs Go bindings/go/bindings_sync.go React Native bindings/react-native/src/types.ts SDK Kit (C header) sync/sdk-kit/turso_sync.h SDK Kit (Rust) sync/sdk-kit/src/bindings.rs Tests Integration tests: tests/integration/functions/test_cdc.rs — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in tests/integration/functions/mod.rs . Sync engine tests: sync/engine/src/database_tape.rs — CDC table reads, tape iteration, replay of schema changes. JS binding tests: bindings/javascript/sync/packages/{wasm,native}/promise.test.ts Run: cargo test -- test_cdc (integration) or cargo test -p turso_sync_engine -- database_tape (sync engine). User-facing Documentation CLI manual page: cli/manuals/cdc.md — accessible via .manual cdc in the REPL Database manual: docs/manual.md — CDC section linked in TOC Key Design Decisions Per-connection, not per-database. Each connection has its own CDC mode and can target different tables. Bytecode-level implementation. CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers. Self-exclusion. Changes to the CDC table and turso_cdc_version table are never captured (checked in prepare_cdc_if_necessary ). Schema changes tracked. DDL operations are recorded as changes to sqlite_schema table. Binary record format. Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload). Transaction-aware. CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries. Version tracking. CDC schema version is recorded in turso_cdc_version table and carried in CaptureDataChangesInfo.version for future schema evolution. Atomic PRAGMA. Connection CDC state is deferred via pending_cdc_info in ProgramState and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged. Per-statement COMMIT (v2). COMMIT records are emitted once per statement (not per row), using emit_cdc_autocommit_commit() which checks is_autocommit() at runtime. In explicit transactions, only the final COMMIT emits a COMMIT CDC record. Backward-compatible version detection. Pre-existing v1 CDC tables (without turso_cdc_version ) are detected by checking table existence before creation. Existing tables get CdcVersion::V1 inserted into the version table. Typed version enum. CdcVersion enum with

[repr(u8)]

and Ord / PartialOrd enables feature gating via integer comparison ( has_commit_record() = self >= V2 ). Display / FromStr handles database round-trip. CDC and MVCC mutual exclusion. Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.

返回排行榜