Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

standalone sqld lib #485

Merged
merged 1 commit into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 129 additions & 40 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"sqld",
"sqld-libsql-bindings",
"testing/end-to-end",
"libsqlx",
]

[workspace.dependencies]
Expand Down
36 changes: 36 additions & 0 deletions libsqlx/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "libsqlx"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.68"
bytesize = "1.2.0"
serde = "1.0.164"
serde_json = "1.0.99"
rusqlite = { workspace = true }
anyhow = "1.0.71"
futures = "0.3.28"
tokio = { version = "1.28.2", features = ["sync", "time"] }
sqlite3-parser = "0.9.0"
fallible-iterator = "0.3.0"
bytes = "1.4.0"
tracing = "0.1.37"
bytemuck = { version = "1.13.1", features = ["derive"] }
parking_lot = "0.12.1"
uuid = { version = "1.4.0", features = ["v4"] }
sqld-libsql-bindings = { version = "0", path = "../sqld-libsql-bindings" }
crossbeam = "0.8.2"
thiserror = "1.0.40"
nix = "0.26.2"
crc = "3.0.1"
once_cell = "1.18.0"
regex = "1.8.4"
tempfile = "3.6.0"

[dev-dependencies]
arbitrary = { version = "1.3.0", features = ["derive"] }
itertools = "0.11.0"
rand = "0.8.5"
Binary file added libsqlx/assets/test/simple_wallog
Binary file not shown.
288 changes: 288 additions & 0 deletions libsqlx/src/analysis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
use anyhow::Result;
use fallible_iterator::FallibleIterator;
use sqlite3_parser::ast::{Cmd, PragmaBody, QualifiedName, Stmt};
use sqlite3_parser::lexer::sql::{Parser, ParserError};

/// A group of statements to be executed together.
#[derive(Debug, Clone)]
pub struct Statement {
pub stmt: String,
pub kind: StmtKind,
/// Is the statement an INSERT, UPDATE or DELETE?
pub is_iud: bool,
pub is_insert: bool,
}

impl Default for Statement {
fn default() -> Self {
Self::empty()
}
}

/// Classify statement in categories of interest.
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum StmtKind {
/// The begining of a transaction
TxnBegin,
/// The end of a transaction
TxnEnd,
Read,
Write,
Other,
}

fn is_temp(name: &QualifiedName) -> bool {
name.db_name.as_ref().map(|n| n.0.as_str()) == Some("TEMP")
}

fn is_reserved_tbl(name: &QualifiedName) -> bool {
let n = name.name.0.to_lowercase();
n == "_litestream_seq" || n == "_litestream_lock" || n == "libsql_wasm_func_table"
}

fn write_if_not_reserved(name: &QualifiedName) -> Option<StmtKind> {
(!is_reserved_tbl(name)).then_some(StmtKind::Write)
}

impl StmtKind {
fn kind(cmd: &Cmd) -> Option<Self> {
match cmd {
Cmd::Explain(Stmt::Pragma(name, body)) => Self::pragma_kind(name, body.as_ref()),
Cmd::Explain(_) => Some(Self::Other),
Cmd::ExplainQueryPlan(_) => Some(Self::Other),
Cmd::Stmt(Stmt::Begin { .. }) => Some(Self::TxnBegin),
Cmd::Stmt(Stmt::Commit { .. } | Stmt::Rollback { .. }) => Some(Self::TxnEnd),
Cmd::Stmt(
Stmt::CreateVirtualTable { tbl_name, .. }
| Stmt::CreateTable {
tbl_name,
temporary: false,
..
},
) if !is_temp(tbl_name) => Some(Self::Write),
Cmd::Stmt(
Stmt::Insert {
with: _,
or_conflict: _,
tbl_name,
..
}
| Stmt::Update {
with: _,
or_conflict: _,
tbl_name,
..
},
) => write_if_not_reserved(tbl_name),

Cmd::Stmt(Stmt::Delete {
with: _, tbl_name, ..
}) => write_if_not_reserved(tbl_name),
Cmd::Stmt(Stmt::DropTable {
if_exists: _,
tbl_name,
}) => write_if_not_reserved(tbl_name),
Cmd::Stmt(Stmt::AlterTable(tbl_name, _)) => write_if_not_reserved(tbl_name),
Cmd::Stmt(
Stmt::DropIndex { .. }
| Stmt::CreateTrigger {
temporary: false, ..
}
| Stmt::CreateIndex { .. },
) => Some(Self::Write),
Cmd::Stmt(Stmt::Select { .. }) => Some(Self::Read),
Cmd::Stmt(Stmt::Pragma(name, body)) => Self::pragma_kind(name, body.as_ref()),
_ => None,
}
}

fn pragma_kind(name: &QualifiedName, body: Option<&PragmaBody>) -> Option<Self> {
let name = name.name.0.as_str();
match name {
// always ok to be served by primary or replicas - pure readonly pragmas
"table_list" | "index_list" | "table_info" | "table_xinfo" | "index_xinfo"
| "pragma_list" | "compile_options" | "database_list" | "function_list"
| "module_list" => Some(Self::Read),
// special case for `encoding` - it's effectively readonly for connections
// that already created a database, which is always the case for sqld
"encoding" => Some(Self::Read),
// always ok to be served by primary
"foreign_keys" | "foreign_key_list" | "foreign_key_check" | "collation_list"
| "data_version" | "freelist_count" | "integrity_check" | "legacy_file_format"
| "page_count" | "quick_check" | "stats" => Some(Self::Write),
// ok to be served by primary without args
"analysis_limit"
| "application_id"
| "auto_vacuum"
| "automatic_index"
| "busy_timeout"
| "cache_size"
| "cache_spill"
| "cell_size_check"
| "checkpoint_fullfsync"
| "defer_foreign_keys"
| "fullfsync"
| "hard_heap_limit"
| "journal_mode"
| "journal_size_limit"
| "legacy_alter_table"
| "locking_mode"
| "max_page_count"
| "mmap_size"
| "page_size"
| "query_only"
| "read_uncommitted"
| "recursive_triggers"
| "reverse_unordered_selects"
| "schema_version"
| "secure_delete"
| "soft_heap_limit"
| "synchronous"
| "temp_store"
| "threads"
| "trusted_schema"
| "user_version"
| "wal_autocheckpoint" => {
match body {
Some(_) => None,
None => Some(Self::Write),
}
}
// changes the state of the connection, and can't be allowed rn:
"case_sensitive_like" | "ignore_check_constraints" | "incremental_vacuum"
// TODO: check if optimize can be safely performed
| "optimize"
| "parser_trace"
| "shrink_memory"
| "wal_checkpoint" => None,
_ => {
tracing::debug!("Unknown pragma: {name}");
None
},
}
}
}

/// The state of a transaction for a series of statement
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum State {
/// The txn in an opened state
Txn,
/// The txn in a closed state
Init,
/// This is an invalid state for the state machine
Invalid,
}

impl State {
pub fn step(&mut self, kind: StmtKind) {
*self = match (*self, kind) {
(State::Txn, StmtKind::TxnBegin) | (State::Init, StmtKind::TxnEnd) => State::Invalid,
(State::Txn, StmtKind::TxnEnd) => State::Init,
(state, StmtKind::Other | StmtKind::Write | StmtKind::Read) => state,
(State::Invalid, _) => State::Invalid,
(State::Init, StmtKind::TxnBegin) => State::Txn,
};
}

pub fn reset(&mut self) {
*self = State::Init
}
}

impl Statement {
pub fn empty() -> Self {
Self {
stmt: String::new(),
// empty statement is arbitrarely made of the read kind so it is not send to a writer
kind: StmtKind::Read,
is_iud: false,
is_insert: false,
}
}

pub fn parse(s: &str) -> impl Iterator<Item = Result<Self>> + '_ {
fn parse_inner(
original: &str,
stmt_count: u64,
has_more_stmts: bool,
c: Cmd,
) -> Result<Statement> {
let kind =
StmtKind::kind(&c).ok_or_else(|| anyhow::anyhow!("unsupported statement"))?;

if stmt_count == 1 && !has_more_stmts {
// XXX: Temporary workaround for integration with Atlas
if let Cmd::Stmt(Stmt::CreateTable { .. }) = &c {
return Ok(Statement {
stmt: original.to_string(),
kind,
is_iud: false,
is_insert: false,
});
}
}

let is_iud = matches!(
c,
Cmd::Stmt(Stmt::Insert { .. } | Stmt::Update { .. } | Stmt::Delete { .. })
);
let is_insert = matches!(c, Cmd::Stmt(Stmt::Insert { .. }));

Ok(Statement {
stmt: c.to_string(),
kind,
is_iud,
is_insert,
})
}
// The parser needs to be boxed because it's large, and you don't want it on the stack.
// There's upstream work to make it smaller, but in the meantime the parser should remain
// on the heap:
// - https://github.com/gwenn/lemon-rs/issues/8
// - https://github.com/gwenn/lemon-rs/pull/19
let mut parser = Box::new(Parser::new(s.as_bytes()).peekable());
let mut stmt_count = 0;
std::iter::from_fn(move || {
stmt_count += 1;
match parser.next() {
Ok(Some(cmd)) => Some(parse_inner(
s,
stmt_count,
parser.peek().map_or(true, |o| o.is_some()),
cmd,
)),
Ok(None) => None,
Err(sqlite3_parser::lexer::sql::Error::ParserError(
ParserError::SyntaxError {
token_type: _,
found: Some(found),
},
Some((line, col)),
)) => Some(Err(anyhow::anyhow!(
"syntax error around L{line}:{col}: `{found}`"
))),
Err(e) => Some(Err(e.into())),
}
})
}

pub fn is_read_only(&self) -> bool {
matches!(
self.kind,
StmtKind::Read | StmtKind::TxnEnd | StmtKind::TxnBegin
)
}
}

/// Given a an initial state and an array of queries, attempts to predict what the final state will
/// be
pub fn predict_final_state<'a>(
mut state: State,
stmts: impl Iterator<Item = &'a Statement>,
) -> State {
for stmt in stmts {
state.step(stmt.kind);
}
state
}
33 changes: 33 additions & 0 deletions libsqlx/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use crate::program::Program;
use crate::result_builder::ResultBuilder;

#[derive(Debug, Clone)]
pub struct DescribeResponse {
pub params: Vec<DescribeParam>,
pub cols: Vec<DescribeCol>,
pub is_explain: bool,
pub is_readonly: bool,
}

#[derive(Debug, Clone)]
pub struct DescribeParam {
pub name: Option<String>,
}

#[derive(Debug, Clone)]
pub struct DescribeCol {
pub name: String,
pub decltype: Option<String>,
}

pub trait Connection {
/// Executes a query program
fn execute_program<B: ResultBuilder>(
&mut self,
pgm: Program,
result_builder: B,
) -> crate::Result<B>;

/// Parse the SQL statement and return information about it.
fn describe(&self, sql: String) -> crate::Result<DescribeResponse>;
}
Loading