1111//! - => S3 as the source for the PGDATA instead of local filesystem
1212//!
1313//! TODOs before productionization:
14- //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
15- //! => produced image layers likely too small.
1614//! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
17- //! - asserts / unwraps need to be replaced with errors
18- //! - don't trust remote objects will be small (=prevent OOMs in those cases)
19- //! - limit all in-memory buffers in size, or download to disk and read from there
20- //! - limit task concurrency
21- //! - generally play nice with other tenants in the system
22- //! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
23- //! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
24- //! - integrate with layer eviction system
25- //! - audit for Tenant::cancel nor Timeline::cancel responsivity
26- //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
2715//!
2816//! An incomplete set of TODOs from the Hackathon:
2917//! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
@@ -44,7 +32,7 @@ use pageserver_api::key::{
4432 rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
4533 slru_segment_size_to_key,
4634} ;
47- use pageserver_api:: keyspace:: { contiguous_range_len , is_contiguous_range , singleton_range} ;
35+ use pageserver_api:: keyspace:: { ShardedRange , singleton_range} ;
4836use pageserver_api:: models:: { ShardImportProgress , ShardImportProgressV1 , ShardImportStatus } ;
4937use pageserver_api:: reltag:: { RelTag , SlruKind } ;
5038use pageserver_api:: shard:: ShardIdentity ;
@@ -167,6 +155,7 @@ impl Planner {
167155 /// This function is and must remain pure: given the same input, it will generate the same import plan.
168156 async fn plan ( mut self , import_config : & TimelineImportConfig ) -> anyhow:: Result < Plan > {
169157 let pgdata_lsn = Lsn ( self . control_file . control_file_data ( ) . checkPoint ) . align ( ) ;
158+ anyhow:: ensure!( pgdata_lsn. is_valid( ) ) ;
170159
171160 let datadir = PgDataDir :: new ( & self . storage ) . await ?;
172161
@@ -249,14 +238,22 @@ impl Planner {
249238 } ) ;
250239
251240 // Assigns parts of key space to later parallel jobs
241+ // Note: The image layers produced here may have gaps, meaning,
242+ // there is not an image for each key in the layer's key range.
243+ // The read path stops traversal at the first image layer, regardless
244+ // of whether a base image has been found for a key or not.
245+ // (Concept of sparse image layers doesn't exist.)
246+ // This behavior is exactly right for the base image layers we're producing here.
247+ // But, since no other place in the code currently produces image layers with gaps,
248+ // it seems noteworthy.
252249 let mut last_end_key = Key :: MIN ;
253250 let mut current_chunk = Vec :: new ( ) ;
254251 let mut current_chunk_size: usize = 0 ;
255252 let mut jobs = Vec :: new ( ) ;
256253 for task in std:: mem:: take ( & mut self . tasks ) . into_iter ( ) {
257- if current_chunk_size + task. total_size ( )
258- > import_config . import_job_soft_size_limit . into ( )
259- {
254+ let task_size = task. total_size ( & self . shard ) ;
255+ let projected_chunk_size = current_chunk_size . saturating_add ( task_size ) ;
256+ if projected_chunk_size > import_config . import_job_soft_size_limit . into ( ) {
260257 let key_range = last_end_key..task. key_range ( ) . start ;
261258 jobs. push ( ChunkProcessingJob :: new (
262259 key_range. clone ( ) ,
@@ -266,7 +263,7 @@ impl Planner {
266263 last_end_key = key_range. end ;
267264 current_chunk_size = 0 ;
268265 }
269- current_chunk_size += task . total_size ( ) ;
266+ current_chunk_size = current_chunk_size . saturating_add ( task_size ) ;
270267 current_chunk. push ( task) ;
271268 }
272269 jobs. push ( ChunkProcessingJob :: new (
@@ -604,18 +601,18 @@ impl PgDataDirDb {
604601 } ;
605602
606603 let path = datadir_path. join ( rel_tag. to_segfile_name ( segno) ) ;
607- assert ! ( filesize % BLCKSZ as usize == 0 ) ; // TODO: this should result in an error
604+ anyhow :: ensure !( filesize % BLCKSZ as usize == 0 ) ;
608605 let nblocks = filesize / BLCKSZ as usize ;
609606
610- PgDataDirDbFile {
607+ Ok ( PgDataDirDbFile {
611608 path,
612609 filesize,
613610 rel_tag,
614611 segno,
615612 nblocks : Some ( nblocks) , // first non-cummulative sizes
616- }
613+ } )
617614 } )
618- . collect ( ) ;
615+ . collect :: < anyhow :: Result < _ , _ > > ( ) ? ;
619616
620617 // Set cummulative sizes. Do all of that math here, so that later we could easier
621618 // parallelize over segments and know with which segments we need to write relsize
@@ -650,12 +647,22 @@ impl PgDataDirDb {
650647trait ImportTask {
651648 fn key_range ( & self ) -> Range < Key > ;
652649
653- fn total_size ( & self ) -> usize {
654- // TODO: revisit this
655- if is_contiguous_range ( & self . key_range ( ) ) {
656- contiguous_range_len ( & self . key_range ( ) ) as usize * 8192
650+ fn total_size ( & self , shard_identity : & ShardIdentity ) -> usize {
651+ let range = ShardedRange :: new ( self . key_range ( ) , shard_identity) ;
652+ let page_count = range. page_count ( ) ;
653+ if page_count == u32:: MAX {
654+ tracing:: warn!(
655+ "Import task has non contiguous key range: {}..{}" ,
656+ self . key_range( ) . start,
657+ self . key_range( ) . end
658+ ) ;
659+
660+ // Tasks should operate on contiguous ranges. It is unexpected for
661+ // ranges to violate this assumption. Calling code handles this by mapping
662+ // any task on a non contiguous range to its own image layer.
663+ usize:: MAX
657664 } else {
658- u32 :: MAX as usize
665+ page_count as usize * 8192
659666 }
660667 }
661668
@@ -753,6 +760,8 @@ impl ImportTask for ImportRelBlocksTask {
753760 layer_writer : & mut ImageLayerWriter ,
754761 ctx : & RequestContext ,
755762 ) -> anyhow:: Result < usize > {
763+ const MAX_BYTE_RANGE_SIZE : usize = 128 * 1024 * 1024 ;
764+
756765 debug ! ( "Importing relation file" ) ;
757766
758767 let ( rel_tag, start_blk) = self . key_range . start . to_rel_block ( ) ?;
@@ -777,7 +786,7 @@ impl ImportTask for ImportRelBlocksTask {
777786 assert_eq ! ( key. len( ) , 1 ) ;
778787 assert ! ( !acc. is_empty( ) ) ;
779788 assert ! ( acc_end > acc_start) ;
780- if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
789+ if acc_end == start && end - acc_start <= MAX_BYTE_RANGE_SIZE {
781790 acc. push ( key. pop ( ) . unwrap ( ) ) ;
782791 Ok ( ( acc, acc_start, end) )
783792 } else {
@@ -792,8 +801,8 @@ impl ImportTask for ImportRelBlocksTask {
792801 . get_range ( & self . path , range_start. into_u64 ( ) , range_end. into_u64 ( ) )
793802 . await ?;
794803 let mut buf = Bytes :: from ( range_buf) ;
795- // TODO: batched writes
796804 for key in keys {
805+ // The writer buffers writes internally
797806 let image = buf. split_to ( 8192 ) ;
798807 layer_writer. put_image ( key, image, ctx) . await ?;
799808 nimages += 1 ;
@@ -846,6 +855,9 @@ impl ImportTask for ImportSlruBlocksTask {
846855 debug ! ( "Importing SLRU segment file {}" , self . path) ;
847856 let buf = self . storage . get ( & self . path ) . await ?;
848857
858+ // TODO(vlad): Does timestamp to LSN work for imported timelines?
859+ // Probably not since we don't append the `xact_time` to it as in
860+ // [`WalIngest::ingest_xact_record`].
849861 let ( kind, segno, start_blk) = self . key_range . start . to_slru_block ( ) ?;
850862 let ( _kind, _segno, end_blk) = self . key_range . end . to_slru_block ( ) ?;
851863 let mut blknum = start_blk;
0 commit comments