Skip to content

Commit

Permalink
[V1][Metrics] Handle preemptions (vllm-project#13169)
Browse files Browse the repository at this point in the history
  • Loading branch information
markmc authored Feb 27, 2025
1 parent 378b3ef commit cd711c4
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 19 deletions.
1 change: 1 addition & 0 deletions tests/entrypoints/openai/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ async def test_metrics_counts(server: RemoteOpenAIServer,
"vllm:gpu_cache_usage_perc",
"vllm:gpu_prefix_cache_queries",
"vllm:gpu_prefix_cache_hits",
"vllm:num_preemptions_total",
"vllm:prompt_tokens_total",
"vllm:generation_tokens_total",
"vllm:iteration_tokens_total",
Expand Down
10 changes: 9 additions & 1 deletion vllm/v1/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def schedule(self) -> "SchedulerOutput":
self.kv_cache_manager.free(preempted_req)
preempted_req.status = RequestStatus.PREEMPTED
preempted_req.num_computed_tokens = 0
self.request_preempted(preempted_req, scheduled_timestamp)

self.waiting.appendleft(preempted_req)
preempted_reqs.append(preempted_req)
Expand Down Expand Up @@ -281,9 +282,9 @@ def schedule(self) -> "SchedulerOutput":
self.waiting.popleft()
self.running.append(request)
self.scheduled_req_ids.add(request.request_id)
self.request_scheduled(request, scheduled_timestamp)
if request.status == RequestStatus.WAITING:
scheduled_new_reqs.append(request)
self.request_scheduled(request, scheduled_timestamp)
elif request.status == RequestStatus.PREEMPTED:
scheduled_resumed_reqs.append(request)
else:
Expand Down Expand Up @@ -675,6 +676,13 @@ def request_scheduled(self, request: Request, timestamp: float):
EngineCoreEvent.new_event(EngineCoreEventType.SCHEDULED,
timestamp))

def request_preempted(self, request: Request, timestamp: float):
if not self.log_stats:
return
request.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.PREEMPTED,
timestamp))

def make_stats(self) -> Optional[SchedulerStats]:
if not self.log_stats:
return None
Expand Down
1 change: 1 addition & 0 deletions vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class EngineCoreEventType(enum.IntEnum):
"""The type of engine core request event."""
QUEUED = 1
SCHEDULED = 2
PREEMPTED = 3


class EngineCoreEvent(msgspec.Struct):
Expand Down
24 changes: 15 additions & 9 deletions vllm/v1/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ def __init__(self, vllm_config: VllmConfig):
"GPU prefix cache hits, in terms of number of cached blocks.",
labelnames=labelnames).labels(*labelvalues)

self.counter_num_preempted_reqs = prometheus_client.Counter(
name="vllm:num_preemptions_total",
documentation="Cumulative number of preemption from the engine.",
labelnames=labelnames).labels(*labelvalues)

self.counter_prompt_tokens = prometheus_client.Counter(
name="vllm:prompt_tokens_total",
documentation="Number of prefill tokens processed.",
Expand Down Expand Up @@ -282,17 +287,27 @@ def log(self, scheduler_stats: SchedulerStats,
self.counter_gpu_prefix_cache_hits.inc(
scheduler_stats.prefix_cache_stats.hits)

self.counter_num_preempted_reqs.inc(iteration_stats.num_preempted_reqs)
self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens)
self.counter_generation_tokens.inc(
iteration_stats.num_generation_tokens)
self.histogram_iteration_tokens.observe(
iteration_stats.num_prompt_tokens + \
iteration_stats.num_generation_tokens)

for ttft in iteration_stats.time_to_first_tokens_iter:
self.histogram_time_to_first_token.observe(ttft)
for tpot in iteration_stats.time_per_output_tokens_iter:
self.histogram_time_per_output_token.observe(tpot)

for finished_request in iteration_stats.finished_requests:
self.counter_request_success[finished_request.finish_reason].inc()
self.histogram_e2e_time_request.observe(
finished_request.e2e_latency)
self.histogram_queue_time_request.observe(
finished_request.queued_time)
self.histogram_prefill_time_request.observe(
finished_request.prefill_time)
self.histogram_inference_time_request.observe(
finished_request.inference_time)
self.histogram_decode_time_request.observe(
Expand All @@ -302,15 +317,6 @@ def log(self, scheduler_stats: SchedulerStats,
self.histogram_num_generation_tokens_request.observe(
finished_request.num_generation_tokens)

for ttft in iteration_stats.time_to_first_tokens_iter:
self.histogram_time_to_first_token.observe(ttft)
for tpot in iteration_stats.time_per_output_tokens_iter:
self.histogram_time_per_output_token.observe(tpot)
for queue_time in iteration_stats.queue_times_iter:
self.histogram_queue_time_request.observe(queue_time)
for prefill_time in iteration_stats.prefill_times_iter:
self.histogram_prefill_time_request.observe(prefill_time)

if self.gauge_lora_info is not None:
running_lora_adapters = \
",".join(iteration_stats.running_lora_adapters.keys())
Expand Down
31 changes: 22 additions & 9 deletions vllm/v1/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class FinishedRequestStats:
e2e_latency: float = 0.0
num_prompt_tokens: int = 0
num_generation_tokens: int = 0
queued_time: float = 0.0
prefill_time: float = 0.0
inference_time: float = 0.0
decode_time: float = 0.0

Expand All @@ -78,11 +80,10 @@ def __init__(self):
self.iteration_timestamp = time.time()
self.num_generation_tokens = 0
self.num_prompt_tokens = 0
self.num_preempted_reqs = 0
self.finished_requests: List[FinishedRequestStats] = []
self.time_to_first_tokens_iter: List[float] = []
self.time_per_output_tokens_iter: List[float] = []
self.queue_times_iter: List[float] = []
self.prefill_times_iter: List[float] = []
self.waiting_lora_adapters: Dict[str, int] = {}
self.running_lora_adapters: Dict[str, int] = {}

Expand Down Expand Up @@ -122,9 +123,6 @@ def update_from_output(self, output: "EngineCoreOutput",
if is_prefilling:
# TODO: re-enable no-output-for-partial-prefills invariant as above
if num_new_generation_tokens > 0:
prefill_interval = \
engine_core_timestamp - req_stats.scheduled_ts
self.prefill_times_iter.append(prefill_interval)
req_stats.first_token_ts = engine_core_timestamp
else:
tpot = engine_core_timestamp - req_stats.last_token_ts
Expand All @@ -145,24 +143,39 @@ def update_from_events(self, req_id: str, events: List["EngineCoreEvent"],
if lora_stats is not None:
lora_stats.waiting_requests.add(req_id)
elif event.type == EngineCoreEventType.SCHEDULED:
queued_interval = event.timestamp - req_stats.queued_ts
self.queue_times_iter.append(queued_interval)
req_stats.scheduled_ts = event.timestamp
if req_stats.scheduled_ts == 0.0: # ignore preemptions
req_stats.scheduled_ts = event.timestamp
LoRARequestStates.scheduled_request(lora_stats, req_id)
elif event.type == EngineCoreEventType.PREEMPTED:
self.num_preempted_reqs += 1

def update_from_finished_request(self, finish_reason: "FinishReason",
request_output: "RequestOutput",
req_stats: RequestStateStats):
e2e_latency = self._time_since(req_stats.arrival_time)

inference_time = req_stats.last_token_ts - req_stats.scheduled_ts
# Queued interval is from first QUEUED event to first SCHEDULED
queued_time = req_stats.scheduled_ts - req_stats.queued_ts

# Prefill interval is from first SCHEDULED to first NEW_TOKEN
# Any preemptions during prefill is included in the interval
prefill_time = req_stats.first_token_ts - req_stats.scheduled_ts

# Decode interval is from first NEW_TOKEN to last NEW_TOKEN
# Any preemptions during decode are included
decode_time = req_stats.last_token_ts - req_stats.first_token_ts

# Inference interval is from first SCHEDULED to last NEW_TOKEN
# Any preemptions during prefill or decode are included
inference_time = req_stats.last_token_ts - req_stats.scheduled_ts

finished_req = \
FinishedRequestStats(finish_reason=finish_reason,
e2e_latency=e2e_latency,
num_prompt_tokens=len(request_output.prompt_token_ids),
num_generation_tokens=req_stats.num_generation_tokens,
queued_time=queued_time,
prefill_time=prefill_time,
inference_time=inference_time,
decode_time=decode_time)
self.finished_requests.append(finished_req)
Expand Down

0 comments on commit cd711c4

Please sign in to comment.