Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
gaoqiong
lm-evaluation-harness
Commits
68b3cddc
Commit
68b3cddc
authored
Jul 12, 2025
by
Baber
Browse files
nit
parent
495ea3a0
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
63 additions
and
64 deletions
+63
-64
lm_eval/tasks/__init__.py
lm_eval/tasks/__init__.py
+63
-64
No files found.
lm_eval/tasks/__init__.py
View file @
68b3cddc
...
...
@@ -11,9 +11,7 @@ from typing import (
TYPE_CHECKING
,
Any
,
Callable
,
Dict
,
Generator
,
List
,
Mapping
,
Optional
,
Union
,
...
...
@@ -30,15 +28,21 @@ from lm_eval.utils import pattern_match, setup_logging
if
TYPE_CHECKING
:
from
lm_eval.api.task
import
ConfigurableTask
,
Task
eval_logger
=
logging
.
getLogger
(
__name__
)
GROUP_ONLY_KEYS
=
list
(
GroupConfig
().
to_dict
().
keys
())
_Base
=
yaml
.
CLoader
if
getattr
(
yaml
,
"__with_libyaml__"
,
False
)
else
yaml
.
FullLoader
_IGNORE_DIRS
=
(
"__pycache__"
,
".ipynb_checkpoints"
,
)
eval_logger
=
logging
.
getLogger
(
__name__
)
_Base
=
yaml
.
CLoader
if
getattr
(
yaml
,
"__with_libyaml__"
,
False
)
else
yaml
.
FullLoader
def
ignore_constructor
(
loader
:
yaml
.
Loader
,
node
:
yaml
.
Node
)
->
None
:
return
None
@
functools
.
lru_cache
(
maxsize
=
None
)
# ← reuse per (directory, simple) pair
@
functools
.
lru_cache
(
maxsize
=
2048
)
# ← reuse per (directory, simple) pair
def
_make_loader
(
yaml_dir
:
Path
,
simple
:
bool
=
False
)
->
type
[
yaml
.
Loader
]:
"""
Return a custom YAML Loader class bound to *yaml_dir*.
...
...
@@ -48,14 +52,13 @@ def _make_loader(yaml_dir: Path, simple: bool = False) -> type[yaml.Loader]:
We capture it so that !function look-ups can resolve relative
Python files like my_utils.some_fn ➜ yaml_dir / "my_utils.py".
simple
If True we ignore !function completely (used by `mode="simple"`).
If True we ignore !function completely (used by `mode="simple"`),
used on TaskManager init to index.
"""
class
Loader
(
_Base
):
"""Dynamically-generated loader that knows its base directory."""
# no extra state needed; the constructor stays the same
# Register (or stub) the !function constructor **for this Loader only**
if
simple
:
yaml
.
add_constructor
(
"!function"
,
ignore_constructor
,
Loader
=
Loader
)
...
...
@@ -90,18 +93,14 @@ def _import_function(qualname: str, *, base_path: Path) -> Callable:
return
getattr
(
mod
,
func_name
)
def
ignore_constructor
(
loader
:
yaml
.
Loader
,
node
:
yaml
.
Node
)
->
None
:
return
None
@
functools
.
lru_cache
(
maxsize
=
None
)
#
@
functools
.
lru_cache
(
maxsize
=
4096
)
#
def
_parse_yaml_file
(
path
:
Path
,
mode
:
str
)
->
dict
:
loader_cls
=
_make_loader
(
path
.
parent
,
simple
=
(
mode
==
"simple"
))
with
path
.
open
(
"rb"
)
as
fh
:
return
yaml
.
load
(
fh
,
Loader
=
loader_cls
)
@
functools
.
lru_cache
(
maxsize
=
None
)
@
functools
.
lru_cache
(
maxsize
=
4096
)
def
_get_cached_config
(
yaml_path
:
Path
,
mode
:
str
)
->
dict
:
"""Load and cache resolved YAML configs with LRU eviction."""
# Parse the YAML file
...
...
@@ -212,10 +211,9 @@ def load_yaml_config(
def
iter_yaml_files
(
root
:
Path
)
->
Generator
[
Path
,
Any
,
None
]:
# '**/*.yaml' is handled internally by os.scandir.
for
p
in
iglob
(
"**/*.yaml"
,
root_dir
=
root
,
recursive
=
True
):
# ignore check
if
p
.
startswith
((
"__pycache__"
,
".ipynb_checkpoints"
))
:
if
Path
(
p
).
parts
[
0
]
in
_IGNORE_DIRS
:
continue
yield
root
/
p
...
...
@@ -229,7 +227,7 @@ class TaskManager:
def
__init__
(
self
,
verbosity
:
Optional
[
str
]
=
None
,
include_path
:
Optional
[
Union
[
str
,
Path
,
L
ist
[
Union
[
str
,
Path
]]]]
=
None
,
include_path
:
Optional
[
Union
[
str
,
Path
,
l
ist
[
Union
[
str
,
Path
]]]]
=
None
,
include_defaults
:
bool
=
True
,
metadata
:
Optional
[
dict
]
=
None
,
)
->
None
:
...
...
@@ -260,18 +258,18 @@ class TaskManager:
def
initialize_tasks
(
self
,
include_path
:
Optional
[
Union
[
str
,
Path
,
L
ist
[
Union
[
str
,
Path
]]]]
=
None
,
include_path
:
Optional
[
Union
[
str
,
Path
,
l
ist
[
Union
[
str
,
Path
]]]]
=
None
,
include_defaults
:
bool
=
True
,
)
->
dict
[
str
,
dict
]:
"""Creates a dictionary of tasks indexes.
:param include_path: Union[str,
L
ist] = None
:param include_path: Union[str,
l
ist] = None
An additional path to be searched for tasks recursively.
Can provide more than one such path as a list.
:param include_defaults: bool = True
If set to false, default tasks (those in lm_eval/tasks/) are not indexed.
return
D
ictionary of task names as key and task metadata
d
ictionary of task names as key and task metadata
"""
if
include_defaults
:
all_paths
=
[
Path
(
__file__
).
parent
]
...
...
@@ -291,23 +289,23 @@ class TaskManager:
return
task_index
@
property
def
all_tasks
(
self
)
->
L
ist
[
str
]:
def
all_tasks
(
self
)
->
l
ist
[
str
]:
return
self
.
_all_tasks
@
property
def
all_groups
(
self
)
->
L
ist
[
str
]:
def
all_groups
(
self
)
->
l
ist
[
str
]:
return
self
.
_all_groups
@
property
def
all_subtasks
(
self
)
->
L
ist
[
str
]:
def
all_subtasks
(
self
)
->
l
ist
[
str
]:
return
self
.
_all_subtasks
@
property
def
all_tags
(
self
)
->
L
ist
[
str
]:
def
all_tags
(
self
)
->
l
ist
[
str
]:
return
self
.
_all_tags
@
property
def
task_index
(
self
)
->
D
ict
[
str
,
D
ict
[
str
,
Union
[
str
,
int
,
L
ist
[
str
]]]]:
def
task_index
(
self
)
->
d
ict
[
str
,
d
ict
[
str
,
Union
[
str
,
int
,
l
ist
[
str
]]]]:
return
self
.
_task_index
def
list_all_tasks
(
...
...
@@ -340,15 +338,16 @@ class TaskManager:
inc_list
=
inc_raw
if
isinstance
(
inc_raw
,
list
)
else
[
inc_raw
]
for
inc
in
inc_list
:
inc_path
=
Path
(
inc
)
if
not
inc_path
.
is_absolute
():
# treat as relative include
inc_path
=
base
.
parent
/
inc_path
try
:
inc_cfg
=
load_yaml_config
(
inc_path
,
mode
=
"simple"
)
except
FileNotFoundError
:
continue
if
"output_type"
in
inc_cfg
:
return
inc_cfg
[
"output_type"
]
if
inc
:
inc_path
=
Path
(
inc
)
if
not
inc_path
.
is_absolute
():
# treat as relative include
inc_path
=
base
.
parent
/
inc_path
try
:
inc_cfg
=
load_yaml_config
(
inc_path
,
mode
=
"simple"
)
except
FileNotFoundError
:
continue
if
"output_type"
in
inc_cfg
:
return
inc_cfg
[
"output_type"
]
return
""
# -------------------------------------------------------------- GROUP table
...
...
@@ -452,7 +451,7 @@ class TaskManager:
raise
ValueError
return
self
.
task_index
[
name
][
"yaml_path"
]
def
_get_config
(
self
,
name
:
str
)
->
D
ict
:
def
_get_config
(
self
,
name
:
str
)
->
d
ict
:
if
name
not
in
self
.
task_index
:
raise
ValueError
yaml_path
=
self
.
_get_yaml_path
(
name
)
...
...
@@ -461,7 +460,7 @@ class TaskManager:
else
:
return
load_yaml_config
(
Path
(
yaml_path
),
mode
=
"full"
)
def
_get_tasklist
(
self
,
name
:
str
)
->
Union
[
L
ist
[
str
],
int
]:
def
_get_tasklist
(
self
,
name
:
str
)
->
Union
[
l
ist
[
str
],
int
]:
if
self
.
_name_is_task
(
name
):
raise
ValueError
return
self
.
task_index
[
name
][
"task"
]
...
...
@@ -471,8 +470,8 @@ class TaskManager:
task_name
:
str
,
task_type
:
str
,
yaml_path
:
str
,
tasks_and_groups
:
D
ict
[
str
,
D
ict
],
config
:
Optional
[
D
ict
]
=
None
,
tasks_and_groups
:
d
ict
[
str
,
d
ict
],
config
:
Optional
[
d
ict
]
=
None
,
populate_tags_fn
:
Optional
[
callable
]
=
None
,
)
->
None
:
"""Helper method to register a task in the tasks_and_groups dict"""
...
...
@@ -485,8 +484,8 @@ class TaskManager:
populate_tags_fn
(
config
,
task_name
,
tasks_and_groups
)
def
_merge_task_configs
(
self
,
base_config
:
D
ict
,
task_specific_config
:
D
ict
,
task_name
:
str
)
->
D
ict
:
self
,
base_config
:
d
ict
,
task_specific_config
:
d
ict
,
task_name
:
str
)
->
d
ict
:
"""Merge base config with task-specific overrides for task_list configs"""
if
task_specific_config
:
task_specific_config
=
task_specific_config
.
copy
()
...
...
@@ -495,8 +494,8 @@ class TaskManager:
return
{
**
base_config
,
"task"
:
task_name
}
def
_process_tag_subtasks
(
self
,
tag_name
:
str
,
update_config
:
Optional
[
D
ict
]
=
None
)
->
D
ict
:
self
,
tag_name
:
str
,
update_config
:
Optional
[
d
ict
]
=
None
)
->
d
ict
:
"""Process subtasks for a tag and return loaded tasks"""
subtask_list
=
self
.
_get_tasklist
(
tag_name
)
fn
=
partial
(
...
...
@@ -505,7 +504,7 @@ class TaskManager:
)
return
dict
(
collections
.
ChainMap
(
*
map
(
fn
,
reversed
(
subtask_list
))))
def
_process_alias
(
self
,
config
:
D
ict
,
group
:
Optional
[
str
]
=
None
)
->
D
ict
:
def
_process_alias
(
self
,
config
:
d
ict
,
group
:
Optional
[
str
]
=
None
)
->
d
ict
:
# If the group is not the same as the original
# group which the group alias was intended for,
# Set the group_alias to None instead.
...
...
@@ -524,15 +523,15 @@ class TaskManager:
def
_load_individual_task_or_group
(
self
,
name_or_config
:
Optional
[
Union
[
str
,
D
ict
]]
=
None
,
name_or_config
:
Optional
[
Union
[
str
,
d
ict
]]
=
None
,
parent_name
:
Optional
[
str
]
=
None
,
update_config
:
Optional
[
D
ict
]
=
None
,
update_config
:
Optional
[
d
ict
]
=
None
,
)
->
Mapping
:
from
lm_eval.api.task
import
ConfigurableTask
,
Task
def
_load_task
(
config
:
D
ict
,
task
:
str
,
yaml_path
:
Optional
[
str
]
=
None
)
->
D
ict
[
str
,
Union
[
"ConfigurableTask"
,
"Task"
]]:
config
:
d
ict
,
task
:
str
,
yaml_path
:
Optional
[
str
]
=
None
)
->
d
ict
[
str
,
Union
[
"ConfigurableTask"
,
"Task"
]]:
if
"include"
in
config
:
# Store the task name to preserve it after include processing
original_task_name
=
config
.
get
(
"task"
,
task
)
...
...
@@ -568,8 +567,8 @@ class TaskManager:
return
{
task
:
task_object
}
def
_get_group_and_subtask_from_config
(
config
:
D
ict
,
)
->
tuple
[
ConfigurableGroup
,
L
ist
[
str
]]:
config
:
d
ict
,
)
->
tuple
[
ConfigurableGroup
,
l
ist
[
str
]]:
if
self
.
metadata
is
not
None
:
config
[
"metadata"
]
=
config
.
get
(
"metadata"
,
{})
|
self
.
metadata
group_name
=
ConfigurableGroup
(
config
=
config
)
...
...
@@ -582,8 +581,8 @@ class TaskManager:
return
group_name
,
subtask_list
def
_process_group_config
(
config
:
D
ict
,
update_config
:
Optional
[
D
ict
]
=
None
)
->
tuple
[
D
ict
,
Optional
[
D
ict
]]:
config
:
d
ict
,
update_config
:
Optional
[
d
ict
]
=
None
)
->
tuple
[
d
ict
,
Optional
[
d
ict
]]:
if
update_config
is
not
None
:
config
=
{
**
config
,
**
update_config
}
_update_config
=
{
...
...
@@ -716,15 +715,15 @@ class TaskManager:
}
def
load_task_or_group
(
self
,
task_list
:
Optional
[
Union
[
str
,
L
ist
[
str
]]]
=
None
)
->
D
ict
:
self
,
task_list
:
Optional
[
Union
[
str
,
l
ist
[
str
]]]
=
None
)
->
d
ict
:
"""Loads a dictionary of task objects from a list
:param task_list: Union[str, list] = None
Single string or list of string of task names to be loaded
:return
D
ictionary of task objects
d
ictionary of task objects
"""
if
isinstance
(
task_list
,
str
):
task_list
=
[
task_list
]
...
...
@@ -739,10 +738,10 @@ class TaskManager:
)
return
all_loaded_tasks
def
load_config
(
self
,
config
:
D
ict
)
->
Mapping
:
def
load_config
(
self
,
config
:
d
ict
)
->
Mapping
:
return
self
.
_load_individual_task_or_group
(
config
)
def
_get_task_and_group
(
self
,
task_dir
:
Union
[
str
,
Path
])
->
D
ict
[
str
,
D
ict
]:
def
_get_task_and_group
(
self
,
task_dir
:
Union
[
str
,
Path
])
->
d
ict
[
str
,
d
ict
]:
"""Creates a dictionary of tasks index with the following metadata,
- `type`, that can be either `task`, `python_task`, `group` or `tags`.
`task` refer to regular task configs, `python_task` are special
...
...
@@ -762,11 +761,11 @@ class TaskManager:
A directory to check for tasks
:return
D
ictionary of task names as key and task metadata
d
ictionary of task names as key and task metadata
"""
def
_populate_tags_and_groups
(
config
:
D
ict
,
task
:
str
,
tasks_and_groups
:
D
ict
[
str
,
D
ict
]
config
:
d
ict
,
task
:
str
,
tasks_and_groups
:
d
ict
[
str
,
d
ict
]
)
->
None
:
# TODO: remove group in next release
if
"tag"
in
config
:
...
...
@@ -868,7 +867,7 @@ class TaskManager:
return
tasks_and_groups
def
get_task_name_from_config
(
task_config
:
D
ict
[
str
,
str
])
->
str
:
def
get_task_name_from_config
(
task_config
:
d
ict
[
str
,
str
])
->
str
:
if
"task"
in
task_config
:
return
task_config
[
"task"
]
if
"dataset_name"
in
task_config
:
...
...
@@ -890,7 +889,7 @@ def get_task_name_from_object(task_object: Union["ConfigurableTask", "Task"]) ->
)
def
_check_duplicates
(
task_dict
:
D
ict
[
str
,
L
ist
[
str
]])
->
None
:
def
_check_duplicates
(
task_dict
:
d
ict
[
str
,
l
ist
[
str
]])
->
None
:
"""helper function solely used in validating get_task_dict output.
Takes the output of lm_eval.evaluator_utils.get_subtask_list and
returns a list of all leaf subtasks contained within, and errors if any such leaf subtasks are
...
...
@@ -918,12 +917,12 @@ def _check_duplicates(task_dict: Dict[str, List[str]]) -> None:
def
get_task_dict
(
task_name_list
:
Union
[
str
,
L
ist
[
Union
[
str
,
D
ict
,
"Task"
]]],
task_name_list
:
Union
[
str
,
l
ist
[
Union
[
str
,
d
ict
,
"Task"
]]],
task_manager
:
Optional
[
TaskManager
]
=
None
,
)
->
D
ict
[
str
,
Union
[
"ConfigurableTask"
,
"Task"
]]:
)
->
d
ict
[
str
,
Union
[
"ConfigurableTask"
,
"Task"
]]:
"""Creates a dictionary of task objects from either a name of task, config, or prepared Task object.
:param task_name_list:
L
ist[Union[str,
D
ict, Task]]
:param task_name_list:
l
ist[Union[str,
d
ict, Task]]
Name of model or LM object, see lm_eval.models.get_model
:param task_manager: TaskManager = None
A TaskManager object that stores indexed tasks. If not set,
...
...
@@ -932,7 +931,7 @@ def get_task_dict(
via `include_path`
:return
D
ictionary of task objects
d
ictionary of task objects
"""
from
lm_eval.api.task
import
ConfigurableTask
,
Task
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment