From 965df0dd43c44f9e34c2b7c9b1103814d165aab7 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Sun, 8 Jun 2025 08:26:47 +0300 Subject: [PATCH 1/2] s/skip_saftey_assert/skip_safety_assert/ --- libsql/src/database.rs | 10 +++++----- libsql/src/database/builder.rs | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/libsql/src/database.rs b/libsql/src/database.rs index 1362ec217a..bb07bb189d 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -84,7 +84,7 @@ enum DbType { path: String, flags: OpenFlags, encryption_config: Option, - skip_saftey_assert: bool, + skip_safety_assert: bool, }, #[cfg(feature = "replication")] Sync { @@ -166,7 +166,7 @@ cfg_core! { path: db_path.into(), flags, encryption_config: None, - skip_saftey_assert: false, + skip_safety_assert: false, }, max_write_replication_index: Default::default(), }) @@ -458,7 +458,7 @@ cfg_replication! { DbType::Sync { db, .. } => { let path = db.path().to_string(); Ok(Database { - db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false }, + db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_safety_assert: false }, max_write_replication_index: Default::default(), }) } @@ -580,11 +580,11 @@ impl Database { path, flags, encryption_config, - skip_saftey_assert, + skip_safety_assert, } => { use crate::local::impls::LibsqlConnection; - let db = if !skip_saftey_assert { + let db = if !skip_safety_assert { crate::local::Database::open(path, *flags)? } else { unsafe { crate::local::Database::open_raw(path, *flags)? } diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index 1969e1f511..ef70479430 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -175,7 +175,7 @@ cfg_core! { /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at /// your own risk. - pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + pub unsafe fn skip_safety_assert(mut self, skip: bool) -> Builder { self.inner.skip_safety_assert = skip; self } @@ -206,7 +206,7 @@ cfg_core! { path, flags: self.inner.flags, encryption_config: self.inner.encryption_config, - skip_saftey_assert: self.inner.skip_safety_assert + skip_safety_assert: self.inner.skip_safety_assert }, max_write_replication_index: Default::default(), } @@ -321,7 +321,7 @@ cfg_replication! { /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at /// your own risk. - pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + pub unsafe fn skip_safety_assert(mut self, skip: bool) -> Builder { self.inner.skip_safety_assert = skip; self } From b7dfab49854db9b4c40bdcef73efd45485e6751e Mon Sep 17 00:00:00 2001 From: Avinash Sajjanshetty Date: Sun, 1 Jun 2025 21:15:24 +0530 Subject: [PATCH 2/2] Optimise `sync_pull` to pull frames in batches Previously, we pulled frames one by one. This patch changes it pull frames in batches. Currently, the batch size is set to 128 (the maximum supported by the server) --- libsql/src/replication/remote_client.rs | 2 +- libsql/src/sync.rs | 100 +++++++++++++++++++----- 2 files changed, 83 insertions(+), 19 deletions(-) diff --git a/libsql/src/replication/remote_client.rs b/libsql/src/replication/remote_client.rs index a401d7a1ac..d55f68ec4d 100644 --- a/libsql/src/replication/remote_client.rs +++ b/libsql/src/replication/remote_client.rs @@ -18,7 +18,7 @@ use tonic::metadata::AsciiMetadataValue; use tonic::{Response, Status}; use zerocopy::FromBytes; -async fn time(fut: impl Future) -> (O, Duration) { +pub(crate) async fn time(fut: impl Future) -> (O, Duration) { let before = Instant::now(); let out = fut.await; (out, before.elapsed()) diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index de3ab6a4b4..9a74b118b0 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0; const DEFAULT_MAX_RETRIES: usize = 5; const DEFAULT_PUSH_BATCH_SIZE: u32 = 128; +const DEFAULT_PULL_BATCH_SIZE: u32 = 128; #[derive(thiserror::Error, Debug)] #[non_exhaustive] @@ -66,6 +67,8 @@ pub enum SyncError { InvalidLocalGeneration(u32, u32), #[error("invalid local state: {0}")] InvalidLocalState(String), + #[error("server returned invalid length of frames: {0}")] + InvalidPullFrameBytes(usize), } impl SyncError { @@ -98,8 +101,8 @@ pub enum PushStatus { } pub enum PullResult { - /// A frame was successfully pulled. - Frame(Bytes), + /// Frames were successfully pulled. + Frames(Bytes), /// We've reached the end of the generation. EndOfGeneration { max_generation: u32 }, } @@ -122,6 +125,7 @@ pub struct SyncContext { auth_token: Option, max_retries: usize, push_batch_size: u32, + pull_batch_size: u32, /// The current durable generation. durable_generation: u32, /// Represents the max_frame_no from the server. @@ -154,6 +158,7 @@ impl SyncContext { auth_token, max_retries: DEFAULT_MAX_RETRIES, push_batch_size: DEFAULT_PUSH_BATCH_SIZE, + pull_batch_size: DEFAULT_PULL_BATCH_SIZE, client, durable_generation: 0, durable_frame_num: 0, @@ -175,7 +180,7 @@ impl SyncContext { } #[tracing::instrument(skip(self))] - pub(crate) async fn pull_one_frame( + pub(crate) async fn pull_frames( &mut self, generation: u32, frame_no: u32, @@ -185,9 +190,10 @@ impl SyncContext { self.sync_url, generation, frame_no, - frame_no + 1 + // the server expects the range of [start, end) frames, i.e. end is exclusive + frame_no + self.pull_batch_size ); - tracing::debug!("pulling frame"); + tracing::debug!("pulling frame (uri={})", uri); self.pull_with_retry(uri, self.max_retries).await } @@ -417,20 +423,39 @@ impl SyncContext { .map_err(SyncError::HttpDispatch)?; if res.status().is_success() { - let frame = hyper::body::to_bytes(res.into_body()) + let frames = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; - return Ok(PullResult::Frame(frame)); + // a success result should always return some frames + if frames.is_empty() { + tracing::error!("server returned empty frames in pull response"); + return Err(SyncError::InvalidPullFrameBytes(0).into()); + } + // the minimum payload size cannot be less than a single frame + if frames.len() < FRAME_SIZE { + tracing::error!( + "server returned frames with invalid length: {} < {}", + frames.len(), + FRAME_SIZE + ); + return Err(SyncError::InvalidPullFrameBytes(frames.len()).into()); + } + return Ok(PullResult::Frames(frames)); } // BUG ALERT: The server returns a 500 error if the remote database is empty. // This is a bug and should be fixed. if res.status() == StatusCode::BAD_REQUEST || res.status() == StatusCode::INTERNAL_SERVER_ERROR { + let status = res.status(); let res_body = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; - + tracing::trace!( + "server returned: {} body: {}", + status, + String::from_utf8_lossy(&res_body[..]) + ); let resp = serde_json::from_slice::(&res_body[..]) .map_err(SyncError::JsonDecode)?; @@ -650,22 +675,34 @@ impl SyncContext { let req = req.body(Body::empty()).expect("valid request"); - let res = self - .client - .request(req) - .await - .map_err(SyncError::HttpDispatch)?; + let (res, http_duration) = + crate::replication::remote_client::time(self.client.request(req)).await; + let res = res.map_err(SyncError::HttpDispatch)?; if !res.status().is_success() { let status = res.status(); let body = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; + tracing::error!( + "failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}", + status, + String::from_utf8_lossy(&body), + uri, + http_duration + ); return Err( SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(), ); } + tracing::debug!( + "pulled db file from remote server, status={}, url={}, duration={:?}", + res.status(), + uri, + http_duration + ); + // todo: do streaming write to the disk let bytes = hyper::body::to_bytes(res.into_body()) .await @@ -887,6 +924,11 @@ async fn try_push( }) } +/// PAGE_SIZE used by the sync / diskless server +const PAGE_SIZE: usize = 4096; +const FRAME_HEADER_SIZE: usize = 24; +const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE; + pub async fn try_pull( sync_ctx: &mut SyncContext, conn: &Connection, @@ -898,10 +940,32 @@ pub async fn try_pull( loop { let generation = sync_ctx.durable_generation(); let frame_no = sync_ctx.durable_frame_num() + 1; - match sync_ctx.pull_one_frame(generation, frame_no).await { - Ok(PullResult::Frame(frame)) => { - insert_handle.insert(&frame)?; - sync_ctx.durable_frame_num = frame_no; + match sync_ctx.pull_frames(generation, frame_no).await { + Ok(PullResult::Frames(frames)) => { + tracing::debug!( + "pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}", + generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(), + ); + if frames.len() % FRAME_SIZE != 0 { + tracing::error!( + "frame size {} is not a multiple of the expected size {}", + frames.len(), + FRAME_SIZE, + ); + return Err(SyncError::InvalidPullFrameBytes(frames.len()).into()); + } + for chunk in frames.chunks(FRAME_SIZE) { + let r = insert_handle.insert(&chunk); + if let Err(e) = r { + tracing::error!( + "insert error (frame= {}) : {:?}", + sync_ctx.durable_frame_num + 1, + e + ); + return Err(e); + } + sync_ctx.durable_frame_num += 1; + } } Ok(PullResult::EndOfGeneration { max_generation }) => { // If there are no more generations to pull, we're done. @@ -920,7 +984,7 @@ pub async fn try_pull( insert_handle.begin()?; } Err(e) => { - tracing::debug!("pull_one_frame error: {:?}", e); + tracing::debug!("pull_frames error: {:?}", e); err.replace(e); break; }