Skip to content

[Stop Sequences] support stop sequences #2712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_STOP_SEQS_MAX_LEN":
lambda: os.getenv("FD_STOP_SEQS_MAX_LEN", "8"),

# Whether to use stop sequences (0 or 1)
"FD_USE_STOP_SEQ":
lambda: os.getenv("FD_USE_STOP_SEQ", 0),

# GPU devices to use (comma-separated string, e.g. 0,1,2)
"CUDA_VISIBLE_DEVICES":
lambda: os.getenv("CUDA_VISIBLE_DEVICES", None),
Expand Down Expand Up @@ -67,6 +71,6 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Switch from standalone PD to centralized inference (0 or 1)
"FD_PD_CHANGEABLE":
lambda: os.getenv("FD_PD_CHANGEABLE", "1"),

}
```
```
9 changes: 7 additions & 2 deletions docs/zh/usage/environment_variables.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# FastDeploy 环境变量说明
FastDeploy 的环境变量保存在了代码库根目录下 fastdeploy/envs.py 文件中,以下是其对应的中文版说明:

```python
environment_variables: dict[str, Callable[[], Any]] = {
# 构建 FastDeploy 时使用的 CUDA 架构版本,这是一个字符串列表,例如[80,90]
Expand Down Expand Up @@ -30,6 +31,10 @@ environment_variables: dict[str, Callable[[], Any]] = {
"FD_STOP_SEQS_MAX_LEN":
lambda: os.getenv("FD_STOP_SEQS_MAX_LEN", "8"),

# 是否使用停止序列
"FD_USE_STOP_SEQ":
lambda: os.getenv("FD_USE_STOP_SEQ", 0),

# 将要使用的GPU设备,这是一个用逗号分隔的字符串,例如 0,1,2
"CUDA_VISIBLE_DEVICES":
lambda: os.getenv("CUDA_VISIBLE_DEVICES", None),
Expand Down Expand Up @@ -65,6 +70,6 @@ environment_variables: dict[str, Callable[[], Any]] = {
# 是否从单机 PD 分离转换为集中式推理
"FD_PD_CHANGEABLE":
lambda: os.getenv("FD_PD_CHANGEABLE", "1"),

}
```
```
4 changes: 4 additions & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from paddleformers.transformers.configuration_utils import PretrainedConfig

from fastdeploy import envs
from fastdeploy.model_executor.layers.quantization.quant_base import \
QuantConfigBase
from fastdeploy.utils import get_logger
Expand Down Expand Up @@ -124,6 +125,9 @@ def __init__(
self.tie_word_embeddings = tie_word_embeddings
self.is_quantized = is_quantized

self.max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM)
self.stop_seqs_max_len = int(envs.FD_STOP_SEQS_MAX_LEN)
self.use_stop_seq = int(envs.FD_USE_STOP_SEQ)

@dataclass
class MoEConfig:
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def read_from_env(self):
"""
self.max_stop_seqs_num = int(envs.FD_MAX_STOP_SEQS_NUM)
self.stop_seqs_max_len = int(envs.FD_STOP_SEQS_MAX_LEN)
self.use_stop_seq = int(envs.FD_USE_STOP_SEQ)

def reset_config_value(key, value):
if not hasattr(self, key.lower()):
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/sampling_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class SamplingParams:
seed: Optional[int] = None
stop: Optional[Union[str, List[str]]] = None
stop_token_ids: Optional[Union[List[List[int]], List[int]]] = None
stop_seqs_len: Optional[int] = None
max_tokens: Optional[int] = None
reasoning_max_tokens: Optional[int] = None
min_tokens: int = 1
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
"FD_STOP_SEQS_MAX_LEN":
lambda: os.getenv("FD_STOP_SEQS_MAX_LEN", "8"),

# Whether to use stop sequences (0 or 1)
"FD_USE_STOP_SEQ":
lambda: os.getenv("FD_USE_STOP_SEQ", "0"),

# GPU devices that will be used. This is a string that
# splited by comma, such as 0,1,2.
"CUDA_VISIBLE_DEVICES":
Expand Down
16 changes: 14 additions & 2 deletions fastdeploy/input/ernie_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
from paddleformers.generation import GenerationConfig

from fastdeploy import envs
from fastdeploy.utils import data_processor_logger
from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer

from fastdeploy.input.text_processor import BaseDataProcessor
from fastdeploy.utils import data_processor_logger

_SAMPLING_EPS = 1e-5

Expand Down Expand Up @@ -93,11 +92,19 @@ def process_request(self, request, max_model_len=None, **kwargs):
if request.get("eos_token_ids") is None or len(
request.eos_token_ids) == 0:
request.eos_token_ids = self.eos_token_ids

# 暂时 stop 和 stop_token_ids 只有一个生效,前者优先级更高
stop_sequences = request.get("stop", [])
if stop_sequences is not None and len(stop_sequences) != 0:
stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences)
request.set("stop_token_ids", stop_seqs)
request.set("stop_seqs_len", stop_seqs_len)
else:
stop_token_ids_list = request.get("stop_token_ids", [])
if len(stop_token_ids_list) != 0:
request.set("stop_token_ids", [stop_token_ids_list])
request.set("stop_seqs_len", [len(stop_token_ids_list)])


if request.prompt_token_ids is None or len(
request.prompt_token_ids) == 0:
Expand Down Expand Up @@ -149,6 +156,11 @@ def process_request_dict(self, request, max_model_len=None):
stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences)
request['stop_token_ids'] = stop_seqs
request['stop_seqs_len'] = stop_seqs_len
else:
stop_token_ids_list = request.get("stop_token_ids", [])
if len(stop_token_ids_list) != 0:
request["stop_token_ids"] = [stop_token_ids_list]
request["stop_seqs_len"] = [len(stop_token_ids_list)]

system = request.get("system")
# 处理prompt_token_ids
Expand Down
12 changes: 11 additions & 1 deletion fastdeploy/input/text_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ def process_request(self, request, max_model_len=None, **kwargs):
stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences)
request.set("stop_token_ids", stop_seqs)
request.set("stop_seqs_len", stop_seqs_len)
else:
stop_token_ids_list = request.get("stop_token_ids", [])
if len(stop_token_ids_list) != 0:
request.set("stop_token_ids", [stop_token_ids_list])
request.set("stop_seqs_len", [len(stop_token_ids_list)])

if request.prompt_token_ids is None or len(
request.prompt_token_ids) == 0:
Expand Down Expand Up @@ -282,6 +287,11 @@ def process_request_dict(self, request, max_model_len=None, **kwargs):
stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences)
request['stop_token_ids'] = stop_seqs
request['stop_seqs_len'] = stop_seqs_len
else:
stop_token_ids_list = request.get("stop_token_ids", [])
if len(stop_token_ids_list) != 0:
request["stop_token_ids"] = [stop_token_ids_list]
request["stop_seqs_len"] = [len(stop_token_ids_list)]

data_processor_logger.info(f"Processing request {request}")
# 处理prompt_token_ids
Expand Down Expand Up @@ -630,6 +640,6 @@ def update_stop_seq(self, stop_sequences):
pad_id=-1,
return_seq_len=True,
return_array=False)
data_processor_logger.debug(
data_processor_logger.info(
f"processed stop_seqs: {stop_seqs}, {stop_seqs_len}")
return stop_seqs, stop_seqs_len
42 changes: 28 additions & 14 deletions fastdeploy/model_executor/pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
from fastdeploy.engine.config import SpeculativeConfig
from fastdeploy.model_executor.ops.gpu import (
get_padding_offset, save_output, set_stop_value_multi_ends,
speculate_clear_accept_nums, speculate_get_output_padding_offset,
speculate_get_padding_offset, speculate_get_seq_lens_output,
speculate_save_output, speculate_set_value_by_flags_and_idx,
speculate_step_paddle, speculate_step_system_cache, speculate_update_v3,
step_paddle, step_system_cache, update_inputs, step_reschedule)
set_stop_value_multi_seqs, speculate_clear_accept_nums,
speculate_get_output_padding_offset, speculate_get_padding_offset,
speculate_get_seq_lens_output, speculate_save_output,
speculate_set_value_by_flags_and_idx, speculate_step_paddle,
speculate_step_system_cache, speculate_update_v3, step_paddle,
step_reschedule, step_system_cache, update_inputs)
from fastdeploy.platforms import current_platform
from fastdeploy.worker.output import ModelOutputData

Expand Down Expand Up @@ -105,7 +106,8 @@ def pre_process(
def post_process_normal(sampled_token_ids: paddle.Tensor,
model_output: ModelOutputData,
save_each_rank: bool = False,
skip_save_output: bool = False) -> None:
skip_save_output: bool = False,
use_stop_seqs: bool = False) -> None:
""" Post-processing steps after completing a single token generation. """
# 1. Set stop value
paddle.assign(
Expand All @@ -122,12 +124,23 @@ def post_process_normal(sampled_token_ids: paddle.Tensor,
paddle.logical_or(model_output.stop_flags, length_cond),
model_output.stop_flags,
)
# TODO(gongshaotian): Add use_stop_seqs
set_stop_value_multi_ends(sampled_token_ids, model_output.stop_flags,
model_output.seq_lens_this_time,
model_output.eos_token_id,
model_output.next_tokens, False) # multi ends

if not use_stop_seqs:
set_stop_value_multi_ends(sampled_token_ids, model_output.stop_flags,
model_output.seq_lens_this_time,
model_output.eos_token_id,
model_output.next_tokens, False) # multi ends
else:
set_stop_value_multi_seqs(
sampled_token_ids,
model_output.pre_ids,
model_output.step_idx,
model_output.stop_flags,
model_output.seq_lens_this_time,
model_output.stop_token_ids,
model_output.stop_seqs_len,
model_output.eos_token_id,
)
# 2. Update the input buffer of the model
with paddle.framework._no_check_dy2st_diff():
update_inputs(
Expand Down Expand Up @@ -197,13 +210,14 @@ def post_process(sampled_token_ids: paddle.Tensor,
model_output: ModelOutputData,
save_each_rank: bool = False,
speculative_decoding: bool = False,
skip_save_output: bool = False) -> None:
skip_save_output: bool = False,
use_stop_seq: bool = False) -> None:
""" Post-processing steps after completing a single token generation. """
if speculative_decoding:
post_process_specualate(model_output, skip_save_output)
else:
post_process_normal(sampled_token_ids, model_output, save_each_rank,
skip_save_output)
skip_save_output, use_stop_seq)


def step_cuda(
Expand All @@ -217,7 +231,7 @@ def step_cuda(
TODO(gongshaotian): normalization name
"""


if speculative_config.method is not None:
if enable_prefix_caching:
speculate_step_system_cache(
Expand Down
22 changes: 16 additions & 6 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ def insert_prefill_inputs(self, req_dicts: List[Request]):
stop_seqs_num = len(request.get("stop_seqs_len"))
for i in range(stop_seqs_num,
self.model_config.max_stop_seqs_num):
request.stop_seqs_len.append(0)
request.sampling_params.stop_seqs_len.append(0)
self.share_inputs["stop_seqs_len"][:] = np.array(
request.stop_seqs_len, dtype="int32")
request.sampling_params.stop_seqs_len, dtype="int32")
self.share_inputs["stop_seqs"][:stop_seqs_num, :len(
request.get("stop_token_ids")[0])] = np.array(
request.get("stop_token_ids"), dtype="int64")
Expand Down Expand Up @@ -505,7 +505,7 @@ def _init_share_inputs(self, max_num_seqs: int):
self.model_config.stop_seqs_max_len
],
-1,
dtype="int32")
dtype="int64")
if self.speculative_decoding:
max_draft_token_num = self.speculative_config.num_speculative_tokens
self.share_inputs["input_ids_cpu"] = paddle.full(
Expand Down Expand Up @@ -832,7 +832,11 @@ def _dummy_run(self,
accept_tokens=self.share_inputs["accept_tokens"]
if self.speculative_decoding else None,
accept_num=self.share_inputs["accept_num"]
if self.speculative_decoding else None)
if self.speculative_decoding else None,
stop_token_ids=self.share_inputs["stop_seqs"]
if self.model_config.use_stop_seq else None,
stop_seqs_len=self.share_inputs["stop_seqs_len"]
if self.model_config.use_stop_seq else None)

post_process(sampled_token_ids=sampled_token_ids,
model_output=model_output_data,
Expand Down Expand Up @@ -1065,7 +1069,12 @@ class at the server level, which is too granular for ModelRunner.
accept_tokens=self.share_inputs["accept_tokens"]
if self.speculative_decoding else None,
accept_num=self.share_inputs["accept_num"]
if self.speculative_decoding else None)
if self.speculative_decoding else None,
stop_token_ids=self.share_inputs["stop_seqs"]
if self.model_config.use_stop_seq else None,
stop_seqs_len=self.share_inputs["stop_seqs_len"]
if self.model_config.use_stop_seq else None,
)

if self.speculative_config.method in ["mtp"] and \
self.parallel_config.splitwise_role == "prefill":
Expand All @@ -1076,7 +1085,8 @@ class at the server level, which is too granular for ModelRunner.
model_output=model_output_data,
save_each_rank=self.parallel_config.use_ep,
speculative_decoding=self.speculative_decoding,
skip_save_output=skip_save_output)
skip_save_output=skip_save_output,
use_stop_seq=self.model_config.use_stop_seq)

# 6. Speculative decode
if self.speculative_decoding:
Expand Down
9 changes: 9 additions & 0 deletions fastdeploy/worker/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ class ModelOutputData:
"""
accept_num: paddle.Tensor

"""
the token ids of stop sequence
"""
stop_token_ids: paddle.Tensor

"""
the length of stop sequence
"""
stop_seqs_len: paddle.Tensor

@dataclass
class ModelRunnerOutput:
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/worker/xpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,9 @@ def process_prefill_inputs(self, req_dicts: List[Request]):
stop_seqs_num = len(request.get("stop_seqs_len"))
for i in range(stop_seqs_num,
self.model_config.max_stop_seqs_num):
request.stop_seqs_len.append(0)
request.sampling_params.stop_seqs_len.append(0)
self.share_inputs["stop_seqs_len"][:] = np.array(
request.stop_seqs_len, dtype="int32")
request.sampling_params.stop_seqs_len, dtype="int32")
self.share_inputs["stop_seqs"][:stop_seqs_num, :len(
request.get("stop_token_ids")[0])] = np.array(
request.get("stop_token_ids"), dtype="int64")
Expand Down Expand Up @@ -719,6 +719,8 @@ class at the server level, which is too granular for ModelRunner.
actual_draft_token_num=None,
accept_tokens=None,
accept_num=None,
stop_token_ids=None,
stop_seqs_len=None,
)
xpu_post_process(sampled_token_ids=sampled_token_ids,
model_output=model_output_data)
Expand Down
Loading