2025/12/11

テクノロジー

PyTorchのマルチプロセス学習でハマったところ

この記事の目次

    本記事は【Advent Calendar】の9日目の記事です。


    AI戦略室のM・Wです。この記事を書いているのは12月1日です。
    11月30日にジャパンカップでカランダガンの単勝と三連単を取りました。現地で観戦をしていたのですが、大変興奮したレースでした。ゴール間際の2頭の叩き合いに混ざる空馬。

    ぜひYoutubeで動画を見てください。以上、12月11日の記事でした!

    沼の淵

    突然ですが、forkやspawnの違いについて知っていますでしょうか。

    私は知らなかったです。いまも正直わかりません。
    その結果、沼にハマってしまいました。抜け出せないまま今日を過ごしています。
    これは自分用のメモ+同じようなことを実装する人がいた際の一助になればいいなと思って執筆しています。

    まずとあるAgentを考えます。このAgentは「環境」を知覚し「行動」を決定するAgentです。
    Agentは「行動」を行うと「環境」から「報酬」を手にすることができ、「環境」は次の「状態」に遷移します。

    図にするとこんな感じです。これは基本的な強化学習を説明する際に用いられる簡略図です。

    今回、ボン◯ーマンのようなPvPをするゲームについて、強化学習を使ってエージェントを作る機会がありました。つまり上の画像に照らし合わせると下記の表となります。

    用語対応
    環境ゲーム自体
    エージェント操作キャラクター
    状態その時時の盤面
    行動移動+ボム配置+キック
    報酬勝利/敗北など

    レギュレーションとして自陣営は2つのBotを用意する2vs2の対決と提示されたため、上記のエージェントが2つ必要なマルチエージェント学習を行う必要がありました。

    (ちなみに特に実装は指定されなかったのでロジックで記述するでもOKです。AI部署なので強化学習で挑みました)

    沼にダイブ

    ここでいよいよ掲題に出てきたマルチプロセス学習の話ができてます。
    最新の強化学習アルゴリズムに疎かったので、一旦Actor-CriticベースのA3Cアルゴリズムを選択しました。実装したモデル構造は下記となります。

    盤面のエンコードやモデル設計については以下のモデルを参考としています。Cursor先生ありがとう!
    Multi-Agent Training for Pommerman: Curriculum Learning and Population-based Self-Play Approach:
    https://arxiv.org/abs/2407.00662

    マルチプロセス学習の全体感としては下記の図です。

    ゲーム盤面を提供するホスト側とエージェント間はWebSocketによる疎通を行う必要があるため、各BotごとにWebSocketサーバを子プロセスとして作成します。
    つまり子プロセス上では、ゲーム開始から終了まで実行されますが、そこで収集したデータについては何かしらの方法を使って親プロセス上に移動させないと、子プロセスがterminateされた際に破棄されてしまう問題があります。

    そのため共有メモリを使って誤差勾配を親プロセス側と共有を行い、最適化器を使って共有モデルのパラメータを更新、次の学習ループで子プロセスへ配分を繰り返し学習を行っていきます。

    沼の底

    学習モデルを共有メモリに配置するためにはmodel.share_memory()を実施しますが、今回の実装のケースにおいてたびたび子プロセスのmodel.backward()が失敗する事象がありました。
    いくつか問題はありましたが、

    ・下記のログが出力され、backward処理が失敗するケース

    objc[8623]: +[MPSGraphObject initialize] may have been in progress in another thread when fork() was called.
    objc[8623]: +[MPSGraphObject initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug.

    ・backward()が呼び出された際に子プロセスがクラッシュして音もなく消える

    使用してる端末がMacOSであるため、前者のケースでは$export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YESを実行することでエラー文が出力されなくなります。しかしながらこれはワークアラウンドとなります。
    MacOSではfork()が実行された際に、親プロセスで既に初期化されつつあったObjective-Cのデータ構造が子プロセスに引き継がれると、子プロセス側でその初期化が安全に完了できない状況が発生することがあります。これにより、子プロセスがクラッシュ(通常はEXC_BAD_ACCESS)したり、デッドロックしたりします。

    Pythonのmultiprocessingは、デフォルトでは「fork」方式を使って新しいプロセスを生成するため、この問題に直面しやすいです。そのため2点目の音もなく子プロセスが消えた原因も含めて、backward()処理がうまく動作しないのはfork()による子プロセスの生成に問題があると結論付けました。

    ちなみにPyTorchからマルチプロセス学習する際のベストプラクティスでも「Use an alternative process start methods, such as spawn or forkserver, which ensures a clean initialization of each process.」と記載があります。
    https://docs.pytorch.org/docs/stable/notes/multiprocessing.html#poison-fork-in-multiprocessing

    spawnforkとは違い、Pythonインタープリタごとプロセスを生成します。起動が遅いというデメリットはありますが、必要な情報のみを親プロセスから引き継ぐ点や上記の初期化に起因するようなエラーを回避できるメリットがあります。

    実験

    簡単なNNを構築してspawnを使用したbackward()がうまくいくかどうかを実験してみます。

    import torch
    import torch.nn as nn
    import torch.multiprocessing as mp
    import torch.nn.functional as F
    import torch.optim as optim
    
    
    class ExampleNN(nn.Module):
        def __init__(self):
            super(ExampleNN, self).__init__()
    
            self.fc = nn.Linear(10, 1)
    
        def forward(self, x):
            out = self.fc(x)
            return F.sigmoid(out)
    
    
    def train(xs, target):
        model = ExampleNN()
        optimizer = optim.SGD(model.parameters())
        out = model(xs)
        loss = F.binary_cross_entropy(out, target)
    
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print('trainメソッドが終了したよ')

    上記のtrainメソッドをforkで動かしてみます。

    if __name__ == '__main__':
    
        x = torch.rand(1, 5, 10)
        t = torch.Tensor([0, 1, 1, 0, 1]).reshape(1, 5, 1)
    
        train(x, t)
        print('-'*54)
    
        ctx = mp.get_context('fork')
        processes = []
        for i in range(5):
            p = ctx.Process(target=train, args=(x, t))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    
    >>> trainメソッドが終了したよ
    ------------------------------------------------------
    RuntimeError: Unable to handle autograd's threading in combination with fork-based multiprocessing. See https://github.com/pytorch/pytorch/wiki/Autograd-and-Fork

    無事エラーが出ました。
    ちなみに途中のtrain(x, t)をコメントアウトすると正常に動作します。これは上記のGithubにも記載の通り、backward()で呼び出されているAutogradがすでに使用されている状態でfork()されたことに起因するエラーとなります。

    Autograd engine relies on threads pool, which makes it vulnerable to fork. We detect such situations and warn users to use spawn method of multiprocessing.

    指示通りspawnでやってみましょう。

    if __name__ == '__main__':
    
        x = torch.rand(1, 5, 10)
        t = torch.Tensor([0, 1, 1, 0, 1]).reshape(1, 5, 1)
    
        train(x, t)
        print('-'*54)
    
        ctx = mp.get_context('spawn')
        processes = []
        for i in range(5):
            p = ctx.Process(target=train, args=(x, t))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    
     >>> trainメソッドが終了したよ
    ------------------------------------------------------
    trainメソッドが終了したよ
    trainメソッドが終了したよ
    trainメソッドが終了したよ
    trainメソッドが終了したよ
    trainメソッドが終了したよ

    無事エラーなく実行することができました。
    この結果からspawnを明示的に指定することでbackward()処理のエラーを回避できることがわかります。
    実際の実装は下記のような形となっています。

            ctx = mp.get_context('spawn')
            # mp.set_start_method('spawn', force=True)
    
            # エピソードループ
            for episode in tqdm(range(num_episodes)):
                clients: List[BombermanClient] = self.create_clients()
                process_pool = []
                for c in clients:
                    p = ctx.Process(target=c.run)
                    p.start()
                    process_pool.append(p)
    
                # エピソードの学習
                loop = asyncio.get_event_loop()
                loop.run_until_complete(self.train_episode(max_steps))
                print("ロールアウト終了")
    
                # 子プロセスの終了を待つ
                for p in process_pool:
                    if p.is_alive():
                        p.terminate()
                        p.join(timeout=3)  # 最大3秒待つ
                        if p.is_alive():
                            # それでも終了しない場合は強制終了
                            print(f"Warning: Process {p.pid} did not terminate gracefully, forcing...")
                            p.kill()
                            p.join()

    そもそもMacOSだとデフォルトでspawnだそうです。つまりわざわざ明示的にforkを使ったところから全ては始まったのです。Cursorくん、君は頑なにforkを推していたよね・・・。

    これでに臨めます!

    終わりに

    元々のスタート地点として、マイナビグループの中の一つであるMynavi Techtus Vietnamさんから挑戦状を叩きつけられたところからこの沼は始まりました。

    Techtusさん「ボン◯ーマンAIを作成してPvPをやろう!それでどこのチームが強いか勝負だ!」

    結果は・・・

    全敗でした。

    そもそもこの問題を解決したのは戦いが終わった3週間後ぐらいなんです!
    つまり、戦には裸同然で挑んだことになる・・・?すでに次の戦は始まっているという決意を胸に日々沼の中で生きたいと思います!

    イベント告知

    12月23日にイベントを開催します!申し込みはこちらから▼

    https://mynaviit.connpass.com/event/376769

    ※本記事は2025年12月時点の情報です。

    著者:マイナビエンジニアブログ編集部