Commit b6edc328 authored by chenzk's avatar chenzk
Browse files

v1.0

parents
Pipeline #2229 canceled with stages
# torch
transformers
sentencepiece
# vllm
tqdm
nltk
pyext
bs4
pdfplumber
icon.png

53.8 KB

# 模型编码
modelCode=1233
# 模型名称
modelName=search-o1_pytorch
# 模型描述
modelDescription=动态获取和整合外部知识,无需训练即可赋予开源模型CoT“慢思考”能力,属于推理版o1。
# 应用场景
appScenario=推理,对话问答,制造,广媒,金融,能源,医疗,家居,教育
# 框架类型
frameType=pytorch
import os
import json
import requests
from requests.exceptions import Timeout
from bs4 import BeautifulSoup
from tqdm import tqdm
import time
import concurrent
from concurrent.futures import ThreadPoolExecutor
import pdfplumber
from io import BytesIO
import re
import string
from typing import Optional, Tuple
from nltk.tokenize import sent_tokenize
# ----------------------- Custom Headers -----------------------
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/58.0.3029.110 Safari/537.36',
'Referer': 'https://www.google.com/',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
# Initialize session
session = requests.Session()
session.headers.update(headers)
def remove_punctuation(text: str) -> str:
"""Remove punctuation from the text."""
return text.translate(str.maketrans("", "", string.punctuation))
def f1_score(true_set: set, pred_set: set) -> float:
"""Calculate the F1 score between two sets of words."""
intersection = len(true_set.intersection(pred_set))
if not intersection:
return 0.0
precision = intersection / float(len(pred_set))
recall = intersection / float(len(true_set))
return 2 * (precision * recall) / (precision + recall)
def extract_snippet_with_context(full_text: str, snippet: str, context_chars: int = 2500) -> Tuple[bool, str]:
"""
Extract the sentence that best matches the snippet and its context from the full text.
Args:
full_text (str): The full text extracted from the webpage.
snippet (str): The snippet to match.
context_chars (int): Number of characters to include before and after the snippet.
Returns:
Tuple[bool, str]: The first element indicates whether extraction was successful, the second element is the extracted context.
"""
try:
full_text = full_text[:50000]
snippet = snippet.lower()
snippet = remove_punctuation(snippet)
snippet_words = set(snippet.split())
best_sentence = None
best_f1 = 0.2
# sentences = re.split(r'(?<=[.!?]) +', full_text) # Split sentences using regex, supporting ., !, ? endings
sentences = sent_tokenize(full_text) # Split sentences using nltk's sent_tokenize
for sentence in sentences:
key_sentence = sentence.lower()
key_sentence = remove_punctuation(key_sentence)
sentence_words = set(key_sentence.split())
f1 = f1_score(snippet_words, sentence_words)
if f1 > best_f1:
best_f1 = f1
best_sentence = sentence
if best_sentence:
para_start = full_text.find(best_sentence)
para_end = para_start + len(best_sentence)
start_index = max(0, para_start - context_chars)
end_index = min(len(full_text), para_end + context_chars)
context = full_text[start_index:end_index]
return True, context
else:
# If no matching sentence is found, return the first context_chars*2 characters of the full text
return False, full_text[:context_chars * 2]
except Exception as e:
return False, f"Failed to extract snippet context due to {str(e)}"
def extract_text_from_url(url, use_jina=False, jina_api_key=None, snippet: Optional[str] = None):
"""
Extract text from a URL. If a snippet is provided, extract the context related to it.
Args:
url (str): URL of a webpage or PDF.
use_jina (bool): Whether to use Jina for extraction.
snippet (Optional[str]): The snippet to search for.
Returns:
str: Extracted text or context.
"""
try:
if use_jina:
jina_headers = {
'Authorization': f'Bearer {jina_api_key}',
'X-Return-Format': 'markdown',
# 'X-With-Links-Summary': 'true'
}
response = requests.get(f'https://r.jina.ai/{url}', headers=jina_headers).text
# Remove URLs
pattern = r"\(https?:.*?\)|\[https?:.*?\]"
text = re.sub(pattern, "", response).replace('---','-').replace('===','=').replace(' ',' ').replace(' ',' ')
else:
response = session.get(url, timeout=20) # Set timeout to 20 seconds
response.raise_for_status() # Raise HTTPError if the request failed
# Determine the content type
content_type = response.headers.get('Content-Type', '')
if 'pdf' in content_type:
# If it's a PDF file, extract PDF text
return extract_pdf_text(url)
# Try using lxml parser, fallback to html.parser if unavailable
try:
soup = BeautifulSoup(response.text, 'lxml')
except Exception:
print("lxml parser not found or failed, falling back to html.parser")
soup = BeautifulSoup(response.text, 'html.parser')
text = soup.get_text(separator=' ', strip=True)
if snippet:
success, context = extract_snippet_with_context(text, snippet)
if success:
return context
else:
return text
else:
# If no snippet is provided, return directly
return text[:8000]
except requests.exceptions.HTTPError as http_err:
return f"HTTP error occurred: {http_err}"
except requests.exceptions.ConnectionError:
return "Error: Connection error occurred"
except requests.exceptions.Timeout:
return "Error: Request timed out after 20 seconds"
except Exception as e:
return f"Unexpected error: {str(e)}"
def fetch_page_content(urls, max_workers=4, use_jina=False, snippets: Optional[dict] = None):
"""
Concurrently fetch content from multiple URLs.
Args:
urls (list): List of URLs to scrape.
max_workers (int): Maximum number of concurrent threads.
use_jina (bool): Whether to use Jina for extraction.
snippets (Optional[dict]): A dictionary mapping URLs to their respective snippets.
Returns:
dict: A dictionary mapping URLs to the extracted content or context.
"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Use tqdm to display a progress bar
futures = {
executor.submit(extract_text_from_url, url, use_jina, snippets.get(url) if snippets else None): url
for url in urls
}
for future in tqdm(concurrent.futures.as_completed(futures), desc="Fetching URLs", total=len(urls)):
url = futures[future]
try:
data = future.result()
results[url] = data
except Exception as exc:
results[url] = f"Error fetching {url}: {exc}"
time.sleep(0.2) # Simple rate limiting
return results
def bing_web_search(query, subscription_key, endpoint, market='en-US', language='en', timeout=20):
"""
Perform a search using the Bing Web Search API with a set timeout.
Args:
query (str): Search query.
subscription_key (str): Subscription key for the Bing Search API.
endpoint (str): Endpoint for the Bing Search API.
market (str): Market, e.g., "en-US" or "zh-CN".
language (str): Language of the results, e.g., "en".
timeout (int or float or tuple): Request timeout in seconds.
Can be a float representing the total timeout,
or a tuple (connect timeout, read timeout).
Returns:
dict: JSON response of the search results. Returns None or raises an exception if the request times out.
"""
headers = {
"Ocp-Apim-Subscription-Key": subscription_key
}
params = {
"q": query,
"mkt": market,
"setLang": language,
"textDecorations": True,
"textFormat": "HTML"
}
try:
response = requests.get(endpoint, headers=headers, params=params, timeout=timeout)
response.raise_for_status() # Raise exception if the request failed
search_results = response.json()
return search_results
except Timeout:
print(f"Bing Web Search request timed out ({timeout} seconds) for query: {query}")
return {} # Or you can choose to raise an exception
except requests.exceptions.RequestException as e:
print(f"Error occurred during Bing Web Search request: {e}")
return {}
def extract_pdf_text(url):
"""
Extract text from a PDF.
Args:
url (str): URL of the PDF file.
Returns:
str: Extracted text content or error message.
"""
try:
response = session.get(url, timeout=20) # Set timeout to 20 seconds
if response.status_code != 200:
return f"Error: Unable to retrieve the PDF (status code {response.status_code})"
# Open the PDF file using pdfplumber
with pdfplumber.open(BytesIO(response.content)) as pdf:
full_text = ""
for page in pdf.pages:
text = page.extract_text()
if text:
full_text += text
# Limit the text length
cleaned_text = ' '.join(full_text.split()[:600])
return cleaned_text
except requests.exceptions.Timeout:
return "Error: Request timed out after 20 seconds"
except Exception as e:
return f"Error: {str(e)}"
def extract_relevant_info(search_results):
"""
Extract relevant information from Bing search results.
Args:
search_results (dict): JSON response from the Bing Web Search API.
Returns:
list: A list of dictionaries containing the extracted information.
"""
useful_info = []
if 'webPages' in search_results and 'value' in search_results['webPages']:
for id, result in enumerate(search_results['webPages']['value']):
info = {
'id': id + 1, # Increment id for easier subsequent operations
'title': result.get('name', ''),
'url': result.get('url', ''),
'site_name': result.get('siteName', ''),
'date': result.get('datePublished', '').split('T')[0],
'snippet': result.get('snippet', ''), # Remove HTML tags
# Add context content to the information
'context': '' # Reserved field to be filled later
}
useful_info.append(info)
return useful_info
# ------------------------------------------------------------
if __name__ == "__main__":
# Example usage
# Define the query to search
query = "Structure of dimethyl fumarate"
# Subscription key and endpoint for Bing Search API
BING_SUBSCRIPTION_KEY = "YOUR_BING_SUBSCRIPTION_KEY"
if not BING_SUBSCRIPTION_KEY:
raise ValueError("Please set the BING_SEARCH_V7_SUBSCRIPTION_KEY environment variable.")
bing_endpoint = "https://api.bing.microsoft.com/v7.0/search"
# Perform the search
print("Performing Bing Web Search...")
search_results = bing_web_search(query, BING_SUBSCRIPTION_KEY, bing_endpoint)
print("Extracting relevant information from search results...")
extracted_info = extract_relevant_info(search_results)
print("Fetching and extracting context for each snippet...")
for info in tqdm(extracted_info, desc="Processing Snippets"):
full_text = extract_text_from_url(info['url'], use_jina=True) # Get full webpage text
if full_text and not full_text.startswith("Error"):
success, context = extract_snippet_with_context(full_text, info['snippet'])
if success:
info['context'] = context
else:
info['context'] = f"Could not extract context. Returning first 8000 chars: {full_text[:8000]}"
else:
info['context'] = f"Failed to fetch full text: {full_text}"
# print("Your Search Query:", query)
# print("Final extracted information with context:")
# print(json.dumps(extracted_info, indent=2, ensure_ascii=False))
This diff is collapsed.
from lcb_runner.benchmarks.code_generation import (
CodeGenerationProblem,
load_code_generation_dataset,
load_code_generation_dataset_not_fast,
)
from lcb_runner.benchmarks.test_output_prediction import (
TestOutputPredictionProblem,
load_test_prediction_dataset,
)
from lcb_runner.benchmarks.code_execution import (
CodeExecutionProblem,
load_code_execution_dataset,
)
import json
from enum import Enum
from datetime import datetime
from dataclasses import dataclass
from datasets import load_dataset
@dataclass
class CodeExecutionProblem:
question_id: str
contest_id: str
contest_date: datetime
difficulty: str
function_name: str
code: str
input: str
output: str
id: str
problem_id: str
numsteps: int
def __post_init__(self):
pass
def insert_output(self, output_list: list[str], pred_list: list[str]) -> dict:
return {
"question_id": self.question_id,
"contest_id": self.contest_id,
"contest_date": self.contest_date.isoformat(),
"difficulty": self.difficulty,
"function_name": self.function_name,
"code": self.code,
"input": self.input,
"output": self.output,
"id": self.id,
"problem_id": self.problem_id,
"numsteps": self.numsteps,
"output_list": output_list,
"pred_list": pred_list,
}
def insert_output_evaluation(
self, output_list: list[str], code_list: list[str], graded_list: list[bool]
) -> dict:
output = self.insert_output(output_list, code_list)
output["graded_list"] = graded_list
output["pass@1"] = graded_list.count(True) / len(graded_list)
return output
def get_evaluation_sample(self) -> dict:
return {
"code": self.code,
"input": self.input,
"output": self.output,
}
def load_code_execution_dataset(release_version="release_v1") -> list[CodeExecutionProblem]:
dataset = load_dataset("livecodebench/execution-v2", split="test")
dataset = [CodeExecutionProblem(**p) for p in dataset] # type: ignore
print(f"Loaded {len(dataset)} problems")
return dataset
if __name__ == "__main__":
dataset = load_code_execution_dataset()
import json
import zlib
import pickle
import base64
from enum import Enum
from datetime import datetime
from dataclasses import dataclass
from datasets import load_dataset
class Platform(Enum):
LEETCODE = "leetcode"
CODEFORCES = "codeforces"
ATCODER = "atcoder"
class Difficulty(Enum):
EASY = "easy"
MEDIUM = "medium"
HARD = "hard"
class TestType(Enum):
STDIN = "stdin"
FUNCTIONAL = "functional"
@dataclass
class Test:
input: str
output: str
testtype: TestType
def __post_init__(self):
self.testtype = TestType(self.testtype)
# if self.testtype == TestType.FUNCTIONAL:
# self.input = json.loads(self.input)
# self.output = json.loads(self.output)
@dataclass
class CodeGenerationProblem:
question_title: str
question_content: str
platform: Platform
question_id: str
contest_id: str
contest_date: datetime
starter_code: str
difficulty: Difficulty
public_test_cases: list[Test]
private_test_cases: list[Test]
metadata: dict
def __post_init__(self):
self.platform = Platform(self.platform)
self.difficulty = Difficulty(self.difficulty)
self.contest_date = datetime.fromisoformat(self.contest_date)
self.public_test_cases = json.loads(self.public_test_cases) # type: ignore
self.public_test_cases = [Test(**t) for t in self.public_test_cases]
try:
self.private_test_cases = json.loads(self.private_test_cases) # type: ignore
except:
self.private_test_cases = json.loads(
pickle.loads(
zlib.decompress(
base64.b64decode(self.private_test_cases.encode("utf-8")) # type: ignore
)
)
) # type: ignore
self.private_test_cases = [Test(**t) for t in self.private_test_cases]
self.metadata = json.loads(self.metadata) # type: ignore
def insert_output(self, output_list: list[str], code_list: list[str]) -> dict:
return {
"question_title": self.question_title,
"question_content": self.question_content,
"platform": self.platform.value,
"question_id": self.question_id,
"contest_id": self.contest_id,
"contest_date": self.contest_date.isoformat(),
"starter_code": self.starter_code,
"difficulty": self.difficulty.value,
"output_list": output_list,
"code_list": code_list,
}
def insert_output_evaluation(
self,
output_list: list[str],
code_list: list[str],
graded_list: list[bool],
**kwargs,
) -> dict:
output = self.insert_output(output_list, code_list)
output["graded_list"] = graded_list
output["pass@1"] = graded_list.count(True) / len(graded_list)
for k, v in kwargs.items():
output[k] = v
return output
def get_evaluation_sample(self):
return {
"input_output": json.dumps(
{
"inputs": [
t.input
for t in self.public_test_cases + self.private_test_cases
],
"outputs": [
t.output
for t in self.public_test_cases + self.private_test_cases
],
"fn_name": self.metadata.get("func_name", None),
}
),
}
def load_code_generation_dataset(release_version="release_v1") -> list[CodeGenerationProblem]:
dataset = load_dataset("livecodebench/code_generation_lite", split="test", version_tag=release_version, trust_remote_code=True)
dataset = [CodeGenerationProblem(**p) for p in dataset] # type: ignore
print(f"Loaded {len(dataset)} problems")
return dataset
def load_code_generation_dataset_not_fast(release_version="release_v1") -> list[CodeGenerationProblem]:
dataset = load_dataset("livecodebench/code_generation", split="test")
dataset = [CodeGenerationProblem(**p) for p in dataset] # type: ignore
print(f"Loaded {len(dataset)} problems")
return dataset
if __name__ == "__main__":
dataset = load_code_generation_dataset()
import json
from enum import Enum
from datetime import datetime
from dataclasses import dataclass
from datasets import load_dataset
@dataclass
class Test:
input: str
output: str
testtype: str
@dataclass
class TestOutputPredictionProblem:
question_title: str
question_content: str
question_id: str
contest_id: str
contest_date: datetime
difficulty: str
test: list[Test]
starter_code: str
function_name: str
test_id: int
def __post_init__(self):
self.test = [Test(**t) for t in json.loads(self.test)] # type: ignore
def insert_output(self, output_list: list[str], pred_list: list[str]) -> dict:
return {
"question_title": self.question_title,
"question_content": self.question_content,
"question_id": self.question_id,
"contest_id": self.contest_id,
"contest_date": self.contest_date.isoformat(),
"difficulty": self.difficulty,
"output_list": output_list,
"pred_list": pred_list,
"test_id": self.test_id,
"function_name": self.function_name,
"starter_code": self.starter_code,
}
def insert_output_evaluation(
self, output_list: list[str], code_list: list[str], graded_list: list[bool]
) -> dict:
output = self.insert_output(output_list, code_list)
output["graded_list"] = graded_list
output["pass@1"] = graded_list.count(True) / len(graded_list)
return output
def get_evaluation_sample(self) -> dict:
return {
"input": self.question_content,
"output": self.test[0].output,
}
def load_test_prediction_dataset(release_version="release_v1") -> list[TestOutputPredictionProblem]:
dataset = load_dataset("livecodebench/test_generation", split="test") # type: ignore
dataset = [TestOutputPredictionProblem(**d) for d in dataset]
print(f"Loaded {len(dataset)} prediction problems")
return dataset
if __name__ == "__main__":
dataset = load_test_prediction_dataset()
from lcb_runner.evaluation.compute_code_generation_metrics import codegen_metrics
from lcb_runner.evaluation.compute_code_execution_metrics import code_execution_metrics
from lcb_runner.evaluation.compute_test_output_prediction_metrics import (
test_output_metrics,
)
from lcb_runner.evaluation.pass_k_utils import extract_instance_results
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment