@@ -16,6 +16,7 @@ use ostree::{gio, glib};
16
16
use std:: collections:: HashMap ;
17
17
use std:: iter:: FromIterator ;
18
18
use std:: sync:: { Arc , Mutex } ;
19
+ use tokio:: sync:: mpsc:: { Receiver , Sender } ;
19
20
20
21
/// Configuration for the proxy.
21
22
///
@@ -54,6 +55,19 @@ fn ref_for_image(l: &ImageReference) -> Result<String> {
54
55
refescape:: prefix_escape_for_ref ( IMAGE_PREFIX , & l. to_string ( ) )
55
56
}
56
57
58
+ /// Sent across a channel to track start and end of a container fetch.
59
+ #[ derive( Debug ) ]
60
+ pub enum ImportProgress {
61
+ /// Started fetching this layer.
62
+ OstreeChunkStarted ( Descriptor ) ,
63
+ /// Successfully completed the fetch of this layer.
64
+ OstreeChunkCompleted ( Descriptor ) ,
65
+ /// Started fetching this layer.
66
+ DerivedLayerStarted ( Descriptor ) ,
67
+ /// Successfully completed the fetch of this layer.
68
+ DerivedLayerCompleted ( Descriptor ) ,
69
+ }
70
+
57
71
/// State of an already pulled layered image.
58
72
#[ derive( Debug , PartialEq , Eq ) ]
59
73
pub struct LayeredImageState {
@@ -95,6 +109,8 @@ pub struct ImageImporter {
95
109
imgref : OstreeImageReference ,
96
110
target_imgref : Option < OstreeImageReference > ,
97
111
pub ( crate ) proxy_img : OpenedImage ,
112
+
113
+ layer_progress : Option < Sender < ImportProgress > > ,
98
114
}
99
115
100
116
/// Result of invoking [`LayeredImageImporter::prepare`].
@@ -274,6 +290,7 @@ impl ImageImporter {
274
290
proxy_img,
275
291
target_imgref : None ,
276
292
imgref : imgref. clone ( ) ,
293
+ layer_progress : None ,
277
294
} )
278
295
}
279
296
@@ -286,6 +303,14 @@ impl ImageImporter {
286
303
self . prepare_internal ( false ) . await
287
304
}
288
305
306
+ /// Create a channel receiver that will get notifications for layer fetches.
307
+ pub fn request_progress ( & mut self ) -> Receiver < ImportProgress > {
308
+ assert ! ( self . layer_progress. is_none( ) ) ;
309
+ let ( s, r) = tokio:: sync:: mpsc:: channel ( 2 ) ;
310
+ self . layer_progress = Some ( s) ;
311
+ r
312
+ }
313
+
289
314
/// Determine if there is a new manifest, and if so return its digest.
290
315
#[ context( "Fetching manifest" ) ]
291
316
pub ( crate ) async fn prepare_internal ( & mut self , verify_layers : bool ) -> Result < PrepareResult > {
@@ -397,6 +422,10 @@ impl ImageImporter {
397
422
if layer. commit . is_some ( ) {
398
423
continue ;
399
424
}
425
+ if let Some ( p) = self . layer_progress . as_ref ( ) {
426
+ p. send ( ImportProgress :: OstreeChunkStarted ( layer. layer . clone ( ) ) )
427
+ . await ?;
428
+ }
400
429
let ( blob, driver) =
401
430
fetch_layer_decompress ( & mut self . proxy , & self . proxy_img , & layer. layer ) . await ?;
402
431
let blob = super :: unencapsulate:: ProgressReader {
@@ -425,8 +454,18 @@ impl ImageImporter {
425
454
} ) ;
426
455
let commit = super :: unencapsulate:: join_fetch ( import_task, driver) . await ?;
427
456
layer. commit = commit;
457
+ if let Some ( p) = self . layer_progress . as_ref ( ) {
458
+ p. send ( ImportProgress :: OstreeChunkCompleted ( layer. layer . clone ( ) ) )
459
+ . await ?;
460
+ }
428
461
}
429
462
if import. ostree_commit_layer . commit . is_none ( ) {
463
+ if let Some ( p) = self . layer_progress . as_ref ( ) {
464
+ p. send ( ImportProgress :: OstreeChunkStarted (
465
+ import. ostree_commit_layer . layer . clone ( ) ,
466
+ ) )
467
+ . await ?;
468
+ }
430
469
let ( blob, driver) = fetch_layer_decompress (
431
470
& mut self . proxy ,
432
471
& self . proxy_img ,
@@ -457,6 +496,12 @@ impl ImageImporter {
457
496
} ) ;
458
497
let commit = super :: unencapsulate:: join_fetch ( import_task, driver) . await ?;
459
498
import. ostree_commit_layer . commit = Some ( commit) ;
499
+ if let Some ( p) = self . layer_progress . as_ref ( ) {
500
+ p. send ( ImportProgress :: OstreeChunkCompleted (
501
+ import. ostree_commit_layer . layer . clone ( ) ,
502
+ ) )
503
+ . await ?;
504
+ }
460
505
} ;
461
506
Ok ( ( ) )
462
507
}
@@ -503,6 +548,10 @@ impl ImageImporter {
503
548
tracing:: debug!( "Reusing fetched commit {}" , c) ;
504
549
layer_commits. push ( c. to_string ( ) ) ;
505
550
} else {
551
+ if let Some ( p) = self . layer_progress . as_ref ( ) {
552
+ p. send ( ImportProgress :: DerivedLayerStarted ( layer. layer . clone ( ) ) )
553
+ . await ?;
554
+ }
506
555
let ( blob, driver) = super :: unencapsulate:: fetch_layer_decompress (
507
556
& mut proxy,
508
557
& self . proxy_img ,
@@ -525,6 +574,10 @@ impl ImageImporter {
525
574
let filtered = HashMap :: from_iter ( r. filtered . into_iter ( ) ) ;
526
575
layer_filtered_content. insert ( layer. digest ( ) . to_string ( ) , filtered) ;
527
576
}
577
+ if let Some ( p) = self . layer_progress . as_ref ( ) {
578
+ p. send ( ImportProgress :: DerivedLayerCompleted ( layer. layer . clone ( ) ) )
579
+ . await ?;
580
+ }
528
581
}
529
582
}
530
583
0 commit comments