import argparse import csv from importlib import resources from pathlib import Path from typing import Optional import numpy as np import time import torch from evo2 import Evo2 def read_prompts(input_file): """Read prompts from input file or built-in test data. Args: input_file: Either a path to a file, or the name of a test data file (e.g., 'prompts.csv') """ # If it's a string that doesn't exist as a file path, assume it's a test data file if isinstance(input_file, str) and not Path(input_file).is_file(): # This is the reliable way to get package data with resources.path("evo2.test.data", input_file) as data_path: input_file = data_path # Your existing code to read the file promptseqs = [] with open(input_file, encoding="utf-8-sig", newline="") as csvfile: reader = csv.reader(csvfile) next(reader) # Skip header for row in reader: promptseqs.append(row[0]) return promptseqs def mid_point_split(*, seq, num_tokens): """Split sequence at midpoint for prompt and target.""" mid_point = 2 * (len(seq) // 4) prompt = seq[:mid_point] target = seq[mid_point : mid_point + num_tokens] return prompt, target def calculate_sequence_identity(seq1: str, seq2: str) -> Optional[float]: """Calculate sequence identity between two sequences through direct comparison.""" if not seq1 or not seq2: return None min_length = min(len(seq1), len(seq2)) matches = sum(a == b for a, b in zip(seq1[:min_length], seq2[:min_length])) return (matches / min_length) * 100 def generate_and_score( *, sequences, model, generations_per_prompt=5, n_tokens=500, temperature=1.0, top_k=1, top_p=1.0, batch_size=2, ): """Prompt with first half, generate and score on 2nd half.""" scores = [] prompts = [] targets = [] # Prepare all prompts and targets for seq in sequences: prompt, target = mid_point_split(seq=seq, num_tokens=n_tokens) prompts.extend([prompt] * generations_per_prompt) targets.extend([target] * generations_per_prompt) for i in range(0, len(prompts), batch_size): batch_prompts = prompts[i : i + batch_size] batch_targets = targets[i : i + batch_size] with torch.inference_mode(): torch.cuda.synchronize() step_time = -time.perf_counter() generated = model.generate( prompt_seqs=batch_prompts, n_tokens=n_tokens, temperature=temperature, top_k=top_k, top_p=top_p, ) torch.cuda.synchronize() step_time += time.perf_counter() print( f"[{i}:{min(i + batch_size, len(prompts))}) E2E Time for model.generate: {step_time:.3f} s" ) for j, decoded_seq in enumerate(generated.sequences): score = calculate_sequence_identity(decoded_seq, batch_targets[j]) scores.append(score) # Reshape scores to group by original sequence reshaped_scores = [ scores[i : i + generations_per_prompt] for i in range(0, len(scores), generations_per_prompt) ] return reshaped_scores def custom_trace_handler( dir_name="./log/pt-trace/", worker_name=None, sort_by="self_device_time_total", top_n=20, ): tb_handler = torch.profiler.tensorboard_trace_handler( dir_name=dir_name, worker_name=worker_name ) field_fallbacks = { "self_device_time_total": "self_cuda_time_total", "device_time_total": "cuda_time_total", "self_cuda_time_total": "self_cpu_time_total", } def handler(prof): tb_handler(prof) avgs = prof.key_averages() final_sort_key = sort_by if len(avgs) > 0: sample_event = avgs[0] # fallback if not hasattr(sample_event, final_sort_key): fallback_key = field_fallbacks.get(final_sort_key) if fallback_key and hasattr(sample_event, fallback_key): print( f"[PROFILER] '{final_sort_key}' not found. Falling back to '{fallback_key}'." ) final_sort_key = fallback_key else: print( f"[PROFILER] Sort key '{final_sort_key}' invalid. Using default order." ) final_sort_key = None print(avgs.table(sort_by=final_sort_key, row_limit=top_n)) return handler def generate_and_score_prof( *, sequences, model, generations_per_prompt=5, n_tokens=500, temperature=1.0, top_k=1, top_p=1.0, batch_size=2, trace_step=1, ): """Prompt with first half, generate and score on 2nd half with torch profiler. Profiler is enabled only for iteration i==1 to capture detailed performance data. """ scores = [] prompts = [] targets = [] # Prepare all prompts and targets for seq in sequences: prompt, target = mid_point_split(seq=seq, num_tokens=n_tokens) prompts.extend([prompt] * generations_per_prompt) targets.extend([target] * generations_per_prompt) print("\n[TRACE] Start profiling...") with torch.profiler.profile( schedule=torch.profiler.schedule(wait=0, warmup=trace_step, active=1, repeat=1), on_trace_ready=custom_trace_handler(dir_name="./log/pt-trace/"), record_shapes=True, profile_memory=False, # 按需开启 with_stack=False, # 按需开启 with_flops=True, with_modules=True, ) as prof: for i in range(0, len(prompts), batch_size): batch_prompts = prompts[i : i + batch_size] batch_targets = targets[i : i + batch_size] with torch.inference_mode(): torch.cuda.synchronize() step_time = -time.perf_counter() generated = model.generate( prompt_seqs=batch_prompts, n_tokens=n_tokens, temperature=temperature, top_k=top_k, top_p=top_p, ) torch.cuda.synchronize() step_time += time.perf_counter() print( f"[{i}:{min(i + batch_size, len(prompts))}) E2E Time for model.generate: {step_time:.3f} s" ) for j, decoded_seq in enumerate(generated.sequences): score = calculate_sequence_identity(decoded_seq, batch_targets[j]) scores.append(score) prof.step() # Reshape scores to group by original sequence reshaped_scores = [ scores[i : i + generations_per_prompt] for i in range(0, len(scores), generations_per_prompt) ] return reshaped_scores def main(): """ Test sequence generation and scoring using the evo2 models Expected results (direct comparison w/o alignment): - Evo 2 40B 1m: 91.15% - Evo 2 7B 1m: 89.25% - Evo 2 1B base: 68.0% """ parser = argparse.ArgumentParser(description="Test Evo2 Model Generation") parser.add_argument( "--model_name", choices=["evo2_7b", "evo2_40b", "evo2_1b_base"], default="evo2_7b", help="Model to test (supports evo2_7b, evo2_40b, evo2_1b_base)", ) parser.add_argument("--local_path", type=str, default=None) parser.add_argument( "--n_tokens", type=int, default=500, help="Number of tokens to generate" ) parser.add_argument( "--batch_size", type=int, default=1, help="Batch size for generation" ) parser.add_argument( "--trace", action="store_true", help="Enable torch profiler", ) parser.add_argument( "--trace_step", type=int, default=1, help="Attach torch profiler to specific step (default: 1)", ) args = parser.parse_args() # Set random seeds torch.manual_seed(1) torch.cuda.manual_seed(1) model = Evo2(args.model_name, local_path=args.local_path) # Test parameters: greedy sampling of 500 tokens test_params = { "n_tokens": args.n_tokens, "temperature": 1.0, "top_k": 1, "top_p": 1.0, "generations_per_prompt": 1, "batch_size": args.batch_size, } # Read and process sequences sequences = read_prompts("prompts.csv") # Debugging: replace all prompts with the longest prompt if args.batch_size > 1: longest_prompt = max(sequences, key=len) sequences = [longest_prompt] * len(sequences) print( f"[DEBUG] Using longest prompt len={len(longest_prompt)} for all sequences" ) if args.trace: print("[TRACE] Using generate_and_score_prof with torch profiler") scores = generate_and_score_prof( sequences=sequences, model=model, trace_step=args.trace_step, **test_params, ) else: scores = generate_and_score(sequences=sequences, model=model, **test_params) # Calculate and validate results mean_score = np.mean(scores) print("\nTest Results:") print("% Matching Nucleotides:", mean_score) # Validate against expected scores eps = 3 # large epsilon for direct comparison, since there are numeric differences by versions expected_scores = {"evo2_40b": 91.15, "evo2_7b": 89.25, "evo2_1b_base": 68.0} expected_score = expected_scores[args.model_name] if abs(mean_score - expected_score) < eps: print(f"\nTest Passed! Score matches expected {expected_score}%") else: print(f"\nTest Failed: Expected {expected_score}%, got {mean_score}%") if __name__ == "__main__": main()