Commit cc338b7c authored by zhaoying1's avatar zhaoying1
Browse files

llama_fastchat_pytorch

parents
"""
Apply the delta weights on top of a base model.
Usage:
python3 -m fastchat.model.apply_delta --base ~/model_weights/llama-13b --target ~/model_weights/vicuna-13b --delta lmsys/vicuna-13b-delta
"""
import argparse
import torch
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
def apply_delta(base_model_path, target_model_path, delta_path):
print(f"Loading the base model from {base_model_path}")
base = AutoModelForCausalLM.from_pretrained(
base_model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True)
print(f"Loading the delta from {delta_path}")
delta = AutoModelForCausalLM.from_pretrained(delta_path, torch_dtype=torch.float16, low_cpu_mem_usage=True)
delta_tokenizer = AutoTokenizer.from_pretrained(delta_path, use_fast=False)
DEFAULT_PAD_TOKEN = "[PAD]"
base_tokenizer = AutoTokenizer.from_pretrained(base_model_path, use_fast=False)
num_new_tokens = base_tokenizer.add_special_tokens(dict(pad_token=DEFAULT_PAD_TOKEN))
base.resize_token_embeddings(len(base_tokenizer))
input_embeddings = base.get_input_embeddings().weight.data
output_embeddings = base.get_output_embeddings().weight.data
input_embeddings[-num_new_tokens:] = 0
output_embeddings[-num_new_tokens:] = 0
print("Applying the delta")
for name, param in tqdm(base.state_dict().items(), desc="Applying delta"):
assert name in delta.state_dict()
param.data += delta.state_dict()[name]
print(f"Saving the target model to {target_model_path}")
base.save_pretrained(target_model_path, max_shard_size="2GB")
delta_tokenizer.save_pretrained(target_model_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--base-model-path", type=str, required=True)
parser.add_argument("--target-model-path", type=str, required=True)
parser.add_argument("--delta-path", type=str, required=True)
args = parser.parse_args()
apply_delta(args.base_model_path, args.target_model_path, args.delta_path)
"""
Make the delta weights by subtracting base weights.
Usage:
python3 -m fastchat.model.make_delta --base ~/model_weights/llama-13b --target ~/model_weights/vicuna-13b --delta ~/model_weights/vicuna-13b-delta --hub-repo-id lmsys/vicuna-13b-delta
"""
import argparse
import torch
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForCausalLM
def make_delta(base_model_path, target_model_path, delta_path):
print(f"Loading the base model from {base_model_path}")
base = AutoModelForCausalLM.from_pretrained(
base_model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True)
print(f"Loading the target model from {target_model_path}")
target = AutoModelForCausalLM.from_pretrained(target_model_path, torch_dtype=torch.float16, low_cpu_mem_usage=True)
DEFAULT_PAD_TOKEN = "[PAD]"
base_tokenizer = AutoTokenizer.from_pretrained(base_model_path, use_fast=False)
num_new_tokens = base_tokenizer.add_special_tokens(dict(pad_token=DEFAULT_PAD_TOKEN))
base.resize_token_embeddings(len(base_tokenizer))
input_embeddings = base.get_input_embeddings().weight.data
output_embeddings = base.get_output_embeddings().weight.data
input_embeddings[-num_new_tokens:] = 0
output_embeddings[-num_new_tokens:] = 0
print("Calculating the delta")
for name, param in tqdm(target.state_dict().items(), desc="Calculating delta"):
assert name in base.state_dict()
param.data -= base.state_dict()[name]
print(f"Saving the delta to {delta_path}")
if args.hub_repo_id:
kwargs = {"push_to_hub": True, "repo_id": args.hub_repo_id}
else:
kwargs = {}
target.save_pretrained(delta_path, **kwargs)
target_tokenizer = AutoTokenizer.from_pretrained(target_model_path, use_fast=False)
target_tokenizer.save_pretrained(delta_path, **kwargs)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--base-model-path", type=str, required=True)
parser.add_argument("--target-model-path", type=str, required=True)
parser.add_argument("--delta-path", type=str, required=True)
parser.add_argument("--hub-repo-id", type=str, required=True)
args = parser.parse_args()
make_delta(args.base_model_path, args.target_model_path, args.delta_path)
"""
Chat with a model with command line interface.
Usage:
python3 -m fastchat.serve.cli --model ~/model_weights/llama-7b
"""
import argparse
import re
from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import WordCompleter
from prompt_toolkit.history import InMemoryHistory
from rich.console import Console
from rich.markdown import Markdown
from rich.live import Live
from fastchat.serve.inference import chat_loop, ChatIO
class SimpleChatIO(ChatIO):
def prompt_for_input(self, role) -> str:
return input(f"{role}: ")
def prompt_for_output(self, role: str):
print(f"{role}: ", end="", flush=True)
def stream_output(self, output_stream, skip_echo_len: int):
pre = 0
for outputs in output_stream:
outputs = outputs[skip_echo_len:].strip()
outputs = outputs.split(" ")
now = len(outputs) - 1
if now > pre:
print(" ".join(outputs[pre:now]), end=" ", flush=True)
pre = now
print(" ".join(outputs[pre:]), flush=True)
return outputs
class RichChatIO(ChatIO):
def __init__(self):
self._prompt_session = PromptSession(history=InMemoryHistory())
self._completer = WordCompleter(words=['!exit', '!reset'], pattern=re.compile('$'))
self._console = Console()
def prompt_for_input(self, role) -> str:
self._console.print(f"[bold]{role}:")
# TODO(suquark): multiline input has some issues. fix it later.
prompt_input = self._prompt_session.prompt(
completer=self._completer,
multiline=False,
auto_suggest=AutoSuggestFromHistory(),
key_bindings=None)
self._console.print()
return prompt_input
def prompt_for_output(self, role: str):
self._console.print(f"[bold]{role}:")
def stream_output(self, output_stream, skip_echo_len: int):
"""Stream output from a role."""
pre = 0
# TODO(suquark): the console flickers when there is a code block
# above it. We need to cut off "live" when a code block is done.
# Create a Live context for updating the console output
with Live(console=self._console, refresh_per_second=4) as live:
accumulated_text = ""
# Read lines from the stream
for outputs in output_stream:
outputs = outputs[skip_echo_len:].strip()
outputs = outputs.split(" ")
now = len(outputs) - 1
if now > pre:
accumulated_text += " ".join(outputs[pre:now]) + " "
pre = now
# Render the accumulated text as Markdown
markdown = Markdown(accumulated_text)
# Update the Live console output
live.update(markdown)
accumulated_text += " ".join(outputs[pre:])
markdown = Markdown(accumulated_text)
live.update(markdown)
self._console.print()
return outputs
def main(args):
if args.style == "simple":
chatio = SimpleChatIO()
elif args.style == "rich":
chatio = RichChatIO()
else:
raise ValueError(f"Invalid style for console: {args.style}")
try:
chat_loop(args.model_name, args.device, args.num_gpus, args.load_8bit,
args.conv_template, args.temperature, args.max_new_tokens,
chatio, args.debug)
except KeyboardInterrupt:
print("exit...")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model-name", type=str, default="facebook/opt-350m")
parser.add_argument("--device", type=str, choices=["cpu", "cuda", "mps"], default="cuda")
parser.add_argument("--num-gpus", type=str, default="1")
parser.add_argument("--load-8bit", action="store_true",
help="Use 8-bit quantization.")
parser.add_argument("--conv-template", type=str, default="v1",
help="Conversation prompt template.")
parser.add_argument("--temperature", type=float, default=0.7)
parser.add_argument("--max-new-tokens", type=int, default=512)
parser.add_argument("--style", type=str, default="simple",
choices=["simple", "rich"], help="Display style.")
parser.add_argument("--debug", action="store_true")
args = parser.parse_args()
main(args)
import dataclasses
import torch
from torch import Tensor
import torch.nn as nn
from torch.nn import functional as F
@dataclasses.dataclass
class CompressionConfig:
"""Group-wise quantization."""
num_bits: int
group_size: int
group_dim: int
symmetric: bool
enabled: bool = True
default_compression_config = CompressionConfig(
num_bits=8, group_size=256, group_dim=1, symmetric=True, enabled=True)
class CLinear(nn.Module):
"""Compressed Linear Layer."""
def __init__(self, weight, bias, device):
super().__init__()
self.weight = compress(weight.data.to(device), default_compression_config)
self.bias = bias
def forward(self, input: Tensor) -> Tensor:
weight = decompress(self.weight, default_compression_config)
return F.linear(input, weight, self.bias)
def compress_module(module, target_device):
for attr_str in dir(module):
target_attr = getattr(module, attr_str)
if type(target_attr) == torch.nn.Linear:
setattr(module, attr_str,
CLinear(target_attr.weight, target_attr.bias, target_device))
for name, child in module.named_children():
compress_module(child, target_device)
def compress(tensor, config):
"""Simulate group-wise quantization."""
if not config.enabled:
return tensor
group_size, num_bits, group_dim, symmetric = (
config.group_size, config.num_bits, config.group_dim, config.symmetric)
assert num_bits <= 8
original_shape = tensor.shape
num_groups = (original_shape[group_dim] + group_size - 1) // group_size
new_shape = (original_shape[:group_dim] + (num_groups, group_size) +
original_shape[group_dim+1:])
# Pad
pad_len = (group_size - original_shape[group_dim] % group_size) % group_size
if pad_len != 0:
pad_shape = original_shape[:group_dim] + (pad_len,) + original_shape[group_dim+1:]
tensor = torch.cat([
tensor,
torch.zeros(pad_shape, dtype=tensor.dtype, device=tensor.device)],
dim=group_dim)
data = tensor.view(new_shape)
# Quantize
if symmetric:
B = 2 ** (num_bits - 1) - 1
scale = B / torch.max(data.abs(), dim=group_dim + 1, keepdim=True)[0]
data = data * scale
data = data.clamp_(-B, B).round_().to(torch.int8)
return data, scale, original_shape
else:
B = 2 ** num_bits - 1
mn = torch.min(data, dim=group_dim + 1, keepdim=True)[0]
mx = torch.max(data, dim=group_dim + 1, keepdim=True)[0]
scale = B / (mx - mn)
data = data - mn
data.mul_(scale)
data = data.clamp_(0, B).round_().to(torch.uint8)
return data, mn, scale, original_shape
def decompress(packed_data, config):
"""Simulate group-wise dequantization."""
if not config.enabled:
return packed_data
group_size, num_bits, group_dim, symmetric = (
config.group_size, config.num_bits, config.group_dim, config.symmetric)
# Dequantize
if symmetric:
data, scale, original_shape = packed_data
data = data / scale
else:
data, mn, scale, original_shape = packed_data
data = data / scale
data.add_(mn)
# Unpad
pad_len = (group_size - original_shape[group_dim] % group_size) % group_size
if pad_len:
padded_original_shape = (
original_shape[:group_dim] +
(original_shape[group_dim] + pad_len,) +
original_shape[group_dim+1:])
data = data.reshape(padded_original_shape)
indices = [slice(0, x) for x in original_shape]
return data[indices].contiguous()
else:
return data.view(original_shape)
"""
A controller manages distributed workers.
It sends worker addresses to clients.
"""
import argparse
import asyncio
import dataclasses
from enum import Enum, auto
import json
import logging
import time
from typing import List, Union
import threading
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import numpy as np
import requests
import uvicorn
from fastchat.constants import CONTROLLER_HEART_BEAT_EXPIRATION
from fastchat.utils import build_logger, server_error_msg
logger = build_logger("controller", "controller.log")
class DispatchMethod(Enum):
LOTTERY = auto()
SHORTEST_QUEUE = auto()
@classmethod
def from_str(cls, name):
if name == "lottery":
return cls.LOTTERY
elif name == "shortest_queue":
return cls.SHORTEST_QUEUE
else:
raise ValueError(f"Invalid dispatch method")
@dataclasses.dataclass
class WorkerInfo:
model_names: List[str]
speed: int
queue_length: int
check_heart_beat: bool
last_heart_beat: str
def heart_beat_controller(controller):
while True:
time.sleep(CONTROLLER_HEART_BEAT_EXPIRATION)
controller.remove_stable_workers_by_expiration()
class Controller:
def __init__(self, dispatch_method: str):
# Dict[str -> WorkerInfo]
self.worker_info = {}
self.dispatch_method = DispatchMethod.from_str(dispatch_method)
self.heart_beat_thread = threading.Thread(
target=heart_beat_controller, args=(self,))
self.heart_beat_thread.start()
logger.info("Init controller")
def register_worker(self, worker_name: str, check_heart_beat: bool,
worker_status: dict):
if worker_name not in self.worker_info:
logger.info(f"Register a new worker: {worker_name}")
else:
logger.info(f"Register an existing worker: {worker_name}")
if not worker_status:
worker_status = self.get_worker_status(worker_name)
if not worker_status:
return False
self.worker_info[worker_name] = WorkerInfo(
worker_status["model_names"], worker_status["speed"], worker_status["queue_length"],
check_heart_beat, time.time())
logger.info(f"Register done: {worker_name}, {worker_status}")
return True
def get_worker_status(self, worker_name: str):
try:
r = requests.post(worker_name + "/worker_get_status", timeout=5)
except requests.exceptions.RequestException as e:
logger.error(f"Get status fails: {worker_name}, {e}")
return None
if r.status_code != 200:
logger.error(f"Get status fails: {worker_name}, {r}")
return None
return r.json()
def remove_worker(self, worker_name: str):
del self.worker_info[worker_name]
def refresh_all_workers(self):
old_info = dict(self.worker_info)
self.worker_info = {}
for w_name, w_info in old_info.items():
if not self.register_worker(w_name, w_info.check_heart_beat, None):
logger.info(f"Remove stale worker: {w_name}")
def list_models(self):
model_names = set()
for w_name, w_info in self.worker_info.items():
model_names.update(w_info.model_names)
return list(model_names)
def get_worker_address(self, model_name: str):
if self.dispatch_method == DispatchMethod.LOTTERY:
worker_names = []
worker_speeds = []
for w_name, w_info in self.worker_info.items():
if model_name in w_info.model_names:
worker_names.append(w_name)
worker_speeds.append(w_info.speed)
worker_speeds = np.array(worker_speeds, dtype=np.float32)
norm = np.sum(worker_speeds)
if norm < 1e-4:
return ""
worker_speeds = worker_speeds / norm
if True: # Directly return address
pt = np.random.choice(np.arange(len(worker_names)),
p=worker_speeds)
worker_name = worker_names[pt]
return worker_name
# Check status before returning
while True:
pt = np.random.choice(np.arange(len(worker_names)),
p=worker_speeds)
worker_name = worker_names[pt]
if self.get_worker_status(worker_name):
break
else:
self.remove_worker(worker_name)
worker_speeds[pt] = 0
norm = np.sum(worker_speeds)
if norm < 1e-4:
return ""
worker_speeds = worker_speeds / norm
continue
return worker_name
elif self.dispatch_method == DispatchMethod.SHORTEST_QUEUE:
worker_names = []
worker_qlen = []
for w_name, w_info in self.worker_info.items():
if model_name in w_info.model_names:
worker_names.append(w_name)
worker_qlen.append(w_info.queue_length / w_info.speed)
if len(worker_names) == 0:
return ""
min_index = np.argmin(worker_qlen)
w_name = worker_names[min_index]
self.worker_info[w_name].queue_length += 1
logger.info(f"names: {worker_names}, queue_lens: {worker_qlen}, ret: {w_name}")
return w_name
else:
raise ValueError(f"Invalid dispatch method: {self.dispatch_method}")
def receive_heart_beat(self, worker_name: str, queue_length: int):
if worker_name not in self.worker_info:
logger.info(f"Receive unknown heart beat. {worker_name}")
return False
self.worker_info[worker_name].queue_length = queue_length
self.worker_info[worker_name].last_heart_beat = time.time()
logger.info(f"Receive heart beat. {worker_name}")
return True
def remove_stable_workers_by_expiration(self):
expire = time.time() - CONTROLLER_HEART_BEAT_EXPIRATION
to_delete = []
for worker_name, w_info in self.worker_info.items():
if w_info.check_heart_beat and w_info.last_heart_beat < expire:
to_delete.append(worker_name)
for worker_name in to_delete:
self.remove_worker(worker_name)
def worker_api_generate_stream(self, params):
worker_addr = self.get_worker_address(params["model"])
if not worker_addr:
logger.info(f"no worker: {params['model']}")
ret = {
"text": server_error_msg,
"error_code": 2,
}
yield json.dumps(ret).encode() + b"\0"
try:
response = requests.post(worker_addr + "/worker_generate_stream",
json=params, stream=True, timeout=5)
for chunk in response.iter_lines(decode_unicode=False, delimiter=b"\0"):
if chunk:
yield chunk + b"\0"
except requests.exceptions.RequestException as e:
logger.info(f"worker timeout: {worker_addr}")
ret = {
"text": server_error_msg,
"error_code": 3,
}
yield json.dumps(ret).encode() + b"\0"
# Let the controller act as a worker to achieve hierarchical
# management. This can be used to connect isolated sub networks.
def worker_api_get_status(self):
model_names = set()
speed = 0
queue_length = 0
for w_name in self.worker_info:
worker_status = self.get_worker_status(w_name)
if worker_status is not None:
model_names.update(worker_status["model_names"])
speed += worker_status["speed"]
queue_length += worker_status["queue_length"]
return {
"model_names": list(model_names),
"speed": speed,
"queue_length": queue_length,
}
app = FastAPI()
@app.post("/register_worker")
async def register_worker(request: Request):
data = await request.json()
controller.register_worker(
data["worker_name"], data["check_heart_beat"],
data.get("worker_status", None))
@app.post("/refresh_all_workers")
async def refresh_all_workers():
models = controller.refresh_all_workers()
@app.post("/list_models")
async def list_models():
models = controller.list_models()
return {"models": models}
@app.post("/get_worker_address")
async def get_worker_address(request: Request):
data = await request.json()
addr = controller.get_worker_address(data["model"])
return {"address": addr}
@app.post("/receive_heart_beat")
async def receive_heart_beat(request: Request):
data = await request.json()
exist = controller.receive_heart_beat(
data["worker_name"], data["queue_length"])
return {"exist": exist}
@app.post("/worker_generate_stream")
async def worker_api_generate_stream(request: Request):
params = await request.json()
generator = controller.worker_api_generate_stream(params)
return StreamingResponse(generator)
@app.post("/worker_get_status")
async def worker_api_get_status(request: Request):
return controller.worker_api_get_status()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=21001)
parser.add_argument("--dispatch-method", type=str, choices=[
"lottery", "shortest_queue"], default="shortest_queue")
args = parser.parse_args()
logger.info(f"args: {args}")
controller = Controller(args.dispatch_method)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
# fastchat Nginx Gateway
## Purpose of the Gateway
The Nginx gateway serves the following purposes:
1. Protects Gradio servers by acting as a firewall.
2. Facilitates dynamic mounting and unmounting of Gradio servers.
3. Provides load balancing for Gradio servers.
4. Offers additional security features, such as total connection limit.
5. Reduces attack surface by requiring only a single public port to be exposed for serving.
## Deployment and Updating of the Gateway
### Installing Nginx
On Debian-based distributions (e.g., Ubuntu):
```bash
sudo apt update
sudo apt install nginx
```
On Red Hat-based distributions (e.g., CentOS, Fedora):
```bash
sudo yum install epel-release
sudo yum install nginx
```
### Deployment
Copy `nginx.conf` to `/etc/nginx/nginx.conf` (need sudo permission).
Replace the port number 7860 in `server localhost:7860` with the port where you deploy the Gradio web server.
Modify `upstream websocket` to configure Gradio servers behind the gateway.
Lastly, update Nginx.
### HTTPS Deployment with a Public Domain URL
Make sure you obtain the HTTPS certificate and the private key used to generate the certificate.
Fill the addresses to your certificate and private key in the `[PATH_TO_SSL_CERT]` and `[PATH_TO_PRIVATE_KEY]` fields.
If you have your own domain url to serve the chatbot, replace the chat.lmsys.org url with your own domain url.
### Updating
Every time when `/etc/nginx/nginx.conf` is modified, you need to update the Nginx service:
```bash
sudo nginx -t # check `/etc/nginx/nginx.conf`
sudo systemctl reload nginx # restart Nginx service to load the new config
sudo systemctl status nginx # check the status of the Nginx service. It should be active (running).
```
user www-data;
worker_processes auto;
pid /run/nginx.pid;
include /etc/nginx/modules-enabled/*.conf;
events {
worker_connections 1024; # maximum number of connections that a worker process can handle concurrently
# multi_accept on; # enabling multi_accept can help improve performance under high load, but may increase the number of simultaneous connections that a worker process can handle
}
http {
##
# Basic Settings
##
sendfile on; # enable sendfile for performance optimization
tcp_nopush on; # enable TCP no-pushing
tcp_nodelay on; # enable TCP no-delay
keepalive_timeout 65; # sets the timeout for keep-alive connections
types_hash_max_size 2048; # maximum size of the types hash table
# server_tokens off; # disable server token (i.e., server signature) in response headers to improve security
# server_names_hash_bucket_size 64;
# server_name_in_redirect off;
include /etc/nginx/mime.types; # include MIME types file
default_type application/octet-stream; # default MIME type for unknown file types
##
# SSL Settings
##
ssl_protocols TLSv1.2; # specify SSL/TLS protocols to use
ssl_prefer_server_ciphers on; # prefer server ciphers over client ciphers
##
# Logging Settings
##
access_log /var/log/nginx/access.log; # path to access log file
error_log /var/log/nginx/error.log; # path to error log file
##
# Gzip Settings
##
gzip on; # enable Gzip compression
##
# Virtual Host Configs
##
include /etc/nginx/conf.d/*.conf; # include all configuration files in conf.d directory
include /etc/nginx/sites-enabled/*; # include all enabled sites configuration files
# WebSocket Proxy: https://www.nginx.com/blog/websocket-nginx/
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
ip_hash; # load balancing by IP to guarantee session persistence
server localhost:7860; # The port should be the gradio web server port
# server localhost:7861; # extra gradio server if more than one
}
limit_conn_status 429;
limit_conn_zone $binary_remote_addr zone=perip:10m; # limit number of connections per IP
limit_conn_zone $server_name zone=perserver:10m; # limit number of connections per server
server {
listen 443 ssl; # the listening port of our server
ssl_certificate [PATH_TO_SSL_CERT];
ssl_certificate_key [PATH_TO_PRIVATE_KEY];
server_name chat.lmsys.org; # replace the url with your own domain url
limit_conn perserver 1024; # connections per server
location / {
proxy_pass http://websocket; # proxy all requests to the defined upstream server
limit_conn perip 5; # connections per IP
proxy_set_header Host $host; # set the Host header for the upstream server
proxy_set_header X-Real-IP $remote_addr; # set the client IP address as the real IP for the upstream server
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # set the client IP addresses in the X-Forwarded-For header
proxy_http_version 1.1; # use HTTP version 1.1 for upstream communication
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade"; # set the Connection header to Upgrade to enable WebSocket communication
}
}
# the following block routes all HTTP traffic to HTTPS via nginx
server {
listen 80;
server_name chat.lmsys.org;
return 301 https://chat.lmsys.org$request_uri;
}
}
code_highlight_css = (
"""
#chatbot .hll { background-color: #ffffcc }
#chatbot .c { color: #408080; font-style: italic }
#chatbot .err { border: 1px solid #FF0000 }
#chatbot .k { color: #008000; font-weight: bold }
#chatbot .o { color: #666666 }
#chatbot .ch { color: #408080; font-style: italic }
#chatbot .cm { color: #408080; font-style: italic }
#chatbot .cp { color: #BC7A00 }
#chatbot .cpf { color: #408080; font-style: italic }
#chatbot .c1 { color: #408080; font-style: italic }
#chatbot .cs { color: #408080; font-style: italic }
#chatbot .gd { color: #A00000 }
#chatbot .ge { font-style: italic }
#chatbot .gr { color: #FF0000 }
#chatbot .gh { color: #000080; font-weight: bold }
#chatbot .gi { color: #00A000 }
#chatbot .go { color: #888888 }
#chatbot .gp { color: #000080; font-weight: bold }
#chatbot .gs { font-weight: bold }
#chatbot .gu { color: #800080; font-weight: bold }
#chatbot .gt { color: #0044DD }
#chatbot .kc { color: #008000; font-weight: bold }
#chatbot .kd { color: #008000; font-weight: bold }
#chatbot .kn { color: #008000; font-weight: bold }
#chatbot .kp { color: #008000 }
#chatbot .kr { color: #008000; font-weight: bold }
#chatbot .kt { color: #B00040 }
#chatbot .m { color: #666666 }
#chatbot .s { color: #BA2121 }
#chatbot .na { color: #7D9029 }
#chatbot .nb { color: #008000 }
#chatbot .nc { color: #0000FF; font-weight: bold }
#chatbot .no { color: #880000 }
#chatbot .nd { color: #AA22FF }
#chatbot .ni { color: #999999; font-weight: bold }
#chatbot .ne { color: #D2413A; font-weight: bold }
#chatbot .nf { color: #0000FF }
#chatbot .nl { color: #A0A000 }
#chatbot .nn { color: #0000FF; font-weight: bold }
#chatbot .nt { color: #008000; font-weight: bold }
#chatbot .nv { color: #19177C }
#chatbot .ow { color: #AA22FF; font-weight: bold }
#chatbot .w { color: #bbbbbb }
#chatbot .mb { color: #666666 }
#chatbot .mf { color: #666666 }
#chatbot .mh { color: #666666 }
#chatbot .mi { color: #666666 }
#chatbot .mo { color: #666666 }
#chatbot .sa { color: #BA2121 }
#chatbot .sb { color: #BA2121 }
#chatbot .sc { color: #BA2121 }
#chatbot .dl { color: #BA2121 }
#chatbot .sd { color: #BA2121; font-style: italic }
#chatbot .s2 { color: #BA2121 }
#chatbot .se { color: #BB6622; font-weight: bold }
#chatbot .sh { color: #BA2121 }
#chatbot .si { color: #BB6688; font-weight: bold }
#chatbot .sx { color: #008000 }
#chatbot .sr { color: #BB6688 }
#chatbot .s1 { color: #BA2121 }
#chatbot .ss { color: #19177C }
#chatbot .bp { color: #008000 }
#chatbot .fm { color: #0000FF }
#chatbot .vc { color: #19177C }
#chatbot .vg { color: #19177C }
#chatbot .vi { color: #19177C }
#chatbot .vm { color: #19177C }
#chatbot .il { color: #666666 }
""")
#.highlight { background: #f8f8f8; }
"""
Adopted from https://github.com/gradio-app/gradio/blob/main/gradio/components.py
Fix a markdown render problem.
"""
from __future__ import annotations
from gradio.components import *
from markdown2 import Markdown
class _Keywords(Enum):
NO_VALUE = "NO_VALUE" # Used as a sentinel to determine if nothing is provided as a argument for `value` in `Component.update()`
FINISHED_ITERATING = "FINISHED_ITERATING" # Used to skip processing of a component's value (needed for generators + state)
@document("style")
class Chatbot(Changeable, Selectable, IOComponent, JSONSerializable):
"""
Displays a chatbot output showing both user submitted messages and responses. Supports a subset of Markdown including bold, italics, code, and images.
Preprocessing: this component does *not* accept input.
Postprocessing: expects function to return a {List[Tuple[str | None | Tuple, str | None | Tuple]]}, a list of tuples with user message and response messages. Messages should be strings, tuples, or Nones. If the message is a string, it can include Markdown. If it is a tuple, it should consist of (string filepath to image/video/audio, [optional string alt text]). Messages that are `None` are not displayed.
Demos: chatbot_simple, chatbot_multimodal
"""
def __init__(
self,
value: List[Tuple[str | None, str | None]] | Callable | None = None,
color_map: Dict[str, str] | None = None, # Parameter moved to Chatbot.style()
*,
label: str | None = None,
every: float | None = None,
show_label: bool = True,
visible: bool = True,
elem_id: str | None = None,
elem_classes: List[str] | str | None = None,
**kwargs,
):
"""
Parameters:
value: Default value to show in chatbot. If callable, the function will be called whenever the app loads to set the initial value of the component.
label: component name in interface.
every: If `value` is a callable, run the function 'every' number of seconds while the client connection is open. Has no effect otherwise. Queue must be enabled. The event can be accessed (e.g. to cancel it) via this component's .load_event attribute.
show_label: if True, will display label.
visible: If False, component will be hidden.
elem_id: An optional string that is assigned as the id of this component in the HTML DOM. Can be used for targeting CSS styles.
elem_classes: An optional list of strings that are assigned as the classes of this component in the HTML DOM. Can be used for targeting CSS styles.
"""
if color_map is not None:
warnings.warn(
"The 'color_map' parameter has been deprecated.",
)
#self.md = utils.get_markdown_parser()
self.md = Markdown(extras=["fenced-code-blocks", "tables", "break-on-newline"])
self.select: EventListenerMethod
"""
Event listener for when the user selects message from Chatbot.
Uses event data gradio.SelectData to carry `value` referring to text of selected message, and `index` tuple to refer to [message, participant] index.
See EventData documentation on how to use this event data.
"""
IOComponent.__init__(
self,
label=label,
every=every,
show_label=show_label,
visible=visible,
elem_id=elem_id,
elem_classes=elem_classes,
value=value,
**kwargs,
)
def get_config(self):
return {
"value": self.value,
"selectable": self.selectable,
**IOComponent.get_config(self),
}
@staticmethod
def update(
value: Any | Literal[_Keywords.NO_VALUE] | None = _Keywords.NO_VALUE,
label: str | None = None,
show_label: bool | None = None,
visible: bool | None = None,
):
updated_config = {
"label": label,
"show_label": show_label,
"visible": visible,
"value": value,
"__type__": "update",
}
return updated_config
def _process_chat_messages(
self, chat_message: str | Tuple | List | Dict | None
) -> str | Dict | None:
if chat_message is None:
return None
elif isinstance(chat_message, (tuple, list)):
mime_type = processing_utils.get_mimetype(chat_message[0])
return {
"name": chat_message[0],
"mime_type": mime_type,
"alt_text": chat_message[1] if len(chat_message) > 1 else None,
"data": None, # These last two fields are filled in by the frontend
"is_file": True,
}
elif isinstance(
chat_message, dict
): # This happens for previously processed messages
return chat_message
elif isinstance(chat_message, str):
#return self.md.render(chat_message)
return str(self.md.convert(chat_message))
else:
raise ValueError(f"Invalid message for Chatbot component: {chat_message}")
def postprocess(
self,
y: List[
Tuple[str | Tuple | List | Dict | None, str | Tuple | List | Dict | None]
],
) -> List[Tuple[str | Dict | None, str | Dict | None]]:
"""
Parameters:
y: List of tuples representing the message and response pairs. Each message and response should be a string, which may be in Markdown format. It can also be a tuple whose first element is a string filepath or URL to an image/video/audio, and second (optional) element is the alt text, in which case the media file is displayed. It can also be None, in which case that message is not displayed.
Returns:
List of tuples representing the message and response. Each message and response will be a string of HTML, or a dictionary with media information.
"""
if y is None:
return []
processed_messages = []
for message_pair in y:
assert isinstance(
message_pair, (tuple, list)
), f"Expected a list of lists or list of tuples. Received: {message_pair}"
assert (
len(message_pair) == 2
), f"Expected a list of lists of length 2 or list of tuples of length 2. Received: {message_pair}"
processed_messages.append(
(
#self._process_chat_messages(message_pair[0]),
'<pre style="font-family: var(--font)">' +
message_pair[0] + "</pre>",
self._process_chat_messages(message_pair[1]),
)
)
return processed_messages
def style(self, height: int | None = None, **kwargs):
"""
This method can be used to change the appearance of the Chatbot component.
"""
if height is not None:
self._style["height"] = height
if kwargs.get("color_map") is not None:
warnings.warn("The 'color_map' parameter has been deprecated.")
Component.style(
self,
**kwargs,
)
return self
import argparse
from collections import defaultdict
import datetime
import json
import os
import time
import uuid
import gradio as gr
import requests
from fastchat.conversation import (default_conversation, conv_templates,
SeparatorStyle)
from fastchat.constants import LOGDIR
from fastchat.utils import (build_logger, server_error_msg,
violates_moderation, moderation_msg)
from fastchat.serve.gradio_patch import Chatbot as grChatbot
from fastchat.serve.gradio_css import code_highlight_css
logger = build_logger("gradio_web_server", "gradio_web_server.log")
headers = {"User-Agent": "fastchat Client"}
no_change_btn = gr.Button.update()
enable_btn = gr.Button.update(interactive=True)
disable_btn = gr.Button.update(interactive=False)
priority = {
"vicuna-13b": "aaaaaaa",
"koala-13b": "aaaaaab",
}
def get_conv_log_filename():
t = datetime.datetime.now()
name = os.path.join(LOGDIR, f"{t.year}-{t.month:02d}-{t.day:02d}-conv.json")
return name
def get_model_list():
ret = requests.post(args.controller_url + "/refresh_all_workers")
assert ret.status_code == 200
ret = requests.post(args.controller_url + "/list_models")
models = ret.json()["models"]
models.sort(key=lambda x: priority.get(x, x))
logger.info(f"Models: {models}")
return models
get_window_url_params = """
function() {
const params = new URLSearchParams(window.location.search);
url_params = Object.fromEntries(params);
console.log(url_params);
return url_params;
}
"""
def load_demo(url_params, request: gr.Request):
logger.info(f"load_demo. ip: {request.client.host}. params: {url_params}")
dropdown_update = gr.Dropdown.update(visible=True)
if "model" in url_params:
model = url_params["model"]
if model in models:
dropdown_update = gr.Dropdown.update(
value=model, visible=True)
state = default_conversation.copy()
return (state,
dropdown_update,
gr.Chatbot.update(visible=True),
gr.Textbox.update(visible=True),
gr.Button.update(visible=True),
gr.Row.update(visible=True),
gr.Accordion.update(visible=True))
def load_demo_refresh_model_list(request: gr.Request):
logger.info(f"load_demo. ip: {request.client.host}")
models = get_model_list()
state = default_conversation.copy()
return (state, gr.Dropdown.update(
choices=models,
value=models[0] if len(models) > 0 else ""),
gr.Chatbot.update(visible=True),
gr.Textbox.update(visible=True),
gr.Button.update(visible=True),
gr.Row.update(visible=True),
gr.Accordion.update(visible=True))
def vote_last_response(state, vote_type, model_selector, request: gr.Request):
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(time.time(), 4),
"type": vote_type,
"model": model_selector,
"state": state.dict(),
"ip": request.client.host,
}
fout.write(json.dumps(data) + "\n")
def upvote_last_response(state, model_selector, request: gr.Request):
logger.info(f"upvote. ip: {request.client.host}")
vote_last_response(state, "upvote", model_selector, request)
return ("",) + (disable_btn,) * 3
def downvote_last_response(state, model_selector, request: gr.Request):
logger.info(f"downvote. ip: {request.client.host}")
vote_last_response(state, "downvote", model_selector, request)
return ("",) + (disable_btn,) * 3
def flag_last_response(state, model_selector, request: gr.Request):
logger.info(f"flag. ip: {request.client.host}")
vote_last_response(state, "flag", model_selector, request)
return ("",) + (disable_btn,) * 3
def regenerate(state, request: gr.Request):
logger.info(f"regenerate. ip: {request.client.host}")
state.messages[-1][-1] = None
state.skip_next = False
return (state, state.to_gradio_chatbot(), "") + (disable_btn,) * 5
def clear_history(request: gr.Request):
logger.info(f"clear_history. ip: {request.client.host}")
state = default_conversation.copy()
return (state, state.to_gradio_chatbot(), "") + (disable_btn,) * 5
def add_text(state, text, request: gr.Request):
logger.info(f"add_text. ip: {request.client.host}. len: {len(text)}")
if len(text) <= 0:
state.skip_next = True
return (state, state.to_gradio_chatbot(), "") + (no_change_btn,) * 5
if args.moderate:
flagged = violates_moderation(text)
if flagged:
state.skip_next = True
return (state, state.to_gradio_chatbot(), moderation_msg) + (
no_change_btn,) * 5
text = text[:1536] # Hard cut-off
state.append_message(state.roles[0], text)
state.append_message(state.roles[1], None)
state.skip_next = False
return (state, state.to_gradio_chatbot(), "") + (disable_btn,) * 5
def post_process_code(code):
sep = "\n```"
if sep in code:
blocks = code.split(sep)
if len(blocks) % 2 == 1:
for i in range(1, len(blocks), 2):
blocks[i] = blocks[i].replace("\\_", "_")
code = sep.join(blocks)
return code
def http_bot(state, model_selector, temperature, max_new_tokens, request: gr.Request):
logger.info(f"http_bot. ip: {request.client.host}")
start_tstamp = time.time()
model_name = model_selector
if state.skip_next:
# This generate call is skipped due to invalid inputs
yield (state, state.to_gradio_chatbot()) + (no_change_btn,) * 5
return
if len(state.messages) == state.offset + 2:
# First round of conversation
if "koala" in model_name: # Hardcode the condition
template_name = "bair_v1"
else:
template_name = "v1"
new_state = conv_templates[template_name].copy()
new_state.conv_id = uuid.uuid4().hex
new_state.append_message(new_state.roles[0], state.messages[-2][1])
new_state.append_message(new_state.roles[1], None)
state = new_state
# Query worker address
controller_url = args.controller_url
ret = requests.post(controller_url + "/get_worker_address",
json={"model": model_name})
worker_addr = ret.json()["address"]
logger.info(f"model_name: {model_name}, worker_addr: {worker_addr}")
# No available worker
if worker_addr == "":
state.messages[-1][-1] = server_error_msg
yield (state, state.to_gradio_chatbot(), disable_btn, disable_btn, disable_btn, enable_btn, enable_btn)
return
# Construct prompt
if "chatglm" in model_name:
prompt = state.messages[state.offset:]
skip_echo_len = len(state.messages[-2][1]) + 1
else:
prompt = state.get_prompt()
skip_echo_len = len(prompt) + 1
# Make requests
pload = {
"model": model_name,
"prompt": prompt,
"temperature": float(temperature),
"max_new_tokens": int(max_new_tokens),
"stop": state.sep if state.sep_style == SeparatorStyle.SINGLE else state.sep2,
}
logger.info(f"==== request ====\n{pload}")
state.messages[-1][-1] = "▌"
yield (state, state.to_gradio_chatbot()) + (disable_btn,) * 5
try:
# Stream output
response = requests.post(worker_addr + "/worker_generate_stream",
headers=headers, json=pload, stream=True, timeout=10)
for chunk in response.iter_lines(decode_unicode=False, delimiter=b"\0"):
if chunk:
data = json.loads(chunk.decode())
if data["error_code"] == 0:
output = data["text"][skip_echo_len:].strip()
output = post_process_code(output)
state.messages[-1][-1] = output + "▌"
yield (state, state.to_gradio_chatbot()) + (disable_btn,) * 5
else:
output = data["text"] + f" (error_code: {data['error_code']})"
state.messages[-1][-1] = output
yield (state, state.to_gradio_chatbot()) + (disable_btn, disable_btn, disable_btn, enable_btn, enable_btn)
return
time.sleep(0.02)
except requests.exceptions.RequestException as e:
state.messages[-1][-1] = server_error_msg + f" (error_code: 4)"
yield (state, state.to_gradio_chatbot()) + (disable_btn, disable_btn, disable_btn, enable_btn, enable_btn)
return
state.messages[-1][-1] = state.messages[-1][-1][:-1]
yield (state, state.to_gradio_chatbot()) + (enable_btn,) * 5
finish_tstamp = time.time()
logger.info(f"{output}")
with open(get_conv_log_filename(), "a") as fout:
data = {
"tstamp": round(finish_tstamp, 4),
"type": "chat",
"model": model_name,
"start": round(start_tstamp, 4),
"finish": round(start_tstamp, 4),
"state": state.dict(),
"ip": request.client.host,
}
fout.write(json.dumps(data) + "\n")
notice_markdown = ("""
# 🏔️ Chat with Open Large Language Models
- Vicuna: An Open-Source Chatbot Impressing GPT-4 with 90% ChatGPT Quality. [[Blog post]](https://vicuna.lmsys.org) [[GitHub]](https://github.com/lm-sys/FastChat)
- Koala: A Dialogue Model for Academic Research. [[Blog post]](https://bair.berkeley.edu/blog/2023/04/03/koala/) [[GitHub]](https://github.com/young-geng/EasyLM)
- This demo server. [[GitHub]](https://github.com/lm-sys/FastChat)
### Terms of use
By using this service, users are required to agree to the following terms: The service is a research preview intended for non-commercial use only. It only provides limited safety measures and may generate offensive content. It must not be used for any illegal, harmful, violent, racist, or sexual purposes. The service may collect user dialogue data for future research.
### Choose a model to chat with
- [Vicuna](https://vicuna.lmsys.org): a chat assistant fine-tuned from LLaMA on user-shared conversations. This one is expected to perform best according to our evaluation.
- [Koala](https://bair.berkeley.edu/blog/2023/04/03/koala/): a chatbot fine-tuned from LLaMA on user-shared conversations and open-source datasets. This one performs similarly to Vicuna.
- [ChatGLM](https://chatglm.cn/blog): an open bilingual dialogue language model | 开源双语对话语言模型
- [Alpaca](https://crfm.stanford.edu/2023/03/13/alpaca.html): a model fine-tuned from LLaMA on 52K instruction-following demonstrations.
- [LLaMA](https://arxiv.org/abs/2302.13971): open and efficient foundation language models
Note: If you are waiting in the queue, check out more benchmark results from GPT-4 on a static website [here](https://vicuna.lmsys.org/eval).
""")
learn_more_markdown = ("""
### License
The service is a research preview intended for non-commercial use only, subject to the model [License](https://github.com/facebookresearch/llama/blob/main/MODEL_CARD.md) of LLaMA, [Terms of Use](https://openai.com/policies/terms-of-use) of the data generated by OpenAI, and [Privacy Practices](https://chrome.google.com/webstore/detail/sharegpt-share-your-chatg/daiacboceoaocpibfodeljbdfacokfjb) of ShareGPT. Please contact us if you find any potential violation.
""")
css = code_highlight_css + """
pre {
white-space: pre-wrap; /* Since CSS 2.1 */
white-space: -moz-pre-wrap; /* Mozilla, since 1999 */
white-space: -pre-wrap; /* Opera 4-6 */
white-space: -o-pre-wrap; /* Opera 7 */
word-wrap: break-word; /* Internet Explorer 5.5+ */
}
"""
def build_demo():
with gr.Blocks(title="FastChat", theme=gr.themes.Base(), css=css) as demo:
state = gr.State()
# Draw layout
notice = gr.Markdown(notice_markdown)
with gr.Row(elem_id="model_selector_row"):
model_selector = gr.Dropdown(
choices=models,
value=models[0] if len(models) > 0 else "",
interactive=True,
show_label=False).style(container=False)
chatbot = grChatbot(elem_id="chatbot", visible=False).style(height=550)
with gr.Row():
with gr.Column(scale=20):
textbox = gr.Textbox(show_label=False,
placeholder="Enter text and press ENTER", visible=False).style(container=False)
with gr.Column(scale=1, min_width=50):
submit_btn = gr.Button(value="Send", visible=False)
with gr.Row(visible=False) as button_row:
upvote_btn = gr.Button(value="👍 Upvote", interactive=False)
downvote_btn = gr.Button(value="👎 Downvote", interactive=False)
flag_btn = gr.Button(value="⚠️ Flag", interactive=False)
#stop_btn = gr.Button(value="⏹️ Stop Generation", interactive=False)
regenerate_btn = gr.Button(value="🔄 Regenerate", interactive=False)
clear_btn = gr.Button(value="🗑️ Clear history", interactive=False)
with gr.Accordion("Parameters", open=False, visible=False) as parameter_row:
temperature = gr.Slider(minimum=0.0, maximum=1.0, value=0.7, step=0.1, interactive=True, label="Temperature",)
max_output_tokens = gr.Slider(minimum=0, maximum=1024, value=512, step=64, interactive=True, label="Max output tokens",)
gr.Markdown(learn_more_markdown)
url_params = gr.JSON(visible=False)
# Register listeners
btn_list = [upvote_btn, downvote_btn, flag_btn, regenerate_btn, clear_btn]
upvote_btn.click(upvote_last_response,
[state, model_selector], [textbox, upvote_btn, downvote_btn, flag_btn])
downvote_btn.click(downvote_last_response,
[state, model_selector], [textbox, upvote_btn, downvote_btn, flag_btn])
flag_btn.click(flag_last_response,
[state, model_selector], [textbox, upvote_btn, downvote_btn, flag_btn])
regenerate_btn.click(regenerate, state,
[state, chatbot, textbox] + btn_list).then(
http_bot, [state, model_selector, temperature, max_output_tokens],
[state, chatbot] + btn_list)
clear_btn.click(clear_history, None, [state, chatbot, textbox] + btn_list)
textbox.submit(add_text, [state, textbox], [state, chatbot, textbox] + btn_list
).then(http_bot, [state, model_selector, temperature, max_output_tokens],
[state, chatbot] + btn_list)
submit_btn.click(add_text, [state, textbox], [state, chatbot, textbox] + btn_list
).then(http_bot, [state, model_selector, temperature, max_output_tokens],
[state, chatbot] + btn_list)
if args.model_list_mode == "once":
demo.load(load_demo, [url_params], [state, model_selector,
chatbot, textbox, submit_btn, button_row, parameter_row],
_js=get_window_url_params)
elif args.model_list_mode == "reload":
demo.load(load_demo_refresh_model_list, None, [state, model_selector,
chatbot, textbox, submit_btn, button_row, parameter_row])
else:
raise ValueError(f"Unknown model list mode: {args.model_list_mode}")
return demo
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int)
parser.add_argument("--controller-url", type=str, default="http://localhost:21001")
parser.add_argument("--concurrency-count", type=int, default=10)
parser.add_argument("--model-list-mode", type=str, default="once",
choices=["once", "reload"])
parser.add_argument("--share", action="store_true")
parser.add_argument("--moderate", action="store_true",
help="Enable content moderation")
args = parser.parse_args()
logger.info(f"args: {args}")
models = get_model_list()
logger.info(args)
demo = build_demo()
demo.queue(concurrency_count=args.concurrency_count, status_update_rate=10,
api_open=False).launch(server_name=args.host, server_port=args.port,
share=args.share, max_threads=200)
"""Inference for FastChat models."""
import abc
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, LlamaTokenizer, AutoModel
from fastchat.conversation import conv_templates, SeparatorStyle
from fastchat.serve.compression import compress_module
from fastchat.serve.monkey_patch_non_inplace import replace_llama_attn_with_non_inplace_operations
from fastchat.serve.serve_chatglm import chatglm_generate_stream
def load_model(model_name, device, num_gpus, load_8bit=False, debug=False):
if device == "cpu":
kwargs = {}
elif device == "cuda":
kwargs = {"torch_dtype": torch.float16}
if num_gpus == "auto":
kwargs["device_map"] = "auto"
else:
num_gpus = int(num_gpus)
if num_gpus != 1:
kwargs.update({
"device_map": "auto",
"max_memory": {i: "13GiB" for i in range(num_gpus)},
})
elif device == "mps":
kwargs = {"torch_dtype": torch.float16}
# Avoid bugs in mps backend by not using in-place operations.
replace_llama_attn_with_non_inplace_operations()
else:
raise ValueError(f"Invalid device: {device}")
if "chatglm" in model_name:
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True)
model = AutoModel.from_pretrained(model_name, trust_remote_code=True).half().cuda()
else:
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(model_name,
low_cpu_mem_usage=True, **kwargs)
if load_8bit:
compress_module(model, device)
if (device == "cuda" and num_gpus == 1) or device == "mps":
model.to(device)
if debug:
print(model)
return model, tokenizer
@torch.inference_mode()
def generate_stream(model, tokenizer, params, device,
context_len=2048, stream_interval=2):
prompt = params["prompt"]
l_prompt = len(prompt)
temperature = float(params.get("temperature", 1.0))
max_new_tokens = int(params.get("max_new_tokens", 256))
stop_str = params.get("stop", None)
input_ids = tokenizer(prompt).input_ids
output_ids = list(input_ids)
max_src_len = context_len - max_new_tokens - 8
input_ids = input_ids[-max_src_len:]
for i in range(max_new_tokens):
if i == 0:
out = model(
torch.as_tensor([input_ids], device=device), use_cache=True)
logits = out.logits
past_key_values = out.past_key_values
else:
attention_mask = torch.ones(
1, past_key_values[0][0].shape[-2] + 1, device=device)
out = model(input_ids=torch.as_tensor([[token]], device=device),
use_cache=True,
attention_mask=attention_mask,
past_key_values=past_key_values)
logits = out.logits
past_key_values = out.past_key_values
last_token_logits = logits[0][-1]
if device == "mps":
# Switch to CPU by avoiding some bugs in mps backend.
last_token_logits = last_token_logits.float().to("cpu")
if temperature < 1e-4:
token = int(torch.argmax(last_token_logits))
else:
probs = torch.softmax(last_token_logits / temperature, dim=-1)
token = int(torch.multinomial(probs, num_samples=1))
output_ids.append(token)
if token == tokenizer.eos_token_id:
stopped = True
else:
stopped = False
if i % stream_interval == 0 or i == max_new_tokens - 1 or stopped:
output = tokenizer.decode(output_ids, skip_special_tokens=True)
pos = output.rfind(stop_str, l_prompt)
if pos != -1:
output = output[:pos]
stopped = True
yield output
if stopped:
break
del past_key_values
class ChatIO(abc.ABC):
@abc.abstractmethod
def prompt_for_input(self, role: str) -> str:
"""Prompt for input from a role."""
@abc.abstractmethod
def prompt_for_output(self, role: str):
"""Prompt for output from a role."""
@abc.abstractmethod
def stream_output(self, output_stream, skip_echo_len: int):
"""Stream output."""
def chat_loop(model_name: str, device: str, num_gpus: str, load_8bit: bool,
conv_template: str, temperature: float, max_new_tokens: int,
chatio: ChatIO, debug: bool):
# Model
model, tokenizer = load_model(model_name, device,
num_gpus, load_8bit, debug)
is_chatglm = "chatglm" in str(type(model)).lower()
# Chat
conv = conv_templates[conv_template].copy()
while True:
try:
inp = chatio.prompt_for_input(conv.roles[0])
except EOFError:
inp = ""
if not inp:
print("exit...")
break
conv.append_message(conv.roles[0], inp)
conv.append_message(conv.roles[1], None)
if is_chatglm:
prompt = conv.messages[conv.offset:]
generate_stream_func = chatglm_generate_stream
skip_echo_len = len(conv.messages[-2][1]) + 1
else:
generate_stream_func = generate_stream
prompt = conv.get_prompt()
skip_echo_len = len(prompt) + 1
params = {
"model": model_name,
"prompt": prompt,
"temperature": temperature,
"max_new_tokens": max_new_tokens,
"stop": conv.sep if conv.sep_style == SeparatorStyle.SINGLE else conv.sep2,
}
chatio.prompt_for_output(conv.roles[1])
output_stream = generate_stream_func(model, tokenizer, params, device)
outputs = chatio.stream_output(output_stream, skip_echo_len)
conv.messages[-1][-1] = " ".join(outputs)
if debug:
print("\n", {"prompt": prompt, "outputs": outputs}, "\n")
"""
A model worker executes the model.
"""
import argparse
import asyncio
import dataclasses
import logging
import json
import time
from typing import List, Union
import threading
import uuid
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import StreamingResponse
import requests
from transformers import AutoTokenizer, AutoModelForCausalLM, LlamaTokenizer
import torch
import uvicorn
from fastchat.constants import WORKER_HEART_BEAT_INTERVAL
from fastchat.serve.inference import load_model, generate_stream
from fastchat.serve.serve_chatglm import chatglm_generate_stream
from fastchat.utils import (build_logger, server_error_msg,
pretty_print_semaphore)
GB = 1 << 30
worker_id = str(uuid.uuid4())[:6]
logger = build_logger("model_worker", f"model_worker_{worker_id}.log")
global_counter = 0
model_semaphore = None
def heart_beat_worker(controller):
while True:
time.sleep(WORKER_HEART_BEAT_INTERVAL)
controller.send_heart_beat()
class ModelWorker:
def __init__(self, controller_addr, worker_addr,
worker_id, no_register, model_path, model_name,
device, num_gpus, load_8bit=False):
self.controller_addr = controller_addr
self.worker_addr = worker_addr
self.worker_id = worker_id
if model_path.endswith("/"):
model_path = model_path[:-1]
self.model_name = model_name or model_path.split("/")[-1]
self.device = device
logger.info(f"Loading the model {self.model_name} on worker {worker_id} ...")
self.model, self.tokenizer = load_model(
model_path, device, num_gpus, load_8bit)
if hasattr(self.model.config, "max_sequence_length"):
self.context_len = self.model.config.max_sequence_length
elif hasattr(self.model.config, "max_position_embeddings"):
self.context_len = self.model.config.max_position_embeddings
else:
self.context_len = 2048
is_chatglm = "chatglm" in str(type(self.model)).lower()
if is_chatglm:
self.generate_stream_func = chatglm_generate_stream
else:
self.generate_stream_func = generate_stream
if not no_register:
self.register_to_controller()
self.heart_beat_thread = threading.Thread(
target=heart_beat_worker, args=(self,))
self.heart_beat_thread.start()
def register_to_controller(self):
logger.info("Register to controller")
url = self.controller_addr + "/register_worker"
data = {
"worker_name": self.worker_addr,
"check_heart_beat": True,
"worker_status": self.get_status()
}
r = requests.post(url, json=data)
assert r.status_code == 200
def send_heart_beat(self):
logger.info(f"Send heart beat. Models: {[self.model_name]}. "
f"Semaphore: {pretty_print_semaphore(model_semaphore)}. "
f"global_counter: {global_counter}")
url = self.controller_addr + "/receive_heart_beat"
while True:
try:
ret = requests.post(url, json={
"worker_name": self.worker_addr,
"queue_length": self.get_queue_length()}, timeout=5)
exist = ret.json()["exist"]
break
except requests.exceptions.RequestException as e:
logger.error(f"heart beat error: {e}")
time.sleep(5)
if not exist:
self.register_to_controller()
def get_queue_length(self):
if model_semaphore is None:
return 0
else:
return args.limit_model_concurrency - model_semaphore._value + len(
model_semaphore._waiters)
def get_status(self):
return {
"model_names": [self.model_name],
"speed": 1,
"queue_length": self.get_queue_length(),
}
def generate_stream_gate(self, params):
try:
for output in self.generate_stream_func(self.model, self.tokenizer,
params, self.device, self.context_len, args.stream_interval):
ret = {
"text": output,
"error_code": 0,
}
yield json.dumps(ret).encode() + b"\0"
except torch.cuda.OutOfMemoryError:
ret = {
"text": server_error_msg,
"error_code": 1,
}
yield json.dumps(ret).encode() + b"\0"
app = FastAPI()
def release_model_semaphore():
model_semaphore.release()
@app.post("/worker_generate_stream")
async def api_generate_stream(request: Request):
global model_semaphore, global_counter
global_counter += 1
params = await request.json()
if model_semaphore is None:
model_semaphore = asyncio.Semaphore(args.limit_model_concurrency)
await model_semaphore.acquire()
generator = worker.generate_stream_gate(params)
background_tasks = BackgroundTasks()
background_tasks.add_task(release_model_semaphore)
return StreamingResponse(generator, background=background_tasks)
@app.post("/worker_get_status")
async def api_get_status(request: Request):
return worker.get_status()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", type=str, default="localhost")
parser.add_argument("--port", type=int, default=21002)
parser.add_argument("--worker-address", type=str,
default="http://localhost:21002")
parser.add_argument("--controller-address", type=str,
default="http://localhost:21001")
parser.add_argument("--model-path", type=str, default="facebook/opt-350m")
parser.add_argument("--model-name", type=str)
parser.add_argument("--device", type=str, choices=["cpu", "cuda", "mps"], default="cuda")
parser.add_argument("--num-gpus", type=int, default=1)
parser.add_argument("--load-8bit", action="store_true")
parser.add_argument("--limit-model-concurrency", type=int, default=5)
parser.add_argument("--stream-interval", type=int, default=2)
parser.add_argument("--no-register", action="store_true")
args = parser.parse_args()
logger.info(f"args: {args}")
worker = ModelWorker(args.controller_address,
args.worker_address,
worker_id,
args.no_register,
args.model_path,
args.model_name,
args.device,
args.num_gpus,
args.load_8bit)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
"""
Monkey patch the llama implementation in the huggingface/transformers library.
Avoid bugs in mps backend by not using in-place operations.
"""
import math
from typing import List, Optional, Tuple
import torch
from torch import nn
import transformers
def rotate_half(x):
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2].clone()
x2 = x[..., x.shape[-1] // 2 :].clone()
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(q, k, cos, sin, position_ids):
gather_indices = position_ids[:, None, :, None] # [bs, 1, seq_len, 1]
gather_indices = gather_indices.repeat(1, cos.shape[1], 1, cos.shape[3])
cos = torch.gather(cos.repeat(gather_indices.shape[0], 1, 1, 1), 2, gather_indices)
sin = torch.gather(sin.repeat(gather_indices.shape[0], 1, 1, 1), 2, gather_indices)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
output_attentions: bool = False,
use_cache: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
bsz, q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
key_states = self.k_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
value_states = self.v_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
kv_seq_len = key_states.shape[-2]
if past_key_value is not None:
kv_seq_len += past_key_value[0].shape[-2]
cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len)
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
# [bsz, nh, t, hd]
if past_key_value is not None:
# reuse k, v, self_attention
key_states = torch.cat([past_key_value[0], key_states], dim=2)
value_states = torch.cat([past_key_value[1], value_states], dim=2)
past_key_value = (key_states, value_states) if use_cache else None
attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
if attn_weights.size() != (bsz, self.num_heads, q_len, kv_seq_len):
raise ValueError(
f"Attention weights should be of size {(bsz * self.num_heads, q_len, kv_seq_len)}, but is"
f" {attn_weights.size()}"
)
if attention_mask is not None:
if attention_mask.size() != (bsz, 1, q_len, kv_seq_len):
raise ValueError(
f"Attention mask should be of size {(bsz, 1, q_len, kv_seq_len)}, but is {attention_mask.size()}"
)
attn_weights = attn_weights + attention_mask
attn_weights = torch.max(attn_weights, torch.tensor(torch.finfo(attn_weights.dtype).min))
# upcast attention to fp32
attn_weights = nn.functional.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
attn_output = torch.matmul(attn_weights, value_states)
if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim):
raise ValueError(
f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is"
f" {attn_output.size()}"
)
attn_output = attn_output.transpose(1, 2)
attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)
attn_output = self.o_proj(attn_output)
if not output_attentions:
attn_weights = None
return attn_output, attn_weights, past_key_value
def replace_llama_attn_with_non_inplace_operations():
"""Avoid bugs in mps backend by not using in-place operations."""
transformers.models.llama.modeling_llama.LlamaAttention.forward = forward
"""
Manually register workers.
Usage:
python3 -m fastchat.serve.register_worker --controller http://localhost:21001 --worker-name http://localhost:21002
"""
import argparse
import requests
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--controller-address", type=str)
parser.add_argument("--worker-name", type=str)
parser.add_argument("--check-heart-beat", action="store_true")
args = parser.parse_args()
url = args.controller_address + "/register_worker"
data = {
"worker_name": args.worker_name,
"check_heart_beat": args.check_heart_beat,
"worker_status": None,
}
r = requests.post(url, json=data)
assert r.status_code == 200
import torch
from typing import List, Tuple
@torch.inference_mode()
def chatglm_generate_stream(model, tokenizer, params, device,
context_len=2048, stream_interval=2):
"""Generate text using model's chat api"""
messages = params["prompt"]
max_new_tokens = int(params.get("max_new_tokens", 256))
temperature = float(params.get("temperature", 1.0))
top_p = float(params.get("top_p", 0.7))
gen_kwargs = {
"max_new_tokens": max_new_tokens,
"do_sample": True,
"top_p": top_p,
"temperature": temperature,
"logits_processor": None
}
hist = []
for i in range(0, len(messages) - 2, 2):
hist.append((messages[i][1], messages[i+1][1]))
query = messages[-2][1]
for response, new_hist in model.stream_chat(tokenizer, query, hist):
output = query + " " + response
yield output
import argparse
import json
import requests
from fastchat.conversation import default_conversation
def main():
if args.worker_address:
worker_addr = args.worker_address
else:
controller_addr = args.controller_address
ret = requests.post(controller_addr + "/refresh_all_workers")
ret = requests.post(controller_addr + "/list_models")
models = ret.json()["models"]
models.sort()
print(f"Models: {models}")
ret = requests.post(controller_addr + "/get_worker_address",
json={"model": args.model_name})
worker_addr = ret.json()["address"]
print(f"worker_addr: {worker_addr}")
if worker_addr == "":
return
conv = default_conversation.copy()
conv.append_message(conv.roles[0], args.message)
prompt = conv.get_prompt()
headers = {"User-Agent": "fastchat Client"}
pload = {
"model": args.model_name,
"prompt": prompt,
"max_new_tokens": args.max_new_tokens,
"temperature": 0.7,
"stop": conv.sep,
}
response = requests.post(worker_addr + "/worker_generate_stream", headers=headers,
json=pload, stream=True)
print(prompt.replace(conv.sep, "\n"), end="")
for chunk in response.iter_lines(chunk_size=8192, decode_unicode=False, delimiter=b"\0"):
if chunk:
data = json.loads(chunk.decode("utf-8"))
output = data["text"].split(conv.sep)[-1]
print(output, end="\r")
print("")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--controller-address", type=str, default="http://localhost:21001")
parser.add_argument("--worker-address", type=str)
parser.add_argument("--model-name", type=str, default="facebook/opt-350m")
parser.add_argument("--max-new-tokens", type=int, default=32)
parser.add_argument("--message", type=str, default=
"Tell me a story with more than 1000 words.")
args = parser.parse_args()
main()
from typing import List, Optional, Tuple
import torch
from torch import nn
import transformers
from transformers.models.llama.modeling_llama import apply_rotary_pos_emb
from einops import rearrange
from flash_attn.flash_attn_interface import flash_attn_unpadded_qkvpacked_func
from flash_attn.bert_padding import unpad_input, pad_input
def forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
past_key_value: Optional[Tuple[torch.Tensor]] = None,
output_attentions: bool = False,
use_cache: bool = False,
) -> Tuple[torch.Tensor, Optional[torch.Tensor],
Optional[Tuple[torch.Tensor]]]:
"""Input shape: Batch x Time x Channel
attention_mask: [bsz, q_len]
"""
bsz, q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
key_states = self.k_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
value_states = self.v_proj(hidden_states).view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
# [bsz, q_len, nh, hd]
# [bsz, nh, q_len, hd]
kv_seq_len = key_states.shape[-2]
assert past_key_value is None, "past_key_value is not supported"
cos, sin = self.rotary_emb(value_states, seq_len=kv_seq_len)
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
# [bsz, nh, t, hd]
assert not output_attentions, "output_attentions is not supported"
assert not use_cache, "use_cache is not supported"
# Flash attention codes from
# https://github.com/HazyResearch/flash-attention/blob/main/flash_attn/flash_attention.py
# transform the data into the format required by flash attention
qkv = torch.stack([query_states, key_states, value_states], dim=2) # [bsz, nh, 3, q_len, hd]
qkv = qkv.transpose(1, 3) # [bsz, q_len, 3, nh, hd]
# We have disabled _prepare_decoder_attention_mask in LlamaModel
# the attention_mask should be the same as the key_padding_mask
key_padding_mask = attention_mask
if key_padding_mask is None:
qkv = rearrange(qkv, 'b s ... -> (b s) ...')
max_s = q_len
cu_q_lens = torch.arange(0, (bsz + 1) * q_len, step=q_len, dtype=torch.int32,
device=qkv.device)
output = flash_attn_unpadded_qkvpacked_func(
qkv, cu_q_lens, max_s, 0.0,
softmax_scale=None, causal=True
)
output = rearrange(output, '(b s) ... -> b s ...', b=bsz)
else:
nheads = qkv.shape[-2]
x = rearrange(qkv, 'b s three h d -> b s (three h d)')
x_unpad, indices, cu_q_lens, max_s = unpad_input(x, key_padding_mask)
x_unpad = rearrange(x_unpad, 'nnz (three h d) -> nnz three h d', three=3, h=nheads)
output_unpad = flash_attn_unpadded_qkvpacked_func(
x_unpad, cu_q_lens, max_s, 0.0,
softmax_scale=None, causal=True
)
output = rearrange(pad_input(rearrange(output_unpad, 'nnz h d -> nnz (h d)'),
indices, bsz, q_len),
'b s (h d) -> b s h d', h=nheads)
return self.o_proj(rearrange(output,
'b s h d -> b s (h d)')), None, None
# Disable the transformation of the attention mask in LlamaModel as the flash attention
# requires the attention mask to be the same as the key_padding_mask
def _prepare_decoder_attention_mask(self, attention_mask, input_shape,
inputs_embeds, past_key_values_length):
# [bsz, seq_len]
return attention_mask
def replace_llama_attn_with_flash_attn():
transformers.models.llama.modeling_llama.LlamaModel._prepare_decoder_attention_mask = _prepare_decoder_attention_mask
transformers.models.llama.modeling_llama.LlamaAttention.forward = forward
# Adopted from tatsu-lab@stanford_alpaca. Below is the original copyright:
# Copyright 2023 Rohan Taori, Ishaan Gulrajani, Tianyi Zhang, Yann Dubois, Xuechen Li
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from dataclasses import dataclass, field
import json
import logging
import pathlib
from typing import Dict, Optional, Sequence
import torch
import transformers
from torch.utils.data import Dataset
from transformers import Trainer
from fastchat import conversation as conversation_lib
# TODO: import and use code from ../data/dataset.py
IGNORE_INDEX = -100
DEFAULT_PAD_TOKEN = "[PAD]"
DEFAULT_EOS_TOKEN = "</s>"
DEFAULT_BOS_TOKEN = "</s>"
DEFAULT_UNK_TOKEN = "</s>"
@dataclass
class ModelArguments:
model_name_or_path: Optional[str] = field(default="facebook/opt-125m")
@dataclass
class DataArguments:
data_path: str = field(default=None,
metadata={"help": "Path to the training data."})
lazy_preprocess: bool = False
@dataclass
class TrainingArguments(transformers.TrainingArguments):
cache_dir: Optional[str] = field(default=None)
optim: str = field(default="adamw_torch")
model_max_length: int = field(
default=512,
metadata={
"help":
"Maximum sequence length. Sequences will be right padded (and possibly truncated)."
},
)
def safe_save_model_for_hf_trainer(trainer: transformers.Trainer,
output_dir: str):
"""Collects the state dict and dump to disk."""
state_dict = trainer.model.state_dict()
if trainer.args.should_save:
cpu_state_dict = {
key: value.cpu()
for key, value in state_dict.items()
}
del state_dict
trainer._save(output_dir, state_dict=cpu_state_dict) # noqa
def smart_tokenizer_and_embedding_resize(
special_tokens_dict: Dict,
tokenizer: transformers.PreTrainedTokenizer,
model: transformers.PreTrainedModel,
):
"""Resize tokenizer and embedding.
Note: This is the unoptimized version that may make your embedding size not be divisible by 64.
"""
num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict)
model.resize_token_embeddings(len(tokenizer))
if num_new_tokens > 0:
input_embeddings = model.get_input_embeddings().weight.data
output_embeddings = model.get_output_embeddings().weight.data
input_embeddings_avg = input_embeddings[:-num_new_tokens].mean(
dim=0, keepdim=True)
output_embeddings_avg = output_embeddings[:-num_new_tokens].mean(
dim=0, keepdim=True)
input_embeddings[-num_new_tokens:] = input_embeddings_avg
output_embeddings[-num_new_tokens:] = output_embeddings_avg
def _tokenize_fn(strings: Sequence[str],
tokenizer: transformers.PreTrainedTokenizer) -> Dict:
"""Tokenize a list of strings."""
tokenized_list = [
tokenizer(
text,
return_tensors="pt",
padding="longest",
max_length=tokenizer.model_max_length,
truncation=True,
) for text in strings
]
input_ids = labels = [
tokenized.input_ids[0] for tokenized in tokenized_list
]
input_ids_lens = labels_lens = [
tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item()
for tokenized in tokenized_list
]
return dict(
input_ids=input_ids,
labels=labels,
input_ids_lens=input_ids_lens,
labels_lens=labels_lens,
)
def _mask_targets(target, tokenized_lens, speakers, header_len, s_ids):
cur_idx = header_len
tgt_len = target.shape[0]
for tokenized_len, speaker, s_id in zip(tokenized_lens, speakers, s_ids):
if cur_idx >= tgt_len:
break
elif cur_idx + tokenized_len < tgt_len:
# Check whether the mask is applied to the correct position
if not torch.equal(target[cur_idx + 2:cur_idx + tokenized_len],
s_id[2:]):
logging.warning("a sentence mismatches the corresponding piece "
"in the conversation")
if speaker == "human":
target[cur_idx:cur_idx + tokenized_len] = IGNORE_INDEX
cur_idx += tokenized_len
def _add_speaker_and_signal(header, source, get_conversation=True):
"""Add speaker and start/end signal on each round."""
BEGIN_SIGNAL = "### "
END_SIGNAL = "\n"
conversation = header
unknown_role = "unknown" # use default unknown role
roles = {
"human": conversation_lib.default_conversation.roles[0], # human role
"gpt": conversation_lib.default_conversation.roles[1], # gpt role
}
for sentence in source:
sentence_from = sentence["from"].lower()
sentence["value"] = (
BEGIN_SIGNAL
+ roles.get(sentence_from, unknown_role)
+ ": "
+ sentence["value"]
+ END_SIGNAL
)
if get_conversation:
conversation += sentence["value"]
return conversation
def preprocess(
sources: Sequence[str],
tokenizer: transformers.PreTrainedTokenizer,
) -> Dict:
"""
Given a list of sources, each is a conversation list. This transform:
1. Add signal '### ' at the beginning each sentence, with end signal '\n';
2. Concatenate conversations together;
3. Tokenize the concatenated conversation;
4. Make a deepcopy as the target. Mask human words with IGNORE_INDEX.
"""
# add end signal and concatenate together
conversations = []
header = f"{conversation_lib.default_conversation.system}\n\n"
for source in sources:
conversation = _add_speaker_and_signal(header, source)
conversations.append(conversation)
# tokenize conversations
conversations_tokenized = _tokenize_fn(conversations, tokenizer)
input_ids = conversations_tokenized["input_ids"]
targets = copy.deepcopy(input_ids)
header_len = _tokenize_fn([header], tokenizer)["input_ids_lens"][0]
for target, source in zip(targets, sources):
tokenized_sentence = _tokenize_fn([s["value"] for s in source], tokenizer)
tokenized_lens = tokenized_sentence["input_ids_lens"]
# Currently, "###" is tokenized into 2 tokens in the whole conversation,
# and 1 token in a single sentence, so we do not need to use the line below.
# tokenized_lens = [l-1 for l in tokenized_lens]
speakers = [sentence["from"] for sentence in source]
ids = tokenized_sentence["input_ids"]
_mask_targets(target, tokenized_lens, speakers, header_len, ids)
return dict(input_ids=input_ids, labels=targets)
class SupervisedDataset(Dataset):
"""Dataset for supervised fine-tuning."""
def __init__(self, data_path: str,
tokenizer: transformers.PreTrainedTokenizer):
super(SupervisedDataset, self).__init__()
logging.warning("Loading data...")
list_data_dict = json.load(open(data_path, "r"))
logging.warning("Formatting inputs...")
sources = [example["conversations"] for example in list_data_dict]
data_dict = preprocess(sources, tokenizer)
self.input_ids = data_dict["input_ids"]
self.labels = data_dict["labels"]
def __len__(self):
return len(self.input_ids)
def __getitem__(self, i) -> Dict[str, torch.Tensor]:
return dict(input_ids=self.input_ids[i], labels=self.labels[i])
class LazySupervisedDataset(Dataset):
"""Dataset for supervised fine-tuning."""
def __init__(self, data_path: str,
tokenizer: transformers.PreTrainedTokenizer):
super(LazySupervisedDataset, self).__init__()
logging.warning("Loading data...")
list_data_dict = json.load(open(data_path, "r"))
logging.warning("Formatting inputs...Skip in lazy mode")
self.tokenizer = tokenizer
self.list_data_dict = list_data_dict
def __len__(self):
return len(self.list_data_dict)
def __getitem__(self, i) -> Dict[str, torch.Tensor]:
sources = self.list_data_dict[i]
if isinstance(i, int):
sources = [sources]
data_dict = preprocess(
copy.deepcopy([e["conversations"] for e in sources]),
self.tokenizer)
if isinstance(i, int):
data_dict = dict(input_ids=data_dict["input_ids"][0],
labels=data_dict["labels"][0])
return data_dict
@dataclass
class DataCollatorForSupervisedDataset(object):
"""Collate examples for supervised fine-tuning."""
tokenizer: transformers.PreTrainedTokenizer
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
input_ids, labels = tuple([instance[key] for instance in instances]
for key in ("input_ids", "labels"))
input_ids = torch.nn.utils.rnn.pad_sequence(
input_ids,
batch_first=True,
padding_value=self.tokenizer.pad_token_id)
labels = torch.nn.utils.rnn.pad_sequence(labels,
batch_first=True,
padding_value=IGNORE_INDEX)
return dict(
input_ids=input_ids,
labels=labels,
attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
)
def make_supervised_data_module(tokenizer: transformers.PreTrainedTokenizer,
data_args) -> Dict:
"""Make dataset and collator for supervised fine-tuning."""
dataset_cls = (LazySupervisedDataset
if data_args.lazy_preprocess else SupervisedDataset)
train_dataset = dataset_cls(tokenizer=tokenizer,
data_path=data_args.data_path)
data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer)
return dict(train_dataset=train_dataset,
eval_dataset=None,
data_collator=data_collator)
def train():
parser = transformers.HfArgumentParser(
(ModelArguments, DataArguments, TrainingArguments))
model_args, data_args, training_args = parser.parse_args_into_dataclasses()
model = transformers.LlamaForCausalLM.from_pretrained(
model_args.model_name_or_path,
cache_dir=training_args.cache_dir,
)
tokenizer = transformers.AutoTokenizer.from_pretrained(
model_args.model_name_or_path,
cache_dir=training_args.cache_dir,
model_max_length=training_args.model_max_length,
padding_side="right",
use_fast=False,
)
if tokenizer.pad_token is None:
smart_tokenizer_and_embedding_resize(
special_tokens_dict=dict(pad_token=DEFAULT_PAD_TOKEN),
tokenizer=tokenizer,
model=model,
)
if "llama" in model_args.model_name_or_path:
tokenizer.add_special_tokens({
"eos_token": DEFAULT_EOS_TOKEN,
"bos_token": DEFAULT_BOS_TOKEN,
"unk_token": DEFAULT_UNK_TOKEN,
})
data_module = make_supervised_data_module(tokenizer=tokenizer,
data_args=data_args)
trainer = Trainer(model=model,
tokenizer=tokenizer,
args=training_args,
**data_module)
if list(pathlib.Path(training_args.output_dir).glob("checkpoint-*")):
trainer.train(resume_from_checkpoint=True)
else:
trainer.train()
trainer.save_state()
safe_save_model_for_hf_trainer(trainer=trainer,
output_dir=training_args.output_dir)
if __name__ == "__main__":
train()
# Usage: deepspeed train_lora.py --deepspeed <$PATH_TO_DEEPSPEED_CONFIG>
# Adopted from tatsu-lab@stanford_alpaca. Below is the original copyright:
# Copyright 2023 Rohan Taori, Ishaan Gulrajani, Tianyi Zhang, Yann Dubois, Xuechen Li
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
import pathlib
import typing
from peft import (
LoraConfig,
get_peft_model,
)
import transformers
from transformers import Trainer
from fastchat.train.train import (DataArguments, ModelArguments,
TrainingArguments,
make_supervised_data_module,
smart_tokenizer_and_embedding_resize)
IGNORE_INDEX = -100
DEFAULT_PAD_TOKEN = "[PAD]"
DEFAULT_EOS_TOKEN = "</s>"
DEFAULT_BOS_TOKEN = "</s>"
DEFAULT_UNK_TOKEN = "</s>"
# TODO: the lora_target_modules cannot support list
@dataclass
class LoraArguments:
lora_r: int = 8,
lora_alpha: int = 16,
lora_dropout: float = 0.05,
lora_target_modules: typing.List[str] = ["q_proj", "v_proj"],
lora_weight_path: str = ""
def train():
parser = transformers.HfArgumentParser(
(ModelArguments, DataArguments, TrainingArguments, LoraArguments))
(model_args, data_args, training_args,
lora_args) = parser.parse_args_into_dataclasses()
model = transformers.LlamaForCausalLM.from_pretrained(
model_args.model_name_or_path,
cache_dir=training_args.cache_dir,
)
lora_config = LoraConfig(
r=lora_args.lora_r,
lora_alpha=lora_args.lora_alpha,
target_modules=["q_proj", "v_proj"],
lora_dropout=lora_args.lora_dropout,
bias="none",
task_type="CAUSAL_LM",
)
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()
tokenizer = transformers.AutoTokenizer.from_pretrained(
model_args.model_name_or_path,
cache_dir=training_args.cache_dir,
model_max_length=training_args.model_max_length,
padding_side="right",
use_fast=False,
)
if tokenizer.pad_token is None:
smart_tokenizer_and_embedding_resize(
special_tokens_dict=dict(pad_token=DEFAULT_PAD_TOKEN),
tokenizer=tokenizer,
model=model,
)
if "llama" in model_args.model_name_or_path:
tokenizer.add_special_tokens({
"eos_token": DEFAULT_EOS_TOKEN,
"bos_token": DEFAULT_BOS_TOKEN,
"unk_token": DEFAULT_UNK_TOKEN,
})
data_module = make_supervised_data_module(tokenizer=tokenizer,
data_args=data_args)
trainer = Trainer(model=model,
tokenizer=tokenizer,
args=training_args,
**data_module)
model.config.use_cache = False
if list(pathlib.Path(training_args.output_dir).glob("checkpoint-*")):
trainer.train(resume_from_checkpoint=True)
else:
trainer.train()
trainer.save_state()
model.save_pretrained(training_args.output_dir)
if __name__ == "__main__":
train()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment