2021/09/02

テクノロジー

自作将棋AIをつくってみた話

この記事の目次

    はじめに

    こんにちは、ITソリューション部(デジタルテクノロジー戦略本部)のS.Dです。

    今回は将棋AIを自分で作ってみたのでエンジニアブログで紹介します!目標は、棋譜データを学習させて、実際に対局できる(ルールに違反しない)ようにさせることとします。

    環境は以下で構築しました。

    環境バージョン
    OSWindows Pro 64bit
    CPURyzen 3900X
    GPUGeforce RTX2060Super
    Python3.8.5
    CUDA10.1
    cuDNN7.6.5
    Chainer7.7.0
    python-shogi1.0.10

    この記事ではPytorch ではなく Chainerを使用します。Pytochでも、構築可能です。また、別の機会があればPytorchで実装したものもご紹介できればと思います。

    §0. 環境構築

    この章では、環境構築をします。

    0.1 Visual Studio ビルドツール 2015のインストール

    現状の最新のVisual Studioは、2019ですが、2019では正常に動作しない報告が多いので2015をインストールします。

    Microsoft Build Tools 2015 Update 3 から、インストーラをダウンロードできます。

    0.2 CUDAのインストール

    NVIDIAからインストーラをダウンロードできます。
    2020.11.24時点で、最新バージョンは11.1です。
    しかし、使用するソフトウェアに対応したバージョンをインストールします。
    現状TensorFlowやChainerなどが対応したバージョンは、10.0, 10.1, (10.2)です。
    私は、10.1をインストールしました。

    0.3 cuDNNのインストール

    NVIDIAから、zipファイルをダウンロードできます。
    2020.11.24時点で、最新バージョンは8.0.5です。
    しかし、使用するソフトウェアに対応したバージョンをダウンロードします。
    現状TensorFlowやChainerなどが対応したバージョンは、7.6以前(8.0以降もたぶん大丈夫)です。
    私は、7.6.5で動かしています。

    ファイルを解凍したら、NVIDIA GPU Computing Toolkitにファイルをコピーします
    私の場合は、

    C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v10.1

    にファイルをコピーしました。

    画像にある、ファイルとフォルダをコピーして、上のフォルダに移します。

    0.4 Chainerのインストール

    Chainerをインストールします。
    Chainerは、機械学習のPythonフレームワークです。日本企業の株式会社Preferred Networksが、研究、開発をしました。現在は、FaceBookのPythonフレームワークである、Pytorchに吸収されました。

    0.4.0 Pythonのインストール

    Python のインストール
    今回は、Pythonは省略します。
    Pythonは、3.6以降、3.8までであれば正常に動作します。
    Anacondaでも大丈夫です。

    0.4.1 その他モジュール

    pip install -U pip setuptools pip install -U jupyterlab jupyter jupyter-console jupytext spyder matplotlib numpy

    0.4.2 Cupyのインストール

    Chainerで、GPUを使用するためには、Cupyをインストールする必要があります。

    pip install cupy-cuda101

    確認:

    エラーが出なければ大丈夫です。
    エラーが出た場合、CuPyのインストールがうまくできていないです。東京大学金子研究室に詳しく書いてあるので、エラーが出て進めない場合は参考にしてください。

    0.4.3 Chainerのインストール

    pip install chainer==7.7

    0.4.4 Chainerの動作確認

    ChainerのGithubから、サンプルを落としてきます。

    cd chainer-7.7.0 python examples\mniost\train_mnist.py -g 0

    オプションは、gpuの番号です。-1で、gpuを使わずに計算できます。

    0.5 学習データ

    東京大学内のサーバー、将棋コンピュータ対局場(Flood gate)のデータを使います。
    後のニューラルネットワークを学習させるための棋譜を用意します。
    今回は、『Flood gate』から最新の2020年の全棋譜データをcsa形式で落とします。
    http://wdoor.c.u-tokyo.ac.jp/shogi/x/wdoor2017.7z
    上は、2017年の棋譜データですが、(webサイト更新されていない),2017→2020にすれば落とせます。(書いていいのか…?)
    http://wdoor.c.u-tokyo.ac.jp/shogi/x/wdoor2020.7z
    7zipで解凍してください。

    全棋譜: 13万件くらい(現状なので、今も増えています)

    そのあと、学習データとして適切でないデータもたくさんあるので、手数が50手以上で、レーティングが3000以上のものに限定して、学習データとします。ここら辺は、どんな方法でもいいです。

    §1. ニューラルネットワーク

    ここからは、思考部分を実装します。
    具体的に、局面から指し手を予測するようにします。

    コード構成

    \<policynetwork>(root dir) | setup.py | train_policy.py | kifulist_train.txt | kifulist_test.txt | kifulist_train_1000.txt | kifulist_test_100.txt |- <model> | | model_policy | |- <pydlshogi> | | common.py | | features.py | | read_kifu_.py | | | |- <network> | | policy.py | |- <utils> | | fileter_csa.py | | make_kifu_list.py | | plot_log.py

    1.1 モジュールインストール

    import setuptools
    
    setuptools.setup(
        name = 'python-dlshogi',
        version = '0.0.1',
        author = 'SudaDaisuke', # 名前
        packages = ['pydlshogi'],
        scripts = [],
    )

    スクリプトを別のスクリプトから、importできるように登録します。
    プロジェクトのルートディレクトリで下のコマンドを打ちます。

    pip install --no-cache-dir -e .

    1.2 Policy Network

    将棋の指し手を予測するための、ニューラルネットワークを構成します。
    Alpha Goでは、打ち手を探索する「Policy Network」と局面を評価する「Value Network」という2つの深層ニューラルネットワークで構成されています。
    将棋の指し手を予測するために、Alpha Goで採用された方法を改良して、13層の畳み込みニューラルネットワークを構成します。

    ニューラルネットワークの仕様

    項目
    フィルターサイズ3x3
    中間層のフィルターサイズ192
    ストライド1
    パディング層1
    ブーリング層1
    活性化関数ReLU

    1.3 Policy Network 実装

    上の設計をもとに実装していきます。

    pydlshogi\network\policy.py
    
    from chainer import Chain
    import chainer.functions as F
    import chainer.links as L
    
    from pydlshogi.common import *
    
    ch = 192
    class PolicyNetwork(Chain):
        def __init__(self):
            super(PolicyNetwork, self).__init__()
            with self.init_scope():
                self.l1=L.Convolution2D(in_channels = 104, out_channels = ch, ksize = 3, pad = 1)
                self.l2=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l3=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l4=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l5=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l6=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l7=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l8=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l9=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l10=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l11=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l12=L.Convolution2D(in_channels = ch, out_channels = ch, ksize = 3, pad = 1)
                self.l13=L.Convolution2D(in_channels = ch, out_channels = MOVE_DIRECTION_LABEL_NUM, ksize = 1, nobias = True)
                self.l13_bias=L.Bias(shape=(9*9*MOVE_DIRECTION_LABEL_NUM))
    
        def __call__(self, x):
            h1 = F.relu(self.l1(x))
            h2 = F.relu(self.l2(h1))
            h3 = F.relu(self.l3(h2))
            h4 = F.relu(self.l4(h3))
            h5 = F.relu(self.l5(h4))
            h6 = F.relu(self.l6(h5))
            h7 = F.relu(self.l7(h6))
            h8 = F.relu(self.l8(h7))
            h9 = F.relu(self.l9(h8))
            h10 = F.relu(self.l10(h9))
            h11 = F.relu(self.l11(h10))
            h12 = F.relu(self.l12(h11))
            h13 = self.l13(h12)
            return self.l13_bias(F.reshape(h13, (-1, 9*9*MOVE_DIRECTION_LABEL_NUM)))

    1.2 学習処理

    1.2.1 実装

    train_policy.py
    
    import numpy as np
    import chainer
    from chainer import cuda, Variable
    from chainer import optimizers, serializers
    import chainer.functions as F
    
    from pydlshogi.common import *
    from pydlshogi.network.policy import PolicyNetwork
    from pydlshogi.features import *
    from pydlshogi.read_kifu import *
    
    import argparse
    import random
    import pickle
    import os
    import re
    
    import logging
    
    parser = argparse.ArgumentParser()
    parser.add_argument('kifulist_train', type=str, help='train kifu list')
    parser.add_argument('kifulist_test', type=str, help='test kifu list')
    parser.add_argument('--batchsize', '-b', type=int, default=32, help='Number of positions in each mini-batch')
    parser.add_argument('--test_batchsize', type=int, default=512, help='Number of positions in each test mini-batch')
    parser.add_argument('--epoch', '-e', type=int, default=1, help='Number of epoch times')
    parser.add_argument('--model', type=str, default='model/model_policy', help='model file name')
    parser.add_argument('--state', type=str, default='model/state_policy', help='state file name')
    parser.add_argument('--initmodel', '-m', default='', help='Initialize the model from given file')
    parser.add_argument('--resume', '-r', default='', help='Resume the optimization from snapshot')
    parser.add_argument('--log', default=None, help='log file path')
    parser.add_argument('--lr', type=float, default=0.01, help='learning rate')
    parser.add_argument('--eval_interval', '-i', type=int, default=1000, help='eval interval')
    args = parser.parse_args()
    
    logging.basicConfig(format='%(asctime)s\t%(levelname)s\t%(message)s', datefmt='%Y/%m/%d %H:%M:%S', filename=args.log, level=logging.DEBUG)
    
    model = PolicyNetwork()
    model.to_gpu()
    
    optimizer = optimizers.SGD(lr=args.lr)
    optimizer.setup(model)
    
    # Init/Resume
    if args.initmodel:
        logging.info('Load model from {}'.format(args.initmodel))
        serializers.load_npz(args.initmodel, model)
    if args.resume:
        logging.info('Load optimizer state from {}'.format(args.resume))
        serializers.load_npz(args.resume, optimizer)
    
    logging.info('read kifu start')
    # 保存済みのpickleファイルがある場合、pickleファイルを読み込む
    # train date
    train_pickle_filename = re.sub(r'\..*?$', '', args.kifulist_train) + '.pickle'
    if os.path.exists(train_pickle_filename):
        with open(train_pickle_filename, 'rb') as f:
            positions_train = pickle.load(f)
        logging.info('load train pickle')
    else:
        positions_train = read_kifu(args.kifulist_train)
    
    # test data
    test_pickle_filename = re.sub(r'\..*?$', '', args.kifulist_test) + '.pickle'
    if os.path.exists(test_pickle_filename):
        with open(test_pickle_filename, 'rb') as f:
            positions_test = pickle.load(f)
        logging.info('load test pickle')
    else:
        positions_test = read_kifu(args.kifulist_test)
    
    # 保存済みのpickleがない場合、pickleファイルを保存する
    if not os.path.exists(train_pickle_filename):
        with open(train_pickle_filename, 'wb') as f:
            pickle.dump(positions_train, f, pickle.HIGHEST_PROTOCOL)
        logging.info('save train pickle')
    if not os.path.exists(test_pickle_filename):
        with open(test_pickle_filename, 'wb') as f:
            pickle.dump(positions_test, f, pickle.HIGHEST_PROTOCOL)
        logging.info('save test pickle')
    logging.info('read kifu end')
    
    logging.info('train position num = {}'.format(len(positions_train)))
    logging.info('test position num = {}'.format(len(positions_test)))
    
    # mini batch
    def mini_batch(positions, i, batchsize):
        mini_batch_data = []
        mini_batch_move = []
        for b in range(batchsize):
            features, move, win = make_features(positions[i + b])
            mini_batch_data.append(features)
            mini_batch_move.append(move)
    
        return (Variable(cuda.to_gpu(np.array(mini_batch_data, dtype=np.float32))),
                Variable(cuda.to_gpu(np.array(mini_batch_move, dtype=np.int32))))
    
    def mini_batch_for_test(positions, batchsize):
        mini_batch_data = []
        mini_batch_move = []
        for b in range(batchsize):
            features, move, win = make_features(random.choice(positions))
            mini_batch_data.append(features)
            mini_batch_move.append(move)
    
        return (Variable(cuda.to_gpu(np.array(mini_batch_data, dtype=np.float32))),
                Variable(cuda.to_gpu(np.array(mini_batch_move, dtype=np.int32))))
    
    # train
    logging.info('start training')
    itr = 0
    sum_loss = 0
    for e in range(args.epoch):
        positions_train_shuffled = random.sample(positions_train, len(positions_train))
    
        itr_epoch = 0
        sum_loss_epoch = 0
        for i in range(0, len(positions_train_shuffled) - args.batchsize, args.batchsize):
            x, t = mini_batch(positions_train_shuffled, i, args.batchsize)
            y = model(x)
    
            model.cleargrads()
            loss = F.softmax_cross_entropy(y, t)
            loss.backward()
            optimizer.update()
    
            itr += 1
            sum_loss += loss.data
            itr_epoch += 1
            sum_loss_epoch += loss.data
    
            # print train loss and test accuracy
            if optimizer.t % args.eval_interval == 0:
                x, t = mini_batch_for_test(positions_test, args.test_batchsize)
                y = model(x)
                logging.info('epoch = {}, iteration = {}, loss = {}, accuracy = {}'.format(optimizer.epoch + 1, optimizer.t, sum_loss / itr, F.accuracy(y, t).data))
                itr = 0
                sum_loss = 0
    
        # validate test data
        logging.info('validate test data')
        itr_test = 0
        sum_test_accuracy = 0
        for i in range(0, len(positions_test) - args.batchsize, args.batchsize):
            x, t = mini_batch(positions_test, i, args.batchsize)
            y = model(x)
            itr_test += 1
            sum_test_accuracy += F.accuracy(y, t).data
        logging.info('epoch = {}, iteration = {}, train loss avr = {}, test accuracy = {}'.format(optimizer.epoch + 1, optimizer.t, sum_loss_epoch / itr_epoch, sum_test_accuracy / itr_test))
    
        optimizer.new_epoch()
    
    logging.info('save the model')
    serializers.save_npz(args.model, model)
    logging.info('save the optimizer')
    serializers.save_npz(args.state, optimizer)

    上のコードは、学習部分を実装しています。
    具体的に、

    こんな感じに実装されています。

    1.2.2 学習実行

    実際に学習を実行すると、こんな感じで、損失計算と、学習データから得られた結果との精度です。

    §2. 将棋AI実装

    ニューラルネットワークで、学習を終えたモデルを使って対局できるように、USIエンジンにします。

    USI(Universal Shogi Interface)プロトコルとは、将棋GUIソフトと思考エンジンが通信をするために、Tord Romstad氏によって考案された通信プロトコルです。
    http://shogidokoro.starfree.jp/usi.html

    上のように、USIプロトコルをもとに通信することで、将棋AIをGUI上で動かします。

    フォルダ構成

    \<policynetwork>(root dir) |- <bat> | | Docbase.bat | |- <pydlshogi> | |- <player> | | base_player.py | | Docbase_player.py | | | |- <usi> | | usi.py | | usi_Docbase_player.py

    import numpy as np
    import chainer
    from chainer import serializers
    from chainer import cuda, Variable
    import chainer.functions as F
    
    import shogi
    
    from pydlshogi.common import *
    from pydlshogi.features import *
    from pydlshogi.network.policy import *
    from pydlshogi.player.base_player import *
    
    def greedy(logits):
        return logits.index(max(logits))
    
    def boltzmann(logits, temperature):
        logits /= temperature
        logits -= logits.max()
        probabilities = np.exp(logits)
        probabilities /= probabilities.sum()
        return np.random.choice(len(logits), p=probabilities)
    
    class PolicyPlayer(BasePlayer):
        def __init__(self):
            super().__init__()
            self.modelfile = r'学習したモデルのパス'
            self.model = None
    
        def usi(self):
            print('id name DocBase ShogiAI')
            print('option name modelfile type string default ' + self.modelfile)
            print('usiok')
    
        def setoption(self, option):
            if option[1] == 'modelfile':
                self.modelfile = option[3]
    
        def isready(self):
            if self.model is None:
                self.model = PolicyNetwork()
                self.model.to_gpu()
            serializers.load_npz(self.modelfile, self.model)
            print('readyok')
    
        def go(self):
            if self.board.is_game_over():
                print('bestmove resign')
                return
    
            features = make_input_features_from_board(self.board)
            x = Variable(cuda.to_gpu(np.array([features], dtype=np.float32)))
    
            with chainer.no_backprop_mode():
                y = self.model(x)
    
                logits = cuda.to_cpu(y.data)[0]
                probabilities = cuda.to_cpu(F.softmax(y).data)[0]
    
            # 全ての合法手について
            legal_moves = []
            legal_logits = []
            for move in self.board.legal_moves:
                # ラベルに変換
                label = make_output_label(move, self.board.turn)
                # 合法手とその指し手の確率(logits)を格納
                legal_moves.append(move)
                legal_logits.append(logits[label])
                # 確率を表示
                print('info string {:5} : {:.5f}'.format(move.usi(), probabilities[label]))
    
            # 確率が最大の手を選ぶ(グリーディー戦略)
            selected_index = greedy(legal_logits)
            # 確率に応じて手を選ぶ(ソフトマックス戦略)
            selected_index = boltzmann(np.array(legal_logits, dtype=np.float32), 0.5)
            bestmove = legal_moves[selected_index]
    
            print('bestmove', bestmove.usi())

    上のように、AIの情報や局面ごとの最善手を計算していきます。

    §3. GUIソフトで動かす

    上で作ったモデルと実際に対局します。
    GUIソフトはいくつかありますが、将棋所を使用します。

    上のように、登録することができます。

    §4. 参考

    参考URL
    将棋AIで学ぶディープラーニングhttps://book.mynavi.jp/ec/products/detail/id=88752
    Alpha Goの論文https://www.nature.com/articles/nature16961

    §5. 終わりに

    いかがでしょうか?
    私は、ボードゲームが趣味で将棋、囲碁、チェス等ももちろん好きです。

    チェスは、1997年に、当時のチャンピオン Гaрри Каспaров さんが、IBM社のディープブルーに敗れました。私がまだ生まれていない時代から、AI vs 人間の戦いが始まっていたことを知ったときは、驚愕しました。そして、あと10年は人間に勝てないといわれていた、囲碁も2016年に李 世乭 さんが、Google傘下のDemis Hassabis さんが率いるDeepMind社のAlpha Goに敗れました。
    今回の将棋AIも、Alpha Goを参考にしています。

    そして、将棋も2010年代からずっと、ponanzaといわれる将棋ソフトの強さは注目されていました。ponanzaには、モンテカルロ法が採用されていました。当時から、将棋プロでさえ、ponanzaを一目おいていて、当時の竜王である渡辺明さんとponanzaの公開対局が催されたりしました。(渡辺明竜王(当時)の逆転勝ち)

    2016年、Alpha Goの論文が公開されると、すぐにAlpha Zeroと呼ばれる将棋AIが誕生しました。現在の、将棋界は藤井聡太2冠をはじめとして、若年世代を筆頭に、世代を問わず将棋AIを用いた研究が盛んになっています。

    コンピュータ将棋大会も、開かれていて、将棋AIの強さを競う大会もあります。ぜひ、今回の記事に興味をもった方はBERTで、将棋AIを作ってみてください!
    https://www.youtube.com/watch?v=2Vl6Ao4GaSQ&t=853s

    ※本記事は2021年09月時点の情報です。

    著者:マイナビエンジニアブログ編集部