Skip to content

[pull] main from tursodatabase:main #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ enum DbType {
path: String,
flags: OpenFlags,
encryption_config: Option<EncryptionConfig>,
skip_saftey_assert: bool,
skip_safety_assert: bool,
},
#[cfg(feature = "replication")]
Sync {
Expand Down Expand Up @@ -166,7 +166,7 @@ cfg_core! {
path: db_path.into(),
flags,
encryption_config: None,
skip_saftey_assert: false,
skip_safety_assert: false,
},
max_write_replication_index: Default::default(),
})
Expand Down Expand Up @@ -458,7 +458,7 @@ cfg_replication! {
DbType::Sync { db, .. } => {
let path = db.path().to_string();
Ok(Database {
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false },
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_safety_assert: false },
max_write_replication_index: Default::default(),
})
}
Expand Down Expand Up @@ -580,11 +580,11 @@ impl Database {
path,
flags,
encryption_config,
skip_saftey_assert,
skip_safety_assert,
} => {
use crate::local::impls::LibsqlConnection;

let db = if !skip_saftey_assert {
let db = if !skip_safety_assert {
crate::local::Database::open(path, *flags)?
} else {
unsafe { crate::local::Database::open_raw(path, *flags)? }
Expand Down
6 changes: 3 additions & 3 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ cfg_core! {
/// Using this setting is very UNSAFE and you are expected to use the libsql in adherence
/// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at
/// your own risk.
pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder<Local> {
pub unsafe fn skip_safety_assert(mut self, skip: bool) -> Builder<Local> {
self.inner.skip_safety_assert = skip;
self
}
Expand Down Expand Up @@ -206,7 +206,7 @@ cfg_core! {
path,
flags: self.inner.flags,
encryption_config: self.inner.encryption_config,
skip_saftey_assert: self.inner.skip_safety_assert
skip_safety_assert: self.inner.skip_safety_assert
},
max_write_replication_index: Default::default(),
}
Expand Down Expand Up @@ -321,7 +321,7 @@ cfg_replication! {
/// Using this setting is very UNSAFE and you are expected to use the libsql in adherence
/// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at
/// your own risk.
pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder<RemoteReplica> {
pub unsafe fn skip_safety_assert(mut self, skip: bool) -> Builder<RemoteReplica> {
self.inner.skip_safety_assert = skip;
self
}
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/replication/remote_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tonic::metadata::AsciiMetadataValue;
use tonic::{Response, Status};
use zerocopy::FromBytes;

async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
pub(crate) async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
let before = Instant::now();
let out = fut.await;
(out, before.elapsed())
Expand Down
100 changes: 82 additions & 18 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0;

const DEFAULT_MAX_RETRIES: usize = 5;
const DEFAULT_PUSH_BATCH_SIZE: u32 = 128;
const DEFAULT_PULL_BATCH_SIZE: u32 = 128;

#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
Expand Down Expand Up @@ -66,6 +67,8 @@ pub enum SyncError {
InvalidLocalGeneration(u32, u32),
#[error("invalid local state: {0}")]
InvalidLocalState(String),
#[error("server returned invalid length of frames: {0}")]
InvalidPullFrameBytes(usize),
}

impl SyncError {
Expand Down Expand Up @@ -98,8 +101,8 @@ pub enum PushStatus {
}

pub enum PullResult {
/// A frame was successfully pulled.
Frame(Bytes),
/// Frames were successfully pulled.
Frames(Bytes),
/// We've reached the end of the generation.
EndOfGeneration { max_generation: u32 },
}
Expand All @@ -122,6 +125,7 @@ pub struct SyncContext {
auth_token: Option<HeaderValue>,
max_retries: usize,
push_batch_size: u32,
pull_batch_size: u32,
/// The current durable generation.
durable_generation: u32,
/// Represents the max_frame_no from the server.
Expand Down Expand Up @@ -154,6 +158,7 @@ impl SyncContext {
auth_token,
max_retries: DEFAULT_MAX_RETRIES,
push_batch_size: DEFAULT_PUSH_BATCH_SIZE,
pull_batch_size: DEFAULT_PULL_BATCH_SIZE,
client,
durable_generation: 0,
durable_frame_num: 0,
Expand All @@ -175,7 +180,7 @@ impl SyncContext {
}

#[tracing::instrument(skip(self))]
pub(crate) async fn pull_one_frame(
pub(crate) async fn pull_frames(
&mut self,
generation: u32,
frame_no: u32,
Expand All @@ -185,9 +190,10 @@ impl SyncContext {
self.sync_url,
generation,
frame_no,
frame_no + 1
// the server expects the range of [start, end) frames, i.e. end is exclusive
frame_no + self.pull_batch_size
);
tracing::debug!("pulling frame");
tracing::debug!("pulling frame (uri={})", uri);
self.pull_with_retry(uri, self.max_retries).await
}

Expand Down Expand Up @@ -417,20 +423,39 @@ impl SyncContext {
.map_err(SyncError::HttpDispatch)?;

if res.status().is_success() {
let frame = hyper::body::to_bytes(res.into_body())
let frames = hyper::body::to_bytes(res.into_body())
.await
.map_err(SyncError::HttpBody)?;
return Ok(PullResult::Frame(frame));
// a success result should always return some frames
if frames.is_empty() {
tracing::error!("server returned empty frames in pull response");
return Err(SyncError::InvalidPullFrameBytes(0).into());
}
// the minimum payload size cannot be less than a single frame
if frames.len() < FRAME_SIZE {
tracing::error!(
"server returned frames with invalid length: {} < {}",
frames.len(),
FRAME_SIZE
);
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
}
return Ok(PullResult::Frames(frames));
}
// BUG ALERT: The server returns a 500 error if the remote database is empty.
// This is a bug and should be fixed.
if res.status() == StatusCode::BAD_REQUEST
|| res.status() == StatusCode::INTERNAL_SERVER_ERROR
{
let status = res.status();
let res_body = hyper::body::to_bytes(res.into_body())
.await
.map_err(SyncError::HttpBody)?;

tracing::trace!(
"server returned: {} body: {}",
status,
String::from_utf8_lossy(&res_body[..])
);
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
.map_err(SyncError::JsonDecode)?;

Expand Down Expand Up @@ -650,22 +675,34 @@ impl SyncContext {

let req = req.body(Body::empty()).expect("valid request");

let res = self
.client
.request(req)
.await
.map_err(SyncError::HttpDispatch)?;
let (res, http_duration) =
crate::replication::remote_client::time(self.client.request(req)).await;
let res = res.map_err(SyncError::HttpDispatch)?;

if !res.status().is_success() {
let status = res.status();
let body = hyper::body::to_bytes(res.into_body())
.await
.map_err(SyncError::HttpBody)?;
tracing::error!(
"failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}",
status,
String::from_utf8_lossy(&body),
uri,
http_duration
);
return Err(
SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(),
);
}

tracing::debug!(
"pulled db file from remote server, status={}, url={}, duration={:?}",
res.status(),
uri,
http_duration
);

// todo: do streaming write to the disk
let bytes = hyper::body::to_bytes(res.into_body())
.await
Expand Down Expand Up @@ -887,6 +924,11 @@ async fn try_push(
})
}

/// PAGE_SIZE used by the sync / diskless server
const PAGE_SIZE: usize = 4096;
const FRAME_HEADER_SIZE: usize = 24;
const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE;

pub async fn try_pull(
sync_ctx: &mut SyncContext,
conn: &Connection,
Expand All @@ -898,10 +940,32 @@ pub async fn try_pull(
loop {
let generation = sync_ctx.durable_generation();
let frame_no = sync_ctx.durable_frame_num() + 1;
match sync_ctx.pull_one_frame(generation, frame_no).await {
Ok(PullResult::Frame(frame)) => {
insert_handle.insert(&frame)?;
sync_ctx.durable_frame_num = frame_no;
match sync_ctx.pull_frames(generation, frame_no).await {
Ok(PullResult::Frames(frames)) => {
tracing::debug!(
"pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}",
generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
);
if frames.len() % FRAME_SIZE != 0 {
tracing::error!(
"frame size {} is not a multiple of the expected size {}",
frames.len(),
FRAME_SIZE,
);
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
}
for chunk in frames.chunks(FRAME_SIZE) {
let r = insert_handle.insert(&chunk);
if let Err(e) = r {
tracing::error!(
"insert error (frame= {}) : {:?}",
sync_ctx.durable_frame_num + 1,
e
);
return Err(e);
}
sync_ctx.durable_frame_num += 1;
}
}
Ok(PullResult::EndOfGeneration { max_generation }) => {
// If there are no more generations to pull, we're done.
Expand All @@ -920,7 +984,7 @@ pub async fn try_pull(
insert_handle.begin()?;
}
Err(e) => {
tracing::debug!("pull_one_frame error: {:?}", e);
tracing::debug!("pull_frames error: {:?}", e);
err.replace(e);
break;
}
Expand Down