Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1][Metrics] Handle preemptions #13169

Merged
merged 2 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/entrypoints/openai/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ async def test_metrics_counts(server: RemoteOpenAIServer,
"vllm:gpu_cache_usage_perc",
"vllm:gpu_prefix_cache_queries",
"vllm:gpu_prefix_cache_hits",
"vllm:num_preemptions_total",
"vllm:prompt_tokens_total",
"vllm:generation_tokens_total",
"vllm:iteration_tokens_total",
Expand Down
10 changes: 9 additions & 1 deletion vllm/v1/core/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ def schedule(self) -> "SchedulerOutput":
self.kv_cache_manager.free(preempted_req)
preempted_req.status = RequestStatus.PREEMPTED
preempted_req.num_computed_tokens = 0
self.request_preempted(preempted_req, scheduled_timestamp)

self.waiting.appendleft(preempted_req)
preempted_reqs.append(preempted_req)
Expand Down Expand Up @@ -281,9 +282,9 @@ def schedule(self) -> "SchedulerOutput":
self.waiting.popleft()
self.running.append(request)
self.scheduled_req_ids.add(request.request_id)
self.request_scheduled(request, scheduled_timestamp)
if request.status == RequestStatus.WAITING:
scheduled_new_reqs.append(request)
self.request_scheduled(request, scheduled_timestamp)
elif request.status == RequestStatus.PREEMPTED:
scheduled_resumed_reqs.append(request)
else:
Expand Down Expand Up @@ -675,6 +676,13 @@ def request_scheduled(self, request: Request, timestamp: float):
EngineCoreEvent.new_event(EngineCoreEventType.SCHEDULED,
timestamp))

def request_preempted(self, request: Request, timestamp: float):
if not self.log_stats:
return
request.events.append(
EngineCoreEvent.new_event(EngineCoreEventType.PREEMPTED,
timestamp))

def make_stats(self) -> Optional[SchedulerStats]:
if not self.log_stats:
return None
Expand Down
1 change: 1 addition & 0 deletions vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class EngineCoreEventType(enum.IntEnum):
"""The type of engine core request event."""
QUEUED = 1
SCHEDULED = 2
PREEMPTED = 3


class EngineCoreEvent(msgspec.Struct):
Expand Down
24 changes: 15 additions & 9 deletions vllm/v1/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ def __init__(self, vllm_config: VllmConfig):
"GPU prefix cache hits, in terms of number of cached blocks.",
labelnames=labelnames).labels(*labelvalues)

self.counter_num_preempted_reqs = prometheus_client.Counter(
name="vllm:num_preemptions_total",
documentation="Cumulative number of preemption from the engine.",
labelnames=labelnames).labels(*labelvalues)

self.counter_prompt_tokens = prometheus_client.Counter(
name="vllm:prompt_tokens_total",
documentation="Number of prefill tokens processed.",
Expand Down Expand Up @@ -282,17 +287,27 @@ def log(self, scheduler_stats: SchedulerStats,
self.counter_gpu_prefix_cache_hits.inc(
scheduler_stats.prefix_cache_stats.hits)

self.counter_num_preempted_reqs.inc(iteration_stats.num_preempted_reqs)
self.counter_prompt_tokens.inc(iteration_stats.num_prompt_tokens)
self.counter_generation_tokens.inc(
iteration_stats.num_generation_tokens)
self.histogram_iteration_tokens.observe(
iteration_stats.num_prompt_tokens + \
iteration_stats.num_generation_tokens)

for ttft in iteration_stats.time_to_first_tokens_iter:
self.histogram_time_to_first_token.observe(ttft)
for tpot in iteration_stats.time_per_output_tokens_iter:
self.histogram_time_per_output_token.observe(tpot)

for finished_request in iteration_stats.finished_requests:
self.counter_request_success[finished_request.finish_reason].inc()
self.histogram_e2e_time_request.observe(
finished_request.e2e_latency)
self.histogram_queue_time_request.observe(
finished_request.queued_time)
self.histogram_prefill_time_request.observe(
finished_request.prefill_time)
self.histogram_inference_time_request.observe(
finished_request.inference_time)
self.histogram_decode_time_request.observe(
Expand All @@ -302,15 +317,6 @@ def log(self, scheduler_stats: SchedulerStats,
self.histogram_num_generation_tokens_request.observe(
finished_request.num_generation_tokens)

for ttft in iteration_stats.time_to_first_tokens_iter:
self.histogram_time_to_first_token.observe(ttft)
for tpot in iteration_stats.time_per_output_tokens_iter:
self.histogram_time_per_output_token.observe(tpot)
for queue_time in iteration_stats.queue_times_iter:
self.histogram_queue_time_request.observe(queue_time)
for prefill_time in iteration_stats.prefill_times_iter:
self.histogram_prefill_time_request.observe(prefill_time)

if self.gauge_lora_info is not None:
running_lora_adapters = \
",".join(iteration_stats.running_lora_adapters.keys())
Expand Down
31 changes: 22 additions & 9 deletions vllm/v1/metrics/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class FinishedRequestStats:
e2e_latency: float = 0.0
num_prompt_tokens: int = 0
num_generation_tokens: int = 0
queued_time: float = 0.0
prefill_time: float = 0.0
inference_time: float = 0.0
decode_time: float = 0.0

Expand All @@ -78,11 +80,10 @@ def __init__(self):
self.iteration_timestamp = time.time()
self.num_generation_tokens = 0
self.num_prompt_tokens = 0
self.num_preempted_reqs = 0
self.finished_requests: List[FinishedRequestStats] = []
self.time_to_first_tokens_iter: List[float] = []
self.time_per_output_tokens_iter: List[float] = []
self.queue_times_iter: List[float] = []
self.prefill_times_iter: List[float] = []
self.waiting_lora_adapters: Dict[str, int] = {}
self.running_lora_adapters: Dict[str, int] = {}

Expand Down Expand Up @@ -122,9 +123,6 @@ def update_from_output(self, output: "EngineCoreOutput",
if is_prefilling:
# TODO: re-enable no-output-for-partial-prefills invariant as above
if num_new_generation_tokens > 0:
prefill_interval = \
engine_core_timestamp - req_stats.scheduled_ts
self.prefill_times_iter.append(prefill_interval)
req_stats.first_token_ts = engine_core_timestamp
else:
tpot = engine_core_timestamp - req_stats.last_token_ts
Expand All @@ -145,24 +143,39 @@ def update_from_events(self, req_id: str, events: List["EngineCoreEvent"],
if lora_stats is not None:
lora_stats.waiting_requests.add(req_id)
elif event.type == EngineCoreEventType.SCHEDULED:
queued_interval = event.timestamp - req_stats.queued_ts
self.queue_times_iter.append(queued_interval)
req_stats.scheduled_ts = event.timestamp
if req_stats.scheduled_ts == 0.0: # ignore preemptions
req_stats.scheduled_ts = event.timestamp
LoRARequestStates.scheduled_request(lora_stats, req_id)
elif event.type == EngineCoreEventType.PREEMPTED:
self.num_preempted_reqs += 1

def update_from_finished_request(self, finish_reason: "FinishReason",
request_output: "RequestOutput",
req_stats: RequestStateStats):
e2e_latency = self._time_since(req_stats.arrival_time)

inference_time = req_stats.last_token_ts - req_stats.scheduled_ts
# Queued interval is from first QUEUED event to first SCHEDULED
queued_time = req_stats.scheduled_ts - req_stats.queued_ts

# Prefill interval is from first SCHEDULED to first NEW_TOKEN
# Any preemptions during prefill is included in the interval
prefill_time = req_stats.first_token_ts - req_stats.scheduled_ts

# Decode interval is from first NEW_TOKEN to last NEW_TOKEN
# Any preemptions during decode are included
decode_time = req_stats.last_token_ts - req_stats.first_token_ts

# Inference interval is from first SCHEDULED to last NEW_TOKEN
# Any preemptions during prefill or decode are included
inference_time = req_stats.last_token_ts - req_stats.scheduled_ts

finished_req = \
FinishedRequestStats(finish_reason=finish_reason,
e2e_latency=e2e_latency,
num_prompt_tokens=len(request_output.prompt_token_ids),
num_generation_tokens=req_stats.num_generation_tokens,
queued_time=queued_time,
prefill_time=prefill_time,
inference_time=inference_time,
decode_time=decode_time)
self.finished_requests.append(finished_req)
Expand Down