import argparse import json import logging import os from functools import partial from lm_eval._cli import SubCommand from lm_eval._cli.utils import ( _int_or_none_list_arg_type, request_caching_arg_to_dict, try_parse_json, ) class Run(SubCommand): """Command for running language model evaluation.""" def __init__(self, subparsers: argparse._SubParsersAction, *args, **kwargs): # Create and configure the parser super().__init__(*args, **kwargs) parser = subparsers.add_parser( "run", help="Run language model evaluation", description="Evaluate language models on various benchmarks and tasks.", epilog=""" Examples: lm-eval run --model hf --model_args pretrained=gpt2 --tasks hellaswag lm-eval run --config my_config.yaml --tasks arc_easy,arc_challenge lm-eval run --model openai --tasks mmlu --num_fewshot 5 """, formatter_class=argparse.RawDescriptionHelpFormatter, ) # Add command-specific arguments self._add_args(parser) # Set the function to execute for this subcommand parser.set_defaults(func=self.execute) def _add_args(self, parser: argparse.ArgumentParser) -> None: parser.add_argument( "--config", "-C", default=None, type=str, metavar="DIR/file.yaml", help="Path to config with all arguments for `lm-eval`", ) parser.add_argument( "--model", "-m", type=str, default="hf", help="Name of model. Default 'hf'", ) parser.add_argument( "--tasks", "-t", default=None, type=str, metavar="task1,task2", help="Comma-separated list of task names or task groupings to evaluate on.\nTo get full list of tasks, use one of the commands `lm-eval --tasks {{list_groups,list_subtasks,list_tags,list}}` to list out all available names for task groupings; only (sub)tasks; tags; or all of the above", ) parser.add_argument( "--model_args", "-a", default=None, type=try_parse_json, help="""Comma separated string or JSON formatted arguments for model, e.g. `pretrained=EleutherAI/pythia-160m,dtype=float32` or '{"pretrained":"EleutherAI/pythia-160m","dtype":"float32"}'.""", ) parser.add_argument( "--num_fewshot", "-f", type=int, default=None, metavar="N", help="Number of examples in few-shot context", ) parser.add_argument( "--batch_size", "-b", type=str, default=argparse.SUPPRESS, metavar="auto|auto:N|N", help="Acceptable values are 'auto', 'auto:N' (recompute batchsize N times with time) or N, where N is an integer. Default 1.", ) parser.add_argument( "--max_batch_size", type=int, default=None, metavar="N", help="Maximal batch size to try with --batch_size auto.", ) parser.add_argument( "--device", type=str, default=None, help="Device to use (e.g. cuda, cuda:0, cpu). Model defaults. Default None.", ) parser.add_argument( "--output_path", "-o", default=None, type=str, metavar="DIR|DIR/file.json", help="Path where result metrics will be saved. Can be either a directory or a .json file. If the path is a directory and log_samples is true, the results will be saved in the directory. Else the parent directory will be used.", ) parser.add_argument( "--limit", "-L", type=float, default=None, metavar="N|0 None: """Execute the evaluation command.""" from lm_eval.config.evaluate_config import EvaluatorConfig # Create and validate config (most validation now happens in EvaluationConfig) cfg = EvaluatorConfig.from_cli(args) from lm_eval import simple_evaluate, utils from lm_eval.loggers import EvaluationTracker, WandbLogger from lm_eval.utils import handle_non_serializable, make_table # Set up logging if cfg.wandb_args: wandb_logger = WandbLogger(cfg.wandb_args, cfg.wandb_config_args) utils.setup_logging(cfg.verbosity) eval_logger = logging.getLogger(__name__) os.environ["TOKENIZERS_PARALLELISM"] = "false" # Set up evaluation tracker if cfg.output_path: cfg.hf_hub_log_args["output_path"] = cfg.output_path if os.environ.get("HF_TOKEN", None): cfg.hf_hub_log_args["token"] = os.environ.get("HF_TOKEN") evaluation_tracker = EvaluationTracker(**cfg.hf_hub_log_args) # Create task manager (metadata already set up in config validation) task_manager = cfg.process_tasks() # Validation warnings (keep these in CLI as they're logging-specific) if "push_samples_to_hub" in cfg.hf_hub_log_args and not cfg.log_samples: eval_logger.warning( "Pushing samples to the Hub requires --log_samples to be set." ) # Log task selection (tasks already processed in config) if cfg.include_path is not None: eval_logger.info(f"Including path: {cfg.include_path}") eval_logger.info(f"Selected Tasks: {cfg.tasks}") # Run evaluation results = simple_evaluate( model=cfg.model, model_args=cfg.model_args, tasks=cfg.tasks, num_fewshot=cfg.num_fewshot, batch_size=cfg.batch_size, max_batch_size=cfg.max_batch_size, device=cfg.device, use_cache=cfg.use_cache, cache_requests=cfg.cache_requests.get("cache_requests", False), rewrite_requests_cache=cfg.cache_requests.get( "rewrite_requests_cache", False ), delete_requests_cache=cfg.cache_requests.get( "delete_requests_cache", False ), limit=cfg.limit, samples=cfg.samples, check_integrity=cfg.check_integrity, write_out=cfg.write_out, log_samples=cfg.log_samples, evaluation_tracker=evaluation_tracker, system_instruction=cfg.system_instruction, apply_chat_template=cfg.apply_chat_template, fewshot_as_multiturn=cfg.fewshot_as_multiturn, gen_kwargs=cfg.gen_kwargs, task_manager=task_manager, verbosity=cfg.verbosity, predict_only=cfg.predict_only, random_seed=cfg.seed[0] if cfg.seed else None, numpy_random_seed=cfg.seed[1] if cfg.seed else None, torch_random_seed=cfg.seed[2] if cfg.seed else None, fewshot_random_seed=cfg.seed[3] if cfg.seed else None, confirm_run_unsafe_code=cfg.confirm_run_unsafe_code, metadata=cfg.metadata, ) # Process results if results is not None: if cfg.log_samples: samples = results.pop("samples") dumped = json.dumps( results, indent=2, default=handle_non_serializable, ensure_ascii=False ) if cfg.show_config: print(dumped) batch_sizes = ",".join(map(str, results["config"]["batch_sizes"])) # W&B logging if cfg.wandb_args: try: wandb_logger.post_init(results) wandb_logger.log_eval_result() if cfg.log_samples: wandb_logger.log_eval_samples(samples) except Exception as e: eval_logger.info(f"Logging to W&B failed: {e}") # Save results evaluation_tracker.save_results_aggregated( results=results, samples=samples if cfg.log_samples else None ) if cfg.log_samples: for task_name, _ in results["configs"].items(): evaluation_tracker.save_results_samples( task_name=task_name, samples=samples[task_name] ) if ( evaluation_tracker.push_results_to_hub or evaluation_tracker.push_samples_to_hub ): evaluation_tracker.recreate_metadata_card() # Print results print( f"{cfg.model} ({cfg.model_args}), gen_kwargs: ({cfg.gen_kwargs}), " f"limit: {cfg.limit}, num_fewshot: {cfg.num_fewshot}, " f"batch_size: {cfg.batch_size}{f' ({batch_sizes})' if batch_sizes else ''}" ) print(make_table(results)) if "groups" in results: print(make_table(results, "groups")) if cfg.wandb_args: wandb_logger.run.finish()