Unverified Commit 7cced021 authored by Arthur's avatar Arthur Committed by GitHub
Browse files

TF Sharded (#17713)



* initial commit

* update modeeling tf utils

* quality

* clean and update args

* update

* remove potential bug

* code quality

* update

* update max shard

* update tests for sharding from pretrained

* fix remaining test

* make style

* h5py if tf available

* update and fix test

* fix test

* style

* modified push to hub to support shard for TF

* quick fix

* update code

* merge branch main and style

* Apply suggestions from code review
Co-authored-by: default avatarJoao Gante <joaofranciscocardosogante@gmail.com>
Co-authored-by: default avatarPatrick von Platen <patrick.v.platen@gmail.com>

* update based on reviews

* update doc

* update and style

* Apply suggestions from code review
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>

* Update based on reviews

* fix typo

* style
Co-authored-by: default avatarJoao Gante <joaofranciscocardosogante@gmail.com>
Co-authored-by: default avatarPatrick von Platen <patrick.v.platen@gmail.com>
Co-authored-by: default avatarSylvain Gugger <35901082+sgugger@users.noreply.github.com>
parent f47afefb
This diff is collapsed.
......@@ -148,6 +148,7 @@ from .import_utils import (
WEIGHTS_NAME = "pytorch_model.bin"
WEIGHTS_INDEX_NAME = "pytorch_model.bin.index.json"
TF2_WEIGHTS_NAME = "tf_model.h5"
TF2_WEIGHTS_INDEX_NAME = "tf_model.h5.index.json"
TF_WEIGHTS_NAME = "model.ckpt"
FLAX_WEIGHTS_NAME = "flax_model.msgpack"
CONFIG_NAME = "config.json"
......
......@@ -861,6 +861,7 @@ class PushToHubMixin:
organization: Optional[str] = None,
private: Optional[bool] = None,
use_auth_token: Optional[Union[bool, str]] = None,
max_shard_size: Optional[Union[int, str]] = "10GB",
**model_card_kwargs
) -> str:
"""
......@@ -936,8 +937,9 @@ class PushToHubMixin:
use_auth_token=use_auth_token,
)
# Save the files in the cloned repo
self.save_pretrained(repo_path_or_name)
if hasattr(self, "history") and hasattr(self, "create_model_card"):
self.save_pretrained(repo_path_or_name, max_shard_size=max_shard_size)
# This is a Keras model and we might be able to fish out its History and make a model card out of it
base_model_card_args = {
"output_dir": repo_path_or_name,
......@@ -945,6 +947,9 @@ class PushToHubMixin:
}
base_model_card_args.update(model_card_kwargs)
self.create_model_card(**base_model_card_args)
else:
# FLAX does not support sharding yet, will come in next PR
self.save_pretrained(repo_path_or_name)
# Commit and push!
url = self._push_to_hub(repo, commit_message=commit_message)
......@@ -1075,3 +1080,114 @@ def send_example_telemetry(example_name, *example_args, framework="pytorch"):
except Exception:
# We don't want to error in case of connection errors of any kind.
pass
def convert_file_size_to_int(size: Union[int, str]):
"""
Converts a size expressed as a string with digits an unit (like `"5MB"`) to an integer (in bytes).
Args:
size (`int` or `str`): The size to convert. Will be directly returned if an `int`.
Example:
```py
>>> convert_file_size_to_int("1MiB")
1048576
```
"""
if isinstance(size, int):
return size
if size.upper().endswith("GIB"):
return int(size[:-3]) * (2**30)
if size.upper().endswith("MIB"):
return int(size[:-3]) * (2**20)
if size.upper().endswith("KIB"):
return int(size[:-3]) * (2**10)
if size.upper().endswith("GB"):
int_size = int(size[:-2]) * (10**9)
return int_size // 8 if size.endswith("b") else int_size
if size.upper().endswith("MB"):
int_size = int(size[:-2]) * (10**6)
return int_size // 8 if size.endswith("b") else int_size
if size.upper().endswith("KB"):
int_size = int(size[:-2]) * (10**3)
return int_size // 8 if size.endswith("b") else int_size
raise ValueError("`size` is not in a valid format. Use an integer followed by the unit, e.g., '5GB'.")
def get_checkpoint_shard_files(
pretrained_model_name_or_path,
index_filename,
cache_dir=None,
force_download=False,
proxies=None,
resume_download=False,
local_files_only=False,
use_auth_token=None,
user_agent=None,
revision=None,
mirror=None,
):
"""
For a given model:
- download and cache all the shards of a sharded checkpoint if `pretrained_model_name_or_path` is a model ID on the
Hub
- returns the list of paths to all the shards, as well as some metadata.
For the description of each arg, see [`PreTrainedModel.from_pretrained`]. `index_filename` is the full path to the
index (downloaded and cached if `pretrained_model_name_or_path` is a model ID on the Hub).
"""
import json
if not os.path.isfile(index_filename):
raise ValueError(f"Can't find a checkpoint index ({index_filename}) in {pretrained_model_name_or_path}.")
with open(index_filename, "r") as f:
index = json.loads(f.read())
shard_filenames = sorted(list(set(index["weight_map"].values())))
sharded_metadata = index["metadata"]
sharded_metadata["all_checkpoint_keys"] = list(index["weight_map"].keys())
# First, let's deal with local folder.
if os.path.isdir(pretrained_model_name_or_path):
shard_filenames = [os.path.join(pretrained_model_name_or_path, f) for f in shard_filenames]
return shard_filenames, sharded_metadata
# At this stage pretrained_model_name_or_path is a model identifier on the Hub
cached_filenames = []
for shard_filename in shard_filenames:
shard_url = hf_bucket_url(
pretrained_model_name_or_path, filename=shard_filename, revision=revision, mirror=mirror
)
try:
# Load from URL
cached_filename = cached_path(
shard_url,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
resume_download=resume_download,
local_files_only=local_files_only,
use_auth_token=use_auth_token,
user_agent=user_agent,
)
# We have already dealt with RepositoryNotFoundError and RevisionNotFoundError when getting the index, so
# we don't have to catch them here.
except EntryNotFoundError:
raise EnvironmentError(
f"{pretrained_model_name_or_path} does not appear to have a file named {shard_filename} which is "
"required according to the checkpoint index."
)
except HTTPError:
raise EnvironmentError(
f"We couldn't connect to '{HUGGINGFACE_CO_RESOLVE_ENDPOINT}' to load {shard_filename}. You should try"
" again after checking your internet connection."
)
cached_filenames.append(cached_filename)
return cached_filenames, sharded_metadata
......@@ -53,6 +53,7 @@ logger = logging.get_logger(__name__)
if is_tf_available():
import h5py
import numpy as np
import tensorflow as tf
......@@ -85,7 +86,12 @@ if is_tf_available():
TFSampleDecoderOnlyOutput,
TFSampleEncoderDecoderOutput,
)
from transformers.modeling_tf_utils import unpack_inputs
from transformers.modeling_tf_utils import (
TF2_WEIGHTS_INDEX_NAME,
TF2_WEIGHTS_NAME,
tf_shard_checkpoint,
unpack_inputs,
)
from transformers.tf_utils import stable_softmax
if _tf_gpu_memory_limit is not None:
......@@ -1867,6 +1873,129 @@ class UtilsFunctionsTest(unittest.TestCase):
out = masked_softmax(x, boolean_mask)
assert tf.experimental.numpy.allclose(xla_out, out)
def test_checkpoint_sharding_from_hub(self):
model = TFBertModel.from_pretrained("ArthurZ/tiny-random-bert-sharded")
# the model above is the same as the model below, just a sharded version.
ref_model = TFBertModel.from_pretrained("hf-internal-testing/tiny-random-bert")
for p1, p2 in zip(model.weights, ref_model.weights):
assert np.allclose(p1.numpy(), p2.numpy())
def test_shard_checkpoint(self):
# This is the model we will use, total size 340,000 bytes.
model = tf.keras.Sequential(
[
tf.keras.layers.Dense(200, use_bias=False), # size 80,000
tf.keras.layers.Dense(200, use_bias=False), # size 160,000
tf.keras.layers.Dense(100, use_bias=False), # size 80,000
tf.keras.layers.Dense(50, use_bias=False), # size 20,000
]
)
inputs = tf.zeros((1, 100), dtype=tf.float32)
model(inputs)
weights = model.weights
weights_dict = {w.name: w for w in weights}
with self.subTest("No shard when max size is bigger than model size"):
shards, index = tf_shard_checkpoint(weights)
self.assertIsNone(index)
self.assertDictEqual(shards, {TF2_WEIGHTS_NAME: weights})
with self.subTest("Test sharding, no weights bigger than max size"):
shards, index = tf_shard_checkpoint(weights, max_shard_size="300kB")
# Split is first two layers then last two.
self.assertDictEqual(
index,
{
"metadata": {"total_size": 340000},
"weight_map": {
"dense/kernel:0": "tf_model-00001-of-00002.h5",
"dense_1/kernel:0": "tf_model-00001-of-00002.h5",
"dense_2/kernel:0": "tf_model-00002-of-00002.h5",
"dense_3/kernel:0": "tf_model-00002-of-00002.h5",
},
},
)
shard1 = [weights_dict["dense/kernel:0"], weights_dict["dense_1/kernel:0"]]
shard2 = [weights_dict["dense_2/kernel:0"], weights_dict["dense_3/kernel:0"]]
self.assertDictEqual(shards, {"tf_model-00001-of-00002.h5": shard1, "tf_model-00002-of-00002.h5": shard2})
with self.subTest("Test sharding with weights bigger than max size"):
shards, index = tf_shard_checkpoint(weights, max_shard_size="100kB")
# Split is first layer, second layer then last 2.
self.assertDictEqual(
index,
{
"metadata": {"total_size": 340000},
"weight_map": {
"dense/kernel:0": "tf_model-00001-of-00003.h5",
"dense_1/kernel:0": "tf_model-00002-of-00003.h5",
"dense_2/kernel:0": "tf_model-00003-of-00003.h5",
"dense_3/kernel:0": "tf_model-00003-of-00003.h5",
},
},
)
shard1 = [weights_dict["dense/kernel:0"]]
shard2 = [weights_dict["dense_1/kernel:0"]]
shard3 = [weights_dict["dense_2/kernel:0"], weights_dict["dense_3/kernel:0"]]
self.assertDictEqual(
shards,
{
"tf_model-00001-of-00003.h5": shard1,
"tf_model-00002-of-00003.h5": shard2,
"tf_model-00003-of-00003.h5": shard3,
},
)
def test_checkpoint_sharding_local(self):
model = TFBertModel.from_pretrained("hf-internal-testing/tiny-random-bert")
with tempfile.TemporaryDirectory() as tmp_dir:
# We use the same folder for various sizes to make sure a new save erases the old checkpoint.
for max_size in ["150kB", "150kiB", "200kB", "200kiB"]:
model.save_pretrained(tmp_dir, max_shard_size=max_size)
# Get each shard file and its size
shard_to_size = {}
for shard in os.listdir(tmp_dir):
if shard.endswith(".h5"):
shard_file = os.path.join(tmp_dir, shard)
shard_to_size[shard_file] = os.path.getsize(shard_file)
index_file = os.path.join(tmp_dir, TF2_WEIGHTS_INDEX_NAME)
# Check there is an index but no regular weight file
self.assertTrue(os.path.isfile(index_file))
self.assertFalse(os.path.isfile(os.path.join(tmp_dir, TF2_WEIGHTS_NAME)))
# Check a file is bigger than max_size only when it has a single weight
for shard_file, size in shard_to_size.items():
if max_size.endswith("kiB"):
max_size_int = int(max_size[:-3]) * 2**10
else:
max_size_int = int(max_size[:-2]) * 10**3
# Note: pickle adds some junk so the weight of the file can end up being slightly bigger than
# the size asked for (since we count parameters)
if size >= max_size_int + 50000:
with h5py.File(shard_file, "r") as state_file:
self.assertEqual(len(state_file), 1)
# Check the index and the shard files found match
with open(index_file, "r", encoding="utf-8") as f:
index = json.loads(f.read())
all_shards = set(index["weight_map"].values())
shards_found = set(f for f in os.listdir(tmp_dir) if f.endswith(".h5"))
self.assertSetEqual(all_shards, shards_found)
# Finally, check the model can be reloaded
new_model = TFBertModel.from_pretrained(tmp_dir)
model(model.dummy_inputs)
new_model(model.dummy_inputs)
for p1, p2 in zip(model.weights, new_model.weights):
self.assertTrue(np.allclose(p1.numpy(), p2.numpy()))
@require_tf
@is_staging_test
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment