"research/learning_unsupervised_learning/datasets/common.py" did not exist on "bb9e441861a3e08ff5c353271803bd98ee0e74f1"
Commit 6f1e3b38 authored by Brian Lee's avatar Brian Lee Committed by Hongkun Yu
Browse files

Remove unmaintained fork of Minigo code (#7605)

The reference implementation can be found at
https://github.com/tensorflow/minigo

This fork was originally created to experiment with performance upgrades
for MLPerf, but since MLPerf work is focused in the original repo,
this fork's existence only serves to confuse.
parent 497989e0
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for preprocessing."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import itertools
import tempfile
import tensorflow as tf # pylint: disable=g-bad-import-order
import coords
import features
import go
import model_params
import numpy as np
import preprocessing
import utils_test
tf.logging.set_verbosity(tf.logging.ERROR)
TEST_SGF = '''(;CA[UTF-8]SZ[9]PB[Murakawa Daisuke]PW[Iyama Yuta]KM[6.5]
HA[0]RE[W+1.5]GM[1];B[fd];W[cf])'''
class TestPreprocessing(utils_test.MiniGoUnitTest):
def create_random_data(self, num_examples):
raw_data = []
for _ in range(num_examples):
feature = np.random.random([
utils_test.BOARD_SIZE, utils_test.BOARD_SIZE,
features.NEW_FEATURES_PLANES]).astype(np.uint8)
pi = np.random.random([utils_test.BOARD_SIZE * utils_test.BOARD_SIZE
+ 1]).astype(np.float32)
value = np.random.random()
raw_data.append((feature, pi, value))
return raw_data
def extract_data(self, tf_record, filter_amount=1):
pos_tensor, label_tensors = preprocessing.get_input_tensors(
model_params.DummyMiniGoParams(), 1, [tf_record], num_repeats=1,
shuffle_records=False, shuffle_examples=False,
filter_amount=filter_amount)
recovered_data = []
with tf.Session() as sess:
while True:
try:
pos_value, label_values = sess.run([pos_tensor, label_tensors])
recovered_data.append((
pos_value,
label_values['pi_tensor'],
label_values['value_tensor']))
except tf.errors.OutOfRangeError:
break
return recovered_data
def assertEqualData(self, data1, data2):
# Assert that two data are equal, where both are of form:
# data = List<Tuple<feature_array, pi_array, value>>
self.assertEqual(len(data1), len(data2))
for datum1, datum2 in zip(data1, data2):
# feature
self.assertEqualNPArray(datum1[0], datum2[0])
# pi
self.assertEqualNPArray(datum1[1], datum2[1])
# value
self.assertEqual(datum1[2], datum2[2])
def test_serialize_round_trip(self):
np.random.seed(1)
raw_data = self.create_random_data(10)
tfexamples = list(map(preprocessing.make_tf_example, *zip(*raw_data)))
with tempfile.NamedTemporaryFile() as f:
preprocessing.write_tf_examples(f.name, tfexamples)
recovered_data = self.extract_data(f.name)
self.assertEqualData(raw_data, recovered_data)
def test_filter(self):
raw_data = self.create_random_data(100)
tfexamples = list(map(preprocessing.make_tf_example, *zip(*raw_data)))
with tempfile.NamedTemporaryFile() as f:
preprocessing.write_tf_examples(f.name, tfexamples)
recovered_data = self.extract_data(f.name, filter_amount=.05)
self.assertLess(len(recovered_data), 50)
def test_serialize_round_trip_no_parse(self):
np.random.seed(1)
raw_data = self.create_random_data(10)
tfexamples = list(map(preprocessing.make_tf_example, *zip(*raw_data)))
with tempfile.NamedTemporaryFile() as start_file, \
tempfile.NamedTemporaryFile() as rewritten_file:
preprocessing.write_tf_examples(start_file.name, tfexamples)
# We want to test that the rewritten, shuffled file contains correctly
# serialized tf.Examples.
batch_size = 4
batches = list(preprocessing.shuffle_tf_examples(
1000, batch_size, [start_file.name]))
# 2 batches of 4, 1 incomplete batch of 2.
self.assertEqual(len(batches), 3)
# concatenate list of lists into one list
all_batches = list(itertools.chain.from_iterable(batches))
for _ in batches:
preprocessing.write_tf_examples(
rewritten_file.name, all_batches, serialize=False)
original_data = self.extract_data(start_file.name)
recovered_data = self.extract_data(rewritten_file.name)
# stuff is shuffled, so sort before checking equality
def sort_key(nparray_tuple):
return nparray_tuple[2]
original_data = sorted(original_data, key=sort_key)
recovered_data = sorted(recovered_data, key=sort_key)
self.assertEqualData(original_data, recovered_data)
def test_make_dataset_from_sgf(self):
with tempfile.NamedTemporaryFile() as sgf_file, \
tempfile.NamedTemporaryFile() as record_file:
sgf_file.write(TEST_SGF.encode('utf8'))
sgf_file.seek(0)
preprocessing.make_dataset_from_sgf(
utils_test.BOARD_SIZE, sgf_file.name, record_file.name)
recovered_data = self.extract_data(record_file.name)
start_pos = go.Position(utils_test.BOARD_SIZE)
first_move = coords.from_sgf('fd')
next_pos = start_pos.play_move(first_move)
second_move = coords.from_sgf('cf')
expected_data = [
(
features.extract_features(utils_test.BOARD_SIZE, start_pos),
preprocessing._one_hot(utils_test.BOARD_SIZE, coords.to_flat(
utils_test.BOARD_SIZE, first_move)), -1
),
(
features.extract_features(utils_test.BOARD_SIZE, next_pos),
preprocessing._one_hot(utils_test.BOARD_SIZE, coords.to_flat(
utils_test.BOARD_SIZE, second_move)), -1
)
]
self.assertEqualData(expected_data, recovered_data)
if __name__ == '__main__':
tf.test.main()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Play a self-play match with a given DualNet model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import random
import sys
import time
import coords
from gtp_wrapper import MCTSPlayer
def play(board_size, network, readouts, resign_threshold, simultaneous_leaves,
verbosity=0):
"""Plays out a self-play match.
Args:
board_size: the go board size
network: the DualNet model
readouts: the number of readouts in MCTS
resign_threshold: the threshold to resign at in the match
simultaneous_leaves: the number of simultaneous leaves in MCTS
verbosity: the verbosity of the self-play match
Returns:
the final position
the n x 362 tensor of floats representing the mcts search probabilities
the n-ary tensor of floats representing the original value-net estimate
where n is the number of moves in the game.
"""
player = MCTSPlayer(board_size, network, resign_threshold=resign_threshold,
verbosity=verbosity, num_parallel=simultaneous_leaves)
# Disable resign in 5% of games
if random.random() < 0.05:
player.resign_threshold = -1.0
player.initialize_game()
# Must run this once at the start, so that noise injection actually
# affects the first move of the game.
first_node = player.root.select_leaf()
prob, val = network.run(first_node.position)
first_node.incorporate_results(prob, val, first_node)
while True:
start = time.time()
player.root.inject_noise()
current_readouts = player.root.N
# we want to do "X additional readouts", rather than "up to X readouts".
while player.root.N < current_readouts + readouts:
player.tree_search()
if verbosity >= 3:
print(player.root.position)
print(player.root.describe())
if player.should_resign():
player.set_result(-1 * player.root.position.to_play, was_resign=True)
break
move = player.pick_move()
player.play_move(move)
if player.root.is_done():
player.set_result(player.root.position.result(), was_resign=False)
break
if (verbosity >= 2) or (
verbosity >= 1 and player.root.position.n % 10 == 9):
print("Q: {:.5f}".format(player.root.Q))
dur = time.time() - start
print("%d: %d readouts, %.3f s/100. (%.2f sec)" % (
player.root.position.n, readouts, dur / readouts * 100.0, dur))
if verbosity >= 3:
print("Played >>",
coords.to_kgs(coords.from_flat(player.root.fmove)))
if verbosity >= 2:
print("%s: %.3f" % (player.result_string, player.root.Q), file=sys.stderr)
print(player.root.position,
player.root.position.score(), file=sys.stderr)
return player
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Code to extract a series of positions + their next moves from an SGF.
Most of the complexity here is dealing with two features of SGF:
- Stones can be added via "play move" or "add move", the latter being used
to configure L+D puzzles, but also for initial handicap placement.
- Plays don't necessarily alternate colors; they can be repeated B or W moves
This feature is used to handle free handicap placement.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import coords
import go
from go import Position, PositionWithContext
import numpy as np
import sgf
import utils
SGF_TEMPLATE = '''(;GM[1]FF[4]CA[UTF-8]AP[Minigo_sgfgenerator]RU[{ruleset}]
SZ[{boardsize}]KM[{komi}]PW[{white_name}]PB[{black_name}]RE[{result}]
{game_moves})'''
PROGRAM_IDENTIFIER = 'Minigo'
def translate_sgf_move_qs(player_move, q):
return '{move}C[{q:.4f}]'.format(
move=translate_sgf_move(player_move), q=q)
def translate_sgf_move(player_move, comment):
if player_move.color not in (go.BLACK, go.WHITE):
raise ValueError(
'Can\'t translate color {} to sgf'.format(player_move.color))
c = coords.to_sgf(player_move.move)
color = 'B' if player_move.color == go.BLACK else 'W'
if comment is not None:
comment = comment.replace(']', r'\]')
comment_node = 'C[{}]'.format(comment)
else:
comment_node = ''
return ';{color}[{coords}]{comment_node}'.format(
color=color, coords=c, comment_node=comment_node)
# pylint: disable=unused-argument
# pylint: disable=unused-variable
def make_sgf(board_size, move_history, result_string, ruleset='Chinese',
komi=7.5, white_name=PROGRAM_IDENTIFIER,
black_name=PROGRAM_IDENTIFIER, comments=[]):
"""Turn a game into SGF.
Doesn't handle handicap games or positions with incomplete history.
Args:
board_size: the go board size.
move_history: iterable of PlayerMoves.
result_string: "B+R", "W+0.5", etc.
ruleset: the rule set of go game
komi: komi score
white_name: the name of white player
black_name: the name of black player
comments: iterable of string/None. Will be zipped with move_history.
"""
try:
# Python 2
from itertools import izip_longest
zip_longest = izip_longest
except ImportError:
# Python 3
from itertools import zip_longest
boardsize = board_size
game_moves = ''.join(translate_sgf_move(*z) for z in zip_longest(
move_history, comments))
result = result_string
return SGF_TEMPLATE.format(**locals())
def sgf_prop(value_list):
"""Converts raw sgf library output to sensible value."""
if value_list is None:
return None
if len(value_list) == 1:
return value_list[0]
else:
return value_list
def sgf_prop_get(props, key, default):
return sgf_prop(props.get(key, default))
def handle_node(board_size, pos, node):
"""A node can either add B+W stones, play as B, or play as W."""
props = node.properties
black_stones_added = [coords.from_sgf(c) for c in props.get('AB', [])]
white_stones_added = [coords.from_sgf(c) for c in props.get('AW', [])]
if black_stones_added or white_stones_added:
return add_stones(board_size, pos, black_stones_added, white_stones_added)
# If B/W props are not present, then there is no move. But if it is present
# and equal to the empty string, then the move was a pass.
elif 'B' in props:
black_move = coords.from_sgf(props.get('B', [''])[0])
return pos.play_move(black_move, color=go.BLACK)
elif 'W' in props:
white_move = coords.from_sgf(props.get('W', [''])[0])
return pos.play_move(white_move, color=go.WHITE)
else:
return pos
def add_stones(board_size, pos, black_stones_added, white_stones_added):
working_board = np.copy(pos.board)
go.place_stones(working_board, go.BLACK, black_stones_added)
go.place_stones(working_board, go.WHITE, white_stones_added)
new_position = Position(
board_size, board=working_board, n=pos.n, komi=pos.komi,
caps=pos.caps, ko=pos.ko, recent=pos.recent, to_play=pos.to_play)
return new_position
def get_next_move(node):
props = node.next.properties
if 'W' in props:
return coords.from_sgf(props['W'][0])
else:
return coords.from_sgf(props['B'][0])
def maybe_correct_next(pos, next_node):
if (('B' in next_node.properties and pos.to_play != go.BLACK) or
('W' in next_node.properties and pos.to_play != go.WHITE)):
pos.flip_playerturn(mutate=True)
def replay_sgf(board_size, sgf_contents):
"""Wrapper for sgf files.
It does NOT return the very final position, as there is no follow up.
To get the final position, call pwc.position.play_move(pwc.next_move)
on the last PositionWithContext returned.
Example usage:
with open(filename) as f:
for position_w_context in replay_sgf(f.read()):
print(position_w_context.position)
Args:
board_size: the go board size.
sgf_contents: the content in sgf.
Yields:
The go.PositionWithContext instances.
"""
collection = sgf.parse(sgf_contents)
game = collection.children[0]
props = game.root.properties
assert int(sgf_prop(props.get('GM', ['1']))) == 1, 'Not a Go SGF!'
komi = 0
if props.get('KM') is not None:
komi = float(sgf_prop(props.get('KM')))
result = utils.parse_game_result(sgf_prop(props.get('RE')))
pos = Position(board_size, komi=komi)
current_node = game.root
while pos is not None and current_node.next is not None:
pos = handle_node(board_size, pos, current_node)
maybe_correct_next(pos, current_node.next)
next_move = get_next_move(current_node)
yield PositionWithContext(pos, next_move, result)
current_node = current_node.next
def replay_sgf_file(board_size, sgf_file):
with open(sgf_file) as f:
for pwc in replay_sgf(board_size, f.read()):
yield pwc
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for sgf_wrapper."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf # pylint: disable=g-bad-import-order
import coords
import go
from sgf_wrapper import replay_sgf, translate_sgf_move, make_sgf
import utils_test
JAPANESE_HANDICAP_SGF = '''(;GM[1]FF[4]CA[UTF-8]AP[CGoban:3]ST[2]RU[Japanese]
SZ[9]HA[2]RE[Void]KM[5.50]PW[test_white]PB[test_black]AB[gc][cg];W[ee];B[dg])'''
CHINESE_HANDICAP_SGF = '''(;GM[1]FF[4]CA[UTF-8]AP[CGoban:3]ST[2]RU[Chinese]SZ[9]
HA[2]RE[Void]KM[5.50]PW[test_white]PB[test_black]RE[B+39.50];B[gc];B[cg];W[ee];
B[gg];W[eg];B[ge];W[ce];B[ec];W[cc];B[dd];W[de];B[cd];W[bd];B[bc];W[bb];B[be];
W[ac];B[bf];W[dh];B[ch];W[ci];B[bi];W[di];B[ah];W[gh];B[hh];W[fh];B[hg];W[gi];
B[fg];W[dg];B[ei];W[cf];B[ef];W[ff];B[fe];W[bg];B[bh];W[af];B[ag];W[ae];B[ad];
W[ae];B[ed];W[db];B[df];W[eb];B[fb];W[ea];B[fa])'''
NO_HANDICAP_SGF = '''(;CA[UTF-8]SZ[9]PB[Murakawa Daisuke]PW[Iyama Yuta]KM[6.5]
HA[0]RE[W+1.5]GM[1];B[fd];W[cf];B[eg];W[dd];B[dc];W[cc];B[de];W[cd];B[ed];W[he];
B[ce];W[be];B[df];W[bf];B[hd];W[ge];B[gd];W[gg];B[db];W[cb];B[cg];W[bg];B[gh];
W[fh];B[hh];W[fg];B[eh];W[ei];B[di];W[fi];B[hg];W[dh];B[ch];W[ci];B[bh];W[ff];
B[fe];W[hf];B[id];W[bi];B[ah];W[ef];B[dg];W[ee];B[di];W[ig];B[ai];W[ih];B[fb];
W[hi];B[ag];W[ab];B[bd];W[bc];B[ae];W[ad];B[af];W[bd];B[ca];W[ba];B[da];W[ie])
'''
tf.logging.set_verbosity(tf.logging.ERROR)
class TestSgfGeneration(utils_test.MiniGoUnitTest):
def test_translate_sgf_move(self):
self.assertEqual(
';B[db]',
translate_sgf_move(go.PlayerMove(go.BLACK, (1, 3)), None))
self.assertEqual(
';W[aa]',
translate_sgf_move(go.PlayerMove(go.WHITE, (0, 0)), None))
self.assertEqual(
';W[]',
translate_sgf_move(go.PlayerMove(go.WHITE, None), None))
self.assertEqual(
';B[db]C[comment]',
translate_sgf_move(go.PlayerMove(go.BLACK, (1, 3)), 'comment'))
def test_make_sgf(self):
all_pwcs = list(replay_sgf(utils_test.BOARD_SIZE, NO_HANDICAP_SGF))
second_last_position, last_move, _ = all_pwcs[-1]
last_position = second_last_position.play_move(last_move)
back_to_sgf = make_sgf(
utils_test.BOARD_SIZE,
last_position.recent,
last_position.score(),
komi=last_position.komi,
)
reconstructed_positions = list(replay_sgf(
utils_test.BOARD_SIZE, back_to_sgf))
second_last_position2, last_move2, _ = reconstructed_positions[-1]
last_position2 = second_last_position2.play_move(last_move2)
self.assertEqualPositions(last_position, last_position2)
class TestSgfWrapper(utils_test.MiniGoUnitTest):
def test_sgf_props(self):
sgf_replayer = replay_sgf(utils_test.BOARD_SIZE, CHINESE_HANDICAP_SGF)
initial = next(sgf_replayer)
self.assertEqual(initial.result, go.BLACK)
self.assertEqual(initial.position.komi, 5.5)
def test_japanese_handicap_handling(self):
intermediate_board = utils_test.load_board('''
.........
.........
......X..
.........
....O....
.........
..X......
.........
.........
''')
intermediate_position = go.Position(
utils_test.BOARD_SIZE,
intermediate_board,
n=1,
komi=5.5,
caps=(0, 0),
recent=(go.PlayerMove(go.WHITE, coords.from_kgs(
utils_test.BOARD_SIZE, 'E5')),),
to_play=go.BLACK,
)
final_board = utils_test.load_board('''
.........
.........
......X..
.........
....O....
.........
..XX.....
.........
.........
''')
final_position = go.Position(
utils_test.BOARD_SIZE,
final_board,
n=2,
komi=5.5,
caps=(0, 0),
recent=(
go.PlayerMove(go.WHITE, coords.from_kgs(
utils_test.BOARD_SIZE, 'E5')),
go.PlayerMove(go.BLACK, coords.from_kgs(
utils_test.BOARD_SIZE, 'D3')),),
to_play=go.WHITE,
)
positions_w_context = list(replay_sgf(
utils_test.BOARD_SIZE, JAPANESE_HANDICAP_SGF))
self.assertEqualPositions(
intermediate_position, positions_w_context[1].position)
final_replayed_position = positions_w_context[-1].position.play_move(
positions_w_context[-1].next_move)
self.assertEqualPositions(final_position, final_replayed_position)
def test_chinese_handicap_handling(self):
intermediate_board = utils_test.load_board('''
.........
.........
......X..
.........
.........
.........
.........
.........
.........
''')
intermediate_position = go.Position(
utils_test.BOARD_SIZE,
intermediate_board,
n=1,
komi=5.5,
caps=(0, 0),
recent=(go.PlayerMove(go.BLACK, coords.from_kgs(
utils_test.BOARD_SIZE, 'G7')),),
to_play=go.BLACK,
)
final_board = utils_test.load_board('''
....OX...
.O.OOX...
O.O.X.X..
.OXXX....
OX...XX..
.X.XXO...
X.XOOXXX.
XXXO.OOX.
.XOOX.O..
''')
final_position = go.Position(
utils_test.BOARD_SIZE,
final_board,
n=50,
komi=5.5,
caps=(7, 2),
ko=None,
recent=(
go.PlayerMove(
go.WHITE, coords.from_kgs(utils_test.BOARD_SIZE, 'E9')),
go.PlayerMove(
go.BLACK, coords.from_kgs(utils_test.BOARD_SIZE, 'F9')),),
to_play=go.WHITE
)
positions_w_context = list(replay_sgf(
utils_test.BOARD_SIZE, CHINESE_HANDICAP_SGF))
self.assertEqualPositions(
intermediate_position, positions_w_context[1].position)
self.assertEqual(
positions_w_context[1].next_move, coords.from_kgs(
utils_test.BOARD_SIZE, 'C3'))
final_replayed_position = positions_w_context[-1].position.play_move(
positions_w_context[-1].next_move)
self.assertEqualPositions(final_position, final_replayed_position)
if __name__ == '__main__':
tf.test.main()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""The strategy to play each move with MCTS."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
import sys
import time
import coords
import go
from mcts import MCTSNode
import numpy as np
import sgf_wrapper
def time_recommendation(move_num, seconds_per_move=5, time_limit=15*60,
decay_factor=0.98):
"""Compute the time can be used."""
# Given current move number and "desired" seconds per move,
# return how much time should actually be used. To be used specifically
# for CGOS time controls, which are absolute 15 minute time.
# The strategy is to spend the maximum time possible using seconds_per_move,
# and then switch to an exponentially decaying time usage, calibrated so that
# we have enough time for an infinite number of moves.
# divide by two since you only play half the moves in a game.
player_move_num = move_num / 2
# sum of geometric series maxes out at endgame_time seconds.
endgame_time = seconds_per_move / (1 - decay_factor)
if endgame_time > time_limit:
# there is so little main time that we're already in "endgame" mode.
base_time = time_limit * (1 - decay_factor)
return base_time * decay_factor ** player_move_num
# leave over endgame_time seconds for the end, and play at seconds_per_move
# for as long as possible
core_time = time_limit - endgame_time
core_moves = core_time / seconds_per_move
if player_move_num < core_moves:
return seconds_per_move
else:
return seconds_per_move * decay_factor ** (player_move_num - core_moves)
def _get_temperature_cutoff(board_size):
# When to do deterministic move selection. ~30 moves on a 19x19, ~8 on 9x9
return int((board_size * board_size) / 12)
class MCTSPlayerMixin(object):
# If 'simulations_per_move' is nonzero, it will perform that many reads
# before playing. Otherwise, it uses 'seconds_per_move' of wall time'
def __init__(self, board_size, network, seconds_per_move=5,
simulations_per_move=0, resign_threshold=-0.90,
verbosity=0, two_player_mode=False, num_parallel=8):
self.board_size = board_size
self.network = network
self.seconds_per_move = seconds_per_move
self.simulations_per_move = simulations_per_move
self.verbosity = verbosity
self.two_player_mode = two_player_mode
if two_player_mode:
self.temp_threshold = -1
else:
self.temp_threshold = _get_temperature_cutoff(board_size)
self.num_parallel = num_parallel
self.qs = []
self.comments = []
self.searches_pi = []
self.root = None
self.result = 0
self.result_string = None
self.resign_threshold = -abs(resign_threshold)
def initialize_game(self, position=None):
if position is None:
position = go.Position(self.board_size)
self.root = MCTSNode(self.board_size, position)
self.result = 0
self.result_string = None
self.comments = []
self.searches_pi = []
self.qs = []
def suggest_move(self, position):
""" Used for playing a single game."""
# For parallel play, use initialize_move, select_leaf,
# incorporate_results, and pick_move
start = time.time()
if self.simulations_per_move == 0:
while time.time() - start < self.seconds_per_move:
self.tree_search()
else:
current_readouts = self.root.N
while self.root.N < current_readouts + self.simulations_per_move:
self.tree_search()
if self.verbosity > 0:
print('%d: Searched %d times in %s seconds\n\n' % (
position.n, self.simulations_per_move, time.time() - start),
file=sys.stderr)
# print some stats on anything with probability > 1%
if self.verbosity > 2:
print(self.root.describe(), file=sys.stderr)
print('\n\n', file=sys.stderr)
if self.verbosity > 3:
print(self.root.position, file=sys.stderr)
return self.pick_move()
def play_move(self, c):
"""Play a move."""
# Notable side effects:
# - finalizes the probability distribution according to
# this roots visit counts into the class' running tally, `searches_pi`
# - Makes the node associated with this move the root, for future
# `inject_noise` calls.
if not self.two_player_mode:
self.searches_pi.append(
self.root.children_as_pi(self.root.position.n < self.temp_threshold))
self.qs.append(self.root.Q) # Save our resulting Q.
self.comments.append(self.root.describe())
self.root = self.root.maybe_add_child(coords.to_flat(self.board_size, c))
self.position = self.root.position # for showboard
del self.root.parent.children
return True # GTP requires positive result.
def pick_move(self):
"""Picks a move to play, based on MCTS readout statistics.
Highest N is most robust indicator. In the early stage of the game, pick
a move weighted by visit count; later on, pick the absolute max.
"""
if self.root.position.n > self.temp_threshold:
fcoord = np.argmax(self.root.child_N)
else:
cdf = self.root.child_N.cumsum()
cdf /= cdf[-1]
selection = random.random()
fcoord = cdf.searchsorted(selection)
assert self.root.child_N[fcoord] != 0
return coords.from_flat(self.board_size, fcoord)
def tree_search(self, num_parallel=None):
if num_parallel is None:
num_parallel = self.num_parallel
leaves = []
failsafe = 0
while len(leaves) < num_parallel and failsafe < num_parallel * 2:
failsafe += 1
leaf = self.root.select_leaf()
if self.verbosity >= 4:
print(self.show_path_to_root(leaf))
# if game is over, override the value estimate with the true score
if leaf.is_done():
value = 1 if leaf.position.score() > 0 else -1
leaf.backup_value(value, up_to=self.root)
continue
leaf.add_virtual_loss(up_to=self.root)
leaves.append(leaf)
if leaves:
move_probs, values = self.network.run_many(
[leaf.position for leaf in leaves])
for leaf, move_prob, value in zip(leaves, move_probs, values):
leaf.revert_virtual_loss(up_to=self.root)
leaf.incorporate_results(move_prob, value, up_to=self.root)
def show_path_to_root(self, node):
max_depth = (self.board_size ** 2) * 1.4 # 505 moves for 19x19, 113 for 9x9
pos = node.position
diff = node.position.n - self.root.position.n
if pos.recent is None:
return
def fmt(move):
return '{}-{}'.format('b' if move.color == 1 else 'w',
coords.to_kgs(self.board_size, move.move))
path = ' '.join(fmt(move) for move in pos.recent[-diff:])
if node.position.n >= max_depth:
path += ' (depth cutoff reached) %0.1f' % node.position.score()
elif node.position.is_game_over():
path += ' (game over) %0.1f' % node.position.score()
return path
def should_resign(self):
"""Returns true if the player resigned.
No further moves should be played.
"""
return self.root.Q_perspective < self.resign_threshold
def set_result(self, winner, was_resign):
self.result = winner
if was_resign:
string = 'B+R' if winner == go.BLACK else 'W+R'
else:
string = self.root.position.result_string()
self.result_string = string
def to_sgf(self, use_comments=True):
assert self.result_string is not None
pos = self.root.position
if use_comments:
comments = self.comments or ['No comments.']
comments[0] = ('Resign Threshold: %0.3f\n' %
self.resign_threshold) + comments[0]
else:
comments = []
return sgf_wrapper.make_sgf(
self.board_size, pos.recent, self.result_string,
white_name=os.path.basename(self.network.save_file) or 'Unknown',
black_name=os.path.basename(self.network.save_file) or 'Unknown',
comments=comments)
def is_done(self):
return self.result != 0 or self.root.is_done()
def extract_data(self):
assert len(self.searches_pi) == self.root.position.n
assert self.result != 0
for pwc, pi in zip(go.replay_position(
self.board_size, self.root.position, self.result), self.searches_pi):
yield pwc.position, pi, pwc.result
def chat(self, msg_type, sender, text):
default_response = (
"Supported commands are 'winrate', 'nextplay', 'fortune', and 'help'.")
if self.root is None or self.root.position.n == 0:
return "I'm not playing right now. " + default_response
if 'winrate' in text.lower():
wr = (abs(self.root.Q) + 1.0) / 2.0
color = 'Black' if self.root.Q > 0 else 'White'
return '{:s} {:.2f}%'.format(color, wr * 100.0)
elif 'nextplay' in text.lower():
return "I'm thinking... " + self.root.most_visited_path()
elif 'fortune' in text.lower():
return "You're feeling lucky!"
elif 'help' in text.lower():
return "I can't help much with go -- try ladders! Otherwise: {}".format(
default_response)
else:
return default_response
class CGOSPlayerMixin(MCTSPlayerMixin):
def suggest_move(self, position):
self.seconds_per_move = time_recommendation(position.n)
return super().suggest_move(position)
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for strategies."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import tensorflow as tf # pylint: disable=g-bad-import-order
import coords
import go
import numpy as np
from strategies import MCTSPlayerMixin, time_recommendation
import utils_test
ALMOST_DONE_BOARD = utils_test.load_board('''
.XO.XO.OO
X.XXOOOO.
XXXXXOOOO
XXXXXOOOO
.XXXXOOO.
XXXXXOOOO
.XXXXOOO.
XXXXXOOOO
XXXXOOOOO
''')
# Tromp taylor means black can win if we hit the move limit.
TT_FTW_BOARD = utils_test.load_board('''
.XXOOOOOO
X.XOO...O
.XXOO...O
X.XOO...O
.XXOO..OO
X.XOOOOOO
.XXOOOOOO
X.XXXXXXX
XXXXXXXXX
''')
SEND_TWO_RETURN_ONE = go.Position(
utils_test.BOARD_SIZE,
board=ALMOST_DONE_BOARD,
n=70,
komi=2.5,
caps=(1, 4),
ko=None,
recent=(go.PlayerMove(go.BLACK, (0, 1)),
go.PlayerMove(go.WHITE, (0, 8))),
to_play=go.BLACK
)
# 505 moves for 19x19, 113 for 9x9
MAX_DEPTH = (utils_test.BOARD_SIZE ** 2) * 1.4
class DummyNet():
def __init__(self, fake_priors=None, fake_value=0):
if fake_priors is None:
fake_priors = np.ones(
(utils_test.BOARD_SIZE ** 2) + 1) / (utils_test.BOARD_SIZE ** 2 + 1)
self.fake_priors = fake_priors
self.fake_value = fake_value
def run(self, position):
return self.fake_priors, self.fake_value
def run_many(self, positions):
if not positions:
raise ValueError(
"No positions passed! (Tensorflow would have failed here.")
return [self.fake_priors] * len(positions), [
self.fake_value] * len(positions)
def initialize_basic_player():
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet())
player.initialize_game()
first_node = player.root.select_leaf()
first_node.incorporate_results(
*player.network.run(player.root.position), up_to=player.root)
return player
def initialize_almost_done_player():
probs = np.array([.001] * (utils_test.BOARD_SIZE * utils_test.BOARD_SIZE + 1))
probs[2:5] = 0.2 # some legal moves along the top.
probs[-1] = 0.2 # passing is also ok
net = DummyNet(fake_priors=probs)
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, net)
# root position is white to play with no history == white passed.
player.initialize_game(SEND_TWO_RETURN_ONE)
return player
class TestMCTSPlayerMixin(utils_test.MiniGoUnitTest):
def test_time_controls(self):
secs_per_move = 5
for time_limit in (10, 100, 1000):
# in the worst case imaginable, let's say a game goes 1000 moves long
move_numbers = range(0, 1000, 2)
total_time_spent = sum(
time_recommendation(move_num, secs_per_move,
time_limit=time_limit)
for move_num in move_numbers)
# we should not exceed available game time
self.assertLess(total_time_spent, time_limit)
# but we should have used at least 95% of our time by the end.
self.assertGreater(total_time_spent, time_limit * 0.95)
def test_inject_noise(self):
player = initialize_basic_player()
sum_priors = np.sum(player.root.child_prior)
# dummyNet should return normalized priors.
self.assertAlmostEqual(sum_priors, 1)
self.assertTrue(np.all(player.root.child_U == player.root.child_U[0]))
player.root.inject_noise()
new_sum_priors = np.sum(player.root.child_prior)
# priors should still be normalized after injecting noise
self.assertAlmostEqual(sum_priors, new_sum_priors)
# With dirichelet noise, majority of density should be in one node.
max_p = np.max(player.root.child_prior)
self.assertGreater(max_p, 3/(utils_test.BOARD_SIZE ** 2 + 1))
def test_pick_moves(self):
player = initialize_basic_player()
root = player.root
root.child_N[coords.to_flat(utils_test.BOARD_SIZE, (2, 0))] = 10
root.child_N[coords.to_flat(utils_test.BOARD_SIZE, (1, 0))] = 5
root.child_N[coords.to_flat(utils_test.BOARD_SIZE, (3, 0))] = 1
# move 81, or 361, or... Endgame.
root.position.n = utils_test.BOARD_SIZE ** 2
# Assert we're picking deterministically
self.assertTrue(root.position.n > player.temp_threshold)
move = player.pick_move()
self.assertEqual(move, (2, 0))
# But if we're in the early part of the game, pick randomly
root.position.n = 3
self.assertFalse(player.root.position.n > player.temp_threshold)
with unittest.mock.patch('random.random', lambda: .5):
move = player.pick_move()
self.assertEqual(move, (2, 0))
with unittest.mock.patch('random.random', lambda: .99):
move = player.pick_move()
self.assertEqual(move, (3, 0))
def test_dont_pass_if_losing(self):
player = initialize_almost_done_player()
# check -- white is losing.
self.assertEqual(player.root.position.score(), -0.5)
for i in range(20):
player.tree_search()
# uncomment to debug this test
# print(player.root.describe())
# Search should converge on D9 as only winning move.
flattened = coords.to_flat(utils_test.BOARD_SIZE, coords.from_kgs(
utils_test.BOARD_SIZE, 'D9'))
best_move = np.argmax(player.root.child_N)
self.assertEqual(best_move, flattened)
# D9 should have a positive value
self.assertGreater(player.root.children[flattened].Q, 0)
self.assertGreaterEqual(player.root.N, 20)
# passing should be ineffective.
self.assertLess(player.root.child_Q[-1], 0)
# no virtual losses should be pending
self.assertNoPendingVirtualLosses(player.root)
# uncomment to debug this test
# print(player.root.describe())
def test_parallel_tree_search(self):
player = initialize_almost_done_player()
# check -- white is losing.
self.assertEqual(player.root.position.score(), -0.5)
# initialize the tree so that the root node has populated children.
player.tree_search(num_parallel=1)
# virtual losses should enable multiple searches to happen simultaneously
# without throwing an error...
for i in range(5):
player.tree_search(num_parallel=4)
# uncomment to debug this test
# print(player.root.describe())
# Search should converge on D9 as only winning move.
flattened = coords.to_flat(utils_test.BOARD_SIZE, coords.from_kgs(
utils_test.BOARD_SIZE, 'D9'))
best_move = np.argmax(player.root.child_N)
self.assertEqual(best_move, flattened)
# D9 should have a positive value
self.assertGreater(player.root.children[flattened].Q, 0)
self.assertGreaterEqual(player.root.N, 20)
# passing should be ineffective.
self.assertLess(player.root.child_Q[-1], 0)
# no virtual losses should be pending
self.assertNoPendingVirtualLosses(player.root)
def test_ridiculously_parallel_tree_search(self):
player = initialize_almost_done_player()
# Test that an almost complete game
# will tree search with # parallelism > # legal moves.
for i in range(10):
player.tree_search(num_parallel=50)
self.assertNoPendingVirtualLosses(player.root)
def test_long_game_tree_search(self):
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet())
endgame = go.Position(
utils_test.BOARD_SIZE,
board=TT_FTW_BOARD,
n=MAX_DEPTH-2,
komi=2.5,
ko=None,
recent=(go.PlayerMove(go.BLACK, (0, 1)),
go.PlayerMove(go.WHITE, (0, 8))),
to_play=go.BLACK
)
player.initialize_game(endgame)
# Test that an almost complete game
for i in range(10):
player.tree_search(num_parallel=8)
self.assertNoPendingVirtualLosses(player.root)
self.assertGreater(player.root.Q, 0)
def test_cold_start_parallel_tree_search(self):
# Test that parallel tree search doesn't trip on an empty tree
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet(fake_value=0.17))
player.initialize_game()
self.assertEqual(player.root.N, 0)
self.assertFalse(player.root.is_expanded)
player.tree_search(num_parallel=4)
self.assertNoPendingVirtualLosses(player.root)
# Even though the root gets selected 4 times by tree search, its
# final visit count should just be 1.
self.assertEqual(player.root.N, 1)
# 0.085 = average(0, 0.17), since 0 is the prior on the root.
self.assertAlmostEqual(player.root.Q, 0.085)
def test_tree_search_failsafe(self):
# Test that the failsafe works correctly. It can trigger if the MCTS
# repeatedly visits a finished game state.
probs = np.array([.001] * (
utils_test.BOARD_SIZE * utils_test.BOARD_SIZE + 1))
probs[-1] = 1 # Make the dummy net always want to pass
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet(fake_priors=probs))
pass_position = go.Position(utils_test.BOARD_SIZE).pass_move()
player.initialize_game(pass_position)
player.tree_search(num_parallel=1)
self.assertNoPendingVirtualLosses(player.root)
def test_only_check_game_end_once(self):
# When presented with a situation where the last move was a pass,
# and we have to decide whether to pass, it should be the first thing
# we check, but not more than that.
white_passed_pos = go.Position(
utils_test.BOARD_SIZE,).play_move(
(3, 3) # b plays
).play_move(
(3, 4) # w plays
).play_move(
(4, 3) # b plays
).pass_move() # w passes - if B passes too, B would lose by komi.
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet())
player.initialize_game(white_passed_pos)
# initialize the root
player.tree_search()
# explore a child - should be a pass move.
player.tree_search()
pass_move = utils_test.BOARD_SIZE * utils_test.BOARD_SIZE
self.assertEqual(player.root.children[pass_move].N, 1)
self.assertEqual(player.root.child_N[pass_move], 1)
player.tree_search()
# check that we didn't visit the pass node any more times.
self.assertEqual(player.root.child_N[pass_move], 1)
def test_extract_data_normal_end(self):
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet())
player.initialize_game()
player.tree_search()
player.play_move(None)
player.tree_search()
player.play_move(None)
self.assertTrue(player.root.is_done())
player.set_result(player.root.position.result(), was_resign=False)
data = list(player.extract_data())
self.assertEqual(len(data), 2)
position, pi, result = data[0]
# White wins by komi
self.assertEqual(result, go.WHITE)
self.assertEqual(player.result_string, 'W+{}'.format(
player.root.position.komi))
def test_extract_data_resign_end(self):
player = MCTSPlayerMixin(utils_test.BOARD_SIZE, DummyNet())
player.initialize_game()
player.tree_search()
player.play_move((0, 0))
player.tree_search()
player.play_move(None)
player.tree_search()
# Black is winning on the board
self.assertEqual(player.root.position.result(), go.BLACK)
# But if Black resigns
player.set_result(go.WHITE, was_resign=True)
data = list(player.extract_data())
position, pi, result = data[0]
# Result should say White is the winner
self.assertEqual(result, go.WHITE)
self.assertEqual(player.result_string, 'W+R')
if __name__ == '__main__':
tf.test.main()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Define symmetries for feature transformation.
Allowable symmetries:
identity [12][34]
rot90 [24][13]
rot180 [43][21]
rot270 [31][42]
flip [13][24]
fliprot90 [34][12]
fliprot180 [42][31]
fliprot270 [21][43]
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
import random
import numpy as np
INVERSES = {
'identity': 'identity',
'rot90': 'rot270',
'rot180': 'rot180',
'rot270': 'rot90',
'flip': 'flip',
'fliprot90': 'fliprot90',
'fliprot180': 'fliprot180',
'fliprot270': 'fliprot270',
}
IMPLS = {
'identity': lambda x: x,
'rot90': np.rot90,
'rot180': functools.partial(np.rot90, k=2),
'rot270': functools.partial(np.rot90, k=3),
'flip': lambda x: np.rot90(np.fliplr(x)),
'fliprot90': np.flipud,
'fliprot180': lambda x: np.rot90(np.flipud(x)),
'fliprot270': np.fliplr,
}
assert set(INVERSES.keys()) == set(IMPLS.keys())
SYMMETRIES = list(INVERSES.keys())
# A symmetry is just a string describing the transformation.
def invert_symmetry(s):
return INVERSES[s]
def apply_symmetry_feat(s, features):
return IMPLS[s](features)
def apply_symmetry_pi(board_size, s, pi):
pi = np.copy(pi)
# rotate all moves except for the pass move at end
pi[:-1] = IMPLS[s](pi[:-1].reshape([board_size, board_size])).ravel()
return pi
def randomize_symmetries_feat(features):
symmetries_used = [random.choice(SYMMETRIES) for f in features]
return symmetries_used, [apply_symmetry_feat(s, f)
for s, f in zip(symmetries_used, features)]
def invert_symmetries_pi(board_size, symmetries, pis):
return [apply_symmetry_pi(board_size, invert_symmetry(s), pi)
for s, pi in zip(symmetries, pis)]
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for symmetries."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import itertools
import tensorflow as tf # pylint: disable=g-bad-import-order
import coords
import numpy as np
import symmetries
import utils_test
tf.logging.set_verbosity(tf.logging.ERROR)
class TestSymmetryOperations(utils_test.MiniGoUnitTest):
def setUp(self):
np.random.seed(1)
self.feat = np.random.random(
[utils_test.BOARD_SIZE, utils_test.BOARD_SIZE, 3])
self.pi = np.random.random([utils_test.BOARD_SIZE ** 2 + 1])
super().setUp()
def test_inversions(self):
for s in symmetries.SYMMETRIES:
with self.subTest(symmetry=s):
self.assertEqualNPArray(
self.feat, symmetries.apply_symmetry_feat(
s, symmetries.apply_symmetry_feat(
symmetries.invert_symmetry(s), self.feat)))
self.assertEqualNPArray(
self.feat, symmetries.apply_symmetry_feat(
symmetries.invert_symmetry(s), symmetries.apply_symmetry_feat(
s, self.feat)))
self.assertEqualNPArray(
self.pi, symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, s, symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, symmetries.invert_symmetry(s),
self.pi)))
self.assertEqualNPArray(
self.pi, symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, symmetries.invert_symmetry(s),
symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, s, self.pi)))
def test_compositions(self):
test_cases = [
('rot90', 'rot90', 'rot180'),
('rot90', 'rot180', 'rot270'),
('identity', 'rot90', 'rot90'),
('fliprot90', 'rot90', 'fliprot180'),
('rot90', 'rot270', 'identity'),
]
for s1, s2, composed in test_cases:
with self.subTest(s1=s1, s2=s2, composed=composed):
self.assertEqualNPArray(symmetries.apply_symmetry_feat(
composed, self.feat), symmetries.apply_symmetry_feat(
s2, symmetries.apply_symmetry_feat(s1, self.feat)))
self.assertEqualNPArray(
symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, composed, self.pi),
symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, s2,
symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, s1, self.pi)))
def test_uniqueness(self):
all_symmetries_f = [
symmetries.apply_symmetry_feat(
s, self.feat) for s in symmetries.SYMMETRIES
]
all_symmetries_pi = [
symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, s, self.pi) for s in symmetries.SYMMETRIES
]
for f1, f2 in itertools.combinations(all_symmetries_f, 2):
self.assertNotEqualNPArray(f1, f2)
for pi1, pi2 in itertools.combinations(all_symmetries_pi, 2):
self.assertNotEqualNPArray(pi1, pi2)
def test_proper_move_transform(self):
# Check that the reinterpretation of 362 = 19*19 + 1 during symmetry
# application is consistent with coords.from_flat
move_array = np.arange(utils_test.BOARD_SIZE ** 2 + 1)
coord_array = np.zeros([utils_test.BOARD_SIZE, utils_test.BOARD_SIZE])
for c in range(utils_test.BOARD_SIZE ** 2):
coord_array[coords.from_flat(utils_test.BOARD_SIZE, c)] = c
for s in symmetries.SYMMETRIES:
with self.subTest(symmetry=s):
transformed_moves = symmetries.apply_symmetry_pi(
utils_test.BOARD_SIZE, s, move_array)
transformed_board = symmetries.apply_symmetry_feat(s, coord_array)
for new_coord, old_coord in enumerate(transformed_moves[:-1]):
self.assertEqual(
old_coord,
transformed_board[
coords.from_flat(utils_test.BOARD_SIZE, new_coord)])
if __name__ == '__main__':
tf.test.main()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utilities for MiniGo and DualNet model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from contextlib import contextmanager
import functools
import itertools
import math
import operator
import os
import random
import re
import string
import time
import tensorflow as tf # pylint: disable=g-bad-import-order
# Regular expression of model number and name.
MODEL_NUM_REGEX = r'^\d{6}' # model_num consists of six digits
# model_name consists of six digits followed by a dash and the model name
MODEL_NAME_REGEX = r'^\d{6}(-\w+)+'
def random_generator(size=6, chars=string.ascii_letters + string.digits):
return ''.join(random.choice(chars) for x in range(size))
def generate_model_name(model_num):
"""Generate a full model name for the given model number.
Args:
model_num: The number/generation of the model.
Returns:
The model's full name: model_num-model_name.
"""
if model_num == 0: # Model number for bootstrap model
new_name = 'bootstrap'
else:
new_name = random_generator()
full_name = '{:06d}-{}'.format(model_num, new_name)
return full_name
def detect_model_num(full_name):
"""Take the full name of a model and extract its model number.
Args:
full_name: The full name of a model.
Returns:
The model number. For example: '000000-bootstrap.index' => 0.
"""
match = re.match(MODEL_NUM_REGEX, full_name)
if match:
return int(match.group())
else:
return None
def detect_model_name(full_name):
"""Take the full name of a model and extract its model name.
Args:
full_name: The full name of a model.
Returns:
The model name. For example: '000000-bootstrap.index' => '000000-bootstrap'.
"""
match = re.match(MODEL_NAME_REGEX, full_name)
if match:
return match.group()
else:
return None
def get_models(models_dir):
"""Get all models.
Args:
models_dir: The directory of all models.
Returns:
A list of model number and names sorted increasingly. For example:
[(13, 000013-modelname), (17, 000017-modelname), ...etc]
"""
all_models = tf.gfile.Glob(os.path.join(models_dir, '*.meta'))
model_filenames = [os.path.basename(m) for m in all_models]
model_numbers_names = sorted([
(detect_model_num(m), detect_model_name(m))
for m in model_filenames])
return model_numbers_names
def get_latest_model(models_dir):
"""Find the latest model.
Args:
models_dir: The directory of all models.
Returns:
The model number and name of the latest model. For example:
(17, 000017-modelname)
"""
models = get_models(models_dir)
if models is None:
models = [(0, '000000-bootstrap')]
return models[-1]
def round_power_of_two(n):
"""Finds the nearest power of 2 to a number.
Thus 84 -> 64, 120 -> 128, etc.
Args:
n: The given number.
Returns:
The nearest 2-power number to n.
"""
return 2 ** int(round(math.log(n, 2)))
def parse_game_result(result):
if re.match(r'[bB]\+', result):
return 1
elif re.match(r'[wW]\+', result):
return -1
else:
return 0
def product(numbers):
return functools.reduce(operator.mul, numbers)
def take_n(n, iterable):
return list(itertools.islice(iterable, n))
def iter_chunks(chunk_size, iterator):
iterator = iter(iterator)
while True:
next_chunk = take_n(chunk_size, iterator)
# If len(iterable) % chunk_size == 0, don't return an empty chunk.
if next_chunk:
yield next_chunk
else:
break
def shuffler(iterator, pool_size=10**5, refill_threshold=0.9):
yields_between_refills = round(pool_size * (1 - refill_threshold))
# initialize pool; this step may or may not exhaust the iterator.
pool = take_n(pool_size, iterator)
while True:
random.shuffle(pool)
for _ in range(yields_between_refills):
yield pool.pop()
next_batch = take_n(yields_between_refills, iterator)
if not next_batch:
break
pool.extend(next_batch)
# finish consuming whatever's left - no need for further randomization.
# yield from pool
print(type(pool))
for p in pool:
yield p
@contextmanager
def timer(message):
tick = time.time()
yield
tock = time.time()
print('{}: {:.3} seconds'.foramt(message, (tock - tick)))
@contextmanager
def logged_timer(message):
tick = time.time()
yield
tock = time.time()
print('{}: {:.3} seconds'.format(message, (tock - tick)))
tf.logging.info('{}: {:.3} seconds'.format(message, (tock - tick)))
class MiniGoDirectory(object):
"""The class to set up directories of MiniGo."""
def __init__(self, base_dir):
self.trained_models_dir = os.path.join(base_dir, 'trained_models')
self.estimator_model_dir = os.path.join(base_dir, 'estimator_model_dir/')
self.selfplay_dir = os.path.join(base_dir, 'data/selfplay/')
self.holdout_dir = os.path.join(base_dir, 'data/holdout/')
self.training_chunk_dir = os.path.join(base_dir, 'data/training_chunks/')
self.sgf_dir = os.path.join(base_dir, 'sgf/')
self.evaluate_dir = os.path.join(base_dir, 'sgf/evaluate/')
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for utils, and base class for other unit tests."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import random
import re
import tempfile
import time
import tensorflow as tf # pylint: disable=g-bad-import-order
import go
import numpy as np
import utils
tf.logging.set_verbosity(tf.logging.ERROR)
BOARD_SIZE = 9
EMPTY_BOARD = np.zeros([BOARD_SIZE, BOARD_SIZE], dtype=np.int8)
ALL_COORDS = [(i, j) for i in range(BOARD_SIZE) for j in range(BOARD_SIZE)]
def _check_bounds(c):
return c[0] % BOARD_SIZE == c[0] and c[1] % BOARD_SIZE == c[1]
NEIGHBORS = {(x, y): list(filter(_check_bounds, [
(x+1, y), (x-1, y), (x, y+1), (x, y-1)])) for x, y in ALL_COORDS}
def load_board(string):
reverse_map = {
'X': go.BLACK,
'O': go.WHITE,
'.': go.EMPTY,
'#': go.FILL,
'*': go.KO,
'?': go.UNKNOWN
}
string = re.sub(r'[^XO\.#]+', '', string)
if len(string) != BOARD_SIZE ** 2:
raise ValueError("Board to load didn't have right dimensions")
board = np.zeros([BOARD_SIZE, BOARD_SIZE], dtype=np.int8)
for ii, char in enumerate(string):
np.ravel(board)[ii] = reverse_map[char]
return board
class TestUtils(tf.test.TestCase):
def test_bootstrap_name(self):
name = utils.generate_model_name(0)
self.assertIn('bootstrap', name)
def test_generate_model_name(self):
name = utils.generate_model_name(17)
self.assertIn('000017', name)
def test_detect_name(self):
string = '000017-model.index'
detected_name = utils.detect_model_name(string)
self.assertEqual(detected_name, '000017-model')
def test_detect_num(self):
string = '000017-model.index'
detected_name = utils.detect_model_num(string)
self.assertEqual(detected_name, 17)
def test_get_models(self):
with tempfile.TemporaryDirectory() as models_dir:
model1 = '000013-model.meta'
model2 = '000017-model.meta'
f1 = open(os.path.join(models_dir, model1), 'w')
f1.close()
f2 = open(os.path.join(models_dir, model2), 'w')
f2.close()
model_nums_names = utils.get_models(models_dir)
self.assertEqual(len(model_nums_names), 2)
self.assertEqual(model_nums_names[0], (13, '000013-model'))
self.assertEqual(model_nums_names[1], (17, '000017-model'))
def test_get_latest_model(self):
with tempfile.TemporaryDirectory() as models_dir:
model1 = '000013-model.meta'
model2 = '000017-model.meta'
f1 = open(os.path.join(models_dir, model1), 'w')
f1.close()
f2 = open(os.path.join(models_dir, model2), 'w')
f2.close()
latest_model = utils.get_latest_model(models_dir)
self.assertEqual(latest_model, (17, '000017-model'))
def test_round_power_of_two(self):
self.assertEqual(utils.round_power_of_two(84), 64)
self.assertEqual(utils.round_power_of_two(120), 128)
def test_shuffler(self):
random.seed(1)
dataset = (i for i in range(10))
shuffled = list(utils.shuffler(
dataset, pool_size=5, refill_threshold=0.8))
self.assertEqual(len(shuffled), 10)
self.assertNotEqual(shuffled, list(range(10)))
def test_parse_game_result(self):
self.assertEqual(utils.parse_game_result('B+3.5'), go.BLACK)
self.assertEqual(utils.parse_game_result('W+T'), go.WHITE)
self.assertEqual(utils.parse_game_result('Void'), 0)
class MiniGoUnitTest(tf.test.TestCase):
@classmethod
def setUpClass(cls):
cls.start_time = time.time()
@classmethod
def tearDownClass(cls):
print('\n%s.%s: %.3f seconds' %
(cls.__module__, cls.__name__, time.time() - cls.start_time))
def assertEqualNPArray(self, array1, array2):
if not np.all(array1 == array2):
raise AssertionError(
'Arrays differed in one or more locations:\n%s\n%s' % (array1, array2)
)
def assertNotEqualNPArray(self, array1, array2):
if np.all(array1 == array2):
raise AssertionError('Arrays were identical:\n%s' % array1)
def assertEqualLibTracker(self, lib_tracker1, lib_tracker2):
# A lib tracker may have differently numbered groups yet still
# represent the same set of groups.
# "Sort" the group_ids to ensure they are the same.
def find_group_mapping(lib_tracker):
current_gid = 0
mapping = {}
for group_id in lib_tracker.group_index.ravel().tolist():
if group_id == go.MISSING_GROUP_ID:
continue
if group_id not in mapping:
mapping[group_id] = current_gid
current_gid += 1
return mapping
lt1_mapping = find_group_mapping(lib_tracker1)
lt2_mapping = find_group_mapping(lib_tracker2)
remapped_group_index1 = [
lt1_mapping.get(gid, go.MISSING_GROUP_ID)
for gid in lib_tracker1.group_index.ravel().tolist()]
remapped_group_index2 = [
lt2_mapping.get(gid, go.MISSING_GROUP_ID)
for gid in lib_tracker2.group_index.ravel().tolist()]
self.assertEqual(remapped_group_index1, remapped_group_index2)
remapped_groups1 = {lt1_mapping.get(
gid): group for gid, group in lib_tracker1.groups.items()}
remapped_groups2 = {lt2_mapping.get(
gid): group for gid, group in lib_tracker2.groups.items()}
self.assertEqual(remapped_groups1, remapped_groups2)
self.assertEqualNPArray(
lib_tracker1.liberty_cache, lib_tracker2.liberty_cache)
def assertEqualPositions(self, pos1, pos2):
self.assertEqualNPArray(pos1.board, pos2.board)
self.assertEqualLibTracker(pos1.lib_tracker, pos2.lib_tracker)
self.assertEqual(pos1.n, pos2.n)
self.assertEqual(pos1.caps, pos2.caps)
self.assertEqual(pos1.ko, pos2.ko)
r_len = min(len(pos1.recent), len(pos2.recent))
if r_len > 0: # if a position has no history, then don't bother testing
self.assertEqual(pos1.recent[-r_len:], pos2.recent[-r_len:])
self.assertEqual(pos1.to_play, pos2.to_play)
def assertNoPendingVirtualLosses(self, root):
"""Raise an error if any node in this subtree has vlosses pending."""
queue = [root]
while queue:
current = queue.pop()
self.assertEqual(current.losses_applied, 0)
queue.extend(current.children.values())
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment