import time import random import argparse from selenium import webdriver from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.firefox.options import Options from concurrent.futures import ThreadPoolExecutor, as_completed from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException from bs4 import BeautifulSoup from loguru import logger class FetchBingResults: def __init__(self,query): self.query = query def get_driver(self): options = Options() options.set_preference('permissions.default.image', 2) # 2表示禁止加载图片 options.add_argument('--headless') # 使用无头模式,不显示浏览器窗口 options.add_argument('--no-sandbox') # 禁用沙箱机制 options.add_argument('--disable-gpu') # 禁用GPU硬件加速 options.add_argument('--disable-dev-shm-usage') # 禁用 /dev/shm 的共享内存使用 options.add_argument('--user-agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"') # 设置用户代理 options.set_preference("intl.accept_languages", "en-US,en") # 设置语言 driver = webdriver.Firefox(options=options) return driver def fetch_bing_results(self, num = 1): retries = 3 for _ in range(retries): driver = self.get_driver() driver.get(f'https://www.bing.com/search?q={self.query}') try: WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.XPATH, '//li[@class="b_algo"]')) ) html_content = driver.page_source soup = BeautifulSoup(html_content, 'html.parser') b_results = soup.find('ol', {'id': 'b_results'}) results = list(b_results.find_all('li', class_='b_algo')) search_results = [] # 创建线程池 with ThreadPoolExecutor(max_workers = num) as executor: # 创建字典 future_to_result = { executor.submit(self.fetch_article_content, result.find('a')['href']): result for result in results[:num] } for future in as_completed(future_to_result): result = future_to_result[future] try: content, current_url = future.result() title = result.find('h2').text return content[:1000] # search_results.append({'title': title, 'content': content, 'link': current_url}) except Exception as exc: logger.error(f'Generated an exception: {exc}') # return search_results except (TimeoutException, WebDriverException) as e: logger.error(f"Attempt {_ + 1} failed: {str(e)}") time.sleep(random.uniform(1, 3)) # 等待一段随机时间后重试 finally: driver.quit() logger.error("All retries failed.") return '' def fetch_article_content(self,link): retries = 3 for _ in range(retries): driver = self.get_driver() driver.get(link) try: try: WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.LINK_TEXT, 'Please click here if the page does not redirect automatically ...')) ).click() except TimeoutException: logger.error("No redirection link found.") WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, 'body')) ) # 执行 JavaScript 以确保所有动态内容加载完成 driver.execute_script("window.scrollTo(0, document.body.scrollHeight);") # 使用新的动态等待方法来检查页面内容 WebDriverWait(driver, 10).until( lambda d: d.execute_script('return document.readyState') == 'complete' ) article_page_source = driver.page_source article_soup = BeautifulSoup(article_page_source, 'html.parser') # 提取页面内容 content = article_soup.get_text(strip=True) # 获取当前页面的URL current_url = driver.current_url return content,current_url except (TimeoutException, WebDriverException, NoSuchElementException) as e: logger.error(f"Attempt {_ + 1} failed: {str(e)}") time.sleep(random.uniform(1, 3)) # 等待一段随机时间后重试 finally: driver.quit() return None, link def __del__(self): self.driver.quit() def parse_args(): parser = argparse.ArgumentParser( description='') parser.add_argument( '--query', default='介绍下曙光DCU', help='提问的问题.') args = parser.parse_args() return args def main(): args = parse_args() fetch_bing = FetchBingResults(args) results = fetch_bing.fetch_bing_results() print(results) if __name__ == "__main__": main()