From 5bd93d22cfc9c212dcd29127dd94275d64362680 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Thu, 16 Jan 2025 11:54:03 +0200 Subject: [PATCH 01/30] Storing previous logits and reading them in decode --- vllm/sequence.py | 26 ++++++- vllm/worker/hpu_model_runner.py | 123 +++++++++++++++++++++++++++++++- 2 files changed, 146 insertions(+), 3 deletions(-) diff --git a/vllm/sequence.py b/vllm/sequence.py index 53c8a4b73b4e3..b87faf83760e5 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -175,6 +175,9 @@ class SequenceData(msgspec.Struct, # It is used to compute mrope_position_ids. _mrope_position_delta: Optional[int] = None + _prev_logits: Optional[List[torch.Tensor]] = None + _prev_logits_idx: Optional[int] = None + @staticmethod def from_prompt_token_counts( *token_counts: Tuple[int, int]) -> "SequenceData": @@ -237,9 +240,25 @@ def cumulative_logprob(self) -> float: def prompt_token_ids(self) -> Tuple[int, ...]: return self._prompt_token_ids_tuple + @property + def prev_logits(self) -> torch.Tensor: + return self._prev_logits + + @property + def prev_logits_idx(self) -> int: + return self._prev_logits_idx + @prompt_token_ids.setter def prompt_token_ids(self, new_prompt_token_ids) -> None: raise NotImplementedError + + @prev_logits.setter + def prev_logits(self, previous_logits) -> None: + self._prev_logits = previous_logits + + @prev_logits_idx.setter + def prev_logits_idx(self, prev_logits_idx) -> None: + self._prev_logits_idx = prev_logits_idx @property def prompt_token_ids_array(self) -> array: @@ -296,6 +315,9 @@ def get_output_len(self) -> int: def get_token_ids(self) -> List[int]: return self._cached_all_token_ids + + def get_previous_logits(self) -> List[torch.Tensor]: + return self._prev_logits def get_prefix_token_ids( self, num_tokens: int @@ -380,7 +402,9 @@ def __repr__(self) -> str: f"prompt_token_ids={self._prompt_token_ids}, " f"output_token_ids={self.output_token_ids}, " f"cumulative_logprob={self.cumulative_logprob}, " - f"get_num_computed_tokens={self.get_num_computed_tokens()}") + f"get_num_computed_tokens={self.get_num_computed_tokens()}, " + f"previous_logits={self._prev_logits}", + f"previous_logits_id={self._prev_logits_id}") class Sequence: diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 0658d17edb0bc..575677a535cac 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -504,6 +504,7 @@ class ModelInputForHPU(ModelRunnerInputBase): async_callback: Optional[Callable] = None is_first_multi_step: bool = True is_last_step: bool = True + seq_group_metadata_list: Optional[List[SequenceGroupMetadata]] = None def as_broadcastable_tensor_dict(self) -> Dict[str, Any]: tensor_dict = { @@ -1341,7 +1342,8 @@ def prepare_input_tensors( "num_prefills": num_prefills, "batch_type": batch_type, "seq_lens": seq_lens, - "query_lens": query_lens + "query_lens": query_lens, + "seq_group_metadata_list": seq_group_metadata_list, } if prefill_attn_metadata is not None: metadata_dict.update(prefill_attn_metadata.asdict_zerocopy()) @@ -1362,7 +1364,8 @@ def prepare_input_tensors( multi_modal_kwargs=multi_modal_kwargs, real_batch_size=real_batch_size, batch_size_padded=batch_size_padded, - lora_ids=lora_ids), \ + lora_ids=lora_ids, + seq_group_metadata_list=seq_group_metadata_list), \ sampling_metadata def _seq_len(self, attn_metadata): @@ -2064,6 +2067,7 @@ def execute_model( previous_hidden_states: Optional[torch.Tensor] = None, seqs=None, ) -> Optional[Union[List[SamplerOutput], IntermediateTensors]]: + if not model_input.is_first_multi_step: if not model_input.is_last_step: # not first or last multi-step @@ -2072,13 +2076,18 @@ def execute_model( output = self._decode_sampler_outputs( model_input) if self.is_driver_worker else [] torch.hpu.synchronize() + + # PROMPTS DONT SAMPLE + # SAMPLING ONLY ON FIRST DECODE if model_input.is_first_multi_step: # first multi-step + if self.lora_config: assert model_input.lora_requests is not None assert model_input.lora_mapping is not None self.set_active_loras(model_input.lora_requests, model_input.lora_mapping) + input_tokens = model_input.input_tokens input_positions = model_input.input_positions attn_metadata = model_input.attn_metadata @@ -2096,6 +2105,24 @@ def execute_model( use_graphs = self._use_graphs(batch_size, seq_len, is_prompt) self._check_config(batch_size, seq_len, is_prompt, warmup_mode) + DEBUG_INFO = True if not warmup_mode else False + + if DEBUG_INFO: + req_ids = [seq_metadata.request_id for seq_metadata in model_input.seq_group_metadata_list] + has_prev_logits = [seq_group_metadata.seq_data[int(seq_group_metadata.request_id[0])].prev_logits + is not None for seq_group_metadata in model_input.seq_group_metadata_list] + + # DELAYED SAMPLING DEBUG INFO + msg = ( + "###", + f"Warmup_mode: {warmup_mode}", + "Prompt" if is_prompt else "Decode", + f"Batch_size_padded: {batch_size_padded}", + f"Req ids: {req_ids}", + f"Has prev logits: {has_prev_logits}", + "###") + logger.info(msg) + lora_mask: torch.Tensor = None lora_logits_mask: torch.Tensor = None if self.lora_config: @@ -2121,6 +2148,81 @@ def execute_model( {"bypass_hpu_graphs": not use_graphs}) htorch.core.mark_step() + + # READ PREVIOUS TOKENS IF DECODE + if self.is_driver_worker and not is_prompt: + # IDS of sequences + logits_ids_list = [] + # Tensor with logits for every seq + logits_tensor = None + # Logits from indvidual seqs + logits_tensor_list = [] + if model_input.seq_group_metadata_list is not None: + # For every sequence group + for seq_group_metadata in model_input.seq_group_metadata_list: + # Only one sequence + assert len(seq_group_metadata.seq_data) == 1 + # For sequence data + for seq_data in seq_group_metadata.seq_data.values(): + # Has prev_logits + if seq_data.prev_logits is not None: + # No logits tensor made + if logits_tensor is None: + logits_tensor = seq_data.prev_logits + # If logits the same + if seq_data.prev_logits is logits_tensor: + # add id + logits_ids_list.append( + seq_data.prev_logits_idx) + # Logits different + else: + # keep only the rows based on id + logits_tensor_list.append( + logits_tensor[torch.tensor( + logits_ids_list, + device=seq_data.prev_logits.device)]) + + logits_ids_list = [seq_data.prev_logits_idx] + logits_tensor = seq_data.prev_logits + else: + # warmup only, TODO add a check + logits_tensor_list.append( + torch.zeros([1, 32000], + dtype=torch.float, + device="hpu")) + if logits_tensor is not None: + logits_tensor_list.append(logits_tensor[torch.tensor( + logits_ids_list, device=seq_data.prev_logits.device)]) + prev_logits = torch.cat(logits_tensor_list, dim=0) + + if DEBUG_INFO: + # DELAYED SAMPLING DEBUG INFO + if prev_logits.shape[0] > 2: + print("N-1 logits in Nth iteration") + print(f"0. Logits {prev_logits[0].cpu()}") + print(f"1. Logits {prev_logits[1].cpu()}") + print(f"2. Logits {prev_logits[2].cpu()}") + + # with self.profiler.record_event( + # 'internal', f'sample_{"prompt" if is_prompt else "decode"}' + # '_bs{batch_size}_seq{seq_len}'): + # output = self.model.sample( + # logits=prev_logits, + # sampling_metadata=sampling_metadata, + # ) + + # #TODO: check why broadcast failed for float tensor use dict instead + # model_kwargs = {} + # model_kwargs["input_ids"] = output.sampled_token_ids + # broadcast_tensor_dict(model_kwargs, src=0) + # input_ids = output.sampled_token_ids + # elif self.scheduler_config.enable_delayed_sampling and not is_prompt: + # model_kwargs = broadcast_tensor_dict(src=0) + # input_ids = model_kwargs["input_ids"] + # if input_ids is not None: + # execute_model_kwargs["input_ids"] = input_ids + # htorch.core.mark_step() + if self.is_driver_worker: model_event_name = ("model_" f"{'prompt' if is_prompt else 'decode'}_" @@ -2184,6 +2286,23 @@ def try_revert_dummy_output_tokens(): sampling_metadata.selected_token_indices = None logits = self.model.compute_logits(hidden_states, sampling_metadata) + + if DEBUG_INFO: + # DELAYED SAMPLING DEBUG INFO + if logits.shape[0] > 2: + print("N-1 logits") + print(f"0. Logits {logits[0].cpu()}") + print(f"1. Logits {logits[1].cpu()}") + print(f"2. Logits {logits[2].cpu()}") + + # SAVE LOGITS TO NEXT ITERATION + if model_input.seq_group_metadata_list is not None and self.is_driver_worker: + for idx, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): + assert len(seq_group_metadata.seq_data) == 1 + for seq_data in seq_group_metadata.seq_data.values(): + seq_data.prev_logits = logits + seq_data.prev_logits_idx = idx + htorch.core.mark_step() # Only perform sampling in the driver worker. if not self.is_driver_worker: From ffcf00714cf930ae93dc7db2c067627e76b2bc59 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Fri, 17 Jan 2025 19:22:59 +0200 Subject: [PATCH 02/30] First accuracy --- vllm/config.py | 14 +- vllm/engine/arg_utils.py | 12 +- vllm/engine/output_processor/interfaces.py | 4 +- vllm/engine/output_processor/single_step.py | 7 +- vllm/model_executor/layers/sampler.py | 4 +- vllm/sequence.py | 10 +- vllm/worker/hpu_model_runner.py | 149 +++++++++++--------- 7 files changed, 125 insertions(+), 75 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index 4e5c755055f1f..21c36e144dea8 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -172,7 +172,8 @@ def __init__( hf_overrides: Optional[HfOverrides] = None, mm_processor_kwargs: Optional[Dict[str, Any]] = None, override_neuron_config: Optional[Dict[str, Any]] = None, - override_pooler_config: Optional["PoolerConfig"] = None) -> None: + override_pooler_config: Optional["PoolerConfig"] = None, + enable_delayed_sampling: bool = False) -> None: self.model = model self.tokenizer = tokenizer self.tokenizer_mode = tokenizer_mode @@ -217,6 +218,7 @@ def __init__( self.max_logprobs = max_logprobs self.disable_sliding_window = disable_sliding_window self.skip_tokenizer_init = skip_tokenizer_init + self.enable_delayed_sampling = enable_delayed_sampling hf_config = get_config(self.model, trust_remote_code, revision, code_revision, config_format) @@ -779,6 +781,16 @@ def _verify_args(self) -> None: raise ValueError( "GPU memory utilization must be less than 1.0. Got " f"{self.gpu_memory_utilization}.") + + # if self.enable_delayed_sampling and self.num_lookahead_slots != 1: + # raise ValueError( + # "num_lookahead_slots " + # f"({self.num_lookahead_slots}) must be 1 for delayed sampling." + # ) + # if self.enable_delayed_sampling and not self.use_v2_block_manager: + # raise ValueError("use_v2_block_manager " + # f"({self.use_v2_block_manager}) must be True " + # "for delayed sampling.") def _verify_cache_dtype(self) -> None: if self.cache_dtype == "auto": diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 9f932c6f26eaa..325a44751ff8b 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -170,6 +170,7 @@ class EngineArgs: scheduler_delay_factor: float = 0.0 enable_chunked_prefill: Optional[bool] = None + enable_delayed_sampling: bool = False guided_decoding_backend: str = 'xgrammar' # Speculative decoding configuration. @@ -594,6 +595,14 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: 'This should be a JSON string that will be ' 'parsed into a dictionary. Ignored if ' 'tokenizer_pool_size is 0.') + + parser.add_argument( + '--enable-delayed-sampling', + action='store_true', + help='If set, the sampling will be delayed by 1 step. First ' + 'model request execution (prefill) will return an invalid token ' + 'id that will be discarded. Actual sampling of valid token ids ' + 'starts from second model execution.') # Multimodal related configs parser.add_argument( @@ -1183,7 +1192,8 @@ def create_engine_config(self, send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER and parallel_config.use_ray), policy=self.scheduling_policy, - use_padding_aware_scheduling=self.use_padding_aware_scheduling) + use_padding_aware_scheduling=self.use_padding_aware_scheduling,) + #enable_delayed_sampling=self.enable_delayed_sampling) lora_config = LoRAConfig( bias_enabled=self.enable_lora_bias, max_lora_rank=self.max_lora_rank, diff --git a/vllm/engine/output_processor/interfaces.py b/vllm/engine/output_processor/interfaces.py index 50adaf4e59188..711d873005081 100644 --- a/vllm/engine/output_processor/interfaces.py +++ b/vllm/engine/output_processor/interfaces.py @@ -36,7 +36,9 @@ def create_output_processor( This returns a single-step output processor if num_lookahead_slots is zero, else returns a multi-step output processor. """ - if scheduler_config.num_lookahead_slots == 0: + + #! ADD FLAG + if scheduler_config.num_lookahead_slots == 0 or scheduler_config.num_lookahead_slots == 1: # Importing here to avoid cycle. from vllm.engine.output_processor.single_step import ( SingleStepOutputProcessor) diff --git a/vllm/engine/output_processor/single_step.py b/vllm/engine/output_processor/single_step.py index da3185f33dbe9..795bdf0553282 100644 --- a/vllm/engine/output_processor/single_step.py +++ b/vllm/engine/output_processor/single_step.py @@ -115,9 +115,11 @@ def _process_sequence_group_outputs(self, seq_group: SequenceGroup, sampling_params = seq_group.sampling_params sample = outputs.samples[0] + seq = seq_group.first_seq if not is_async: - seq.append_token_id(sample.output_token, sample.logprobs) + if sample.output_token != -1: + seq.append_token_id(sample.output_token, sample.logprobs) if sampling_params.detokenize and self.detokenizer: new_char_count = self.detokenizer.decode_sequence_inplace( seq, sampling_params) @@ -132,3 +134,6 @@ def _process_sequence_group_outputs(self, seq_group: SequenceGroup, if seq.is_finished(): for scheduler in self.scheduler: scheduler.free_seq(seq) + + # DEBUG + logger.info("Seq finished") \ No newline at end of file diff --git a/vllm/model_executor/layers/sampler.py b/vllm/model_executor/layers/sampler.py index ce6ec1a89ff87..9c593f330973c 100755 --- a/vllm/model_executor/layers/sampler.py +++ b/vllm/model_executor/layers/sampler.py @@ -609,7 +609,7 @@ def _greedy_sample( same as the length of selected_seq_groups. If the corresponding seq_group has do_sample=False, tuple contains ([], []) """ - samples_lst = samples.tolist() + #! samples_lst = samples.tolist() sample_idx = 0 results: SampleResultType = [] for seq_group in selected_seq_groups: @@ -622,7 +622,7 @@ def _greedy_sample( assert num_parent_seqs == 1, ( "Greedy sampling should have only one seq.") parent_ids = list(range(num_parent_seqs)) - next_token_ids = [samples_lst[sample_idx]] + next_token_ids = [sample_idx] #! results.append((next_token_ids, parent_ids)) sample_idx += num_parent_seqs return results diff --git a/vllm/sequence.py b/vllm/sequence.py index b87faf83760e5..bba01c947a561 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -302,7 +302,7 @@ def append_token_id(self, token_id: int, logprob: float) -> None: self._output_token_ids.append(token_id) self._new_appended_tokens.append(token_id) self._cached_all_token_ids.append(token_id) - self._cumulative_logprob += logprob + self._cumulative_logprob += logprob if logprob is not None else 0.0 def get_len(self) -> int: return len(self._output_token_ids) + len(self._prompt_token_ids) @@ -403,8 +403,8 @@ def __repr__(self) -> str: f"output_token_ids={self.output_token_ids}, " f"cumulative_logprob={self.cumulative_logprob}, " f"get_num_computed_tokens={self.get_num_computed_tokens()}, " - f"previous_logits={self._prev_logits}", - f"previous_logits_id={self._prev_logits_id}") + f"previous_logits={self._prev_logits}, " + f"previous_logits_id={self._prev_logits_idx})") class Sequence: @@ -560,9 +560,9 @@ def reset_state_for_recompute(self): def append_token_id(self, token_id: int, logprobs: Dict[int, Logprob]) -> None: - assert token_id in logprobs self.output_logprobs.append(logprobs) - self.data.append_token_id(token_id, logprobs[token_id].logprob) + self.data.append_token_id(token_id, + logprobs[token_id].logprob if token_id in logprobs else None) def get_len(self) -> int: return self.data.get_len() diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 575677a535cac..9ce19622b5dfe 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -769,6 +769,9 @@ def load_model(self) -> None: path_to_rope = get_path_to_rope(self.model) torch.hpu.synchronize() + # if delayed sampling + self.model.sampler.include_gpu_probs_tensor = True + with HabanaMemoryProfiler() as m_wrap: self.model = self._maybe_wrap_in_hpu_graph( self.model, @@ -1071,7 +1074,9 @@ def _prepare_decode( generation_token = seq_data.get_last_token_id() input_tokens.append([generation_token]) - seq_len = seq_data.get_len() + # DS + seq_len = (seq_data.get_num_computed_tokens() + + 1) position = seq_len - 1 input_positions.append([position]) @@ -2077,7 +2082,7 @@ def execute_model( model_input) if self.is_driver_worker else [] torch.hpu.synchronize() - # PROMPTS DONT SAMPLE + # NO SAMPLING IN PREFILLS # SAMPLING ONLY ON FIRST DECODE if model_input.is_first_multi_step: # first multi-step @@ -2105,14 +2110,15 @@ def execute_model( use_graphs = self._use_graphs(batch_size, seq_len, is_prompt) self._check_config(batch_size, seq_len, is_prompt, warmup_mode) - DEBUG_INFO = True if not warmup_mode else False + # DEBUG + DELAYED_SAMPLING_DEBUG_INFO = True + DEBUG_INFO = DELAYED_SAMPLING_DEBUG_INFO if not warmup_mode else False if DEBUG_INFO: req_ids = [seq_metadata.request_id for seq_metadata in model_input.seq_group_metadata_list] has_prev_logits = [seq_group_metadata.seq_data[int(seq_group_metadata.request_id[0])].prev_logits is not None for seq_group_metadata in model_input.seq_group_metadata_list] - # DELAYED SAMPLING DEBUG INFO msg = ( "###", f"Warmup_mode: {warmup_mode}", @@ -2149,34 +2155,23 @@ def execute_model( htorch.core.mark_step() - # READ PREVIOUS TOKENS IF DECODE + input_ids = None #? + # READ N-1th LOGITS IF DECODE (DRIVER) if self.is_driver_worker and not is_prompt: - # IDS of sequences logits_ids_list = [] - # Tensor with logits for every seq logits_tensor = None - # Logits from indvidual seqs logits_tensor_list = [] if model_input.seq_group_metadata_list is not None: - # For every sequence group for seq_group_metadata in model_input.seq_group_metadata_list: - # Only one sequence assert len(seq_group_metadata.seq_data) == 1 - # For sequence data for seq_data in seq_group_metadata.seq_data.values(): - # Has prev_logits if seq_data.prev_logits is not None: - # No logits tensor made if logits_tensor is None: logits_tensor = seq_data.prev_logits - # If logits the same if seq_data.prev_logits is logits_tensor: - # add id logits_ids_list.append( seq_data.prev_logits_idx) - # Logits different else: - # keep only the rows based on id logits_tensor_list.append( logits_tensor[torch.tensor( logits_ids_list, @@ -2193,35 +2188,32 @@ def execute_model( if logits_tensor is not None: logits_tensor_list.append(logits_tensor[torch.tensor( logits_ids_list, device=seq_data.prev_logits.device)]) + prev_logits = torch.cat(logits_tensor_list, dim=0) - if DEBUG_INFO: - # DELAYED SAMPLING DEBUG INFO - if prev_logits.shape[0] > 2: - print("N-1 logits in Nth iteration") - print(f"0. Logits {prev_logits[0].cpu()}") - print(f"1. Logits {prev_logits[1].cpu()}") - print(f"2. Logits {prev_logits[2].cpu()}") - - # with self.profiler.record_event( - # 'internal', f'sample_{"prompt" if is_prompt else "decode"}' - # '_bs{batch_size}_seq{seq_len}'): - # output = self.model.sample( - # logits=prev_logits, - # sampling_metadata=sampling_metadata, - # ) + # SAMPLE WITH N-1th LOGITS IF DECODE (DRIVER) + with self.profiler.record_event( + 'internal', f'sample_{"prompt" if is_prompt else "decode"}' + '_bs{batch_size}_seq{seq_len}'): + output = self.model.sample( + logits=prev_logits, + sampling_metadata=sampling_metadata, + ) - # #TODO: check why broadcast failed for float tensor use dict instead - # model_kwargs = {} - # model_kwargs["input_ids"] = output.sampled_token_ids - # broadcast_tensor_dict(model_kwargs, src=0) - # input_ids = output.sampled_token_ids - # elif self.scheduler_config.enable_delayed_sampling and not is_prompt: - # model_kwargs = broadcast_tensor_dict(src=0) - # input_ids = model_kwargs["input_ids"] - # if input_ids is not None: - # execute_model_kwargs["input_ids"] = input_ids - # htorch.core.mark_step() + #TODO: why is output.sampled_token_ids None + + #TODO: check why broadcast failed for float tensor use dict instead + model_kwargs = {} + model_kwargs["input_ids"] = output.sampled_token_ids #! + broadcast_tensor_dict(model_kwargs, src=0) + input_ids = output.sampled_token_ids + # ALL WORKERS GET input_ids + elif not is_prompt: + model_kwargs = broadcast_tensor_dict(src=0) + input_ids = model_kwargs["input_ids"] + if input_ids is not None: + execute_model_kwargs["input_ids"] = input_ids + htorch.core.mark_step() if self.is_driver_worker: model_event_name = ("model_" @@ -2264,6 +2256,7 @@ def try_revert_dummy_output_tokens(): self.trim_attn_metadata( broadcast_data["attn_metadata"]) }) + # FORWARD FOR N-th step with self.profiler.record_event('internal', model_event_name): hidden_states = self.model.forward( **execute_model_kwargs, @@ -2286,14 +2279,46 @@ def try_revert_dummy_output_tokens(): sampling_metadata.selected_token_indices = None logits = self.model.compute_logits(hidden_states, sampling_metadata) + + htorch.core.mark_step() + # Only perform sampling in the driver worker. + if not self.is_driver_worker: + continue + + if not is_prompt: + htorch.core.mark_step() + for i, seq_group_output in enumerate( + output.outputs[:real_batch_size]): + for sample in seq_group_output.samples: + sample.output_token = output.sampled_token_ids[i][0] + output = output + else: + # For prompts compose empty output + from vllm.sequence import (CompletionSequenceGroupOutput, + Logprob, SequenceOutput) + from vllm.model_executor.layers.sampler import SamplerOutput + + sampler_output = [] + for seq_group in sampling_metadata.seq_groups: + seq_ids = seq_group.seq_ids + next_token_id, parent_id = -1, 0 + seq_outputs = [] + seq_outputs.append( + SequenceOutput(seq_ids[parent_id], next_token_id, + {-1: Logprob(0.0)})) + sampler_output.append( + CompletionSequenceGroupOutput(seq_outputs, None)) + sampled_token_probs, logprobs_tensor, sampled_token_ids = ( + None, None, None) + output = SamplerOutput( + outputs=sampler_output, + sampled_token_probs=sampled_token_probs, + sampled_token_ids=sampled_token_ids, + logprobs=logprobs_tensor, + ) - if DEBUG_INFO: - # DELAYED SAMPLING DEBUG INFO - if logits.shape[0] > 2: - print("N-1 logits") - print(f"0. Logits {logits[0].cpu()}") - print(f"1. Logits {logits[1].cpu()}") - print(f"2. Logits {logits[2].cpu()}") + output.outputs = output.outputs[:real_batch_size] + htorch.core.mark_step() # SAVE LOGITS TO NEXT ITERATION if model_input.seq_group_metadata_list is not None and self.is_driver_worker: @@ -2303,28 +2328,24 @@ def try_revert_dummy_output_tokens(): seq_data.prev_logits = logits seq_data.prev_logits_idx = idx - htorch.core.mark_step() - # Only perform sampling in the driver worker. - if not self.is_driver_worker: - continue - if model_input.async_callback is not None: model_input.async_callback() # Sample the next token. - with self.profiler.record_event( - 'internal', ('sample_' - f'{"prompt" if is_prompt else "decode"}_' - f'bs{batch_size}_' - f'seq{seq_len}')): - output = self.model.sample( - logits=logits, - sampling_metadata=sampling_metadata, - ) + # with self.profiler.record_event( + # 'internal', ('sample_' + # f'{"prompt" if is_prompt else "decode"}_' + # f'bs{batch_size}_' + # f'seq{seq_len}')): + # output = self.model.sample( + # logits=logits, + # sampling_metadata=sampling_metadata, + # ) if num_steps > 1: output = output.sampled_token_ids self.cached_step_outputs.append( output.detach().clone()) htorch.core.mark_step() + if i < num_steps - 1: if i == 0: if model_input.async_callback is not None: From bb0a7615b77fd2f9fc10ae402e7fc5665a63bd2a Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 15 Jan 2025 15:27:25 +0100 Subject: [PATCH 03/30] Set vllm-hpu-extension to 6ac93fb (#684) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit remove expert_max hard code (#47) vLLM-Ext: Full enabling of ALiBi (#34) Add version inference via setuptools-scm (#58) Revert "vLLM-Ext: Full enabling of ALiBi (#34)" (#59) Remove punica_hpu.py from vllm_hpu_extension (#66) Removed previous (not-pipelined) pa implementation (#72) Add flag to enable running softmax in fp32 (#71) Update calibration readme link (#73) allow lm_head quantization in calibration process (#65) Pad to bmin if value is less (#67) Update pyproject.toml (https://github.com/HabanaAI/vllm-fork/pull/75) --------- Co-authored-by: MichaƂ Kuligowski --- requirements-hpu.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-hpu.txt b/requirements-hpu.txt index f4fb89ef42834..084d2a736521a 100644 --- a/requirements-hpu.txt +++ b/requirements-hpu.txt @@ -8,4 +8,4 @@ pandas tabulate setuptools>=61 setuptools-scm>=8 -vllm-hpu-extension @ git+https://github.com/HabanaAI/vllm-hpu-extension.git@4312768 +vllm-hpu-extension @ git+https://github.com/HabanaAI/vllm-hpu-extension.git@d05c0a7 From 6c4fd0b25dcaf0b5106ffc7fe72e31b7efdc1014 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Tue, 21 Jan 2025 18:18:09 +0200 Subject: [PATCH 04/30] Flag checking and MSS --- vllm/config.py | 10 +- vllm/engine/arg_utils.py | 4 +- vllm/engine/output_processor/interfaces.py | 4 +- vllm/engine/output_processor/single_step.py | 2 +- vllm/model_executor/layers/sampler.py | 21 ++- vllm/worker/hpu_model_runner.py | 169 +++++++++----------- 6 files changed, 104 insertions(+), 106 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index 21c36e144dea8..16fef0bd88fa9 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -172,8 +172,7 @@ def __init__( hf_overrides: Optional[HfOverrides] = None, mm_processor_kwargs: Optional[Dict[str, Any]] = None, override_neuron_config: Optional[Dict[str, Any]] = None, - override_pooler_config: Optional["PoolerConfig"] = None, - enable_delayed_sampling: bool = False) -> None: + override_pooler_config: Optional["PoolerConfig"] = None) -> None: self.model = model self.tokenizer = tokenizer self.tokenizer_mode = tokenizer_mode @@ -218,7 +217,6 @@ def __init__( self.max_logprobs = max_logprobs self.disable_sliding_window = disable_sliding_window self.skip_tokenizer_init = skip_tokenizer_init - self.enable_delayed_sampling = enable_delayed_sampling hf_config = get_config(self.model, trust_remote_code, revision, code_revision, config_format) @@ -1118,6 +1116,7 @@ class SchedulerConfig: preemption_mode: Optional[str] = None num_scheduler_steps: int = 1 + enable_delayed_sampling: bool = False multi_step_stream_outputs: bool = False @@ -1208,6 +1207,11 @@ def _verify_args(self) -> None: "num_scheduler_steps " f"({self.num_scheduler_steps}) must be greater than or " "equal to 1.") + if self.enable_delayed_sampling and self.num_lookahead_slots != 1: + raise ValueError( + "num_lookahead_slots " + f"({self.num_lookahead_slots}) must be 1 for delayed sampling." + ) if self.max_num_prefill_seqs is not None \ and not self.use_padding_aware_scheduling: raise ValueError("max_num_prefill_seqs can be only " diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 325a44751ff8b..203c19512e97c 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -1192,8 +1192,8 @@ def create_engine_config(self, send_delta_data=(envs.VLLM_USE_RAY_SPMD_WORKER and parallel_config.use_ray), policy=self.scheduling_policy, - use_padding_aware_scheduling=self.use_padding_aware_scheduling,) - #enable_delayed_sampling=self.enable_delayed_sampling) + use_padding_aware_scheduling=self.use_padding_aware_scheduling, + enable_delayed_sampling=self.enable_delayed_sampling) lora_config = LoRAConfig( bias_enabled=self.enable_lora_bias, max_lora_rank=self.max_lora_rank, diff --git a/vllm/engine/output_processor/interfaces.py b/vllm/engine/output_processor/interfaces.py index 711d873005081..4a06a42687be4 100644 --- a/vllm/engine/output_processor/interfaces.py +++ b/vllm/engine/output_processor/interfaces.py @@ -37,8 +37,8 @@ def create_output_processor( zero, else returns a multi-step output processor. """ - #! ADD FLAG - if scheduler_config.num_lookahead_slots == 0 or scheduler_config.num_lookahead_slots == 1: + if (scheduler_config.num_lookahead_slots == 0 or + (scheduler_config.num_lookahead_slots == 1 and scheduler_config.enable_delayed_sampling)): # Importing here to avoid cycle. from vllm.engine.output_processor.single_step import ( SingleStepOutputProcessor) diff --git a/vllm/engine/output_processor/single_step.py b/vllm/engine/output_processor/single_step.py index 795bdf0553282..9e4011896075d 100644 --- a/vllm/engine/output_processor/single_step.py +++ b/vllm/engine/output_processor/single_step.py @@ -136,4 +136,4 @@ def _process_sequence_group_outputs(self, seq_group: SequenceGroup, scheduler.free_seq(seq) # DEBUG - logger.info("Seq finished") \ No newline at end of file + #logger.info("Seq finished") \ No newline at end of file diff --git a/vllm/model_executor/layers/sampler.py b/vllm/model_executor/layers/sampler.py index 9c593f330973c..204d5fe1ff590 100755 --- a/vllm/model_executor/layers/sampler.py +++ b/vllm/model_executor/layers/sampler.py @@ -186,6 +186,7 @@ def __init__(self): # containing the sampled token ids and probabilities. This is used by # speculative decoding. self.include_gpu_probs_tensor = False + self.sample_token_positions_only = False self.should_modify_greedy_probs_inplace = False def _init_sampling_tensors( @@ -302,6 +303,7 @@ def forward( sampling_tensors, include_gpu_probs_tensor=self.include_gpu_probs_tensor, modify_greedy_probs=self._should_modify_greedy_probs_inplace, + token_positions_only=self.sample_token_positions_only, ) if self.include_gpu_probs_tensor: @@ -596,6 +598,7 @@ def _apply_min_p( def _greedy_sample( selected_seq_groups: List[SequenceGroupToSample], samples: torch.Tensor, + token_positions_only: bool = False, ) -> SampleResultType: """Run greedy sampling on a given samples. @@ -609,7 +612,8 @@ def _greedy_sample( same as the length of selected_seq_groups. If the corresponding seq_group has do_sample=False, tuple contains ([], []) """ - #! samples_lst = samples.tolist() + if not token_positions_only: + samples_lst = samples.tolist() sample_idx = 0 results: SampleResultType = [] for seq_group in selected_seq_groups: @@ -622,7 +626,9 @@ def _greedy_sample( assert num_parent_seqs == 1, ( "Greedy sampling should have only one seq.") parent_ids = list(range(num_parent_seqs)) - next_token_ids = [sample_idx] #! + next_token_ids = [ + sample_idx if token_positions_only else samples_lst[sample_idx] + ] results.append((next_token_ids, parent_ids)) sample_idx += num_parent_seqs return results @@ -806,7 +812,8 @@ def _top_k_top_p_multinomial_with_flashinfer( def get_pythonized_sample_results( - sample_result_args: SampleResultArgsType) -> SampleResultType: + sample_result_args: SampleResultArgsType, + token_positions_only) -> SampleResultType: '''This function consumes GPU-side sampler results and computes Pythonized CPU-side sampler results (GPU -> CPU sync.) @@ -844,7 +851,8 @@ def get_pythonized_sample_results( continue (seq_group_id, seq_groups) = sample_metadata[sampling_type] if sampling_type == SamplingType.GREEDY: - sample_results = _greedy_sample(seq_groups, greedy_samples) + sample_results = _greedy_sample(seq_groups, greedy_samples, + token_positions_only) elif sampling_type in (SamplingType.RANDOM, SamplingType.RANDOM_SEED): sample_results = _random_sample(seq_groups, multinomial_samples[sampling_type]) @@ -866,6 +874,7 @@ def _sample_with_torch( sampling_tensors: SamplingTensors, include_gpu_probs_tensor: bool, modify_greedy_probs: bool, + token_positions_only: bool = False, ) -> SampleReturnType: '''Torch-oriented _sample() implementation. @@ -981,7 +990,7 @@ def _sample_with_torch( # This also converts the sampler output to a Python object. # Return Pythonized sampler result & sampled token ids return get_pythonized_sample_results( - maybe_deferred_args), sampled_token_ids_tensor + maybe_deferred_args, token_positions_only), sampled_token_ids_tensor else: # Defer sampler result Pythonization; return deferred # Pythonization args & sampled token ids @@ -998,6 +1007,7 @@ def _sample( sampling_tensors: SamplingTensors, include_gpu_probs_tensor: bool, modify_greedy_probs: bool, + token_positions_only: bool ) -> SampleReturnType: """ Args: @@ -1018,6 +1028,7 @@ def _sample( sampling_tensors, include_gpu_probs_tensor=include_gpu_probs_tensor, modify_greedy_probs=modify_greedy_probs, + token_positions_only=token_positions_only ) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 9ce19622b5dfe..c87a4a2ff6ade 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -769,8 +769,9 @@ def load_model(self) -> None: path_to_rope = get_path_to_rope(self.model) torch.hpu.synchronize() - # if delayed sampling - self.model.sampler.include_gpu_probs_tensor = True + if self.scheduler_config.enable_delayed_sampling: + self.model.sampler.include_gpu_probs_tensor = True + self.model.sampler.sample_token_positions_only = True with HabanaMemoryProfiler() as m_wrap: self.model = self._maybe_wrap_in_hpu_graph( @@ -1075,8 +1076,9 @@ def _prepare_decode( input_tokens.append([generation_token]) # DS - seq_len = (seq_data.get_num_computed_tokens() + - 1) + seq_len = ((seq_data.get_num_computed_tokens() + + 1) if self.scheduler_config.enable_delayed_sampling + else seq_data.get_len()) position = seq_len - 1 input_positions.append([position]) @@ -2082,8 +2084,6 @@ def execute_model( model_input) if self.is_driver_worker else [] torch.hpu.synchronize() - # NO SAMPLING IN PREFILLS - # SAMPLING ONLY ON FIRST DECODE if model_input.is_first_multi_step: # first multi-step @@ -2110,25 +2110,6 @@ def execute_model( use_graphs = self._use_graphs(batch_size, seq_len, is_prompt) self._check_config(batch_size, seq_len, is_prompt, warmup_mode) - # DEBUG - DELAYED_SAMPLING_DEBUG_INFO = True - DEBUG_INFO = DELAYED_SAMPLING_DEBUG_INFO if not warmup_mode else False - - if DEBUG_INFO: - req_ids = [seq_metadata.request_id for seq_metadata in model_input.seq_group_metadata_list] - has_prev_logits = [seq_group_metadata.seq_data[int(seq_group_metadata.request_id[0])].prev_logits - is not None for seq_group_metadata in model_input.seq_group_metadata_list] - - msg = ( - "###", - f"Warmup_mode: {warmup_mode}", - "Prompt" if is_prompt else "Decode", - f"Batch_size_padded: {batch_size_padded}", - f"Req ids: {req_ids}", - f"Has prev logits: {has_prev_logits}", - "###") - logger.info(msg) - lora_mask: torch.Tensor = None lora_logits_mask: torch.Tensor = None if self.lora_config: @@ -2155,9 +2136,11 @@ def execute_model( htorch.core.mark_step() - input_ids = None #? - # READ N-1th LOGITS IF DECODE (DRIVER) - if self.is_driver_worker and not is_prompt: + input_ids = None + # Delayed sampling + # Sample the next token based on previous logits if any. + if self.scheduler_config.enable_delayed_sampling \ + and self.is_driver_worker and not is_prompt: logits_ids_list = [] logits_tensor = None logits_tensor_list = [] @@ -2191,7 +2174,6 @@ def execute_model( prev_logits = torch.cat(logits_tensor_list, dim=0) - # SAMPLE WITH N-1th LOGITS IF DECODE (DRIVER) with self.profiler.record_event( 'internal', f'sample_{"prompt" if is_prompt else "decode"}' '_bs{batch_size}_seq{seq_len}'): @@ -2200,14 +2182,10 @@ def execute_model( sampling_metadata=sampling_metadata, ) - #TODO: why is output.sampled_token_ids None - - #TODO: check why broadcast failed for float tensor use dict instead model_kwargs = {} - model_kwargs["input_ids"] = output.sampled_token_ids #! + model_kwargs["input_ids"] = output.sampled_token_ids broadcast_tensor_dict(model_kwargs, src=0) input_ids = output.sampled_token_ids - # ALL WORKERS GET input_ids elif not is_prompt: model_kwargs = broadcast_tensor_dict(src=0) input_ids = model_kwargs["input_ids"] @@ -2256,7 +2234,7 @@ def try_revert_dummy_output_tokens(): self.trim_attn_metadata( broadcast_data["attn_metadata"]) }) - # FORWARD FOR N-th step + # Model forward with self.profiler.record_event('internal', model_event_name): hidden_states = self.model.forward( **execute_model_kwargs, @@ -2267,7 +2245,44 @@ def try_revert_dummy_output_tokens(): LoraMask.setLoraMask( lora_logits_mask.index_select( 0, sampling_metadata.selected_token_indices)) - + + # Read and update the previous token ids to return + if self.scheduler_config.enable_delayed_sampling \ + and self.is_driver_worker and i==0: + if not is_prompt: + htorch.core.mark_step() + for i, seq_group_output in enumerate( + output.outputs[:real_batch_size]): + for sample in seq_group_output.samples: + sample.output_token = output.sampled_token_ids[i][0] + else: + # For prompts compose empty output + from vllm.sequence import (CompletionSequenceGroupOutput, + Logprob, SequenceOutput) + from vllm.model_executor.layers.sampler import SamplerOutput + + sampler_output = [] + for seq_group in sampling_metadata.seq_groups: + seq_ids = seq_group.seq_ids + next_token_id, parent_id = -1, 0 + seq_outputs = [] + seq_outputs.append( + SequenceOutput(seq_ids[parent_id], next_token_id, + {-1: Logprob(0.0)})) + sampler_output.append( + CompletionSequenceGroupOutput(seq_outputs, None)) + sampled_token_probs, logprobs_tensor, sampled_token_ids = ( + None, None, None) + output = SamplerOutput( + outputs=sampler_output, + sampled_token_probs=sampled_token_probs, + sampled_token_ids=sampled_token_ids, + logprobs=logprobs_tensor, + ) + + output.outputs = output.outputs[:real_batch_size] + htorch.core.mark_step() + # Compute the logits. with self.profiler.record_event( 'internal', @@ -2279,72 +2294,40 @@ def try_revert_dummy_output_tokens(): sampling_metadata.selected_token_indices = None logits = self.model.compute_logits(hidden_states, sampling_metadata) - - htorch.core.mark_step() - # Only perform sampling in the driver worker. - if not self.is_driver_worker: - continue - if not is_prompt: - htorch.core.mark_step() - for i, seq_group_output in enumerate( - output.outputs[:real_batch_size]): - for sample in seq_group_output.samples: - sample.output_token = output.sampled_token_ids[i][0] - output = output - else: - # For prompts compose empty output - from vllm.sequence import (CompletionSequenceGroupOutput, - Logprob, SequenceOutput) - from vllm.model_executor.layers.sampler import SamplerOutput - - sampler_output = [] - for seq_group in sampling_metadata.seq_groups: - seq_ids = seq_group.seq_ids - next_token_id, parent_id = -1, 0 - seq_outputs = [] - seq_outputs.append( - SequenceOutput(seq_ids[parent_id], next_token_id, - {-1: Logprob(0.0)})) - sampler_output.append( - CompletionSequenceGroupOutput(seq_outputs, None)) - sampled_token_probs, logprobs_tensor, sampled_token_ids = ( - None, None, None) - output = SamplerOutput( - outputs=sampler_output, - sampled_token_probs=sampled_token_probs, - sampled_token_ids=sampled_token_ids, - logprobs=logprobs_tensor, - ) - - output.outputs = output.outputs[:real_batch_size] - htorch.core.mark_step() - - # SAVE LOGITS TO NEXT ITERATION - if model_input.seq_group_metadata_list is not None and self.is_driver_worker: + # Save logits and idx + if (self.scheduler_config.enable_delayed_sampling and model_input.seq_group_metadata_list is not None and self.is_driver_worker): for idx, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): assert len(seq_group_metadata.seq_data) == 1 for seq_data in seq_group_metadata.seq_data.values(): seq_data.prev_logits = logits seq_data.prev_logits_idx = idx + htorch.core.mark_step() + + # Only perform sampling in the driver worker. + if not self.is_driver_worker: + continue + if model_input.async_callback is not None: model_input.async_callback() - # Sample the next token. - # with self.profiler.record_event( - # 'internal', ('sample_' - # f'{"prompt" if is_prompt else "decode"}_' - # f'bs{batch_size}_' - # f'seq{seq_len}')): - # output = self.model.sample( - # logits=logits, - # sampling_metadata=sampling_metadata, - # ) - if num_steps > 1: - output = output.sampled_token_ids - self.cached_step_outputs.append( - output.detach().clone()) - htorch.core.mark_step() + + if not self.scheduler_config.enable_delayed_sampling or i==(num_steps-1): + # Sample the next token. + with self.profiler.record_event( + 'internal', ('sample_' + f'{"prompt" if is_prompt else "decode"}_' + f'bs{batch_size}_' + f'seq{seq_len}')): + output = self.model.sample( + logits=logits, + sampling_metadata=sampling_metadata, + ) + if num_steps > 1: + output = output.sampled_token_ids + self.cached_step_outputs.append( + output.detach().clone()) + htorch.core.mark_step() if i < num_steps - 1: if i == 0: From 8b98f21e8ddbaaccf5337a62cf899a337c927189 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 22 Jan 2025 11:15:58 +0200 Subject: [PATCH 05/30] Flag bug fix --- vllm/worker/hpu_model_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index c87a4a2ff6ade..ecb839d192cd4 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2186,7 +2186,7 @@ def execute_model( model_kwargs["input_ids"] = output.sampled_token_ids broadcast_tensor_dict(model_kwargs, src=0) input_ids = output.sampled_token_ids - elif not is_prompt: + elif self.scheduler_config.enable_delayed_sampling and not is_prompt: model_kwargs = broadcast_tensor_dict(src=0) input_ids = model_kwargs["input_ids"] if input_ids is not None: From fc0d6f54c23c5532f0e45a4f5600288fba7133ac Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 22 Jan 2025 11:40:23 +0200 Subject: [PATCH 06/30] checking num_lookahead_slots --- vllm/config.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index 16fef0bd88fa9..81507b477f1d0 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -780,15 +780,11 @@ def _verify_args(self) -> None: "GPU memory utilization must be less than 1.0. Got " f"{self.gpu_memory_utilization}.") - # if self.enable_delayed_sampling and self.num_lookahead_slots != 1: - # raise ValueError( - # "num_lookahead_slots " - # f"({self.num_lookahead_slots}) must be 1 for delayed sampling." - # ) - # if self.enable_delayed_sampling and not self.use_v2_block_manager: - # raise ValueError("use_v2_block_manager " - # f"({self.use_v2_block_manager}) must be True " - # "for delayed sampling.") + if self.enable_delayed_sampling and self.num_lookahead_slots != 1: + raise ValueError( + "num_lookahead_slots " + f"({self.num_lookahead_slots}) must be 1 for delayed sampling." + ) def _verify_cache_dtype(self) -> None: if self.cache_dtype == "auto": From f16fa53a25064f55f127637bd08701b7c2df27ca Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 22 Jan 2025 12:42:51 +0200 Subject: [PATCH 07/30] Config fix --- vllm/config.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/vllm/config.py b/vllm/config.py index 81507b477f1d0..fcca829f9ea45 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -750,6 +750,7 @@ def __init__( sliding_window: Optional[int] = None, enable_prefix_caching: bool = False, cpu_offload_gb: float = 0, + enable_delayed_sampling: bool = False, ) -> None: self.block_size = block_size self.gpu_memory_utilization = gpu_memory_utilization @@ -779,12 +780,6 @@ def _verify_args(self) -> None: raise ValueError( "GPU memory utilization must be less than 1.0. Got " f"{self.gpu_memory_utilization}.") - - if self.enable_delayed_sampling and self.num_lookahead_slots != 1: - raise ValueError( - "num_lookahead_slots " - f"({self.num_lookahead_slots}) must be 1 for delayed sampling." - ) def _verify_cache_dtype(self) -> None: if self.cache_dtype == "auto": From 8d3602fb7b3b6809f1bf50537434665da3c12d25 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 22 Jan 2025 12:45:03 +0200 Subject: [PATCH 08/30] Config fix --- vllm/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/vllm/config.py b/vllm/config.py index fcca829f9ea45..fc805703daf67 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -750,7 +750,6 @@ def __init__( sliding_window: Optional[int] = None, enable_prefix_caching: bool = False, cpu_offload_gb: float = 0, - enable_delayed_sampling: bool = False, ) -> None: self.block_size = block_size self.gpu_memory_utilization = gpu_memory_utilization From 5ed2c119be9f206238da8ad64a11df18eb68e32e Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 22 Jan 2025 15:49:49 +0200 Subject: [PATCH 09/30] Config fix for MSS --- vllm/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/config.py b/vllm/config.py index fc805703daf67..c2fedc61d109c 100644 --- a/vllm/config.py +++ b/vllm/config.py @@ -1197,7 +1197,7 @@ def _verify_args(self) -> None: "num_scheduler_steps " f"({self.num_scheduler_steps}) must be greater than or " "equal to 1.") - if self.enable_delayed_sampling and self.num_lookahead_slots != 1: + if self.enable_delayed_sampling and self.num_lookahead_slots < 1: raise ValueError( "num_lookahead_slots " f"({self.num_lookahead_slots}) must be 1 for delayed sampling." From 85ac250a256f6aae580a4346c187bde0bb525f16 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Thu, 23 Jan 2025 13:34:49 +0200 Subject: [PATCH 10/30] Set Triton version --- requirements-hpu.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements-hpu.txt b/requirements-hpu.txt index 084d2a736521a..215a760c05366 100644 --- a/requirements-hpu.txt +++ b/requirements-hpu.txt @@ -3,7 +3,7 @@ # Dependencies for HPU code ray -triton +triton==3.1.0 pandas tabulate setuptools>=61 From fdb3b9fc0a8e9d589ba6522dff6f83a19eda269b Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Fri, 24 Jan 2025 18:39:54 +0200 Subject: [PATCH 11/30] Bug fixes --- vllm/worker/hpu_model_runner.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index c65a7755bd2f7..ab1c5e99c679e 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2164,6 +2164,7 @@ def execute_model( input_ids = None # Delayed sampling # Sample the next token based on previous logits if any. + # With and without MSS if self.scheduler_config.enable_delayed_sampling \ and self.is_driver_worker and not is_prompt: logits_ids_list = [] @@ -2195,10 +2196,11 @@ def execute_model( device="hpu")) if logits_tensor is not None: logits_tensor_list.append(logits_tensor[torch.tensor( - logits_ids_list, device=seq_data.prev_logits.device)]) + logits_ids_list, device=logits_tensor.device)]) prev_logits = torch.cat(logits_tensor_list, dim=0) + # Sample next token - delayed sampling with self.profiler.record_event( 'internal', f'sample_{"prompt" if is_prompt else "decode"}' '_bs{batch_size}_seq{seq_len}'): @@ -2276,10 +2278,10 @@ def try_revert_dummy_output_tokens(): and self.is_driver_worker and i==0: if not is_prompt: htorch.core.mark_step() - for i, seq_group_output in enumerate( + for j, seq_group_output in enumerate( output.outputs[:real_batch_size]): for sample in seq_group_output.samples: - sample.output_token = output.sampled_token_ids[i][0] + sample.output_token = output.sampled_token_ids[j][0] else: # For prompts compose empty output from vllm.sequence import (CompletionSequenceGroupOutput, @@ -2320,7 +2322,20 @@ def try_revert_dummy_output_tokens(): logits = self.model.compute_logits(hidden_states, sampling_metadata) - # Save logits and idx + + # Delayed sampling + # MSS: Sample for decodes, but not for the last one + is_last_multistep = i == (num_steps - 1) and i != 0 + # No DS + if self.scheduler_config.enable_delayed_sampling: + if num_steps==1: + should_sample = False # Delayed sampling, no MSS, never sample + else: # MSS + should_sample = False if is_last_multistep else True + else: + should_sample = True + + #! Should Save logits and idx only when not sampling if (self.scheduler_config.enable_delayed_sampling and model_input.seq_group_metadata_list is not None and self.is_driver_worker): for idx, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): assert len(seq_group_metadata.seq_data) == 1 @@ -2329,15 +2344,14 @@ def try_revert_dummy_output_tokens(): seq_data.prev_logits_idx = idx htorch.core.mark_step() - # Only perform sampling in the driver worker. if not self.is_driver_worker: continue if model_input.async_callback is not None: model_input.async_callback() - - if not self.scheduler_config.enable_delayed_sampling or i==(num_steps-1): + + if should_sample: # Sample the next token. with self.profiler.record_event( 'internal', ('sample_' @@ -2395,7 +2409,7 @@ def try_revert_dummy_output_tokens(): else: try_revert_dummy_output_tokens() return [] - + result = self._prepare_decode(seq_group_metadata_list, output=output) if self.lora_config: From 74c87c5ad56a70536f647741dd80decd56967573 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Fri, 24 Jan 2025 18:45:05 +0200 Subject: [PATCH 12/30] Typo --- vllm/worker/hpu_model_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index ab1c5e99c679e..bb128fb327f36 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2325,7 +2325,7 @@ def try_revert_dummy_output_tokens(): # Delayed sampling # MSS: Sample for decodes, but not for the last one - is_last_multistep = i == (num_steps - 1) and i != 0 + is_last_multistep = True if ((i == num_steps - 1) and i != 0) else False # No DS if self.scheduler_config.enable_delayed_sampling: if num_steps==1: From 346ff80dd652d53d02861be8054bff547fd27d7c Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Mon, 27 Jan 2025 11:50:55 +0200 Subject: [PATCH 13/30] No logits saving when sampling --- vllm/worker/hpu_model_runner.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index bb128fb327f36..2c23a540a1ac8 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -803,6 +803,9 @@ def _add_dummy_seq(self, seq_group_metadata_list, is_prompt): real_batch_size, is_prompt) batch_size_padding = batch_size_padded - real_batch_size + #! TODO: batch size padding breakes accuracy + batch_size_padding =0 + seq_group_metadata_list = seq_group_metadata_list.copy() if batch_size_padding > 0: @@ -2161,6 +2164,7 @@ def execute_model( htorch.core.mark_step() + #breakpoint() input_ids = None # Delayed sampling # Sample the next token based on previous logits if any. @@ -2340,8 +2344,8 @@ def try_revert_dummy_output_tokens(): for idx, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): assert len(seq_group_metadata.seq_data) == 1 for seq_data in seq_group_metadata.seq_data.values(): - seq_data.prev_logits = logits - seq_data.prev_logits_idx = idx + seq_data.prev_logits = logits if not should_sample else None + seq_data.prev_logits_idx = idx if not should_sample else None htorch.core.mark_step() # Only perform sampling in the driver worker. From c3cfca9e70e3c099c27222e5927e6bc9483048a5 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Mon, 27 Jan 2025 16:00:13 +0200 Subject: [PATCH 14/30] BS padding fix --- vllm/worker/hpu_model_runner.py | 46 ++++++++++++++++----------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 2c23a540a1ac8..a4fff280125c5 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -803,9 +803,6 @@ def _add_dummy_seq(self, seq_group_metadata_list, is_prompt): real_batch_size, is_prompt) batch_size_padding = batch_size_padded - real_batch_size - #! TODO: batch size padding breakes accuracy - batch_size_padding =0 - seq_group_metadata_list = seq_group_metadata_list.copy() if batch_size_padding > 0: @@ -1485,6 +1482,7 @@ def create_dummy_seq_group_metadata(self, output_token_ids = [1] * output_len prompt_token_ids_array = array('l', prompt_token_ids) # noqa: F821 seq_data = SequenceData(prompt_token_ids_array) + seq_data.output_token_ids = output_token_ids return SequenceGroupMetadata(request_id=str(group_id), is_prompt=(output_len == 0), @@ -2164,7 +2162,6 @@ def execute_model( htorch.core.mark_step() - #breakpoint() input_ids = None # Delayed sampling # Sample the next token based on previous logits if any. @@ -2175,33 +2172,34 @@ def execute_model( logits_tensor = None logits_tensor_list = [] if model_input.seq_group_metadata_list is not None: - for seq_group_metadata in model_input.seq_group_metadata_list: + for i, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): assert len(seq_group_metadata.seq_data) == 1 for seq_data in seq_group_metadata.seq_data.values(): - if seq_data.prev_logits is not None: - if logits_tensor is None: - logits_tensor = seq_data.prev_logits - if seq_data.prev_logits is logits_tensor: - logits_ids_list.append( - seq_data.prev_logits_idx) - else: - logits_tensor_list.append( - logits_tensor[torch.tensor( - logits_ids_list, - device=seq_data.prev_logits.device)]) - - logits_ids_list = [seq_data.prev_logits_idx] - logits_tensor = seq_data.prev_logits + if seq_data.prev_logits is None: + # Padded sequences and warmup + #TODO: Add some sort of check based on metadata(?) + seq_data.prev_logits = torch.zeros([1, 32000], + dtype=torch.float, + device="hpu") + seq_data.prev_logits_idx = i + if logits_tensor is None: + logits_tensor = seq_data.prev_logits + if seq_data.prev_logits is logits_tensor: + logits_ids_list.append( + seq_data.prev_logits_idx) else: - # warmup only, TODO add a check logits_tensor_list.append( - torch.zeros([1, 32000], - dtype=torch.float, - device="hpu")) + logits_tensor[torch.tensor( + logits_ids_list, + device=seq_data.prev_logits.device)]) + + logits_ids_list = [seq_data.prev_logits_idx] + logits_tensor = seq_data.prev_logits + if logits_tensor is not None: logits_tensor_list.append(logits_tensor[torch.tensor( logits_ids_list, device=logits_tensor.device)]) - + prev_logits = torch.cat(logits_tensor_list, dim=0) # Sample next token - delayed sampling From 4658ba5dc04407218d5e4607e8c3ae9f55d1f0ed Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Tue, 28 Jan 2025 14:06:54 +0200 Subject: [PATCH 15/30] Set correct vocab size --- vllm/sequence.py | 4 ++-- vllm/worker/hpu_model_runner.py | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/vllm/sequence.py b/vllm/sequence.py index 9260a43c90b2f..c657a909fdea4 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -403,8 +403,8 @@ def __repr__(self) -> str: f"output_token_ids={self.output_token_ids}, " f"cumulative_logprob={self.cumulative_logprob}, " f"get_num_computed_tokens={self.get_num_computed_tokens()}, " - f"previous_logits={self._prev_logits}, " - f"previous_logits_id={self._prev_logits_idx})") + f"prev_logits={self._prev_logits}, " + f"prev_logits_id={self._prev_logits_idx})") class Sequence: diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index a4fff280125c5..d9ba266496cf0 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2178,7 +2178,7 @@ def execute_model( if seq_data.prev_logits is None: # Padded sequences and warmup #TODO: Add some sort of check based on metadata(?) - seq_data.prev_logits = torch.zeros([1, 32000], + seq_data.prev_logits = torch.zeros([1, self.vocab_size], dtype=torch.float, device="hpu") seq_data.prev_logits_idx = i @@ -2199,7 +2199,7 @@ def execute_model( if logits_tensor is not None: logits_tensor_list.append(logits_tensor[torch.tensor( logits_ids_list, device=logits_tensor.device)]) - + prev_logits = torch.cat(logits_tensor_list, dim=0) # Sample next token - delayed sampling @@ -2218,6 +2218,7 @@ def execute_model( elif self.scheduler_config.enable_delayed_sampling and not is_prompt: model_kwargs = broadcast_tensor_dict(src=0) input_ids = model_kwargs["input_ids"] + if input_ids is not None: execute_model_kwargs["input_ids"] = input_ids htorch.core.mark_step() @@ -2263,6 +2264,7 @@ def try_revert_dummy_output_tokens(): self.trim_attn_metadata( broadcast_data["attn_metadata"]) }) + # Model forward with self.profiler.record_event('internal', model_event_name): hidden_states = self.model.forward( From 10e2209f5d72a2ca08bfeaad9fba889f2cabf404 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Tue, 28 Jan 2025 17:29:36 +0200 Subject: [PATCH 16/30] Warmup fix --- vllm/worker/hpu_model_runner.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index d9ba266496cf0..9d39b3c293389 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -1126,10 +1126,7 @@ def _prepare_decode( generation_token = seq_data.get_last_token_id() input_tokens.append([generation_token]) - # DS - seq_len = ((seq_data.get_num_computed_tokens() + - 1) if self.scheduler_config.enable_delayed_sampling - else seq_data.get_len()) + seq_len = seq_data.get_len() position = seq_len - 1 input_positions.append([position]) From 3c5e0d4e4492e867ed9b9fe74079eebe70686098 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Thu, 30 Jan 2025 15:33:26 +0200 Subject: [PATCH 17/30] graphs fix --- requirements-hpu.txt | 2 +- vllm/worker/hpu_model_runner.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/requirements-hpu.txt b/requirements-hpu.txt index e7711ec3f1134..38113981cd663 100644 --- a/requirements-hpu.txt +++ b/requirements-hpu.txt @@ -8,4 +8,4 @@ pandas tabulate setuptools>=61 setuptools-scm>=8 -vllm-hpu-extension @ git+https://github.com/HabanaAI/vllm-hpu-extension.git@d4f37bb +vllm-hpu-extension @ git+https://github.com/HabanaAI/vllm-hpu-extension.git@db80a48 diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 9d39b3c293389..a4861f4f13168 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2179,25 +2179,32 @@ def execute_model( dtype=torch.float, device="hpu") seq_data.prev_logits_idx = i + #! This causes recompilations due to changing tensor shapes if logits_tensor is None: logits_tensor = seq_data.prev_logits + # If the current sequence has the same logits if seq_data.prev_logits is logits_tensor: logits_ids_list.append( seq_data.prev_logits_idx) + # Different logits else: + # Save logits of all previous sequences + # Varying shape causes recompilations logits_tensor_list.append( - logits_tensor[torch.tensor( - logits_ids_list, - device=seq_data.prev_logits.device)]) + torch.index_select(logits_tensor, 0, torch.tensor(logits_ids_list, device=htorch.hpu.current_device()))) + # Store new id logits_ids_list = [seq_data.prev_logits_idx] + + # Save logits tensor logits_tensor = seq_data.prev_logits if logits_tensor is not None: logits_tensor_list.append(logits_tensor[torch.tensor( logits_ids_list, device=logits_tensor.device)]) - prev_logits = torch.cat(logits_tensor_list, dim=0) + #! Logits have different shapes that causes recompilations + prev_logits = torch.cat(logits_tensor_list, dim=0).to(htorch.hpu.current_device()) # Sample next token - delayed sampling with self.profiler.record_event( @@ -2322,7 +2329,6 @@ def try_revert_dummy_output_tokens(): sampling_metadata.selected_token_indices = None logits = self.model.compute_logits(hidden_states, sampling_metadata) - # Delayed sampling # MSS: Sample for decodes, but not for the last one From c445fe7eeb16b864b1365c1d525153ecb372f5e8 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Fri, 31 Jan 2025 16:20:27 +0200 Subject: [PATCH 18/30] Token id to int --- vllm/engine/llm_engine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 88c21f9a6d31b..6b4492a55669e 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1218,14 +1218,16 @@ def _advance_to_next_step( assert len(seq_group.seqs) == 1 seq = seq_group.seqs[0] + output_token = sample.output_token.item() if type(sample.output_token) is torch.Tensor else sample.output_token + logprobs = sample.logprobs.item() if type(sample.logprobs) is torch.Tensor else sample.logprobs if self.scheduler_config.is_multi_step: is_prefill_append = seq.data.get_num_uncomputed_tokens( ) == 0 - seq.append_token_id(sample.output_token, sample.logprobs) + seq.append_token_id(output_token, logprobs) if not is_prefill_append: seq_group.update_num_computed_tokens(1) else: - seq.append_token_id(sample.output_token, sample.logprobs) + seq.append_token_id(output_token, logprobs) def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]: """Performs one decoding iteration and returns newly generated results. From 68c50064136f2ebd5b33eb82ebeadadd03b54291 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Mon, 3 Feb 2025 12:48:12 +0200 Subject: [PATCH 19/30] Revert "Token id to int" This reverts commit c445fe7eeb16b864b1365c1d525153ecb372f5e8. --- vllm/engine/llm_engine.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 6b4492a55669e..88c21f9a6d31b 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -1218,16 +1218,14 @@ def _advance_to_next_step( assert len(seq_group.seqs) == 1 seq = seq_group.seqs[0] - output_token = sample.output_token.item() if type(sample.output_token) is torch.Tensor else sample.output_token - logprobs = sample.logprobs.item() if type(sample.logprobs) is torch.Tensor else sample.logprobs if self.scheduler_config.is_multi_step: is_prefill_append = seq.data.get_num_uncomputed_tokens( ) == 0 - seq.append_token_id(output_token, logprobs) + seq.append_token_id(sample.output_token, sample.logprobs) if not is_prefill_append: seq_group.update_num_computed_tokens(1) else: - seq.append_token_id(output_token, logprobs) + seq.append_token_id(sample.output_token, sample.logprobs) def step(self) -> List[Union[RequestOutput, PoolingRequestOutput]]: """Performs one decoding iteration and returns newly generated results. From 90b54af2d44bc434d17aba9fa0e43b04a03d57a6 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Mon, 3 Feb 2025 13:49:22 +0200 Subject: [PATCH 20/30] Sample id to int --- vllm/worker/hpu_model_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index a4861f4f13168..339156328f1c9 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2289,7 +2289,7 @@ def try_revert_dummy_output_tokens(): for j, seq_group_output in enumerate( output.outputs[:real_batch_size]): for sample in seq_group_output.samples: - sample.output_token = output.sampled_token_ids[j][0] + sample.output_token = output.sampled_token_ids[j][0].item() else: # For prompts compose empty output from vllm.sequence import (CompletionSequenceGroupOutput, From 5c995b0d8c4dea503b3087737531fa170e2d0820 Mon Sep 17 00:00:00 2001 From: Kamil Kaczor Date: Mon, 3 Feb 2025 14:33:27 +0100 Subject: [PATCH 21/30] Update hpu_model_runner.py fix high level profiling --- vllm/worker/hpu_model_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 339156328f1c9..ed18e5ba4a54b 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2209,7 +2209,7 @@ def execute_model( # Sample next token - delayed sampling with self.profiler.record_event( 'internal', f'sample_{"prompt" if is_prompt else "decode"}' - '_bs{batch_size}_seq{seq_len}'): + f'_bs{batch_size}_seq{seq_len}'): output = self.model.sample( logits=prev_logits, sampling_metadata=sampling_metadata, From 3b2337a2b44ad991d02e6d0d570fe861042d7b73 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Tue, 4 Feb 2025 16:24:01 +0200 Subject: [PATCH 22/30] MSS accuracy fix --- vllm/worker/hpu_model_runner.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index ed18e5ba4a54b..f0e9376bd81bf 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2203,6 +2203,9 @@ def execute_model( logits_tensor_list.append(logits_tensor[torch.tensor( logits_ids_list, device=logits_tensor.device)]) + # Seperate logits concat from sampler graph + htorch.core.mark_step() + #! Logits have different shapes that causes recompilations prev_logits = torch.cat(logits_tensor_list, dim=0).to(htorch.hpu.current_device()) @@ -2210,15 +2213,19 @@ def execute_model( with self.profiler.record_event( 'internal', f'sample_{"prompt" if is_prompt else "decode"}' f'_bs{batch_size}_seq{seq_len}'): - output = self.model.sample( + prev_output = self.model.sample( logits=prev_logits, sampling_metadata=sampling_metadata, ) model_kwargs = {} - model_kwargs["input_ids"] = output.sampled_token_ids + model_kwargs["input_ids"] = prev_output.sampled_token_ids broadcast_tensor_dict(model_kwargs, src=0) - input_ids = output.sampled_token_ids + input_ids = prev_output.sampled_token_ids + + if num_steps == 1: + output = prev_output + elif self.scheduler_config.enable_delayed_sampling and not is_prompt: model_kwargs = broadcast_tensor_dict(src=0) input_ids = model_kwargs["input_ids"] @@ -2283,7 +2290,7 @@ def try_revert_dummy_output_tokens(): # Read and update the previous token ids to return if self.scheduler_config.enable_delayed_sampling \ - and self.is_driver_worker and i==0: + and self.is_driver_worker and num_steps==1: if not is_prompt: htorch.core.mark_step() for j, seq_group_output in enumerate( @@ -2370,6 +2377,11 @@ def try_revert_dummy_output_tokens(): sampling_metadata=sampling_metadata, ) if num_steps > 1: + if i == 0: + prev_output = prev_output.sampled_token_ids + self.cached_step_outputs.append( + prev_output.detach().clone()) + output = output.sampled_token_ids self.cached_step_outputs.append( output.detach().clone()) From 9b016683250b71bd8a96601eb681878b3264f5f3 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Wed, 5 Feb 2025 10:56:28 +0200 Subject: [PATCH 23/30] Cache prev outputs only when DS is on --- vllm/worker/hpu_model_runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index f0e9376bd81bf..2dcff2b416ac4 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2377,7 +2377,8 @@ def try_revert_dummy_output_tokens(): sampling_metadata=sampling_metadata, ) if num_steps > 1: - if i == 0: + # Cache previous outputs for delayed sampling + if self.scheduler_config.enable_delayed_sampling and i == 0: prev_output = prev_output.sampled_token_ids self.cached_step_outputs.append( prev_output.detach().clone()) From a2cf81cbd7747f6da007bcb08e012cb086308aed Mon Sep 17 00:00:00 2001 From: Kamil Kaczor Date: Thu, 6 Feb 2025 16:43:42 +0100 Subject: [PATCH 24/30] Reduce cat recompilations WA for recompilations - reduces number of them from 2 each decode to 1 and total time from 11ms to 2.6sm. Also torch.tensor is preferable vs index citing Marceli: "torch.index_select returns a new tensor which copies the indexed fields into a new memory location. torch.Tensor.select or slicing returns a view of the original tensor." --- vllm/worker/hpu_model_runner.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 2dcff2b416ac4..71fb2c184c977 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2187,12 +2187,12 @@ def execute_model( logits_ids_list.append( seq_data.prev_logits_idx) # Different logits - else: + else: # Save logits of all previous sequences - # Varying shape causes recompilations logits_tensor_list.append( - torch.index_select(logits_tensor, 0, torch.tensor(logits_ids_list, device=htorch.hpu.current_device()))) - + logits_tensor[torch.tensor( + logits_ids_list, + device=seq_data.prev_logits.device)]) # Store new id logits_ids_list = [seq_data.prev_logits_idx] @@ -2203,11 +2203,9 @@ def execute_model( logits_tensor_list.append(logits_tensor[torch.tensor( logits_ids_list, device=logits_tensor.device)]) - # Seperate logits concat from sampler graph - htorch.core.mark_step() - #! Logits have different shapes that causes recompilations - prev_logits = torch.cat(logits_tensor_list, dim=0).to(htorch.hpu.current_device()) + prev_logits = torch.cat(logits_tensor_list, dim=0) + htorch.core.mark_step() # Sample next token - delayed sampling with self.profiler.record_event( From d9b68b6a25cf4360dc9eccf3047f4404b0e33a9c Mon Sep 17 00:00:00 2001 From: Kamil Kaczor Date: Fri, 7 Feb 2025 11:03:52 +0100 Subject: [PATCH 25/30] WA for sync after MSS Sync there causes 4-6ms gap because executor waits for hpu to finish although in delayed sampling it's not needed. --- vllm/worker/hpu_model_runner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 71fb2c184c977..90cc985c4f4e9 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2104,7 +2104,9 @@ def execute_model( # last multi-step output = self._decode_sampler_outputs( model_input) if self.is_driver_worker else [] - torch.hpu.synchronize() + # WA - this sync causes 4ms+ gap before decodes in MSS + delayed. + # Let's remove it for testing and check if TP>1 works without it + #torch.hpu.synchronize() if model_input.is_first_multi_step: # first multi-step From 58845c608b393fbc34971513f0b53809001a4fa0 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Fri, 7 Feb 2025 12:11:01 +0200 Subject: [PATCH 26/30] Syntax --- vllm/worker/hpu_model_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 90cc985c4f4e9..0ae8363887b24 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2189,7 +2189,7 @@ def execute_model( logits_ids_list.append( seq_data.prev_logits_idx) # Different logits - else: + else: # Save logits of all previous sequences logits_tensor_list.append( logits_tensor[torch.tensor( From 5a9aa84910a7c7660e184bd9fb6f1a6ecda7ee0f Mon Sep 17 00:00:00 2001 From: Kamil Kaczor Date: Fri, 7 Feb 2025 12:15:23 +0100 Subject: [PATCH 27/30] Remove WA sync in MSS Instead of removing the sync let's add an if - we don't want to change too much (in this case affecting also MSS without delayed) increasing the testing scope. --- vllm/worker/hpu_model_runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 0ae8363887b24..3242e14a0039d 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2104,9 +2104,10 @@ def execute_model( # last multi-step output = self._decode_sampler_outputs( model_input) if self.is_driver_worker else [] - # WA - this sync causes 4ms+ gap before decodes in MSS + delayed. - # Let's remove it for testing and check if TP>1 works without it - #torch.hpu.synchronize() + # It causes delayed sampling to wait for hpu although + # it's not needed here - it will sync later in a sampler. + if not self.scheduler_config.enable_delayed_sampling: + torch.hpu.synchronize() if model_input.is_first_multi_step: # first multi-step From 99452c844fd74423b4ccdf27d31617ebffbbc1cb Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Fri, 7 Feb 2025 17:40:22 +0200 Subject: [PATCH 28/30] Skip caching loop if shouldn't sample --- vllm/worker/hpu_model_runner.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index 3242e14a0039d..a32064d566412 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2351,12 +2351,14 @@ def try_revert_dummy_output_tokens(): should_sample = True #! Should Save logits and idx only when not sampling - if (self.scheduler_config.enable_delayed_sampling and model_input.seq_group_metadata_list is not None and self.is_driver_worker): + if (self.scheduler_config.enable_delayed_sampling + and model_input.seq_group_metadata_list is not None + and self.is_driver_worker and not should_sample): for idx, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): assert len(seq_group_metadata.seq_data) == 1 for seq_data in seq_group_metadata.seq_data.values(): - seq_data.prev_logits = logits if not should_sample else None - seq_data.prev_logits_idx = idx if not should_sample else None + seq_data.prev_logits = logits + seq_data.prev_logits_idx = idx htorch.core.mark_step() # Only perform sampling in the driver worker. From 99ba0bf583985b0aaf355b04bec0f29fccc7fdf4 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Tue, 11 Feb 2025 13:40:54 +0200 Subject: [PATCH 29/30] Free logits during multistep --- vllm/worker/hpu_model_runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index a32064d566412..a6565ce9000a8 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -2350,15 +2350,16 @@ def try_revert_dummy_output_tokens(): else: should_sample = True - #! Should Save logits and idx only when not sampling + # Should save logits only when not sampling + # Keeping logits during MSS causes memory issues if (self.scheduler_config.enable_delayed_sampling and model_input.seq_group_metadata_list is not None - and self.is_driver_worker and not should_sample): + and self.is_driver_worker): for idx, seq_group_metadata in enumerate(model_input.seq_group_metadata_list): assert len(seq_group_metadata.seq_data) == 1 for seq_data in seq_group_metadata.seq_data.values(): - seq_data.prev_logits = logits - seq_data.prev_logits_idx = idx + seq_data.prev_logits = logits if not should_sample else None + seq_data.prev_logits_idx = idx if not should_sample else None htorch.core.mark_step() # Only perform sampling in the driver worker. From bf17f2cee9fd44fa07a6f6ce85235aa1efc29619 Mon Sep 17 00:00:00 2001 From: Marceli Fylcek Date: Tue, 11 Feb 2025 14:13:53 +0200 Subject: [PATCH 30/30] WA for forward recomp in 3rd step --- vllm/worker/hpu_model_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vllm/worker/hpu_model_runner.py b/vllm/worker/hpu_model_runner.py index a6565ce9000a8..404c5b70b6274 100755 --- a/vllm/worker/hpu_model_runner.py +++ b/vllm/worker/hpu_model_runner.py @@ -1558,7 +1558,7 @@ def warmup_scenario(self, seqs = [ self.create_dummy_seq_group_metadata( i, - b * self.block_size - 1, + b * self.block_size - 2, is_prompt, lora_request=dummy_lora_requests_per_seq[i] if dummy_lora_requests_per_seq else None, @@ -1582,7 +1582,7 @@ def warmup_scenario(self, self.execute_model(inputs, kv_caches, warmup_mode=True, - num_steps=2, + num_steps=3, seqs=seqs) inputs = dataclasses.replace(inputs, is_first_multi_step=False,