@@ -105,7 +105,7 @@ def __init__(self, network: Network) -> None:
105
105
class WorkerPod :
106
106
def __init__ (
107
107
self ,
108
- id : int ,
108
+ worker_id : int ,
109
109
start_version : int ,
110
110
end_version : int ,
111
111
label : str ,
@@ -115,7 +115,7 @@ def __init__(
115
115
network : Network = Network .TESTNET ,
116
116
namespace : str = "default" ,
117
117
) -> None :
118
- self .id = id
118
+ self .worker_id = worker_id
119
119
self .client = client .CoreV1Api ()
120
120
self .name = f"{ label } -replay-verify-{ start_version } -{ end_version } "
121
121
self .start_version = start_version
@@ -167,6 +167,12 @@ def get_phase(self) -> str | None:
167
167
return self .status .status .phase
168
168
return None
169
169
170
+ def get_container_status (self ) -> list [client .V1ContainerStatus ] | None :
171
+ self .update_status ()
172
+ if self .status :
173
+ return self .status .status .container_statuses
174
+ return None
175
+
170
176
def has_txn_mismatch (self ) -> bool :
171
177
if self .status :
172
178
container_statuses = self .status .status .container_statuses
@@ -182,7 +188,7 @@ def get_target_db_dir(self) -> str:
182
188
return "/mnt/archive/db"
183
189
184
190
def get_claim_name (self ) -> str :
185
- idx = self .id % len (self .pvcs )
191
+ idx = self .worker_id % len (self .pvcs )
186
192
return self .pvcs [idx ]
187
193
188
194
def start (self ) -> None :
@@ -392,17 +398,40 @@ def create_pvc_from_snapshot(self):
392
398
assert len (pvcs ) == self .config .pvc_number , "failed to create all pvcs"
393
399
self .pvcs = pvcs
394
400
401
+ @retry (
402
+ stop = stop_after_attempt (MAX_RETRIES ),
403
+ wait = wait_fixed (RETRY_DELAY ),
404
+ retry = retry_if_exception_type (ApiException ),
405
+ before_sleep = lambda retry_state : logger .warning (
406
+ f"Retry { retry_state .attempt_number } /{ MAX_RETRIES } failed: { retry_state .outcome .exception ()} "
407
+ ),
408
+ )
409
+ def get_pvc_bound_status (self ) -> list [bool ]:
410
+ statuses = []
411
+ for pvc in self .pvcs :
412
+ pvc_status = self .client .read_namespaced_persistent_volume_claim_status (
413
+ name = pvc , namespace = self .namespace
414
+ )
415
+ if pvc_status .status .phase == "Bound" :
416
+ statuses .append (True )
417
+ else :
418
+ statuses .append (False )
419
+ return statuses
420
+
395
421
def schedule (self , from_scratch : bool = False ) -> None :
396
422
if from_scratch :
397
423
self .kill_all_pods ()
398
424
self .create_tasks ()
399
425
400
426
while len (self .tasks ) > 0 :
427
+ pvc_bound_status = self .get_pvc_bound_status ()
401
428
for i in range (len (self .current_workers )):
402
429
if (
403
430
self .current_workers [i ] is None
404
431
or self .current_workers [i ].is_completed ()
405
- ):
432
+ ) and (
433
+ pvc_bound_status [i % len (self .pvcs )] or i < len (self .pvcs )
434
+ ): # we only create a new pod to intialize the pvc before the PVC is bound
406
435
if (
407
436
self .current_workers [i ] is not None
408
437
and self .current_workers [i ].is_completed ()
@@ -428,12 +457,21 @@ def schedule(self, from_scratch: bool = False) -> None:
428
457
self .task_stats [worker_pod .name ] = TaskStats (worker_pod .name )
429
458
430
459
if self .current_workers [i ] is not None :
460
+ phase = self .current_workers [i ].get_phase ()
431
461
logger .info (
432
- f"Checking worker { i } : { self .current_workers [i ].name } : { self . current_workers [ i ]. get_phase () } "
462
+ f"Checking worker { i } : { self .current_workers [i ].name } : { phase } "
433
463
)
434
464
time .sleep (QUERY_DELAY )
435
465
logger .info ("All tasks have been scheduled" )
436
466
467
+ def reschedule_pod (self , worker_pod : WorkerPod , worker_idx : int ):
468
+ # clean up the existing pod
469
+ worker_pod .delete_pod ()
470
+ # re-enter the task to the queue
471
+ self .tasks .append ((worker_pod .start_version , worker_pod .end_version ))
472
+ self .task_stats [worker_pod .name ].increment_retry_count ()
473
+ self .current_workers [worker_idx ] = None
474
+
437
475
def process_completed_pod (self , worker_pod , worker_idx ):
438
476
if worker_pod .has_txn_mismatch ():
439
477
logger .info (f"Worker { worker_pod .name } failed with txn mismatch" )
@@ -444,16 +482,11 @@ def process_completed_pod(self, worker_pod, worker_idx):
444
482
logger .info (
445
483
f"Worker { worker_pod .name } failed with { worker_pod .get_failure_reason ()} . Rescheduling"
446
484
)
447
- # clean up the existing pod
448
- worker_pod .delete_pod ()
449
- # re-enter the task to the queue
450
- self .tasks .append ((worker_pod .start_version , worker_pod .end_version ))
451
- self .task_stats [worker_pod .name ].increment_retry_count ()
485
+ self .reschedule_pod (worker_pod , worker_idx )
452
486
else :
453
487
self .failed_workpod_logs .append (worker_pod .get_humio_log_link ())
454
-
488
+ self . current_workers [ worker_idx ] = None
455
489
self .task_stats [worker_pod .name ].set_end_time ()
456
- self .current_workers [worker_idx ] = None
457
490
458
491
def cleanup (self ):
459
492
self .kill_all_pods ()
0 commit comments