# Copyright (c) Alibaba, Inc. and its affiliates.
import ast
import itertools
import os
import re
from copy import deepcopy
from functools import partial
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union
import json
import numpy as np
import pandas as pd
from datasets import Dataset as HfDataset
from datasets import concatenate_datasets
from datasets import load_dataset as load_hf_dataset
from numpy.random import RandomState
from pandas import DataFrame
from tqdm.auto import tqdm
from transformers.utils import strtobool
from swift.utils import get_logger, get_seed, is_dist, is_local_master, read_from_jsonl, transform_jsonl_to_df
from .preprocess import (AlpacaPreprocessor, ClsPreprocessor, ComposePreprocessor, ConversationsPreprocessor,
PreprocessFunc, RenameColumnsPreprocessor, SmartPreprocessor, TextGenerationPreprocessor,
preprocess_sharegpt)
from .template import History
from .utils import download_dataset
def _remove_useless_columns(dataset: HfDataset) -> HfDataset:
k_list = []
for k in dataset.features.keys():
if k in {'query', 'response', 'rejected_response', 'system', 'history', 'images'}:
k_list.append(k)
dataset = dataset.select_columns(k_list)
return dataset
GetDatasetFunction = Callable[[], Union[HfDataset, Tuple[HfDataset, Optional[HfDataset]]]]
SubsetSplit = Union[str, Tuple[str, str], List[str]]
DATASET_MAPPING: Dict[str, Dict[str, Any]] = {}
logger = get_logger()
class DatasetName:
# general
ms_bench = 'ms-bench' # used for mixed training
alpaca_en = 'alpaca-en'
alpaca_zh = 'alpaca-zh'
multi_alpaca = 'multi-alpaca'
instinwild = 'instinwild'
cot_en = 'cot-en'
cot_zh = 'cot-zh'
instruct_en = 'instruct-en'
firefly_zh = 'firefly-zh'
gpt4all_en = 'gpt4all-en'
sharegpt = 'sharegpt'
tulu_v2_sft_mixture = 'tulu-v2-sft-mixture'
wikipedia_zh = 'wikipedia-zh'
open_orca = 'open-orca'
sharegpt_gpt4 = 'sharegpt-gpt4'
deepctrl_sft = 'deepctrl-sft'
coig_cqia = 'coig-cqia'
ruozhiba = 'ruozhiba'
long_alpaca_12k = 'long-alpaca-12k'
# agent
ms_agent = 'ms-agent'
ms_agent_for_agentfabric = 'ms-agent-for-agentfabric'
ms_agent_multirole = 'ms-agent-multirole'
toolbench_for_alpha_umi = 'toolbench-for-alpha-umi'
damo_agent_zh = 'damo-agent-zh'
damo_agent_zh_mini = 'damo-agent-zh-mini'
agent_instruct_all_en = 'agent-instruct-all-en'
# coding
code_alpaca_en = 'code-alpaca-en'
leetcode_python_en = 'leetcode-python-en'
codefuse_python_en = 'codefuse-python-en'
codefuse_evol_instruction_zh = 'codefuse-evol-instruction-zh'
# medical
medical_en = 'medical-en'
medical_zh = 'medical-zh'
disc_med_sft_zh = 'disc-med-sft-zh'
# law
lawyer_llama_zh = 'lawyer-llama-zh'
tigerbot_law_zh = 'tigerbot-law-zh'
disc_law_sft_zh = 'disc-law-sft-zh'
# math
blossom_math_zh = 'blossom-math-zh'
school_math_zh = 'school-math-zh'
open_platypus_en = 'open-platypus-en'
# sql
text2sql_en = 'text2sql-en'
sql_create_context_en = 'sql-create-context-en'
# text-generation
advertise_gen_zh = 'advertise-gen-zh'
dureader_robust_zh = 'dureader-robust-zh'
# classification
cmnli_zh = 'cmnli-zh'
jd_sentiment_zh = 'jd-sentiment-zh'
hc3_zh = 'hc3-zh'
hc3_en = 'hc3-en'
# other
finance_en = 'finance-en'
poetry_zh = 'poetry-zh'
webnovel_zh = 'webnovel-zh'
generated_chat_zh = 'generated-chat-zh'
self_cognition = 'self-cognition'
# example dataset for specific model
cls_fudan_news_zh = 'cls-fudan-news-zh' # seqgpt-560m
ner_java_zh = 'ner-jave-zh' # seqgpt-560m
# multi-modal
#
coco_en = 'coco-en'
coco_en_mini = 'coco-en-mini'
# images
coco_en_2 = 'coco-en-2'
coco_en_2_mini = 'coco-en-2-mini'
capcha_images = 'capcha-images'
# for qwen-audio
aishell1_zh = 'aishell1-zh'
aishell1_zh_mini = 'aishell1-zh-mini'
# dpo/hfrl dataset
hh_rlhf = 'hh-rlhf'
hh_rlhf_cn = 'hh-rlhf-cn'
stack_exchange_paired = 'stack-exchange-paired'
shareai_llama3_dpo_zh_en_emoji = 'shareai-llama3-dpo-zh-en-emoji'
# for awq
pileval = 'pileval'
@classmethod
def get_dataset_name_list(cls) -> List[str]:
res = []
for k in cls.__dict__.keys():
if k.startswith('__') or k == 'get_dataset_name_list':
continue
res.append(cls.__dict__[k])
return res
def register_dataset(dataset_name: str,
dataset_id_or_path: Optional[str] = None,
subsets: Optional[List[str]] = None,
preprocess_func: Optional[PreprocessFunc] = None,
get_function: Optional[GetDatasetFunction] = None,
*,
split: Optional[List[str]] = None,
hf_dataset_id: Optional[str] = None,
function_kwargs: Optional[Dict[str, Any]] = None,
exist_ok: bool = False,
is_local: bool = False,
**kwargs) -> Optional[Callable[[GetDatasetFunction], GetDatasetFunction]]:
if preprocess_func is None:
preprocess_func = SmartPreprocessor()
if not exist_ok and dataset_name in DATASET_MAPPING:
raise ValueError(f'The `{dataset_name}` has already been registered in the DATASET_MAPPING.')
if subsets is None:
subsets = []
if split is None:
split = ['train']
if function_kwargs is None:
function_kwargs = {}
dataset_info = {
'dataset_id_or_path': dataset_id_or_path,
'subsets': subsets,
'preprocess_func': preprocess_func,
'split': split,
'hf_dataset_id': hf_dataset_id,
'is_local': is_local,
**kwargs
}
if get_function is not None:
if len(function_kwargs) > 0:
get_function = partial(get_function, **function_kwargs)
dataset_info['get_function'] = get_function
DATASET_MAPPING[dataset_name] = dataset_info
return
def _register_dataset(get_function: GetDatasetFunction) -> GetDatasetFunction:
_old_get_function = get_function
if len(function_kwargs) > 0:
get_function = partial(get_function, **function_kwargs)
dataset_info['get_function'] = get_function
DATASET_MAPPING[dataset_name] = dataset_info
return _old_get_function
return _register_dataset
def register_local_dataset(
dataset_name: str,
dataset_path: Optional[List[str]] = None,
# Convert relative path to absolute path
base_dir: Optional[str] = None,
**kwargs) -> None:
if dataset_path is None:
dataset_path = []
elif isinstance(dataset_path, str):
dataset_path = [dataset_path]
assert len(dataset_path) > 0
if base_dir is not None:
for i, path in enumerate(dataset_path):
if not os.path.isabs(path):
dataset_path[i] = os.path.join(base_dir, dataset_path[i])
register_dataset(
dataset_name, get_function=get_local_dataset, split=dataset_path, exist_ok=True, is_local=True, **kwargs)
def register_dataset_info(dataset_name: str, d_info: Dict[str, Any], **kwargs) -> None:
preprocess_func = None
if 'columns' in d_info:
preprocess_func = RenameColumnsPreprocessor(d_info['columns'])
d_info.pop('columns')
d_info['preprocess_func'] = preprocess_func
elif 'conversations' in d_info:
preprocess_func = ConversationsPreprocessor(**d_info['conversations'])
d_info.pop('conversations')
d_info['preprocess_func'] = preprocess_func
if 'dataset_path' in d_info:
base_dir = kwargs.pop('base_dir', None)
register_local_dataset(dataset_name, d_info.pop('dataset_path', None), base_dir, **d_info)
return
assert 'dataset_id' in d_info or 'hf_dataset_id' in d_info
dataset_id = d_info.pop('dataset_id', None)
subsets = d_info.pop('subsets', None)
preprocess_func = d_info.pop('preprocess_func', None)
register_dataset(dataset_name, dataset_id, subsets, preprocess_func, get_dataset_from_repo, **d_info, exist_ok=True)
def load_ms_dataset(dataset_id: str,
subset_split_list: Optional[List[SubsetSplit]],
use_hf: bool = False) -> Optional[HfDataset]:
if not use_hf:
from modelscope import MsDataset
if subset_split_list is None or len(subset_split_list) == 0:
return None
dataset_list = []
for subset_split in subset_split_list:
if isinstance(subset_split, str):
subset_split = ('default', subset_split)
assert len(subset_split) == 2
subset_name, split = subset_split
if use_hf:
dataset = load_hf_dataset(dataset_id, name=subset_name, split=split)
else:
if is_dist() and not is_local_master():
force_redownload = False
else:
force_redownload = strtobool(os.environ.get('FORCE_REDOWNLOAD', 'False'))
download_mode = 'force_redownload' if force_redownload else 'reuse_dataset_if_exists'
dataset = MsDataset.load(dataset_id, subset_name=subset_name, split=split, download_mode=download_mode)
if hasattr(dataset, 'to_hf_dataset'):
dataset = dataset.to_hf_dataset()
dataset_list.append(dataset)
return concatenate_datasets(dataset_list)
def sample_dataset(dataset: HfDataset, dataset_sample: int, random_state: Optional[RandomState] = None) -> HfDataset:
if dataset_sample in {None, -1, len(dataset)}:
return dataset
if random_state is None:
random_state = RandomState()
# Sample the part that exceeds the length of the dataset.
idx = random_state.permutation(len(dataset))[:dataset_sample]
dataset_sample -= len(idx)
if dataset_sample > 0:
idx2 = random_state.choice(len(dataset), dataset_sample)
idx = np.concatenate([idx, idx2], axis=0)
dataset = dataset.select(idx)
return dataset
def _post_preprocess(
train_dataset: HfDataset,
dataset_sample: int,
random_state: Optional[RandomState] = None,
preprocess_func: Optional[PreprocessFunc] = None,
dataset_test_ratio: float = 0.,
remove_useless_columns: bool = True,
) -> Tuple[HfDataset, Optional[HfDataset]]:
assert train_dataset is not None
if dataset_sample == -1:
dataset_sample = len(train_dataset)
assert 0 <= dataset_test_ratio <= 1
if dataset_test_ratio == 1:
train_dataset, val_dataset = None, train_dataset
val_sample = dataset_sample
assert val_sample <= len(val_dataset), f'dataset_sample: {dataset_sample}, len(val_dataset): {len(val_dataset)}'
val_dataset = sample_dataset(val_dataset, val_sample, random_state)
else:
if dataset_test_ratio == 0:
train_sample = dataset_sample
val_dataset = None
else:
# Avoid having a high train_sample causing a high val_sample.
_train_len = min(len(train_dataset), dataset_sample)
val_sample = max(int(_train_len * dataset_test_ratio), 1)
train_sample = dataset_sample - val_sample
assert isinstance(val_sample, int)
train_dataset, val_dataset = train_dataset.train_test_split(
test_size=val_sample, seed=get_seed(random_state)).values()
assert train_sample > 0
train_dataset = sample_dataset(train_dataset, train_sample, random_state)
res = []
for dataset in [train_dataset, val_dataset]:
if dataset is not None and preprocess_func is not None:
dataset = preprocess_func(dataset)
if dataset is not None and len(dataset) > 0 and remove_useless_columns:
dataset = _remove_useless_columns(dataset)
res.append(dataset)
return tuple(res)
def get_dataset_from_repo(dataset_id: str,
subsets: Optional[List[str]],
preprocess_func: PreprocessFunc,
split: List[str],
dataset_sample: int = -1,
*,
random_state: Optional[RandomState] = None,
dataset_test_ratio: float = 0.,
remove_useless_columns: bool = True,
use_hf: bool = False) -> Tuple[HfDataset, Optional[HfDataset]]:
if subsets is None:
subsets = []
assert len(split) > 0
if len(subsets) == 0:
subset_split_list = split
else:
subset_split_list = list(itertools.product(subsets, split))
dataset = load_ms_dataset(dataset_id, subset_split_list, use_hf)
return _post_preprocess(dataset, dataset_sample, random_state, preprocess_func, dataset_test_ratio,
remove_useless_columns)
def _concat_inst_inp_alpaca_zh(inst: str, inp: str) -> str:
if inp.startswith('输入:'):
inp = inp[3:]
return f'{inst}\n{inp}'
register_dataset(
DatasetName.alpaca_zh,
'AI-ModelScope/alpaca-gpt4-data-zh',
None,
AlpacaPreprocessor(concat_inst_inp=_concat_inst_inp_alpaca_zh),
get_dataset_from_repo,
tags=['chat', 'general', '🔥'],
hf_dataset_id='llm-wizard/alpaca-gpt4-data-zh')
def _preprocess_vision_dataset(dataset: HfDataset) -> HfDataset:
prompt = 'please describe the image.'
image_key = 'image'
response_key = 'caption'
dataset._info.features._column_requires_decoding['image'] = False
query_format = f'Picture 1:
{{image_path}}\n{prompt}'
query = []
response = []
for d in tqdm(dataset):
query.append(query_format.format(image_path=d[image_key]['path']))
if '&&' in d[response_key]:
d[response_key] = d[response_key].split('&&')[0]
response.append(d[response_key])
dataset = HfDataset.from_dict({'query': query, 'response': response})
return dataset
register_dataset(
DatasetName.coco_en,
'modelscope/coco_2014_caption', ['coco_2014_caption'],
_preprocess_vision_dataset,
get_dataset_from_repo,
split=['train', 'validation'],
tags=['chat', 'multi-modal', 'vision'],
is_main=False)
register_dataset(
DatasetName.coco_en_mini,
'modelscope/coco_2014_caption', ['coco_2014_caption'],
_preprocess_vision_dataset,
get_dataset_from_repo,
split=['validation'],
tags=['chat', 'multi-modal', 'vision', '🔥'],
is_main=False)
def _preprocess_vision_dataset2(dataset: HfDataset) -> HfDataset:
query = 'please describe the image.'
image_key = 'image'
response_key = 'caption'
dataset._info.features._column_requires_decoding['image'] = False
response = []
images = []
for d in tqdm(dataset):
images.append([d[image_key]['path']])
if '&&' in d[response_key]:
d[response_key] = d[response_key].split('&&')[0]
response.append(d[response_key])
return HfDataset.from_dict({'query': [query] * len(response), 'response': response, 'images': images})
register_dataset(
DatasetName.coco_en_2,
'modelscope/coco_2014_caption', ['coco_2014_caption'],
_preprocess_vision_dataset2,
get_dataset_from_repo,
split=['train', 'validation'],
tags=['chat', 'multi-modal', 'vision'],
is_main=False)
register_dataset(
DatasetName.coco_en_2_mini,
'modelscope/coco_2014_caption', ['coco_2014_caption'],
_preprocess_vision_dataset2,
get_dataset_from_repo,
split=['validation'],
tags=['chat', 'multi-modal', 'vision', '🔥'],
is_main=False)
def _preprocess_aishell1_dataset(dataset: HfDataset) -> HfDataset:
prompt = '语音转文本'
audio_key = 'Audio:FILE'
response_key = 'Text:LABEL'
query_format = f'Audio 1:\n{prompt}'
query = []
response = []
for d in tqdm(dataset):
query.append(query_format.format(audio_path=d[audio_key]))
response.append(d[response_key].replace(' ', ''))
dataset = HfDataset.from_dict({'query': query, 'response': response})
return dataset
register_dataset(
DatasetName.aishell1_zh,
'speech_asr/speech_asr_aishell1_trainsets',
None,
_preprocess_aishell1_dataset,
get_dataset_from_repo,
split=['train', 'validation', 'test'],
tags=['chat', 'multi-modal', 'audio'])
register_dataset(
DatasetName.aishell1_zh_mini,
'speech_asr/speech_asr_aishell1_trainsets',
None,
_preprocess_aishell1_dataset,
get_dataset_from_repo,
split=['validation', 'test'],
tags=['chat', 'multi-modal', 'audio', '🔥'],
is_main=False)
def _repair_agent_conversations(conversations: str, use_mini: bool) -> List[Dict[str, str]]:
if use_mini:
pattern = r'\d\. {"plugin_name": "(.+?)"'
else:
pattern = r'\d\. {"(?:plugin_)?name": "(.+?)"'
idx = conversations.find(r"'from': 'user")
if idx == -1:
return
# remove dirty data
find_list = re.findall(pattern, conversations[:idx])
if len(set(find_list)) <= 1:
return
if isinstance(conversations, str):
conversations = ast.literal_eval(conversations)
if len(conversations) == 1:
return
return conversations
def _repair_ms_bench(conversations: str) -> List[Dict[str, str]]:
if isinstance(conversations, str):
conversations = ast.literal_eval(conversations)
default_system = 'You are a helpful assistant.'
if conversations[0]['from'] == 'system' and conversations[0]['value'] == default_system:
conversations.pop(0)
# skip MOSS
for c in conversations:
value = c['value'].lower()
if 'moss' in value or 'human:' in value or 'assistant:' in value or 'user:' in value:
return
return conversations
def long_alpaca_preprocessor(dataset: HfDataset):
def map_row(row):
response = row['response']
if response and response.startswith('Answer:'):
response = response[len('Answer:') + 1:].strip()
row['response'] = response
return response
dataset = AlpacaPreprocessor()(dataset)
return dataset.map(map_row)
register_dataset(
DatasetName.long_alpaca_12k,
'AI-ModelScope/LongAlpaca-12k',
None,
long_alpaca_preprocessor,
get_dataset_from_repo,
tags=['longlora', 'QA'],
hf_dataset_id='Yukang/LongAlpaca-12k')
def _preprocess_ruozhiba(dataset: HfDataset):
def map_row(row):
title = row['title'] if row.get('title', None) is not None else row['content']
abs = row['abs'] if 'abs' in row else None
if abs and abs != title:
title = title + ',' + abs
pattern = r'\d+[\.,\s,\、](.+)'
match = re.search(pattern, title)
if match:
title = match.group(1)
return {'response': title}
return dataset.map(map_row).filter(lambda row: row['response'])
register_dataset(
DatasetName.ruozhiba,
'AI-ModelScope/ruozhiba', ['post-annual', 'title-good', 'title-norm'],
_preprocess_ruozhiba,
get_dataset_from_repo,
tags=['pretrain', '🔥'])
register_dataset(
DatasetName.ms_bench,
'iic/ms_bench',
None,
ConversationsPreprocessor(repair_conversations=_repair_ms_bench, error_strategy='delete'),
get_dataset_from_repo,
tags=['chat', 'general', 'multi-round', '🔥'])
register_dataset(
DatasetName.damo_agent_zh_mini,
'damo/MSAgent-Bench',
None,
ConversationsPreprocessor(
repair_conversations=partial(_repair_agent_conversations, use_mini=True), error_strategy='delete'),
get_dataset_from_repo,
split=['train', 'validation'],
tags=['chat', 'agent', 'multi-round'],
is_main=False)
register_dataset(
DatasetName.damo_agent_zh,
'damo/MSAgent-Bench',
None,
ConversationsPreprocessor(
repair_conversations=partial(_repair_agent_conversations, use_mini=False), error_strategy='delete'),
get_dataset_from_repo,
split=['train', 'validation'],
tags=['chat', 'agent', 'multi-round'])
advertise_gen_prompt = """Task: Generating advertisements based on keywords.
Keywords: {query}
Advertisements:"""
register_dataset(
DatasetName.advertise_gen_zh,
'lvjianjin/AdvertiseGen',
None,
TextGenerationPreprocessor(advertise_gen_prompt, 'content', 'summary'),
get_dataset_from_repo,
split=['train', 'validation'],
tags=['text-generation', '🔥'],
hf_dataset_id='shibing624/AdvertiseGen')
_firefly_kind_list = [
'ProseGeneration', 'MRC', 'JinYongGeneration', 'TextCorrection', 'ClassicalChinese', 'BELLE', 'StoryGeneration',
'Couplet', 'Cot', 'Dictionary', 'Translation', 'Program', 'SentimentAnalyze', 'OpenQA', 'AncientPoem',
'TextMatching', 'NLI', 'Summary', 'KeywordRecognition', 'ProductDesc', 'LyricGeneration', 'Composition',
'MusicComment', 'NER'
]
def _preprocess_firefly(dataset: List[Dict[str, str]], kind_list: List[str]) -> HfDataset:
kind_set = set(kind_list)
query: List[str] = []
response: List[str] = []
for d in tqdm(dataset):
if d['kind'] not in kind_set:
continue
query.append(d['input'])
response.append(d['target'])
return HfDataset.from_dict({
'query': query,
'response': response,
})
@register_dataset(
DatasetName.firefly_zh,
'wyj123456/firefly',
None,
_preprocess_firefly,
tags=['chat', 'general'],
function_kwargs={'kind_list': _firefly_kind_list})
def get_firefly_zh_dataset(dataset_id: str, _, preprocess_func: PreprocessFunc, *args, **kwargs) -> HfDataset:
kind_list = kwargs['kind_list']
file = 'firefly-train-1.1M.jsonl'
dataset_dir = download_dataset(dataset_id, [file])
fpath = os.path.join(dataset_dir, file)
with open(fpath, 'r', encoding='utf-8') as f:
text = f.read()
text = text.replace('}{', '},{')
text = f'[{text}]'
dataset = json.loads(text)
return preprocess_func(dataset, kind_list)
register_dataset(
DatasetName.cmnli_zh,
'modelscope/clue', ['cmnli'],
ClsPreprocessor(['neutral', 'entailment', 'contradiction'], 'Natural Language Inference', True),
get_dataset_from_repo,
split=['train', 'validation'],
tags=['text-generation', 'classification'],
hf_dataset_id='clue')
register_dataset(
DatasetName.jd_sentiment_zh,
'DAMO_NLP/jd',
None,
ClsPreprocessor(['negative', 'positive'], 'Sentiment Classification', False),
get_dataset_from_repo,
split=['train', 'validation'],
tags=['text-generation', 'classification', '🔥'])
def _preprocess_dureader_robust(dataset: HfDataset) -> HfDataset:
prompt = """Task: Question Generation
Context: {context}
Answer: {answer}
Question:"""
query = []
response = []
for d in dataset:
answer, context = d['text1'].split('[SEP]')
q = prompt.format(context=context, answer=answer)
query.append(q)
response.append(d['text2'])
return HfDataset.from_dict({'query': query, 'response': response})
register_dataset(
DatasetName.dureader_robust_zh,
'modelscope/DuReader_robust-QG',
None,
_preprocess_dureader_robust,
get_dataset_from_repo,
split=['train', 'validation', 'test'],
tags=['text-generation', '🔥'])
def process_hh_rlhf(dataset):
def reorganize_row(row):
import re
chosen = row['chosen'].strip()
rejected = row['rejected'].strip()
parts_chosen = [s.strip() for s in re.split('\n\nHuman:|\n\nAssistant:|\n\nHum:', chosen)]
parts_rejected = [s.strip() for s in re.split('\n\nHuman:|\n\nAssistant:|\n\nHum:', rejected)]
if parts_chosen[0].startswith('Human:'):
assert parts_rejected[0].startswith('Human:')
parts_chosen[0] = parts_chosen[0][6:].strip()
parts_rejected[0] = parts_rejected[0][6:].strip()
history = []
idx, s1, s2 = None, None, None
for idx, (s1, s2) in enumerate(zip(parts_chosen, parts_rejected)):
if s1 == s2:
if idx % 2 == 0:
history.append([s1, None])
else:
history[-1][-1] = s1
else:
break
if idx % 2 == 0:
return {
'query': None,
'response': None,
'rejected_response': None,
'history': None,
}
query = history[-1][0]
history = history[:-1]
response = s1
rejected_response = s2
return {
'query': query,
'response': response,
'rejected_response': rejected_response,
'history': history,
}
return dataset.map(reorganize_row).filter(lambda row: row['query'] is not None)
register_dataset(
DatasetName.hh_rlhf,
'AI-ModelScope/hh-rlhf', ['harmless-base', 'helpful-base', 'helpful-online', 'helpful-rejection-sampled'],
process_hh_rlhf,
get_dataset_from_repo,
split=['train', 'test'],
tags=['rlhf', 'dpo', 'pairwise'])
def process_hh_rlhf_cn(dataset):
def reorganize_row(row):
history = []
try:
if isinstance(row['context'], str):
row['context'] = ast.literal_eval(row['context'])
if isinstance(row['chosen'], str):
row['chosen'] = ast.literal_eval(row['chosen'])
if isinstance(row['rejected'], str):
row['rejected'] = ast.literal_eval(row['rejected'])
for idx, h in enumerate(row['context']):
if idx % 2 == 0 and h['role'] != 'human':
raise ValueError()
if idx % 2 != 0 and h['role'] != 'assistant':
raise ValueError()
if idx % 2 == 0:
history.append([h['text'], None])
else:
history[-1][-1] = h['text']
if history[-1][-1] is not None:
raise ValueError()
query = history[-1][0]
history = history[:-1]
response = row['chosen']['text']
rejected_response = row['rejected']['text']
except: # noqa
return {
'query': '',
'response': '',
'rejected_response': '',
'history': [],
}
return {
'query': query,
'response': response,
'rejected_response': rejected_response,
'history': history,
}
def row_can_be_parsed(row):
try:
if isinstance(row['context'], str):
row['context'] = ast.literal_eval(row['context'])
if isinstance(row['chosen'], str):
row['chosen'] = ast.literal_eval(row['chosen'])
if isinstance(row['rejected'], str):
row['rejected'] = ast.literal_eval(row['rejected'])
return True
except: # noqa
return False
return dataset.filter(row_can_be_parsed).map(reorganize_row).filter(lambda row: row['query'])
register_dataset(
DatasetName.hh_rlhf_cn,
'AI-ModelScope/hh_rlhf_cn',
['hh_rlhf', 'harmless_base_cn', 'harmless_base_en', 'helpful_base_cn', 'helpful_base_en'],
process_hh_rlhf_cn,
get_dataset_from_repo,
split=['train', 'test'],
tags=['rlhf', 'dpo', 'pairwise', '🔥'])
def process_shareai_dpo(dataset):
def reorganize_row(row):
return {
'query': row['question'],
'response': row['answer_zh'],
'rejected_response': row['answer_en'],
}
return dataset.map(reorganize_row)
register_dataset(
DatasetName.shareai_llama3_dpo_zh_en_emoji,
'hjh0119/shareAI-Llama3-DPO-zh-en-emoji', ['default'],
process_shareai_dpo,
get_dataset_from_repo,
tags=['rlhf', 'dpo', 'pairwise'])
register_dataset(
DatasetName.sharegpt,
'huangjintao/sharegpt', ['common-zh', 'computer-zh', 'unknow-zh', 'common-en', 'computer-en'],
preprocess_sharegpt,
get_dataset_from_repo,
tags=['chat', 'general', 'multi-round'])
def _preprocess_capcha_images(dataset: HfDataset) -> HfDataset:
query = 'recognize the content.'
image_key = 'image'
response_key = 'solution'
response = []
images = []
for d in tqdm(dataset):
images.append(d[image_key])
response.append(d[response_key])
dataset = HfDataset.from_dict({'query': [query] * len(response), 'response': response, 'images': images})
dataset._info.features._column_requires_decoding['images'] = True
return dataset
register_dataset(
DatasetName.capcha_images,
'AI-ModelScope/captcha-images',
None,
_preprocess_capcha_images,
get_dataset_from_repo,
split=['train', 'validation'],
tags=['chat', 'multi-modal', 'vision'])
def _repair_toolbench(conversations: Dict[str, str]) -> Dict[str, str]:
assert len(conversations) == 2
if (conversations[1]['from'] in {'caller', 'conclusion'}):
conversations[1]['from'] = 'assistant'
return conversations
register_dataset(
DatasetName.toolbench_for_alpha_umi,
'shenweizhou/alpha-umi-toolbench-processed-v2', ['backbone', 'caller', 'planner', 'summarizer'],
ConversationsPreprocessor('system', system_role='-', repair_conversations=_repair_toolbench),
get_dataset_from_repo,
tags=['chat', 'agent', '🔥'])
def _preprocess_blossom_math(dataset: HfDataset) -> HfDataset:
response = []
for d in tqdm(dataset):
output, answer = d['output'], d['answer']
response.append(f'{output}\n\nAnswer: {answer}')
return HfDataset.from_dict({'query': dataset['input'], 'response': response})
register_dataset(
DatasetName.blossom_math_zh,
'AI-ModelScope/blossom-math-v2',
None,
_preprocess_blossom_math,
get_dataset_from_repo,
tags=['chat', 'math', '🔥'],
hf_dataset_id='Azure99/blossom-math-v2')
register_dataset(
DatasetName.sql_create_context_en,
'AI-ModelScope/sql-create-context',
None,
ComposePreprocessor([
RenameColumnsPreprocessor({
'question': 'instruction',
'context': 'input',
'answer': 'output'
}),
AlpacaPreprocessor(),
]),
get_dataset_from_repo,
tags=['chat', 'sql', '🔥'],
hf_dataset_id='b-mc2/sql-create-context')
def _preprocess_tigerbot_law(dataset: HfDataset) -> HfDataset:
prompt = """{type}
{title}
"""
response = []
for d in tqdm(dataset):
cur_prompt = prompt.format(type=d['type'], title=d['title'])
for i in range(1, 4):
chapter = d[f'chapter{i}']
if chapter is not None:
cur_prompt += f'{chapter}'
cur_prompt += f'{d["content"]}'
response.append(cur_prompt)
return HfDataset.from_dict({
'response': response,
})
register_dataset(
DatasetName.tigerbot_law_zh,
'AI-ModelScope/tigerbot-law-plugin',
None,
_preprocess_tigerbot_law,
get_dataset_from_repo,
tags=['text-generation', 'law', 'pretrained'],
hf_dataset_id='TigerResearch/tigerbot-law-plugin')
def _preprocess_leetcode_python(dataset: HfDataset) -> HfDataset:
query = []
response = []
for d in dataset:
code_with_problem = d['code_with_problem']
idx = code_with_problem.find('```python')
idx2 = code_with_problem.rfind('```python')
assert idx == idx2
problem = code_with_problem[:idx]
if problem.startswith('# '):
problem = problem[2:]
code = code_with_problem[idx:].strip()
explanation = d['explanation_only']
query.append(problem)
response.append(f'{code}\n\n{explanation}')
return HfDataset.from_dict({'query': query, 'response': response})
register_dataset(
DatasetName.leetcode_python_en,
'AI-ModelScope/leetcode-solutions-python',
None,
_preprocess_leetcode_python,
get_dataset_from_repo,
tags=['chat', 'coding', '🔥'])
def _repair_conversations_agent_instruct(s: str) -> List[Dict[str, Any]]:
s = s.replace('}\n {', '},\n {')
if isinstance(s, str):
s = ast.literal_eval(s)
return s
register_dataset(
DatasetName.agent_instruct_all_en,
'huangjintao/AgentInstruct_copy', ['alfworld', 'db', 'kg', 'mind2web', 'os', 'webshop'],
ConversationsPreprocessor('human', 'gpt', repair_conversations=_repair_conversations_agent_instruct),
get_dataset_from_repo,
tags=['chat', 'agent', 'multi-round'])
def _preprocess_msagent_multirole_dataset(dataset: HfDataset) -> HfDataset:
res_prompt = """\n\n【注意事项】\n1. 这是聊天室,不要发送私信给任何人\n2. 仅代表你个人说话,不要扮演其他人,
只根据对话历史进行回复\n3. 长话短说,不要说太多话,不要超过50字 """
history_prompt = '\n\n【chat history】'
conv_prompt = '\n {name}:{content}'
system, query, response = [], [], []
def process_conversation(conv):
query, response = '', conv[-1]['value']
system = conv[0]['value'] if conv[0]['from'] != 'user' else ''
if conv[0]['from'] == 'user':
query = conv[0]['value']
elif 'next_speakers:' not in system:
if '【注意事项】' not in system and system:
system += res_prompt
system += history_prompt
system += ''.join([conv_prompt.format(name=c['from'], content=c['value']) for c in conv[1:-1]])
return system, query, response
for d in dataset:
sys, qry, resp = process_conversation(d['conversations'])
system.append(sys)
query.append(qry)
response.append(resp)
return HfDataset.from_dict({'system': system, 'query': query, 'response': response})
register_dataset(
DatasetName.ms_agent_multirole,
'iic/MSAgent-MultiRole',
None,
_preprocess_msagent_multirole_dataset,
get_dataset_from_repo,
tags=['chat', 'agent', 'multi-round', 'role-play', 'multi-agent'])
def _preprocess_hc3(dataset: HfDataset) -> HfDataset:
prompt = """Classification Task: Are the following responses from a human or from ChatGPT?
Question: {question}
Answer: {answer}
Category: Human, ChatGPT
Output:"""
query = []
response = []
for d in dataset:
question = d['question']
for h in d['human_answers']:
query.append(prompt.format(question=question, answer=h))
response.append('Human')
for c in d['chatgpt_answers']:
query.append(prompt.format(question=question, answer=c))
response.append('ChatGPT')
return HfDataset.from_dict({'query': query, 'response': response})
register_dataset(
DatasetName.hc3_zh,
'simpleai/HC3-Chinese', ['baike', 'open_qa', 'nlpcc_dbqa', 'finance', 'medicine', 'law', 'psychology'],
_preprocess_hc3,
get_dataset_from_repo,
tags=['text-generation', 'classification', '🔥'],
hf_dataset_id='Hello-SimpleAI/HC3-Chinese')
register_dataset(
DatasetName.hc3_en,
'simpleai/HC3', ['finance', 'medicine'],
_preprocess_hc3,
get_dataset_from_repo,
tags=['text-generation', 'classification', '🔥'],
hf_dataset_id='Hello-SimpleAI/HC3')
NoneType = type(None)
def _check_dataset(dataset: Optional[None], check_dataset_strategy: Literal['none', 'discard', 'error',
'warning']) -> HfDataset:
if check_dataset_strategy == 'none' or dataset is None:
return dataset
idx_list = []
has_query = 'query' in dataset.features
has_history = 'history' in dataset.features
has_system = 'system' in dataset.features
is_modified = False
for i, d in enumerate(tqdm(dataset)):
if not isinstance(d['response'], str):
is_modified = True
if check_dataset_strategy == 'discard':
continue
elif check_dataset_strategy == 'warning':
logger.warning(f"d['response']: {d['response']}, i: {i}")
continue
else:
raise ValueError(f"d['response']: {d['response']}, i: {i}")
if has_query and not isinstance(d['query'], (str, NoneType)):
is_modified = True
if check_dataset_strategy == 'discard':
continue
elif check_dataset_strategy == 'warning':
logger.warning(f"d['query']: {d['query']}, i: {i}")
continue
else:
raise ValueError(f"d['query']: {d['query']}, i: {i}")
if has_history and not isinstance(d['history'], (list, NoneType)):
is_modified = True
if check_dataset_strategy == 'discard':
continue
elif check_dataset_strategy == 'warning':
logger.warning(f"d['history']: {d['history']}, i: {i}")
continue
else:
raise ValueError(f"d['history']: {d['history']}, i: {i}")
if has_system and not isinstance(d['system'], (str, NoneType)):
is_modified = True
if check_dataset_strategy == 'discard':
continue
elif check_dataset_strategy == 'warning':
logger.warning(f"d['system']: {d['system']}, i: {i}")
continue
else:
raise ValueError(f"d['system']: {d['system']}, i: {i}")
idx_list.append(i)
if is_modified:
dataset = dataset.select(idx_list)
assert len(dataset) > 0
return dataset
def _safe_split(s: str, sep: str, use_0: bool, split_mode: Literal['left', 'right'] = 'left') -> Tuple[str, str]:
# use_0: When the length of the part is 1, is it considered as part0 or part1.
if s is None or len(s) == 0:
return None, None
if split_mode == 'left':
part = s.split(sep, 1)
else:
part = s.rsplit(sep, 1)
if len(part) == 1:
if use_0:
part = part[0], None
else:
part = None, part[0]
else:
assert len(part) == 2
return part
def parse_dataset_name(dataset_name: str) -> Tuple[bool, str, List[str], int]:
# HF::dataset_name:subset1/subset2/subset3#dataset_sample
use_hf, other = _safe_split(dataset_name, '::', False)
if use_hf is None:
use_hf = strtobool(os.environ.get('USE_HF', 'False'))
elif isinstance(use_hf, str):
use_hf = {'hf': 1, 'ms': 0}[use_hf.lower()]
if os.path.isfile(other):
part1, dataset_sample = other, None
else:
part1, dataset_sample = _safe_split(other, '#', True, 'right')
if os.path.isfile(part1):
dataset_name, subsets = part1, None
else:
dataset_name, subsets = _safe_split(part1, ':', True)
if subsets is not None:
subset_list = subsets.split('/')
subset_list = [subset.strip() for subset in subset_list]
else:
subset_list = None
if dataset_sample is None:
dataset_sample = -1
else:
dataset_sample = int(dataset_sample)
return tuple(t.strip() if isinstance(t, str) else t for t in [use_hf, dataset_name, subset_list, dataset_sample])
def _dataset_name_exists(dataset_list: str, dataset_name: str) -> List[int]:
dataset_name = parse_dataset_name(dataset_name)[1]
cache_name_list = [parse_dataset_name(dataset)[1] for dataset in dataset_list]
res = []
for i, cache_name in enumerate(cache_name_list):
if cache_name == dataset_name:
res.append(i)
return res
def _preprocess_self_cognition_dataset(
dataset_list: Tuple[HfDataset, Optional[HfDataset]],
model_name: Tuple[str, Optional[str]],
model_author: Tuple[str, Optional[str]],
) -> Tuple[HfDataset, HfDataset]:
# model_name: Tuple[zh, en]
assert model_name[0] is not None
assert model_author[0] is not None
if len(model_name) == 1 or model_name[1] is None:
model_name = (model_name[0], model_name[0])
if len(model_author) == 1 or model_author[1] is None:
model_author = (model_author[0], model_author[0])
res_d_list = []
for dataset in dataset_list: # train_dataset, val_dataset
if dataset is None:
res_d_list.append(dataset)
continue
response = []
for d in dataset:
if d['tag'] == 'zh':
model_n, model_a = model_name[0], model_author[0]
else:
model_n, model_a = model_name[1], model_author[1]
r = d['response'].replace('{{NAME}}', model_n).replace('{{AUTHOR}}', model_a)
response.append(r)
dataset = dataset.remove_columns('response').add_column('response', response).remove_columns('tag')
res_d_list.append(dataset)
return tuple(res_d_list)
def _dataset_id_to_name(dataset_name_list: List[str]) -> List[int]:
# register dataset_id (ms/hf). Convert dataset_id to dataset_name.
ms_dataset_mapping = {}
hf_dataset_mapping = {}
for k_name, container in zip(['dataset_id_or_path', 'hf_dataset_id'], [ms_dataset_mapping, hf_dataset_mapping]):
for k, v in DATASET_MAPPING.items():
if v.get(k_name) is None or not v.get('is_main', True):
continue
if v[k_name] not in container:
container[v[k_name]] = []
container[v[k_name]].append(k)
res_dataset = []
dataset_list = []
# Add dataset_id or dataset_path to dataset_list, and add dataset_name to res_dataset.
for d in dataset_name_list:
use_hf, d_name = parse_dataset_name(d)[:2]
if d_name in DATASET_MAPPING:
res_dataset.append(d)
else:
dataset_list.append((d, use_hf, d_name))
extra_dataset = []
for d, use_hf, d_id_or_path in dataset_list:
dataset_mapping = hf_dataset_mapping if use_hf else ms_dataset_mapping
if d_id_or_path in dataset_mapping:
# Add the dataset_name corresponding to the dataset_id to res_dataset.
for d_name in dataset_mapping[d_id_or_path]:
res_dataset.append(d.replace(d_id_or_path, d_name))
else:
# This dataset needs to be registered.
extra_dataset.append((d, use_hf, d_id_or_path))
for i, (d, use_hf, d_id_or_path) in enumerate(extra_dataset):
d_info = {}
d_name = f'_{i}'
if os.path.isfile(d_id_or_path):
d_info['dataset_path'] = d_id_or_path
else:
if use_hf:
d_info['hf_dataset_id'] = d_id_or_path
else:
d_info['dataset_id'] = d_id_or_path
register_dataset_info(d_name, d_info)
res_dataset.append(d.replace(d_id_or_path, d_name))
return res_dataset
def get_dataset(
dataset_name_list: Union[List[str], str],
dataset_test_ratio: float = 0.,
dataset_seed: Union[int, RandomState] = 42,
check_dataset_strategy: Literal['none', 'discard', 'error', 'warning'] = 'none',
*,
# for self-cognition
model_name: Optional[Tuple[str, str]] = None,
model_author: Optional[Tuple[str, str]] = None) -> Tuple[HfDataset, Optional[HfDataset]]:
"""Returns train_dataset and val_dataset"""
if isinstance(dataset_name_list, str):
dataset_name_list = [dataset_name_list]
train_dataset_list: List[HfDataset] = []
val_dataset_list: List[HfDataset] = []
# dataset_id_or_path -> dataset_name
dataset_name_list = _dataset_id_to_name(dataset_name_list)
for dataset_name in dataset_name_list:
use_hf, dataset_name, subsets, dataset_sample = parse_dataset_name(dataset_name)
dataset_info = DATASET_MAPPING[dataset_name]
if subsets is None:
subsets = dataset_info['subsets']
if dataset_sample == -1:
dataset_sample = dataset_info.get('dataset_sample', -1)
if isinstance(dataset_seed, int):
random_state = RandomState(dataset_seed)
else:
random_state = dataset_seed
get_function: GetDatasetFunction = dataset_info['get_function']
is_local = dataset_info.get('is_local', False)
dataset_id_or_path = dataset_info['dataset_id_or_path']
remove_useless_columns = dataset_info.get('remove_useless_columns', True)
if not is_local:
dataset_str_f = 'Downloading the dataset from {hub}, dataset_id: {dataset_id}'
if use_hf:
dataset_id_or_path = dataset_info['hf_dataset_id']
dataset_str = dataset_str_f.format(hub='HuggingFace', dataset_id=dataset_id_or_path)
else:
dataset_str = dataset_str_f.format(hub='ModelScope', dataset_id=dataset_id_or_path)
logger.info(dataset_str)
assert dataset_id_or_path is not None, (f'dataset_name: {dataset_name}, use_hf: {use_hf}, '
f'dataset_id_or_path: {dataset_id_or_path}.')
dataset = get_function(
dataset_id_or_path,
subsets,
dataset_info['preprocess_func'],
dataset_info['split'],
dataset_sample,
random_state=random_state,
dataset_test_ratio=dataset_test_ratio,
remove_useless_columns=remove_useless_columns,
use_hf=use_hf)
if dataset_name == 'self-cognition':
assert model_name is not None and model_author is not None
dataset = _preprocess_self_cognition_dataset(dataset, model_name, model_author)
train_d: HfDataset
if isinstance(dataset, (list, tuple)):
train_d, val_d = dataset
else:
train_d, val_d = dataset, None
assert train_d is not None or val_d is not None
if train_d is not None:
train_dataset_list.append(train_d)
if val_d is not None:
val_dataset_list.append(val_d)
train_dataset = None
if len(train_dataset_list) > 0:
train_dataset = concatenate_datasets(train_dataset_list)
val_dataset = None
if len(val_dataset_list) > 0:
val_dataset = concatenate_datasets(val_dataset_list)
if check_dataset_strategy != 'none':
logger.info('check dataset...')
logger.info(f"check_dataset_strategy: '{check_dataset_strategy}'")
train_dataset = _check_dataset(train_dataset, check_dataset_strategy)
val_dataset = _check_dataset(val_dataset, check_dataset_strategy)
return train_dataset, val_dataset
def load_dataset_from_local(dataset_path_list: Optional[Union[str, List[str]]],
preprocess_func: PreprocessFunc) -> Optional[HfDataset]:
if isinstance(dataset_path_list, str):
dataset_path_list = [dataset_path_list]
if dataset_path_list is None or len(dataset_path_list) == 0:
return None
assert isinstance(dataset_path_list, (list, tuple))
dataset_list = []
for dataset_path in dataset_path_list:
assert isinstance(dataset_path, str)
df: DataFrame
if dataset_path.endswith('.csv'):
df = pd.read_csv(dataset_path, na_filter=False)
elif dataset_path.endswith('.jsonl'):
df = transform_jsonl_to_df(read_from_jsonl(dataset_path))
elif dataset_path.endswith('.json'):
with open(dataset_path, 'r', encoding='utf-8') as f:
obj_list = json.load(f)
df = transform_jsonl_to_df(obj_list)
else:
raise ValueError('The custom dataset only supports CSV, JSONL or JSON format. You can refer to the link '
'`https://github.com/modelscope/swift/blob/main/docs/source/LLM/自定义与拓展.md#注册数据集的方式` '
'for more information.')
dataset = HfDataset.from_dict(df.to_dict(orient='list'))
dataset_list.append(preprocess_func(dataset))
return concatenate_datasets(dataset_list)
def get_local_dataset(_1: str,
_2: Optional[List[str]],
preprocess_func: PreprocessFunc,
split: List[str],
dataset_sample: int = -1,
random_state: Optional[RandomState] = None,
dataset_test_ratio: float = 0.,
remove_useless_columns: bool = True,
**kwargs) -> Tuple[HfDataset, Optional[HfDataset]]:
dataset = load_dataset_from_local(split, preprocess_func)
return _post_preprocess(dataset, dataset_sample, random_state, None, dataset_test_ratio, remove_useless_columns)
def register_dataset_info_file(dataset_info_path: Optional[str] = None) -> None:
# dataset_info_path: path, json or None
if dataset_info_path is None:
dataset_info_path = os.path.abspath(os.path.join(__file__, '..', '..', 'data', 'dataset_info.json'))
if isinstance(dataset_info_path, str):
if os.path.isfile(dataset_info_path):
with open(dataset_info_path, 'r') as f:
dataset_info = json.load(f)
base_dir = os.path.dirname(dataset_info_path)
else:
dataset_info = json.loads(dataset_info_path)
dataset_info_path = list(dataset_info.keys())
base_dir = None
else:
assert isinstance(dataset_info_path, dict)
dataset_info = deepcopy(dataset_info_path)
dataset_info_path = list(dataset_info.keys())
base_dir = None
for dataset_name, d_info in dataset_info.items():
register_dataset_info(dataset_name, d_info, base_dir=base_dir)
logger.info(f'Successfully registered `{dataset_info_path}`')
register_dataset_info_file()