diff --git a/libsql/src/database.rs b/libsql/src/database.rs index 1362ec217a..bb07bb189d 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -84,7 +84,7 @@ enum DbType { path: String, flags: OpenFlags, encryption_config: Option, - skip_saftey_assert: bool, + skip_safety_assert: bool, }, #[cfg(feature = "replication")] Sync { @@ -166,7 +166,7 @@ cfg_core! { path: db_path.into(), flags, encryption_config: None, - skip_saftey_assert: false, + skip_safety_assert: false, }, max_write_replication_index: Default::default(), }) @@ -458,7 +458,7 @@ cfg_replication! { DbType::Sync { db, .. } => { let path = db.path().to_string(); Ok(Database { - db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false }, + db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_safety_assert: false }, max_write_replication_index: Default::default(), }) } @@ -580,11 +580,11 @@ impl Database { path, flags, encryption_config, - skip_saftey_assert, + skip_safety_assert, } => { use crate::local::impls::LibsqlConnection; - let db = if !skip_saftey_assert { + let db = if !skip_safety_assert { crate::local::Database::open(path, *flags)? } else { unsafe { crate::local::Database::open_raw(path, *flags)? } diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index 1969e1f511..ef70479430 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -175,7 +175,7 @@ cfg_core! { /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at /// your own risk. - pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + pub unsafe fn skip_safety_assert(mut self, skip: bool) -> Builder { self.inner.skip_safety_assert = skip; self } @@ -206,7 +206,7 @@ cfg_core! { path, flags: self.inner.flags, encryption_config: self.inner.encryption_config, - skip_saftey_assert: self.inner.skip_safety_assert + skip_safety_assert: self.inner.skip_safety_assert }, max_write_replication_index: Default::default(), } @@ -321,7 +321,7 @@ cfg_replication! { /// Using this setting is very UNSAFE and you are expected to use the libsql in adherence /// with the sqlite3 threadsafe rules or else you WILL create undefined behavior. Use at /// your own risk. - pub unsafe fn skip_saftey_assert(mut self, skip: bool) -> Builder { + pub unsafe fn skip_safety_assert(mut self, skip: bool) -> Builder { self.inner.skip_safety_assert = skip; self } diff --git a/libsql/src/replication/remote_client.rs b/libsql/src/replication/remote_client.rs index a401d7a1ac..d55f68ec4d 100644 --- a/libsql/src/replication/remote_client.rs +++ b/libsql/src/replication/remote_client.rs @@ -18,7 +18,7 @@ use tonic::metadata::AsciiMetadataValue; use tonic::{Response, Status}; use zerocopy::FromBytes; -async fn time(fut: impl Future) -> (O, Duration) { +pub(crate) async fn time(fut: impl Future) -> (O, Duration) { let before = Instant::now(); let out = fut.await; (out, before.elapsed()) diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index de3ab6a4b4..9a74b118b0 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0; const DEFAULT_MAX_RETRIES: usize = 5; const DEFAULT_PUSH_BATCH_SIZE: u32 = 128; +const DEFAULT_PULL_BATCH_SIZE: u32 = 128; #[derive(thiserror::Error, Debug)] #[non_exhaustive] @@ -66,6 +67,8 @@ pub enum SyncError { InvalidLocalGeneration(u32, u32), #[error("invalid local state: {0}")] InvalidLocalState(String), + #[error("server returned invalid length of frames: {0}")] + InvalidPullFrameBytes(usize), } impl SyncError { @@ -98,8 +101,8 @@ pub enum PushStatus { } pub enum PullResult { - /// A frame was successfully pulled. - Frame(Bytes), + /// Frames were successfully pulled. + Frames(Bytes), /// We've reached the end of the generation. EndOfGeneration { max_generation: u32 }, } @@ -122,6 +125,7 @@ pub struct SyncContext { auth_token: Option, max_retries: usize, push_batch_size: u32, + pull_batch_size: u32, /// The current durable generation. durable_generation: u32, /// Represents the max_frame_no from the server. @@ -154,6 +158,7 @@ impl SyncContext { auth_token, max_retries: DEFAULT_MAX_RETRIES, push_batch_size: DEFAULT_PUSH_BATCH_SIZE, + pull_batch_size: DEFAULT_PULL_BATCH_SIZE, client, durable_generation: 0, durable_frame_num: 0, @@ -175,7 +180,7 @@ impl SyncContext { } #[tracing::instrument(skip(self))] - pub(crate) async fn pull_one_frame( + pub(crate) async fn pull_frames( &mut self, generation: u32, frame_no: u32, @@ -185,9 +190,10 @@ impl SyncContext { self.sync_url, generation, frame_no, - frame_no + 1 + // the server expects the range of [start, end) frames, i.e. end is exclusive + frame_no + self.pull_batch_size ); - tracing::debug!("pulling frame"); + tracing::debug!("pulling frame (uri={})", uri); self.pull_with_retry(uri, self.max_retries).await } @@ -417,20 +423,39 @@ impl SyncContext { .map_err(SyncError::HttpDispatch)?; if res.status().is_success() { - let frame = hyper::body::to_bytes(res.into_body()) + let frames = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; - return Ok(PullResult::Frame(frame)); + // a success result should always return some frames + if frames.is_empty() { + tracing::error!("server returned empty frames in pull response"); + return Err(SyncError::InvalidPullFrameBytes(0).into()); + } + // the minimum payload size cannot be less than a single frame + if frames.len() < FRAME_SIZE { + tracing::error!( + "server returned frames with invalid length: {} < {}", + frames.len(), + FRAME_SIZE + ); + return Err(SyncError::InvalidPullFrameBytes(frames.len()).into()); + } + return Ok(PullResult::Frames(frames)); } // BUG ALERT: The server returns a 500 error if the remote database is empty. // This is a bug and should be fixed. if res.status() == StatusCode::BAD_REQUEST || res.status() == StatusCode::INTERNAL_SERVER_ERROR { + let status = res.status(); let res_body = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; - + tracing::trace!( + "server returned: {} body: {}", + status, + String::from_utf8_lossy(&res_body[..]) + ); let resp = serde_json::from_slice::(&res_body[..]) .map_err(SyncError::JsonDecode)?; @@ -650,22 +675,34 @@ impl SyncContext { let req = req.body(Body::empty()).expect("valid request"); - let res = self - .client - .request(req) - .await - .map_err(SyncError::HttpDispatch)?; + let (res, http_duration) = + crate::replication::remote_client::time(self.client.request(req)).await; + let res = res.map_err(SyncError::HttpDispatch)?; if !res.status().is_success() { let status = res.status(); let body = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; + tracing::error!( + "failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}", + status, + String::from_utf8_lossy(&body), + uri, + http_duration + ); return Err( SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(), ); } + tracing::debug!( + "pulled db file from remote server, status={}, url={}, duration={:?}", + res.status(), + uri, + http_duration + ); + // todo: do streaming write to the disk let bytes = hyper::body::to_bytes(res.into_body()) .await @@ -887,6 +924,11 @@ async fn try_push( }) } +/// PAGE_SIZE used by the sync / diskless server +const PAGE_SIZE: usize = 4096; +const FRAME_HEADER_SIZE: usize = 24; +const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE; + pub async fn try_pull( sync_ctx: &mut SyncContext, conn: &Connection, @@ -898,10 +940,32 @@ pub async fn try_pull( loop { let generation = sync_ctx.durable_generation(); let frame_no = sync_ctx.durable_frame_num() + 1; - match sync_ctx.pull_one_frame(generation, frame_no).await { - Ok(PullResult::Frame(frame)) => { - insert_handle.insert(&frame)?; - sync_ctx.durable_frame_num = frame_no; + match sync_ctx.pull_frames(generation, frame_no).await { + Ok(PullResult::Frames(frames)) => { + tracing::debug!( + "pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}", + generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(), + ); + if frames.len() % FRAME_SIZE != 0 { + tracing::error!( + "frame size {} is not a multiple of the expected size {}", + frames.len(), + FRAME_SIZE, + ); + return Err(SyncError::InvalidPullFrameBytes(frames.len()).into()); + } + for chunk in frames.chunks(FRAME_SIZE) { + let r = insert_handle.insert(&chunk); + if let Err(e) = r { + tracing::error!( + "insert error (frame= {}) : {:?}", + sync_ctx.durable_frame_num + 1, + e + ); + return Err(e); + } + sync_ctx.durable_frame_num += 1; + } } Ok(PullResult::EndOfGeneration { max_generation }) => { // If there are no more generations to pull, we're done. @@ -920,7 +984,7 @@ pub async fn try_pull( insert_handle.begin()?; } Err(e) => { - tracing::debug!("pull_one_frame error: {:?}", e); + tracing::debug!("pull_frames error: {:?}", e); err.replace(e); break; }