@@ -4,19 +4,20 @@ use std::fmt;
4
4
use std:: path:: { Path , PathBuf } ;
5
5
use std:: sync:: { Arc , Weak } ;
6
6
7
- use anyhow:: { bail , Context as _, ensure } ;
7
+ use anyhow:: Context as _;
8
8
use async_lock:: { RwLock , RwLockUpgradableReadGuard } ;
9
9
use bottomless:: replicator:: Options ;
10
10
use bytes:: Bytes ;
11
11
use chrono:: NaiveDateTime ;
12
12
use enclose:: enclose;
13
13
use futures_core:: Stream ;
14
14
use hyper:: Uri ;
15
+ use parking_lot:: Mutex ;
15
16
use rusqlite:: ErrorCode ;
16
17
use sqld_libsql_bindings:: wal_hook:: TRANSPARENT_METHODS ;
17
18
use tokio:: io:: AsyncBufReadExt ;
18
19
use tokio:: sync:: watch;
19
- use tokio:: task:: { block_in_place , JoinSet } ;
20
+ use tokio:: task:: JoinSet ;
20
21
use tokio:: time:: Duration ;
21
22
use tokio_util:: io:: StreamReader ;
22
23
use tonic:: transport:: Channel ;
@@ -76,7 +77,10 @@ impl NamespaceName {
76
77
}
77
78
78
79
pub fn as_str ( & self ) -> & str {
79
- std:: str:: from_utf8 ( & self . 0 ) . unwrap ( )
80
+ // Safety: the namespace is always valid UTF8
81
+ unsafe {
82
+ std:: str:: from_utf8_unchecked ( & self . 0 )
83
+ }
80
84
}
81
85
82
86
pub fn from_bytes ( bytes : Bytes ) -> crate :: Result < Self > {
@@ -476,6 +480,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
476
480
)
477
481
. await ?;
478
482
483
+
479
484
let mut lock = RwLockUpgradableReadGuard :: upgrade ( lock) . await ;
480
485
tracing:: info!( "loaded namespace: `{namespace}`" ) ;
481
486
lock. insert ( namespace, ns) ;
@@ -640,6 +645,25 @@ impl Namespace<PrimaryDatabase> {
640
645
name : NamespaceName ,
641
646
restore_option : RestoreOption ,
642
647
allow_creation : bool ,
648
+ ) -> crate :: Result < Self > {
649
+ // FIXME: make that truly atomic. explore the idea of using temp directories, and it's implications
650
+ match Self :: try_new_primary ( config, name. clone ( ) , restore_option, allow_creation) . await {
651
+ Ok ( ns) => Ok ( ns) ,
652
+ Err ( e) => {
653
+ let path = config. base_path . join ( "dbs" ) . join ( name. as_str ( ) ) ;
654
+ if let Err ( e) = tokio:: fs:: remove_dir_all ( path) . await {
655
+ tracing:: error!( "failed to clean dirty namespace: {e}" ) ;
656
+ }
657
+ Err ( e)
658
+ }
659
+ }
660
+ }
661
+
662
+ async fn try_new_primary (
663
+ config : & PrimaryNamespaceConfig ,
664
+ name : NamespaceName ,
665
+ restore_option : RestoreOption ,
666
+ allow_creation : bool ,
643
667
) -> crate :: Result < Self > {
644
668
// if namespaces are disabled, then we allow creation for the default namespace.
645
669
let allow_creation =
@@ -842,22 +866,24 @@ async fn load_dump<S>(
842
866
dump : S ,
843
867
mk_ctx : impl Fn ( ) -> ReplicationLoggerHookCtx ,
844
868
auto_checkpoint : u32 ,
845
- ) -> anyhow :: Result < ( ) >
869
+ ) -> crate :: Result < ( ) , LoadDumpError >
846
870
where
847
871
S : Stream < Item = std:: io:: Result < Bytes > > + Unpin ,
848
872
{
849
873
let mut retries = 0 ;
850
874
// there is a small chance we fail to acquire the lock right away, so we perform a few retries
851
875
let conn = loop {
852
- match block_in_place ( || {
876
+ let ctx = mk_ctx ( ) ;
877
+ let db_path = db_path. to_path_buf ( ) ;
878
+ match tokio:: task:: spawn_blocking ( move || {
853
879
open_conn (
854
- db_path,
880
+ & db_path,
855
881
& REPLICATION_METHODS ,
856
- mk_ctx ( ) ,
882
+ ctx ,
857
883
None ,
858
884
auto_checkpoint,
859
885
)
860
- } ) {
886
+ } ) . await ? {
861
887
Ok ( conn) => {
862
888
break conn;
863
889
}
@@ -874,12 +900,12 @@ where
874
900
retries += 1 ;
875
901
tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
876
902
}
877
- Err ( e) => {
878
- bail ! ( e) ;
879
- }
903
+ Err ( e) => Err ( e) ?,
880
904
}
881
905
} ;
882
906
907
+ let conn = Arc :: new ( Mutex :: new ( conn) ) ;
908
+
883
909
let mut reader = tokio:: io:: BufReader :: new ( StreamReader :: new ( dump) ) ;
884
910
let mut curr = String :: new ( ) ;
885
911
let mut line = String :: new ( ) ;
@@ -911,20 +937,27 @@ where
911
937
912
938
if line. ends_with ( ';' ) {
913
939
n_stmt += 1 ;
914
- if n_stmt > 2 {
915
- ensure ! ( !conn. is_autocommit( ) , "a dump should execute within a transaction." ) ;
940
+ // dump must be performd within a txn
941
+ if n_stmt > 2 && conn. lock ( ) . is_autocommit ( ) {
942
+ return Err ( LoadDumpError :: NoTxn ) ;
916
943
}
917
944
918
- block_in_place ( || conn. execute ( & line, ( ) ) ) ?;
945
+ line = tokio:: task:: spawn_blocking ( {
946
+ let conn = conn. clone ( ) ;
947
+ move || -> crate :: Result < String , LoadDumpError > {
948
+ conn. lock ( ) . execute ( & line, ( ) ) ?;
949
+ Ok ( line)
950
+ }
951
+ } ) . await ??;
919
952
line. clear ( ) ;
920
953
} else {
921
954
line. push ( ' ' ) ;
922
955
}
923
956
}
924
957
925
- if !conn. is_autocommit ( ) {
926
- let _ = conn. execute ( "rollback" , ( ) ) ;
927
- bail ! ( "the dump should commit the transaction." ) ;
958
+ if !conn. lock ( ) . is_autocommit ( ) {
959
+ let _ = conn. lock ( ) . execute ( "rollback" , ( ) ) ;
960
+ return Err ( LoadDumpError :: NoCommit ) ;
928
961
}
929
962
930
963
Ok ( ( ) )
0 commit comments