python으로 간단한 모바일 푸시 서버를 만들어 보았다.

예약 발송 및 batch 처리등 처리 하였다. 


# push_server.py
# APNs, FCM, 예약 및 batch 푸시 발송

import json
import jwt
import time
from datetime import datetime
from hyper import HTTPConnection
from http.client import HTTPSConnection
from apscheduler.schedulers.blocking import BlockingScheduler
import mysql.connector
import configparser
from apscheduler.events import EVENT_JOB_EXECUTED

# Read configuration from config.ini
config = configparser.ConfigParser()

# SSL/TLS (Secure Sockets Layer / Transport Layer Security)
CA_FILE = config.get('SSL', 'ca_file', fallback=None)
CERT_FILE = config.get('SSL', 'cert_file', fallback=None)
KEY_FILE = config.get('SSL', 'key_file', fallback=None)

# Database
DB_HOST = config.get('Database', 'host')
DB_USER = config.get('Database', 'user')
DB_PASSWORD = config.get('Database', 'password')
DB_NAME = config.get('Database', 'database')
DB_PORT = config.get('Database', 'port')

# Protocol
PROTOCOL = config.get('Server', 'protocol', fallback='HTTP')

# APNs status codes
status_codes = {
    200: "Success",
    400: "Bad request",
    403: "Error with certificate or provider authentication token",
    405: "Invalid request method. Only POST requests are supported.",
    410: "Device token is no longer active for the topic.",
    413: "Notification payload was too large.",
    429: "Too many requests for the same device token.",
    500: "Internal server error",
    503: "Server is shutting down and unavailable.",

conn_class = HTTPConnection if PROTOCOL.upper() == 'HTTP' else HTTPSConnection

scheduler = BlockingScheduler()

# Create the request headers with expiration time
def create_request_headers(section_name, remaining_seconds):
    push_type = config.get(section_name, 'push_type')
    if push_type == "APNs": 
        return create_apns_request_headers(section_name, remaining_seconds)
    elif push_type == "FCM":
        return create_fcm_request_headers(section_name)
        return None

# Create APNs request headers
def create_apns_request_headers(section_name, remaining_seconds):
    auth_key_path = config.get(section_name, 'auth_key_path')
    team_id = config.get(section_name, 'team_id')
    key_id = config.get(section_name, 'key_id')
    bundle_id = config.get(section_name, 'bundle_id')
    with open(auth_key_path) as f:
        secret = f.read()
    token = jwt.encode(
            'iss': team_id,
            'iat': time.time(),
            'alg': 'ES256',
            'kid': key_id,
    token_bytes = token.encode('ascii')
    headers = {
        'apns-priority': '10',
        'apns-topic': bundle_id,
        'authorization': 'bearer {0}'.format(token_bytes.decode('ascii'))
    if remaining_seconds is not None:
        headers['apns-expiration'] = str(remaining_seconds)

    return headers

# Create FCM request headers
def create_fcm_request_headers(section_name):
    api_key = config.get(section_name, 'api_key') 
    headers = {
        'Authorization': 'key=' + api_key,
        'Content-Type': 'application/json'
    return headers

# Function to send notification with scheduled time
def send_notification(conn, section_name, push_token, message, badge_count=0, remaining_seconds=0):
    request_headers = create_request_headers(section_name, remaining_seconds)

    push_type = config.get(section_name, 'push_type')

    payload_data = create_payload_data(push_type, message, badge_count)
    payload = json.dumps(payload_data).encode('utf-8')

    if push_type == "APNs": 
        conn.request('POST', f'/3/device/{push_token}', payload, headers=request_headers)
    elif push_type == "FCM":
        conn.request('POST', '/fcm/send', payload, headers=request_headers)
    resp = conn.get_response()

    status_code = resp.status
    description = status_codes.get(status_code, "Unknown status code")
    print(f"Status code: {status_code} - Description: {description}")

# Create payload data based on push type
def create_payload_data(push_type, message, badge_count):
    if push_type == "APNs":
        return {
            'aps': {
                'alert': message,
                'badge': badge_count,
    elif push_type == "FCM":
        return {
            'to': push_token,
            'notification': {
                'title': '',
                'body': message,
            'data': {
                'badge': badge_count,

# Function to send push with scheduled time
def send_push(conn, section_name, push_token, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
        args=(conn, section_name, push_token, message, badge_count, remaining_seconds),

# Function to send bulk push
def send_bulk_push(conn, section_name, users, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
    for user in users: 
        push_token = user[6]
            args=(conn, section_name, push_token, message, badge_count, remaining_seconds),

# Function to send push to users in batches
def send_push_to_users_in_batches(section_name, users, batch_size, message, badge_count, scheduled_time, remaining_seconds=0): 
    push_type = config.get(section_name, 'push_type')
    environment = config.get(section_name, 'environment', fallback='development')

    if push_type == "APNs": 
        push_url = 'api.development.push.apple.com' if environment == 'development' else 'api.push.apple.com'
    elif push_type == "FCM":
        push_url = 'fcm.googleapis.com'

    conn = conn_class(push_url + ':443', ca_certs=CA_FILE, cert_file=CERT_FILE, key_file=KEY_FILE)

    total_users = len(users)
    num_batches = (total_users + batch_size - 1) // batch_size
    for batch_number in range(num_batches):
        start_index = batch_number * batch_size
        end_index = (batch_number + 1) * batch_size
        batch_users = users[start_index:end_index]
        send_bulk_push(conn, section_name, batch_users, message, badge_count, scheduled_time, remaining_seconds)


# MySQL connection and get user info
def get_users_from_database():
    connection_params = {
        'host': DB_HOST,
        'user': DB_USER,
        'password': DB_PASSWORD,
        'database': DB_NAME,
        'port': DB_PORT,
    if PROTOCOL == 'HTTPS':
        connection_params['ssl'] = {
            'ca': CA_FILE,
            'cert': CERT_FILE,
            'key': KEY_FILE
    connection = mysql.connector.connect(**connection_params)
        cursor = connection.cursor()
        query = 'SELECT * FROM users'
        users = cursor.fetchall()
        return users

# Calculate the scheduled time (in seconds from now)
def get_push_schedule_time(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()

    if all(x is None for x in [year, month, day, hour, minute]):
        return int(current_time.timestamp())

    scheduled_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute

    if scheduled_time < current_time:
        return int(current_time.timestamp())

    return int(scheduled_time.timestamp())

# Calculate the remaining expiration time (in seconds from now)
def get_remaining_expiry_seconds(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()
    expiration_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute
    time_difference = expiration_time - current_time
    remaining_seconds = max(int(time_difference.total_seconds()), 0)
    return remaining_seconds

# Push scheduling time
scheduled_time = get_push_schedule_time(2023, 12, 19, 10, 37)
# Remaining seconds until push expiration time
remaining_seconds = get_remaining_expiry_seconds(2023, 12, 19, 10, 38)
print(f"Scheduled Time: {scheduled_time}")
print(f"Remaining Seconds: {remaining_seconds}")

# Section name (company name)
section_name = 'MyApp'
# Get user information
users_data = get_users_from_database()
# Batch size
batch_size = 1000
# Push message
message = "test message."
# Badge count
badge_count = 1

# Send push to users based on batch size
send_push_to_users_in_batches(section_name, users_data, batch_size, message, badge_count, scheduled_time, remaining_seconds)



# config.ini

protocol = HTTP

ca_file = cert/ca.pem
cert_file = cert/cert.pem
key_file = cert/key.pem

host =
user = netcanis
password = 12345678
database = My_DB_Name
port = 3306

push_type = APNs
# development 또는 production
environment = development
bundle_id = com.mycompany.test
auth_key_path = cert/mycompany/AuthKey.p8
key_id = XX8UHXF9XX
team_id = XX6KEXF2XX

push_type = FCM
# development 또는 production
environment = development
bundle_id = com.mycompany.test
api_key = XXXX.....XXXX


A star

Q-Learning 알고리즘으로 구현하였던 길찾기를 A star 알고리즘으로 구현해 보았다.

그리드 구성은 이전과 동일하다.

# A* 

import numpy as np
import matplotlib.pyplot as plt
import heapq

# 그리드 월드 환경 설정
# 0: 빈공간, 1: 벽, 2: 목적지, (0,0): 시작위치
grid = np.array([[0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 1, 0, 0, 1, 0, 0, 0],
                 [0, 0, 0, 0, 1, 0, 0, 1, 0, 0],
                 [0, 1, 1, 0, 1, 0, 0, 0, 1, 0],
                 [0, 0, 0, 0, 0, 0, 0, 0, 1, 2]])

# 그리드 월드 환경 출력 함수
def plot_grid_world(start, goal, path=[]):
    plt.imshow(grid, cmap="gray", interpolation="none", origin="upper")
    if path:
        path = np.array(path)
        plt.plot(path[:, 1], path[:, 0], marker='o', markersize=4, linestyle='-', color='green')
        for i, pos in enumerate(path):
            plt.text(pos[1], pos[0], str(i + 1), ha="center", va="center", color="orange", fontsize=14)
    plt.text(start[1], start[0], "A", ha="center", va="center", color="red", fontsize=16)
    plt.text(goal[1], goal[0], "B", ha="center", va="center", color="green", fontsize=16)

# 휴리스틱 계산 함수
# A* 알고리즘을 이용하여 최적 경로 찾기
def heuristic(state, goal):
    return abs(state[0] - goal[0]) + abs(state[1] - goal[1])

# A* 알고리즘
def astar(grid, start, goal):
    frontier = [(0, start)]
    came_from = {}
    visited = set()
    # 'frontier'를 최소 힙(min heap)으로 변환
    while frontier:
        current_cost, current_state = heapq.heappop(frontier)
        if current_state == goal:
            # 시작 지점부터 목표 지점까지의 경로를 재구성
            path = []
            while current_state in came_from:
                current_state = came_from[current_state]
            return path[::-1] # 리스트 역순으로 변환 
        # 현재 상태가 방문한 곳이라면 스킵.
        if current_state in visited:
        # 현재 상태 방문 리스트에 추가 
        for next_state in get_neighbors(grid, current_state):
            if next_state not in visited:
                priority = current_cost + 1 + heuristic(next_state, goal)   # 우선순위 계산 
                heapq.heappush(frontier, (priority, next_state)) # 최소 우선순위를 가진 상태가 먼저 나오도록 정렬 
                came_from[next_state] = current_state # 현재 이동상태 기록 
    return None

# 현재 위치에서 이동 가능한 이웃 반환
def get_neighbors(grid, state):
    neighbors = []
    directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] 
    for dx, dy in directions:
        new_position = (state[0] + dx, state[1] + dy)
        if 0 <= new_position[0] < grid.shape[0] and 0 <= new_position[1] < grid.shape[1] and grid[new_position] != 1:
    return neighbors

# 시작 위치 및 타겟 위치 설정
start_position = (0, 0)
target_position = tuple(np.argwhere(grid == 2)[0]) # grid값이 2인 곳이 목적지

# A* 알고리즘 실행
optimal_path = astar(grid, start_position, target_position)

# 결과 출력
print("최적 경로:", optimal_path)

# 최적 경로 및 순서 출력
plt.ion() # nteractive mode
plt.clf()   # clear
plt.title(f"Optimal Path - Distance : {len(optimal_path)}")
plot_grid_world(start_position, target_position, optimal_path)


결과는 아래 그림과 같다.



일반적으로 문자열 비교는 '==' 연산자를 사용하거나 문자열 비교함수 'compare(_ :options:)'매서드를 사용한다.

그러나 위와 같이 결과 값이 True, False로 반환된 결과 값으로 처리할 경우 해당 결과 값을 변조하여 해당 조건문을 통과할 수 있다. 이러한 문제를 우회하기 위하여 아래와 같이 XOR 연산을 이용한 문자열 비교처리가 필요할 수 있다.

let str1 = "ABC123"
let str2 = "ABC123"

guard let utf8Bytes1 = str1.data(using: .utf8),
      let utf8Bytes2 = str2.data(using: .utf8) else {

// 각각의 바이트 별로 xor한 결과값을 어레이로 반환
let xorResult = zip(utf8Bytes1, utf8Bytes2).map { $0.0 ^ $0.1 } // xor 비교

// xorResult의 모든 값이 0인지 체크 - 모두 0이어야 동일한 문자열 이다.
if (xorResult.allSatisfy { $0 == 0 }) {
    // 좀더 확실하게 막기 원한다면 다시한번 체크.
	for i in 0..<utf8Bytes1.count {
        if utf8Bytes1[i] ^ utf8Bytes2[i] != 0 {
    // TODO : 작업은 여기에....



3x3 체크를 위해 아래와 같이 함수를 만들고 플레이어와 AI 둘다 막아보았다.
문제되는 부분이 있다면 댓글로 알려주길 바란다.

# 3x3 체크 
def check3x3(self, row, col):
    def check_direction_with_blank(dx, dy):
        scount = 1 # 같은 색상 갯수 
        bcount = 1 # 빈공간 포함 갯수
        r, c = row + dx, col + dy
        while 0 <= r < len(self.board) and 0 <= c < len(self.board[0]) and (self.board[r][c] == self.board[row][col] or self.board[r][c] == 0):
            bcount += 1
            if self.board[r][c] == self.board[row][col]:
                scount += 1
            r += dx
            c += dy
        return scount, bcount

    self.board[row][col] = self.turn_player
    count3x3 = 0
    directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
    for dx, dy in directions:
        sc1, bc1 = check_direction_with_blank( dx,  dy)
        sc2, bc2 = check_direction_with_blank(-dx, -dy)
        scount = sc1 + sc2 - 1
        bcount = bc1 + bc2 - 1
        if scound == 3 and bcound >= 5:
            count3x3 += 1
    self.board[row][col] = 0
    if check3x3 >= 2:
        return True
    return False
def find_empty_cells(self):
    empty_cells = []
    for row in range(BOARD_SIZE):
        for col in range(BOARD_SIZE):
            if self.board[row][col] == 0:
                if self.check3x3(row, col) == False: # 3x3 체크
                    empty_cells.append((row, col))
    return empty_cells
def handle_events(self):
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            self.game_over = True
        elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
            x, y = pygame.mouse.get_pos()
            row = int(round((y - MARGIN) / CELL_SIZE))
            col = int(round((x - MARGIN) / CELL_SIZE))
            if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                if self.check3x3(row, col) == True: # 3x3 체크

                self.make_move(row, col, self.turn_player)
    			(이하 생략)

이제 머신러닝으로 게임을 구현해 보자.

학습 데이터가 5만 이상 되어야 하는데 2만개 정도만 학습 시켰다.

최적화 이동 처리 함수와 믹스해서 구현해 보았다.

3x3일 경우 막지 않았다.

나중에 그 부분은 업데이트 하도록 하겠다.

# Created by netcanis on 2023/10/29.
# minimax
# alpha beta pruning
# 속도 개선 
# 훈련데이터 생성하여 머신러닝.

import pygame
import random
import numpy as np
import sys
import os
import tensorflow as tf
from keras.models import load_model

BOARD_SIZE = 9  # (7,9,13,15,19)

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

AI = -1



H5_FILE_NAME = "fiar_model.h5"

class FIAR:
    def __init__(self):
    def init_game(self):
        self.episode = 0
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)
    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = PLAYER#random.choice([PLAYER, AI])

        self.model = load_model(H5_FILE_NAME)
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        return empty_cells
    def find_adjacent_empty_cells(self):
        empty_cells = []
        empty_cells2 = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        # 수를 놓았을 경우 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 배열에 추가.
                        self.board[row][col] = self.turn_player
                        c1, c2, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = -self.turn_player
                        c3, c4, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = 0
                        if (c1 >= 3 and c2 >= 5) or (c3 >= 3 and c4 >= 5):
                            empty_cells2.append((row, col))

                        empty_cells.append((row, col))
        if len(empty_cells2) > 0:
            return empty_cells2
        return empty_cells
    def evaluate_position(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count
        def check_direction_with_blank(dx, dy):
            count = 1 # 빈자리 포함 총자리  수   
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and (board[r][c] == board[row][col] or board[r][c] == 0):
                count += 1
                r += dx
                c += dy
            return count
        total_count1 = 0
        total_count2 = 0
        total_score = 0
        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction( dx,  dy)
            count2 = check_direction(-dx, -dy)
            dir_count1 = count1 + count2 - 1
            count3 = check_direction_with_blank( dx,  dy)
            count4 = check_direction_with_blank(-dx, -dy)
            dir_count2 = count3 + count4 - 1
            total_count1 = max(total_count1, dir_count1)
            total_count2 = max(total_count2, dir_count2)
            # 각 방향별로 연결된 돌의 수가 3자리 이상이고 5개이상 놓을 수 있는 자리라면 점수를 추가한다.
            # 즉, 연결되는 돌이 교차하는 위치라면 점수가 높게 나온다.
            if dir_count1 == 4 and dir_count2 >= 5:
                total_score += dir_count1 * 4
            elif dir_count1 >= 3 and dir_count2 >= 5:
                total_score += dir_count1 * 2
            elif dir_count1 >= 2 and dir_count2 >= 5:
                total_score += dir_count1
        return total_count1, total_count2, total_score
    def check_winner(self, board, row, col):
        if board[row][col] == 0:
            return False
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        best_move = None
        best_score = 0
        best_move = self.make_optimal_move(player)
        if best_move == None:
            if player == AI:
                empty_cells = self.find_adjacent_empty_cells()
                for row, col in empty_cells: 
                    self.board[row][col] = player
                    s1 = self.evaluate_position(self.board, row, col)[2]
                    if s1 > best_score:
                        best_score = s1
                        best_move = (row, col)
                    self.board[row][col] = -player
                    s2 = self.evaluate_position(self.board, row, col)[2]
                    if s2 > best_score:
                        best_score = s2
                        best_move = (row, col)
                    self.board[row][col] = 0
                if best_score == 0:
                    best_move = random.choice(self.find_empty_cells())
                best_move = random.choice(self.find_empty_cells())
        self.make_move(best_move[0], best_move[1], player)
        return best_move

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            #print(f"[{self.sequence}] <{player}> {row},{col}")
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)
        if depth >= DIFFICULTY_LEVEL_AI:
            return 0
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
            return best_score
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
            return best_score
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        # 3. The position that becomes 4 when placed
        for row, col in empty_cells: 
            self.board[row][col] = player
            c1, c2, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = -player
            c3, c4, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = 0
            if (c1 == 4 and c2 >= 5) or (c3 == 4 and c4 >= 5):
                return (row, col)
        # 4. The position that becomes 3 when placed
        for row, col in empty_cells: 
            self.board[row][col] = player
            c1, c2, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = -player
            c3, c4, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = 0
            if (c1 == 3 and c2 >= 5) or (c3 == 3 and c4 >= 5):
                return (row, col)
        return None
    def find_best_move(self, player):
        if AI_MODE == "ML":
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                print(f"optimal move! ({optimal_move[0]}, {optimal_move[1]})")
                return optimal_move
            move_index = self.predicts(self.board)
            row = move_index // BOARD_SIZE
            col = move_index % BOARD_SIZE
            best_move = (row, col)
            print(f"ML move! ({row}, {col})")
        elif AI_MODE == "RANDOM":
            best_move = self.random_move(player)
        else: # MINIMAX
            print(f"[{self.sequence+1}] <{player}> ...")
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                return optimal_move
            alpha = -float('inf')
            beta = float('inf')

            best_move = None
            best_score = -float('inf') if player == AI else float('inf')

            empty_cells = self.find_adjacent_empty_cells()
            for index, (row, col) in enumerate(empty_cells):
                self.board[row][col] = player
                is_maximizing = False if player == AI else True
                score = self.minimax(0, is_maximizing, alpha, beta, row, col)
                self.board[row][col] = 0
                if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                    best_score = score
                    best_move = (row, col)
                percentage = (index / len(empty_cells)) * 100
                print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            print(f"    {best_move[0]},{best_move[1]} ({best_score})")
        return best_move
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            popup = False
                        elif event.key == pygame.K_n:
                        popup = False
                        message = ""
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
    def draw_board(self, screen):
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        if best_move is not None:
                            row, col = best_move
                            self.make_move(row, col, self.turn_player)
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

    def init_ML(self):
        hidden_layer_count = 3 * NUM_ITEMS
        self.model = tf.keras.Sequential([
            tf.keras.layers.Dense(hidden_layer_count, activation='relu', input_shape=(NUM_ITEMS,)),
            tf.keras.layers.Dense(NUM_ITEMS, activation='softmax')
        self.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    def predicts(self, input_data):
        if isinstance(input_data, list):
            input_data = np.array(input_data)
        prediction = self.model.predict(input_data.reshape(1, -1))
        sorted_indices = np.argsort(prediction, axis=-1)[:, ::-1]
        index = 0
        for i in sorted_indices[0]:
            if input_data.shape == (NUM_ITEMS,):
                if input_data[i] == 0:
                    index = i
            elif input_data.shape == (BOARD_SIZE, BOARD_SIZE):
                row = i // BOARD_SIZE
                col = i % BOARD_SIZE
                if input_data[row][col] == 0:
                    index = i
        return index

if __name__ == "__main__":
    game = FIAR()
    while True:
        while not game.game_over:
        game.show_message("Play Again? (y/n)", is_exit=True)



이제 머신러닝을 위한 대량의 훈련 데이터를 만들어보자.

플레이어와 AI는 모두 랜덤 모드로 수를 연산하도록 설정했다. 수를 놓을 때 3자리 이하인 부분에서는,

AI는 점수를 계산하여 높은 점수의 자리에 돌을 놓도록 했다. 플레이어는 3자리 이하인 부분에서 인접한 빈자리 중 랜덤한 위치에 돌을 놓도록 난이도에 약간의 차이를 두었다. 즉, 대부분 비기거나 AI가 좀더 잘 두도록 설정하였다.

# Created by netcanis on 2023/10/29.
# minimax
# alpha beta pruning
# 속도 개선 
# 훈련데이터 생성하여 머신러닝.

import pygame
import random
import numpy as np
import sys
import os
import tensorflow as tf
from keras.models import Sequential, load_model
import pandas as pd

BOARD_SIZE = 9  # (7,9,13,15,19)

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

AI = -1



if AUTO_PLAY == True:
CSV_FILE_NAME = "fiar_training_data.csv"
H5_FILE_NAME = "fiar_model.h5"

class FIAR:
    def __init__(self):
    def init_game(self):
        self.episode = 0
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)
    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = random.choice([PLAYER, AI]) if AUTO_PLAY == True else PLAYER
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        return empty_cells
    def find_adjacent_empty_cells(self):
        empty_cells = []
        empty_cells2 = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        # 수를 놓았을 경우 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 배열에 추가.
                        self.board[row][col] = self.turn_player
                        c1, c2, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = -self.turn_player
                        c3, c4, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = 0
                        if (c1 >= 3 and c2 >= 5) or (c3 >= 3 and c4 >= 5):
                            empty_cells2.append((row, col))

                        empty_cells.append((row, col))
        if len(empty_cells2) > 0:
            return empty_cells2
        return empty_cells
    def evaluate_position(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count
        def check_direction_with_blank(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and (board[r][c] == board[row][col] or board[r][c] == 0):
                count += 1
                r += dx
                c += dy
            return count
        total_count1 = 0
        total_count2 = 0
        total_score = 0
        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction( dx,  dy)
            count2 = check_direction(-dx, -dy)
            dir_count1 = count1 + count2 - 1
            count3 = check_direction_with_blank( dx,  dy)
            count4 = check_direction_with_blank(-dx, -dy)
            dir_count2 = count3 + count4 - 1
            total_count1 = max(total_count1, dir_count1)
            total_count2 = max(total_count2, dir_count2)
            # 각 방향별로 연결된 돌의 수가 3자리 이상이고 5개이상 놓을 수 있는 자리라면 점수를 추가한다.
            # 즉, 연결되는 돌이 교차하는 위치라면 점수가 높게 나온다.
            if dir_count1 == 4 and dir_count2 >= 5:
                total_score += dir_count1 * 4
            elif dir_count1 >= 3 and dir_count2 >= 5:
                total_score += dir_count1 * 2
            elif dir_count1 >= 2 and dir_count2 >= 5:
                total_score += dir_count1
        return total_count1, total_count2, total_score
    def check_winner(self, board, row, col):
        if board[row][col] == 0:
            return False
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        best_move = None
        best_score = 0
        best_move = self.make_optimal_move(player)
        if best_move == None:
            if player == AI:
                empty_cells = self.find_adjacent_empty_cells()
                for row, col in empty_cells: 
                    self.board[row][col] = player
                    s1 = self.evaluate_position(self.board, row, col)[2]
                    if s1 > best_score:
                        best_score = s1
                        best_move = (row, col)
                    self.board[row][col] = -player
                    s2 = self.evaluate_position(self.board, row, col)[2]
                    if s2 > best_score:
                        best_score = s2
                        best_move = (row, col)
                    self.board[row][col] = 0
                if best_score == 0:
                    best_move = random.choice(self.find_empty_cells())
                best_move = random.choice(self.find_empty_cells())
        self.make_move(best_move[0], best_move[1], player)
        return best_move

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            #print(f"[{self.sequence}] <{player}> {row},{col}")
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)
        if (self.turn_player == AI and depth >= DIFFICULTY_LEVEL_AI) or \
            (self.turn_player == PLAYER and depth >= DIFFICULTY_LEVEL_PLAYER):
            return 0
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
            return best_score
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
            return best_score
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        if (player == PLAYER and PLAYER_MODE == "RANDOM") or (player == AI and AI_MODE == "RANDOM"):
            # 3. The position that becomes 4 when placed
            for row, col in empty_cells: 
                self.board[row][col] = player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = -player
                c3, c4, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if (c1 == 4 and c2 >= 5) or (c3 == 4 and c4 >= 5):
                    return (row, col)
            # 4. The position that becomes 3 when placed
            for row, col in empty_cells: 
                self.board[row][col] = player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = -player
                c3, c4, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if (c1 == 3 and c2 >= 5) or (c3 == 3 and c4 >= 5):
                    return (row, col)
        return None
    def find_best_move(self, player):
        if AI_MODE == "RANDOM":
            best_move = self.random_move(player)
        else: # MINIMAX
            print(f"[{self.sequence+1}] <{player}> ...")
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                return optimal_move
            alpha = -float('inf')
            beta = float('inf')

            best_move = None
            best_score = -float('inf') if player == AI else float('inf')

            empty_cells = self.find_adjacent_empty_cells()
            for index, (row, col) in enumerate(empty_cells):
                self.board[row][col] = player
                is_maximizing = False if player == AI else True
                score = self.minimax(0, is_maximizing, alpha, beta, row, col)
                self.board[row][col] = 0
                if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                    best_score = score
                    best_move = (row, col)
                percentage = (index / len(empty_cells)) * 100
                print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            print(f"    {best_move[0]},{best_move[1]} ({best_score})")
        return best_move
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            popup = False
                        elif event.key == pygame.K_n:
                        popup = False
                        message = ""
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
    def draw_board(self, screen):
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        if best_move is not None:
                            row, col = best_move
                            self.make_move(row, col, self.turn_player)
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

    def init_ML(self):
        hidden_layer_count = 3 * NUM_ITEMS
        self.model = tf.keras.Sequential([
            tf.keras.layers.Dense(hidden_layer_count, activation='relu', input_shape=(NUM_ITEMS,)),
            tf.keras.layers.Dense(NUM_ITEMS, activation='softmax')
        self.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    def learning(self):
        if os.path.exists(CSV_FILE_NAME) == False:
            x_data, y_data = self.generate_training_data(NUM_EPISODES)

            self.save_data_to_csv(x_data, y_data, CSV_FILE_NAME)
            print(f"{CSV_FILE_NAME} 저장 완료")
            x_data, y_data = self.load_data_from_csv(CSV_FILE_NAME)
        self.model.fit(x_data, y_data, epochs=100, verbose=1)
        test_results = self.model.evaluate(x_data, y_data)
        print(f"손실(Loss): {test_results[0]}")
        print(f"정확도(Accuracy): {test_results[1]}")
    def generate_training_data(self, num_games):
        x_data = []
        y_data = []
        while self.episode < num_games:
            x = []
            y = []
            while True:
                row, col = self.get_next_move(self.turn_player)
                y.append(np.eye(NUM_ITEMS)[row * BOARD_SIZE + col])
                if self.check_winner(self.board, row, col):
                if self.is_board_full(self.board):
            if self.turn_player != PLAYER:
                del x[-1]
                del y[0]
                self.episode += 1
                print(f"{self.episode}: {self.turn_player} win.")

        return np.array(x_data), np.array(y_data)

    def get_next_move(self, player):
        if (player == AI and AI_MODE == "MINIMAX") or (player == PLAYER and PLAYER_MODE == "MINIMAX"):
            return self.minimax_move(player)
            return self.random_move(player)
    def save_data_to_csv(self, x_data, y_data, file_name):
        x_data_flat = [x.flatten() for x in x_data]
        y_data_flat = [y.flatten() for y in y_data]
        data = {'x_data': x_data_flat, 'y_data': y_data_flat}
        df = pd.DataFrame(data)
        df.to_csv(file_name, index=False)

    def load_data_from_csv(self, file_name):
        df = pd.read_csv(file_name)
        x_data_flat = df['x_data'].apply(lambda x: np.fromstring(x[1:-1], sep=' '))
        y_data_flat = df['y_data'].apply(lambda x: np.fromstring(x[1:-1], sep=' '))
        x_data = np.array(x_data_flat.to_list())
        y_data = np.array(y_data_flat.to_list())
        return x_data, y_data
if __name__ == "__main__":
    game = FIAR()


minimax를 이용한 방식으로는 대량의 훈련데이터를 생성하기에 너무 느리므로
각 빈자리에 대한 점수를 계산하여 높은 점수 자리에 돌을 놓는 랜덤모드 추가해 보았다.
다소 기계적인 소심한 수를 두지만 속도만큼은 확실히 빠르다!
<빈 자리 반환 함수 개선사항>
- 수를 놓았을 때 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 반환.
- 수를 놓았을 때 연결된 수가 4개가 되는 경우 즉시 수를 놓도록 처리.
<랜덤 모드의 점수 계산 방법>
아래의 계산을 각 방향(좌->우, 상->하, 좌상->우하, 우상->좌하)으로 4번 누적된 값을 반환.
- 연결된 돌의 수가 4개이고 5개 이상 돌을 놓을 수 있는 자리: 16점
- 연결된 돌의 수가 3개이고 5개 이상 돌을 놓을 수 있는 자리:  6점
- 연결된 돌의 수가 2개이고 5개 이상 돌을 놓을 수 있는 자리:  2점
랜덤 모드는 수가 놓여진 주변 셀에 대해 위의 랜덤모드 점수를 구해 가장 높은 점수 위치에 수를 놓도록 하였다.

# Created by netcanis on 2023/10/28.
# minimax
# alpha beta pruning
# 속도 개선 2차 

import pygame
import random
import numpy as np
import sys

BOARD_SIZE = 9  # (7,9,13,15,19)

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

AI = -1



class FIAR:
    def __init__(self):
    def init_game(self):
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)

    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = PLAYER
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        return empty_cells
    def find_adjacent_empty_cells(self):
        empty_cells = []
        empty_cells2 = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        # 수를 놓았을 경우 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 배열에 추가.
                        self.board[row][col] = self.turn_player
                        c1, c2, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = -self.turn_player
                        c3, c4, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = 0
                        if (c1 >= 3 and c2 >= 5) or (c3 >= 3 and c4 >= 5):
                            empty_cells2.append((row, col))

                        empty_cells.append((row, col))
        if len(empty_cells2) > 0:
            return empty_cells2
        return empty_cells
    def evaluate_position(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count
        def check_direction_with_blank(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and (board[r][c] == board[row][col] or board[r][c] == 0):
                count += 1
                r += dx
                c += dy
            return count
        total_count1 = 0
        total_count2 = 0
        total_score = 0
        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction( dx,  dy)
            count2 = check_direction(-dx, -dy)
            dir_count1 = count1 + count2 - 1
            count3 = check_direction_with_blank( dx,  dy)
            count4 = check_direction_with_blank(-dx, -dy)
            dir_count2 = count3 + count4 - 1
            total_count1 = max(total_count1, dir_count1)
            total_count2 = max(total_count2, dir_count2)
            # 각 방향별로 연결된 돌의 수가 3개 이상이고 5개 이상 놓을 수 있는 자리라면 점수를 추가한다.
            # 즉, 연결되는 돌이 교차하는 위치라면 점수가 높게 나온다.
            if dir_count1 == 4 and dir_count2 >= 5:
                total_score += dir_count1 * 4
            elif dir_count1 >= 3 and dir_count2 >= 5:
                total_score += dir_count1 * 2
            elif dir_count1 >= 2 and dir_count2 >= 5:
                total_score += dir_count1
        return total_count1, total_count2, total_score
    def check_winner(self, board, row, col):
        if board[row][col] == 0:
            return False
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        optimal_move = self.make_optimal_move(player)
        if optimal_move != None:
            return optimal_move
        empty_cells = self.find_adjacent_empty_cells()
        best_move = (0,0)
        best_score = 0
        for row, col in empty_cells: 
            self.board[row][col] = player
            s1 = self.evaluate_position(self.board, row, col)[2]
            if s1 > best_score:
                best_score = s1
                best_move = (row, col)
            self.board[row][col] = -player
            s2 = self.evaluate_position(self.board, row, col)[2]
            if s2 > best_score:
                best_score = s2
                best_move = (row, col)
            self.board[row][col] = 0
        if best_score == 0:
            best_move = random.choice(self.find_empty_cells())
        return best_move

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            print(f"[{self.sequence}] <{player}> {row},{col}")
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)

        if depth >= DIFFICULTY_LEVEL:
            return 0
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
            return best_score
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
            return best_score
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        if AI_MODE == "RANDOM":
            # 3. ai turn: 4
            for row, col in empty_cells: 
                self.board[row][col] = player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if c1 == 4 and c2 >= 5:
                    return (row, col)
            # 4. player turn: 4
            for row, col in empty_cells: 
                self.board[row][col] = -player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if c1 == 4 and c2 >= 5:
                    return (row, col)
        return None
    def find_best_move(self, player):
        if AI_MODE == "RANDOM":
            best_move = self.random_move(player)
        else: # MINIMAX
            print(f"[{self.sequence+1}] <{player}> ...")
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                return optimal_move
            alpha = -float('inf')
            beta = float('inf')

            best_move = None
            best_score = -float('inf') if player == AI else float('inf')

            empty_cells = self.find_adjacent_empty_cells()
            for index, (row, col) in enumerate(empty_cells):
                self.board[row][col] = player
                is_maximizing = False if player == AI else True
                score = self.minimax(0, is_maximizing, alpha, beta, row, col)
                self.board[row][col] = 0
                if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                    best_score = score
                    best_move = (row, col)
                percentage = (index / len(empty_cells)) * 100
                print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            print(f"    {best_move[0]},{best_move[1]} ({best_score})")
        return best_move
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            popup = False
                        elif event.key == pygame.K_n:
                        popup = False
                        message = ""
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
    def draw_board(self, screen):
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        self.make_move(best_move[0], best_move[1], self.turn_player)
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

if __name__ == "__main__":
    game = FIAR()
    while True:
        while not game.game_over:
        game.show_message("Play Again? (y/n)", is_exit=True)

속도는 비교할 수 없을 정도로 빨라졌다.
기계적으로 막기에 급급해 보이는 소심한 수를 놓는건 단점이다.
하지만 대량으로 학습 데이터를 만들기에는 충분한 듯하다.


minimax 속도를 개선하기 위해 다음과 같은 변경사항을 적용하였다.

1. 빈 자리를 검색할 때, 이미 돌이 놓여진 위치 주변에 있는 빈 자리만 반환.

2. AI가 돌을 놓았을 때, 승리 조건이 만족되는 위치라면 즉시 돌을 놓는다.

3. 플레이어가 돌을 놓았을 때, 승리 조건이 만족되는 위치라면 AI가 그 위치에 돌을 즉시 놓는다.


1번 빈 자리 검색 시 아래 그림과 같이 돌이 놓여진 주변 빈자리만 검색하여 반환되도록 한다.

반환된 빈 자리는 불필요한 트리 탐색을 줄여 속도가 개선된다.

빈 자리 검색시 반환되는 위치


# Created by netcanis on 2023/10/07.
# minimax
# alpha beta pruning
# 속도 개선 

import pygame
import random
import numpy as np
import sys

BOARD_SIZE = 9  # (7,9,13,15,19)

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

AI = -1


class FIAR:
    def __init__(self):
    def init_game(self):
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)

    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = PLAYER
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        return empty_cells
    # 수가 놓여진 자리의 주변 빈 자리만 반환
    def find_adjacent_empty_cells(self):
        empty_cells = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        empty_cells.append((row, col))
        return empty_cells
    def check_winner(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = random.choice(self.find_adjacent_empty_cells())
        self.make_move(row, col, player)
        return row, col

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            print(f"[{self.sequence}] <{player}> {row},{col}")
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)

        if depth >= DIFFICULTY_LEVEL:
            return 0
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
            return best_score
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
            return best_score
    # 아래 우선순위에 따라 즉시 처리할 자리는 minimax 알고리즘 없이 즉시 수를 놓아 속도 개선
    # 1.만약 AI가 놓으면 게임을 이기는 자리라면 즉시 수를 놓는다. 
    # 2.만약 Player가 놓으면 게임을 이기는 자리라면 AI는 그 곳에 수는 놓는다.
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        return None
    def find_best_move(self, player):
        print(f"[{self.sequence+1}] <{player}> Calculating...")
        optimal_move = self.make_optimal_move(player)
        if optimal_move != None:
            return optimal_move
        alpha = -float('inf')
        beta = float('inf')

        best_move = None
        best_score = -float('inf') if player == AI else float('inf')

        empty_cells = self.find_adjacent_empty_cells()
        for index, (row, col) in enumerate(empty_cells):
            self.board[row][col] = player
            is_maximizing = False if player == AI else True
            score = self.minimax(0, is_maximizing, alpha, beta, row, col)
            self.board[row][col] = 0
            if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                best_score = score
                best_move = (row, col)
            percentage = (index / len(empty_cells)) * 100
            print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
        return best_move
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            popup = False
                        elif event.key == pygame.K_n:
                        popup = False
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
    def draw_board(self, screen):
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        if best_move is not None:
                            row, col = best_move
                            self.make_move(row, col, self.turn_player)
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

if __name__ == "__main__":
    game = FIAR()
    while True:
        while not game.game_over:
        game.show_message("Play Again? (y/n)", is_exit=True)


속도는 다소 빨라지긴 했다.

3x3을 만들면 이미 진 것을 예감한 것인지 AI는 그 때부터 엉뚱한 수를 놓는다.  

마치 포기한 것처럼...


