import argparse import csv from importlib import resources from pathlib import Path from typing import Optional import numpy as np import time import torch from evo2 import Evo2 def read_prompts(input_file): """Read prompts from input file or built-in test data. Args: input_file: Either a path to a file, or the name of a test data file (e.g., 'prompts.csv') """ # If it's a string that doesn't exist as a file path, assume it's a test data file if isinstance(input_file, str) and not Path(input_file).is_file(): # This is the reliable way to get package data with resources.path("evo2.test.data", input_file) as data_path: input_file = data_path # Your existing code to read the file promptseqs = [] with open(input_file, encoding="utf-8-sig", newline="") as csvfile: reader = csv.reader(csvfile) next(reader) # Skip header for row in reader: promptseqs.append(row[0]) return promptseqs def mid_point_split(*, seq, num_tokens): """Split sequence at midpoint for prompt and target.""" mid_point = 2 * (len(seq) // 4) prompt = seq[:mid_point] target = seq[mid_point : mid_point + num_tokens] return prompt, target def calculate_sequence_identity(seq1: str, seq2: str) -> Optional[float]: """Calculate sequence identity between two sequences through direct comparison.""" if not seq1 or not seq2: return None min_length = min(len(seq1), len(seq2)) matches = sum(a == b for a, b in zip(seq1[:min_length], seq2[:min_length])) return (matches / min_length) * 100 def generate_and_score( *, sequences, model, generations_per_prompt=5, n_tokens=500, temperature=1.0, top_k=1, top_p=1.0, batch_size=2, ): """Prompt with first half, generate and score on 2nd half.""" scores = [] prompts = [] targets = [] # Prepare all prompts and targets for seq in sequences: prompt, target = mid_point_split(seq=seq, num_tokens=n_tokens) prompts.extend([prompt] * generations_per_prompt) targets.extend([target] * generations_per_prompt) for i in range(0, len(prompts), batch_size): batch_prompts = prompts[i : i + batch_size] batch_targets = targets[i : i + batch_size] with torch.inference_mode(): torch.cuda.synchronize() step_time = -time.perf_counter() generated = model.generate( prompt_seqs=batch_prompts, n_tokens=n_tokens, temperature=temperature, top_k=top_k, top_p=top_p, ) torch.cuda.synchronize() step_time += time.perf_counter() print( f"[{i}:{min(i + batch_size, len(prompts)) - 1}] E2E Time for model.generate (batch_size={batch_size}): {step_time:.3f} s" ) for j, decoded_seq in enumerate(generated.sequences): score = calculate_sequence_identity(decoded_seq, batch_targets[j]) scores.append(score) # Reshape scores to group by original sequence reshaped_scores = [ scores[i : i + generations_per_prompt] for i in range(0, len(scores), generations_per_prompt) ] return reshaped_scores def generate_and_score_prof( *, sequences, model, generations_per_prompt=5, n_tokens=500, temperature=1.0, top_k=1, top_p=1.0, batch_size=2, trace_step=1, trace_logdir="./log/pt-trace/", trace_gzip=False, trace_file_prefix=None, ): """Prompt with first half, generate and score on 2nd half with torch profiler. Profiler is enabled only for iteration i==1 to capture detailed performance data. """ scores = [] prompts = [] targets = [] # Prepare all prompts and targets for seq in sequences: prompt, target = mid_point_split(seq=seq, num_tokens=n_tokens) prompts.extend([prompt] * generations_per_prompt) targets.extend([target] * generations_per_prompt) print("\n[TRACE] Start profiling...") # 按需开启功能 with torch.profiler.profile( schedule=torch.profiler.schedule(wait=0, warmup=trace_step, active=1, repeat=1), on_trace_ready=torch.profiler.tensorboard_trace_handler( dir_name=trace_logdir, worker_name=trace_file_prefix, use_gzip=trace_gzip ), record_shapes=False, profile_memory=False, with_stack=False, with_flops=False, with_modules=False, ) as prof: for i in range(0, len(prompts), batch_size): batch_prompts = prompts[i : i + batch_size] batch_targets = targets[i : i + batch_size] with torch.inference_mode(): torch.cuda.synchronize() step_time = -time.perf_counter() generated = model.generate( prompt_seqs=batch_prompts, n_tokens=n_tokens, temperature=temperature, top_k=top_k, top_p=top_p, ) torch.cuda.synchronize() step_time += time.perf_counter() print( f"[{i}:{min(i + batch_size, len(prompts)) - 1}] E2E Time for model.generate (batch_size={batch_size}): {step_time:.3f} s" ) for j, decoded_seq in enumerate(generated.sequences): score = calculate_sequence_identity(decoded_seq, batch_targets[j]) scores.append(score) prof.step() # Reshape scores to group by original sequence reshaped_scores = [ scores[i : i + generations_per_prompt] for i in range(0, len(scores), generations_per_prompt) ] return reshaped_scores def main(): """ Test sequence generation and scoring using the evo2 models Expected results (direct comparison w/o alignment): - Evo 2 40B 1m: 91.15% - Evo 2 7B 1m: 89.25% - Evo 2 1B base: 68.0% - Evo 2 20B 1m: 93.4% """ parser = argparse.ArgumentParser(description="Test Evo2 Model Generation") parser.add_argument( "--model_name", choices=["evo2_7b", "evo2_40b", "evo2_1b_base", "evo2_20b"], default="evo2_7b", help="Model to test (supports evo2_7b, evo2_40b, evo2_1b_base, evo2_20b)", ) parser.add_argument("--local_path", type=str, default=None) parser.add_argument( "--n_tokens", type=int, default=500, help="Number of tokens to generate" ) parser.add_argument( "--batch_size", type=int, default=1, help="Batch size for generation" ) parser.add_argument( "--prompt_stretch", action="store_true", help="Stretch all prompts to the longest prompt length", ) parser.add_argument( "--n_warmups", type=int, default=0, help="Number of warmups to run", ) parser.add_argument( "--trace", action="store_true", help="Enable torch profiler", ) parser.add_argument( "--trace_step", type=int, default=1, help="Attach torch profiler to specific step (default: 1)", ) parser.add_argument( "--trace_logdir", type=str, default="./log/pt-trace/", help="Directory for torch profiler trace output (default: ./log/pt-trace/)", ) parser.add_argument( "--trace_gzip", action="store_true", help="Gzip torch profiler trace output", ) parser.add_argument( "--trace_file_prefix", type=str, default=None, help="Prefix for torch profiler trace output file", ) args = parser.parse_args() # Reduce CUDA memory fragmentation for large models (e.g. evo2_20b) torch.cuda.memory._set_allocator_settings("expandable_segments:True") # Set random seeds torch.manual_seed(1) torch.cuda.manual_seed(1) model = Evo2(args.model_name, local_path=args.local_path) # Test parameters: greedy sampling of 500 tokens test_params = { "n_tokens": args.n_tokens, "temperature": 1.0, "top_k": 1, "top_p": 1.0, "generations_per_prompt": 1, "batch_size": args.batch_size, } # Read and process sequences sequences = read_prompts("prompts.csv") print("[DEBUG] Prompt lengths:", [len(seq) for seq in sequences]) # Debugging: replace all prompts with the longest prompt if args.prompt_stretch or args.batch_size > 1: uniform_prompt = sequences[1] # length=7056 sequences = [uniform_prompt] * len(sequences) print( f"[DEBUG] Using the uniform prompt with length {len(uniform_prompt)} for all sequences" ) # Warmup if args.n_warmups > 0: warmup_sequences = sequences[:1] * args.n_warmups warmup_params = {**test_params, "n_tokens": 16} generate_and_score(sequences=warmup_sequences, model=model, **warmup_params) print(f"[DEBUG] Running {args.n_warmups} warmups with the first prompt") if args.trace: print("[TRACE] Using generate_and_score_prof with torch profiler") scores = generate_and_score_prof( sequences=sequences, model=model, trace_step=args.trace_step, trace_gzip=args.trace_gzip, trace_logdir=args.trace_logdir, trace_file_prefix=args.trace_file_prefix, **test_params, ) else: scores = generate_and_score(sequences=sequences, model=model, **test_params) # Calculate and validate results mean_score = np.mean(scores) print("\nTest Results:") print("% Matching Nucleotides:", mean_score) # Validate against expected scores eps = 3 # large epsilon for direct comparison, since there are numeric differences by versions expected_scores = { "evo2_40b": 91.15, "evo2_7b": 89.25, "evo2_1b_base": 68.0, "evo2_20b": 93.4, } expected_score = expected_scores[args.model_name] if abs(mean_score - expected_score) < eps: print(f"\nTest Passed! Score matches expected {expected_score}%") else: print(f"\nTest Failed: Expected {expected_score}%, got {mean_score}%") if __name__ == "__main__": main()