@@ -60,7 +60,6 @@ pub struct Synchronization<T> {
60
60
pool : XSQLPool ,
61
61
rocksdb : PrefixKVStore ,
62
62
adapter : Arc < T > ,
63
- task_count : Arc < ( ) > ,
64
63
65
64
sync_task_size : usize ,
66
65
max_task_number : usize ,
@@ -75,13 +74,11 @@ impl<T: SyncAdapter> Synchronization<T> {
75
74
max_task_number : usize ,
76
75
) -> Self {
77
76
let rocksdb = PrefixKVStore :: new ( rocksdb_path) ;
78
- let task_count = Arc :: new ( ( ) ) ;
79
77
80
78
Synchronization {
81
79
pool,
82
80
rocksdb,
83
81
adapter,
84
- task_count,
85
82
sync_task_size,
86
83
max_task_number,
87
84
}
@@ -106,7 +103,7 @@ impl<T: SyncAdapter> Synchronization<T> {
106
103
self . pool . fetch_count_by_wrapper :: < BlockTable > ( & w) . await ?
107
104
} ;
108
105
109
- debug_assert ! ( current_count == ( chain_tip + 1 ) ) ;
106
+ log :: info! ( "[sync] current block count {}" , current_count ) ;
110
107
111
108
let mut num = 1 ;
112
109
while let Some ( set) = self . check_synchronization ( ) . await ? {
@@ -147,24 +144,23 @@ impl<T: SyncAdapter> Synchronization<T> {
147
144
148
145
for set in sync_list. chunks ( self . sync_task_size ) {
149
146
let sync_set = set. to_vec ( ) ;
150
- let ( rdb, kvdb, adapter, arc_clone ) = (
147
+ let ( rdb, kvdb, adapter) = (
151
148
self . pool . clone ( ) ,
152
149
self . rocksdb . clone ( ) ,
153
150
Arc :: clone ( & self . adapter ) ,
154
- Arc :: clone ( & self . task_count ) ,
155
151
) ;
156
152
157
153
loop {
158
154
let task_num = current_task_count ( ) ;
159
155
if task_num < self . max_task_number {
160
156
add_one_task ( ) ;
161
157
tokio:: spawn ( async move {
162
- sync_process ( sync_set, rdb, kvdb, adapter, arc_clone ) . await ;
158
+ sync_process ( sync_set, rdb, kvdb, adapter) . await ;
163
159
} ) ;
164
160
165
161
break ;
166
162
} else {
167
- sleep ( Duration :: from_secs ( 2 ) ) . await ;
163
+ sleep ( Duration :: from_secs ( 5 ) ) . await ;
168
164
}
169
165
}
170
166
}
@@ -260,12 +256,15 @@ impl<T: SyncAdapter> Synchronization<T> {
260
256
}
261
257
262
258
async fn wait_insertion_complete ( & self ) {
263
- while Arc :: strong_count ( & self . task_count ) != 1 {
264
- log:: info!(
265
- "current thread number {}" ,
266
- Arc :: strong_count( & self . task_count)
267
- ) ;
259
+ loop {
268
260
sleep ( Duration :: from_secs ( 5 ) ) . await ;
261
+
262
+ let task_num = current_task_count ( ) ;
263
+ if task_num == 0 {
264
+ return ;
265
+ }
266
+
267
+ log:: info!( "current thread number {}" , current_task_count( ) ) ;
269
268
}
270
269
}
271
270
}
@@ -275,7 +274,6 @@ async fn sync_process<T: SyncAdapter>(
275
274
rdb : XSQLPool ,
276
275
kvdb : PrefixKVStore ,
277
276
adapter : Arc < T > ,
278
- _: Arc < ( ) > ,
279
277
) {
280
278
for subtask in task. chunks ( PULL_BLOCK_BATCH_SIZE ) {
281
279
let ( rdb_clone, kvdb_clone, adapter_clone) =
0 commit comments