テクノロジー
自作将棋AIをつくってみた話
はじめに
こんにちは、ITソリューション部(デジタルテクノロジー戦略本部)のS.Dです。
今回は将棋AIを自分で作ってみたのでエンジニアブログで紹介します!目標は、棋譜データを学習させて、実際に対局できる(ルールに違反しない)ようにさせることとします。
環境は以下で構築しました。
環境 | バージョン |
---|---|
OS | Windows Pro 64bit |
CPU | Ryzen 3900X |
GPU | Geforce RTX2060Super |
Python | 3.8.5 |
CUDA | 10.1 |
cuDNN | 7.6.5 |
Chainer | 7.7.0 |
python-shogi | 1.0.10 |
この記事ではPytorch ではなく Chainerを使用します。Pytochでも、構築可能です。また、別の機会があればPytorchで実装したものもご紹介できればと思います。
§0. 環境構築
この章では、環境構築をします。
0.1 Visual Studio ビルドツール 2015のインストール
現状の最新のVisual Studioは、2019ですが、2019では正常に動作しない報告が多いので2015をインストールします。
Microsoft Build Tools 2015 Update 3 から、インストーラをダウンロードできます。
0.2 CUDAのインストール
NVIDIAからインストーラをダウンロードできます。
2020.11.24時点で、最新バージョンは11.1です。
しかし、使用するソフトウェアに対応したバージョンをインストールします。
現状TensorFlowやChainerなどが対応したバージョンは、10.0, 10.1, (10.2)です。
私は、10.1をインストールしました。
0.3 cuDNNのインストール
NVIDIAから、zipファイルをダウンロードできます。
2020.11.24時点で、最新バージョンは8.0.5です。
しかし、使用するソフトウェアに対応したバージョンをダウンロードします。
現状TensorFlowやChainerなどが対応したバージョンは、7.6以前(8.0以降もたぶん大丈夫)です。
私は、7.6.5で動かしています。
ファイルを解凍したら、NVIDIA GPU Computing Toolkitにファイルをコピーします
私の場合は、
にファイルをコピーしました。
画像にある、ファイルとフォルダをコピーして、上のフォルダに移します。
0.4 Chainerのインストール
Chainerをインストールします。
Chainerは、機械学習のPythonフレームワークです。日本企業の株式会社Preferred Networksが、研究、開発をしました。現在は、FaceBookのPythonフレームワークである、Pytorchに吸収されました。
0.4.0 Pythonのインストール
Python のインストール
今回は、Pythonは省略します。
Pythonは、3.6以降、3.8までであれば正常に動作します。
Anacondaでも大丈夫です。
0.4.1 その他モジュール
pip install -U jupyterlab jupyter jupyter-console jupytext spyder matplotlib numpy
0.4.2 Cupyのインストール
Chainerで、GPUを使用するためには、Cupyをインストールする必要があります。
確認:
エラーが出なければ大丈夫です。
エラーが出た場合、CuPyのインストールがうまくできていないです。東京大学金子研究室に詳しく書いてあるので、エラーが出て進めない場合は参考にしてください。
0.4.3 Chainerのインストール
0.4.4 Chainerの動作確認
ChainerのGithubから、サンプルを落としてきます。
python examples\mniost\train_mnist.py -g 0
オプションは、gpuの番号です。-1で、gpuを使わずに計算できます。
0.5 学習データ
東京大学内のサーバー、将棋コンピュータ対局場(Flood gate)のデータを使います。
後のニューラルネットワークを学習させるための棋譜を用意します。
今回は、『Flood gate』から最新の2020年の全棋譜データをcsa形式で落とします。
http://wdoor.c.u-tokyo.ac.jp/shogi/x/wdoor2017.7z
上は、2017年の棋譜データですが、(webサイト更新されていない),2017→2020にすれば落とせます。(書いていいのか...?)
http://wdoor.c.u-tokyo.ac.jp/shogi/x/wdoor2020.7z
7zipで解凍してください。
全棋譜: 13万件くらい(現状なので、今も増えています)
そのあと、学習データとして適切でないデータもたくさんあるので、手数が50手以上で、レーティングが3000以上のものに限定して、学習データとします。ここら辺は、どんな方法でもいいです。
§1. ニューラルネットワーク
ここからは、思考部分を実装します。
具体的に、局面から指し手を予測するようにします。
コード構成
| setup.py
| train_policy.py
| kifulist_train.txt
| kifulist_test.txt
| kifulist_train_1000.txt
| kifulist_test_100.txt
|- <model>
| | model_policy
|
|- <pydlshogi>
| | common.py
| | features.py
| | read_kifu_.py
| |
| |- <network>
| | policy.py
|
|- <utils>
| | fileter_csa.py
| | make_kifu_list.py
| | plot_log.py
1.1 モジュールインストール
import setuptools
setuptools.setup(
name = 'python-dlshogi',
version = '0.0.1',
author = 'SudaDaisuke', # 名前
packages = ['pydlshogi'],
scripts = [],
)
スクリプトを別のスクリプトから、importできるように登録します。
プロジェクトのルートディレクトリで下のコマンドを打ちます。
1.2 Policy Network
将棋の指し手を予測するための、ニューラルネットワークを構成します。
Alpha Goでは、打ち手を探索する「Policy Network」と局面を評価する「Value Network」という2つの深層ニューラルネットワークで構成されています。
将棋の指し手を予測するために、Alpha Goで採用された方法を改良して、13層の畳み込みニューラルネットワークを構成します。
ニューラルネットワークの仕様
項目 | 値 |
---|---|
フィルターサイズ | 3x3 |
中間層のフィルターサイズ | 192 |
ストライド | 1 |
パディング層 | 1 |
ブーリング層 | 1 |
活性化関数 | ReLU |
1.3 Policy Network 実装
上の設計をもとに実装していきます。
pydlshogi\network\policy.py
from chainer import Chain
import chainer.functions as F
import chainer.links as L
from pydlshogi.common import *
ch = 192
class PolicyNetwork(Chain):
def __init__(self):
super(PolicyNetwork, self).__init__()
with self.init_scope():
self.l1=L.Convolution2D(in_channels = 104, out_channels = ch, ksize = 3, pad = 1)
self.l2=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l3=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l4=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l5=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l6=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l7=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l8=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l9=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l10=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l11=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l12=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
self.l13=L.Convolution2D(in_channels = ch, out_channels = MOVE_DIRECTION_LABEL_NUM, ksize = 1, nobias = True)
self.l13_bias=L.Bias(shape=(9*9*MOVE_DIRECTION_LABEL_NUM))
def __call__(self, x):
h1 = F.relu(self.l1(x))
h2 = F.relu(self.l2(h1))
h3 = F.relu(self.l3(h2))
h4 = F.relu(self.l4(h3))
h5 = F.relu(self.l5(h4))
h6 = F.relu(self.l6(h5))
h7 = F.relu(self.l7(h6))
h8 = F.relu(self.l8(h7))
h9 = F.relu(self.l9(h8))
h10 = F.relu(self.l10(h9))
h11 = F.relu(self.l11(h10))
h12 = F.relu(self.l12(h11))
h13 = self.l13(h12)
return self.l13_bias(F.reshape(h13, (-1, 9*9*MOVE_DIRECTION_LABEL_NUM)))
1.2 学習処理
1.2.1 実装
train_policy.py
import numpy as np
import chainer
from chainer import cuda, Variable
from chainer import optimizers, serializers
import chainer.functions as F
from pydlshogi.common import *
from pydlshogi.network.policy import PolicyNetwork
from pydlshogi.features import *
from pydlshogi.read_kifu import *
import argparse
import random
import pickle
import os
import re
import logging
parser = argparse.ArgumentParser()
parser.add_argument('kifulist_train', type=str, help='train kifu list')
parser.add_argument('kifulist_test', type=str, help='test kifu list')
parser.add_argument('--batchsize', '-b', type=int, default=32, help='Number of positions in each mini-batch')
parser.add_argument('--test_batchsize', type=int, default=512, help='Number of positions in each test mini-batch')
parser.add_argument('--epoch', '-e', type=int, default=1, help='Number of epoch times')
parser.add_argument('--model', type=str, default='model/model_policy', help='model file name')
parser.add_argument('--state', type=str, default='model/state_policy', help='state file name')
parser.add_argument('--initmodel', '-m', default='', help='Initialize the model from given file')
parser.add_argument('--resume', '-r', default='', help='Resume the optimization from snapshot')
parser.add_argument('--log', default=None, help='log file path')
parser.add_argument('--lr', type=float, default=0.01, help='learning rate')
parser.add_argument('--eval_interval', '-i', type=int, default=1000, help='eval interval')
args = parser.parse_args()
logging.basicConfig(format='%(asctime)s\t%(levelname)s\t%(message)s', datefmt='%Y/%m/%d %H:%M:%S', filename=args.log, level=logging.DEBUG)
model = PolicyNetwork()
model.to_gpu()
optimizer = optimizers.SGD(lr=args.lr)
optimizer.setup(model)
# Init/Resume
if args.initmodel:
logging.info('Load model from {}'.format(args.initmodel))
serializers.load_npz(args.initmodel, model)
if args.resume:
logging.info('Load optimizer state from {}'.format(args.resume))
serializers.load_npz(args.resume, optimizer)
logging.info('read kifu start')
# 保存済みのpickleファイルがある場合、pickleファイルを読み込む
# train date
train_pickle_filename = re.sub(r'\..*?$', '', args.kifulist_train) + '.pickle'
if os.path.exists(train_pickle_filename):
with open(train_pickle_filename, 'rb') as f:
positions_train = pickle.load(f)
logging.info('load train pickle')
else:
positions_train = read_kifu(args.kifulist_train)
# test data
test_pickle_filename = re.sub(r'\..*?$', '', args.kifulist_test) + '.pickle'
if os.path.exists(test_pickle_filename):
with open(test_pickle_filename, 'rb') as f:
positions_test = pickle.load(f)
logging.info('load test pickle')
else:
positions_test = read_kifu(args.kifulist_test)
# 保存済みのpickleがない場合、pickleファイルを保存する
if not os.path.exists(train_pickle_filename):
with open(train_pickle_filename, 'wb') as f:
pickle.dump(positions_train, f, pickle.HIGHEST_PROTOCOL)
logging.info('save train pickle')
if not os.path.exists(test_pickle_filename):
with open(test_pickle_filename, 'wb') as f:
pickle.dump(positions_test, f, pickle.HIGHEST_PROTOCOL)
logging.info('save test pickle')
logging.info('read kifu end')
logging.info('train position num = {}'.format(len(positions_train)))
logging.info('test position num = {}'.format(len(positions_test)))
# mini batch
def mini_batch(positions, i, batchsize):
mini_batch_data = []
mini_batch_move = []
for b in range(batchsize):
features, move, win = make_features(positions[i + b])
mini_batch_data.append(features)
mini_batch_move.append(move)
return (Variable(cuda.to_gpu(np.array(mini_batch_data, dtype=np.float32))),
Variable(cuda.to_gpu(np.array(mini_batch_move, dtype=np.int32))))
def mini_batch_for_test(positions, batchsize):
mini_batch_data = []
mini_batch_move = []
for b in range(batchsize):
features, move, win = make_features(random.choice(positions))
mini_batch_data.append(features)
mini_batch_move.append(move)
return (Variable(cuda.to_gpu(np.array(mini_batch_data, dtype=np.float32))),
Variable(cuda.to_gpu(np.array(mini_batch_move, dtype=np.int32))))
# train
logging.info('start training')
itr = 0
sum_loss = 0
for e in range(args.epoch):
positions_train_shuffled = random.sample(positions_train, len(positions_train))
itr_epoch = 0
sum_loss_epoch = 0
for i in range(0, len(positions_train_shuffled) - args.batchsize, args.batchsize):
x, t = mini_batch(positions_train_shuffled, i, args.batchsize)
y = model(x)
model.cleargrads()
loss = F.softmax_cross_entropy(y, t)
loss.backward()
optimizer.update()
itr += 1
sum_loss += loss.data
itr_epoch += 1
sum_loss_epoch += loss.data
# print train loss and test accuracy
if optimizer.t % args.eval_interval == 0:
x, t = mini_batch_for_test(positions_test, args.test_batchsize)
y = model(x)
logging.info('epoch = {}, iteration = {}, loss = {}, accuracy = {}'.format(optimizer.epoch + 1, optimizer.t, sum_loss / itr, F.accuracy(y, t).data))
itr = 0
sum_loss = 0
# validate test data
logging.info('validate test data')
itr_test = 0
sum_test_accuracy = 0
for i in range(0, len(positions_test) - args.batchsize, args.batchsize):
x, t = mini_batch(positions_test, i, args.batchsize)
y = model(x)
itr_test += 1
sum_test_accuracy += F.accuracy(y, t).data
logging.info('epoch = {}, iteration = {}, train loss avr = {}, test accuracy = {}'.format(optimizer.epoch + 1, optimizer.t, sum_loss_epoch / itr_epoch, sum_test_accuracy / itr_test))
optimizer.new_epoch()
logging.info('save the model')
serializers.save_npz(args.model, model)
logging.info('save the optimizer')
serializers.save_npz(args.state, optimizer)
上のコードは、学習部分を実装しています。
具体的に、
こんな感じに実装されています。
1.2.2 学習実行
実際に学習を実行すると、こんな感じで、損失計算と、学習データから得られた結果との精度です。
§2. 将棋AI実装
ニューラルネットワークで、学習を終えたモデルを使って対局できるように、USIエンジンにします。
USI(Universal Shogi Interface)プロトコルとは、将棋GUIソフトと思考エンジンが通信をするために、Tord Romstad氏によって考案された通信プロトコルです。
http://shogidokoro.starfree.jp/usi.html
上のように、USIプロトコルをもとに通信することで、将棋AIをGUI上で動かします。
フォルダ構成
|- <bat>
| | Docbase.bat
|
|- <pydlshogi>
| |- <player>
| | base_player.py
| | Docbase_player.py
| |
| |- <usi>
| | usi.py
| | usi_Docbase_player.py
import numpy as np
import chainer
from chainer import serializers
from chainer import cuda, Variable
import chainer.functions as F
import shogi
from pydlshogi.common import *
from pydlshogi.features import *
from pydlshogi.network.policy import *
from pydlshogi.player.base_player import *
def greedy(logits):
return logits.index(max(logits))
def boltzmann(logits, temperature):
logits /= temperature
logits -= logits.max()
probabilities = np.exp(logits)
probabilities /= probabilities.sum()
return np.random.choice(len(logits), p=probabilities)
class PolicyPlayer(BasePlayer):
def __init__(self):
super().__init__()
self.modelfile = r'学習したモデルのパス'
self.model = None
def usi(self):
print('id name DocBase ShogiAI')
print('option name modelfile type string default ' + self.modelfile)
print('usiok')
def setoption(self, option):
if option[1] == 'modelfile':
self.modelfile = option[3]
def isready(self):
if self.model is None:
self.model = PolicyNetwork()
self.model.to_gpu()
serializers.load_npz(self.modelfile, self.model)
print('readyok')
def go(self):
if self.board.is_game_over():
print('bestmove resign')
return
features = make_input_features_from_board(self.board)
x = Variable(cuda.to_gpu(np.array([features], dtype=np.float32)))
with chainer.no_backprop_mode():
y = self.model(x)
logits = cuda.to_cpu(y.data)[0]
probabilities = cuda.to_cpu(F.softmax(y).data)[0]
# 全ての合法手について
legal_moves = []
legal_logits = []
for move in self.board.legal_moves:
# ラベルに変換
label = make_output_label(move, self.board.turn)
# 合法手とその指し手の確率(logits)を格納
legal_moves.append(move)
legal_logits.append(logits[label])
# 確率を表示
print('info string {:5} : {:.5f}'.format(move.usi(), probabilities[label]))
# 確率が最大の手を選ぶ(グリーディー戦略)
selected_index = greedy(legal_logits)
# 確率に応じて手を選ぶ(ソフトマックス戦略)
selected_index = boltzmann(np.array(legal_logits, dtype=np.float32), 0.5)
bestmove = legal_moves[selected_index]
print('bestmove', bestmove.usi())
上のように、AIの情報や局面ごとの最善手を計算していきます。
§3. GUIソフトで動かす
上で作ったモデルと実際に対局します。
GUIソフトはいくつかありますが、将棋所を使用します。
上のように、登録することができます。
§4. 参考
参考 | URL |
---|---|
将棋AIで学ぶディープラーニング | https://book.mynavi.jp/ec/products/detail/id=88752 |
Alpha Goの論文 | https://www.nature.com/articles/nature16961 |
§5. 終わりに
いかがでしょうか?
私は、ボードゲームが趣味で将棋、囲碁、チェス等ももちろん好きです。
チェスは、1997年に、当時のチャンピオン Гaрри Каспaров さんが、IBM社のディープブルーに敗れました。私がまだ生まれていない時代から、AI vs 人間の戦いが始まっていたことを知ったときは、驚愕しました。そして、あと10年は人間に勝てないといわれていた、囲碁も2016年に李 世乭 さんが、Google傘下のDemis Hassabis さんが率いるDeepMind社のAlpha Goに敗れました。
今回の将棋AIも、Alpha Goを参考にしています。
そして、将棋も2010年代からずっと、ponanzaといわれる将棋ソフトの強さは注目されていました。ponanzaには、モンテカルロ法が採用されていました。当時から、将棋プロでさえ、ponanzaを一目おいていて、当時の竜王である渡辺明さんとponanzaの公開対局が催されたりしました。(渡辺明竜王(当時)の逆転勝ち)
2016年、Alpha Goの論文が公開されると、すぐにAlpha Zeroと呼ばれる将棋AIが誕生しました。現在の、将棋界は藤井聡太2冠をはじめとして、若年世代を筆頭に、世代を問わず将棋AIを用いた研究が盛んになっています。
コンピュータ将棋大会も、開かれていて、将棋AIの強さを競う大会もあります。ぜひ、今回の記事に興味をもった方はBERTで、将棋AIを作ってみてください!
https://www.youtube.com/watch?v=2Vl6Ao4GaSQ&t=853s
※本記事は2021年09月時点の内容です。