Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 12b889f

Browse files
committed
fix tests
1 parent 2ef424c commit 12b889f

12 files changed

+121
-519
lines changed

sqld/src/error.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,32 @@ impl From<bincode::Error> for Error {
147147
}
148148
}
149149

150+
macro_rules! internal_from {
151+
($to:ty => { $($from:ty,)* }) => {
152+
$(
153+
impl From<$from> for $to {
154+
fn from(v: $from) -> Self {
155+
<$to>::Internal(v.to_string())
156+
}
157+
}
158+
)*
159+
160+
};
161+
}
162+
163+
internal_from! {
164+
LoadDumpError => {
165+
std::io::Error,
166+
rusqlite::Error,
167+
hyper::Error,
168+
tokio::task::JoinError,
169+
}
170+
}
171+
150172
#[derive(Debug, thiserror::Error)]
151173
pub enum LoadDumpError {
152-
#[error("IO error: {0}")]
153-
Io(#[from] std::io::Error),
174+
#[error("internal error: {0}")]
175+
Internal(String),
154176
#[error("Cannot load a dump on a replica")]
155177
ReplicaLoadDump,
156178
#[error("cannot load from a dump if a database already exists")]
@@ -161,10 +183,12 @@ pub enum LoadDumpError {
161183
DumpFileDoesntExist,
162184
#[error("invalid dump url")]
163185
InvalidDumpUrl,
164-
#[error("error fetching dump: {0}")]
165-
Fetch(#[from] hyper::Error),
166186
#[error("unsupported dump url scheme `{0}`, supported schemes are: `http`, `file`")]
167187
UnsupportedUrlScheme(String),
188+
#[error("a dump should execute within a transaction.")]
189+
NoTxn,
190+
#[error("the dump should commit the transaction.")]
191+
NoCommit,
168192
}
169193

170194
impl ResponseError for LoadDumpError {}
@@ -174,12 +198,14 @@ impl IntoResponse for LoadDumpError {
174198
use LoadDumpError::*;
175199

176200
match &self {
177-
Io(_) | Fetch(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
201+
Internal(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
178202
ReplicaLoadDump
179203
| LoadDumpExistingDb
180204
| InvalidDumpUrl
181205
| DumpFileDoesntExist
182206
| UnsupportedUrlScheme(_)
207+
| NoTxn
208+
| NoCommit
183209
| DumpFilePathNotAbsolute => self.format_err(StatusCode::BAD_REQUEST),
184210
}
185211
}

sqld/src/namespace/mod.rs

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,20 @@ use std::fmt;
44
use std::path::{Path, PathBuf};
55
use std::sync::{Arc, Weak};
66

7-
use anyhow::{bail, Context as _, ensure};
7+
use anyhow::Context as _;
88
use async_lock::{RwLock, RwLockUpgradableReadGuard};
99
use bottomless::replicator::Options;
1010
use bytes::Bytes;
1111
use chrono::NaiveDateTime;
1212
use enclose::enclose;
1313
use futures_core::Stream;
1414
use hyper::Uri;
15+
use parking_lot::Mutex;
1516
use rusqlite::ErrorCode;
1617
use sqld_libsql_bindings::wal_hook::TRANSPARENT_METHODS;
1718
use tokio::io::AsyncBufReadExt;
1819
use tokio::sync::watch;
19-
use tokio::task::{block_in_place, JoinSet};
20+
use tokio::task::JoinSet;
2021
use tokio::time::Duration;
2122
use tokio_util::io::StreamReader;
2223
use tonic::transport::Channel;
@@ -76,7 +77,10 @@ impl NamespaceName {
7677
}
7778

7879
pub fn as_str(&self) -> &str {
79-
std::str::from_utf8(&self.0).unwrap()
80+
// Safety: the namespace is always valid UTF8
81+
unsafe {
82+
std::str::from_utf8_unchecked(&self.0)
83+
}
8084
}
8185

8286
pub fn from_bytes(bytes: Bytes) -> crate::Result<Self> {
@@ -476,6 +480,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
476480
)
477481
.await?;
478482

483+
479484
let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await;
480485
tracing::info!("loaded namespace: `{namespace}`");
481486
lock.insert(namespace, ns);
@@ -640,6 +645,25 @@ impl Namespace<PrimaryDatabase> {
640645
name: NamespaceName,
641646
restore_option: RestoreOption,
642647
allow_creation: bool,
648+
) -> crate::Result<Self> {
649+
// FIXME: make that truly atomic. explore the idea of using temp directories, and it's implications
650+
match Self::try_new_primary(config, name.clone(), restore_option, allow_creation).await {
651+
Ok(ns) => Ok(ns),
652+
Err(e) => {
653+
let path = config.base_path.join("dbs").join(name.as_str());
654+
if let Err(e) = tokio::fs::remove_dir_all(path).await {
655+
tracing::error!("failed to clean dirty namespace: {e}");
656+
}
657+
Err(e)
658+
}
659+
}
660+
}
661+
662+
async fn try_new_primary(
663+
config: &PrimaryNamespaceConfig,
664+
name: NamespaceName,
665+
restore_option: RestoreOption,
666+
allow_creation: bool,
643667
) -> crate::Result<Self> {
644668
// if namespaces are disabled, then we allow creation for the default namespace.
645669
let allow_creation =
@@ -842,22 +866,24 @@ async fn load_dump<S>(
842866
dump: S,
843867
mk_ctx: impl Fn() -> ReplicationLoggerHookCtx,
844868
auto_checkpoint: u32,
845-
) -> anyhow::Result<()>
869+
) -> crate::Result<(), LoadDumpError>
846870
where
847871
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
848872
{
849873
let mut retries = 0;
850874
// there is a small chance we fail to acquire the lock right away, so we perform a few retries
851875
let conn = loop {
852-
match block_in_place(|| {
876+
let ctx = mk_ctx();
877+
let db_path = db_path.to_path_buf();
878+
match tokio::task::spawn_blocking(move || {
853879
open_conn(
854-
db_path,
880+
&db_path,
855881
&REPLICATION_METHODS,
856-
mk_ctx(),
882+
ctx,
857883
None,
858884
auto_checkpoint,
859885
)
860-
}) {
886+
}).await? {
861887
Ok(conn) => {
862888
break conn;
863889
}
@@ -874,12 +900,12 @@ where
874900
retries += 1;
875901
tokio::time::sleep(Duration::from_millis(100)).await;
876902
}
877-
Err(e) => {
878-
bail!(e);
879-
}
903+
Err(e) => Err(e)?,
880904
}
881905
};
882906

907+
let conn = Arc::new(Mutex::new(conn));
908+
883909
let mut reader = tokio::io::BufReader::new(StreamReader::new(dump));
884910
let mut curr = String::new();
885911
let mut line = String::new();
@@ -911,20 +937,27 @@ where
911937

912938
if line.ends_with(';') {
913939
n_stmt += 1;
914-
if n_stmt > 2 {
915-
ensure!(!conn.is_autocommit(), "a dump should execute within a transaction.");
940+
// dump must be performd within a txn
941+
if n_stmt > 2 && conn.lock().is_autocommit() {
942+
return Err(LoadDumpError::NoTxn);
916943
}
917944

918-
block_in_place(|| conn.execute(&line, ()))?;
945+
line = tokio::task::spawn_blocking({
946+
let conn = conn.clone();
947+
move || -> crate::Result<String, LoadDumpError> {
948+
conn.lock().execute(&line, ())?;
949+
Ok(line)
950+
}
951+
}).await??;
919952
line.clear();
920953
} else {
921954
line.push(' ');
922955
}
923956
}
924957

925-
if !conn.is_autocommit() {
926-
let _ = conn.execute("rollback", ());
927-
bail!("the dump should commit the transaction.");
958+
if !conn.lock().is_autocommit() {
959+
let _ = conn.lock().execute("rollback", ());
960+
return Err(LoadDumpError::NoCommit);
928961
}
929962

930963
Ok(())

0 commit comments

Comments
 (0)