Push Notification

개발/Note 2023. 12. 27. 16:42
반응형

python으로 간단한 모바일 푸시 서버를 만들어 보았다.

예약 발송 및 batch 처리등 처리 하였다. 

 

#
# push_server.py
# APNs, FCM, 예약 및 batch 푸시 발송
#

import json
import jwt
import time
from datetime import datetime
from hyper import HTTPConnection
from http.client import HTTPSConnection
from apscheduler.schedulers.blocking import BlockingScheduler
import mysql.connector
import configparser
from apscheduler.events import EVENT_JOB_EXECUTED

# Read configuration from config.ini
config = configparser.ConfigParser()
config.read('config.ini')

# SSL/TLS (Secure Sockets Layer / Transport Layer Security)
CA_FILE = config.get('SSL', 'ca_file', fallback=None)
CERT_FILE = config.get('SSL', 'cert_file', fallback=None)
KEY_FILE = config.get('SSL', 'key_file', fallback=None)

# Database
DB_HOST = config.get('Database', 'host')
DB_USER = config.get('Database', 'user')
DB_PASSWORD = config.get('Database', 'password')
DB_NAME = config.get('Database', 'database')
DB_PORT = config.get('Database', 'port')

# Protocol
PROTOCOL = config.get('Server', 'protocol', fallback='HTTP')

# APNs status codes
status_codes = {
    200: "Success",
    400: "Bad request",
    403: "Error with certificate or provider authentication token",
    405: "Invalid request method. Only POST requests are supported.",
    410: "Device token is no longer active for the topic.",
    413: "Notification payload was too large.",
    429: "Too many requests for the same device token.",
    500: "Internal server error",
    503: "Server is shutting down and unavailable.",
}

conn_class = HTTPConnection if PROTOCOL.upper() == 'HTTP' else HTTPSConnection

scheduler = BlockingScheduler()


# Create the request headers with expiration time
def create_request_headers(section_name, remaining_seconds):
    push_type = config.get(section_name, 'push_type')
    
    if push_type == "APNs": 
        return create_apns_request_headers(section_name, remaining_seconds)
    elif push_type == "FCM":
        return create_fcm_request_headers(section_name)
    else:
        return None

# Create APNs request headers
def create_apns_request_headers(section_name, remaining_seconds):
    auth_key_path = config.get(section_name, 'auth_key_path')
    team_id = config.get(section_name, 'team_id')
    key_id = config.get(section_name, 'key_id')
    bundle_id = config.get(section_name, 'bundle_id')
    
    with open(auth_key_path) as f:
        secret = f.read()
    
    token = jwt.encode(
        {
            'iss': team_id,
            'iat': time.time(),
        },
        secret,
        algorithm='ES256',
        headers={
            'alg': 'ES256',
            'kid': key_id,
        }
    )
    token_bytes = token.encode('ascii')
    
    headers = {
        'apns-priority': '10',
        'apns-topic': bundle_id,
        'authorization': 'bearer {0}'.format(token_bytes.decode('ascii'))
    }
    
    if remaining_seconds is not None:
        headers['apns-expiration'] = str(remaining_seconds)

    return headers

# Create FCM request headers
def create_fcm_request_headers(section_name):
    api_key = config.get(section_name, 'api_key') 
    
    headers = {
        'Authorization': 'key=' + api_key,
        'Content-Type': 'application/json'
    }
    return headers

# Function to send notification with scheduled time
def send_notification(conn, section_name, push_token, message, badge_count=0, remaining_seconds=0):
    request_headers = create_request_headers(section_name, remaining_seconds)

    push_type = config.get(section_name, 'push_type')

    payload_data = create_payload_data(push_type, message, badge_count)
    payload = json.dumps(payload_data).encode('utf-8')

    if push_type == "APNs": 
        conn.request('POST', f'/3/device/{push_token}', payload, headers=request_headers)
    elif push_type == "FCM":
        conn.request('POST', '/fcm/send', payload, headers=request_headers)
    
    resp = conn.get_response()

    status_code = resp.status
    description = status_codes.get(status_code, "Unknown status code")
    print(f"Status code: {status_code} - Description: {description}")
    print(resp.read())

# Create payload data based on push type
def create_payload_data(push_type, message, badge_count):
    if push_type == "APNs":
        return {
            'aps': {
                'alert': message,
                'badge': badge_count,
            }
        }
    elif push_type == "FCM":
        return {
            'to': push_token,
            'notification': {
                'title': '',
                'body': message,
            },
            'data': {
                'badge': badge_count,
            },
        }

# Function to send push with scheduled time
def send_push(conn, section_name, push_token, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
    scheduler.add_job(
        send_notification,
        args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
        trigger='date',
        run_date=datetime.fromtimestamp(scheduled_time)
    )
    scheduler.start()

# Function to send bulk push
def send_bulk_push(conn, section_name, users, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
    for user in users: 
        push_token = user[6]
        scheduler.add_job(
            send_notification,
            args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
            trigger='date',
            run_date=datetime.fromtimestamp(scheduled_time)
        )
    scheduler.start()

# Function to send push to users in batches
def send_push_to_users_in_batches(section_name, users, batch_size, message, badge_count, scheduled_time, remaining_seconds=0): 
    push_type = config.get(section_name, 'push_type')
    environment = config.get(section_name, 'environment', fallback='development')

    if push_type == "APNs": 
        push_url = 'api.development.push.apple.com' if environment == 'development' else 'api.push.apple.com'
    elif push_type == "FCM":
        push_url = 'fcm.googleapis.com'

    conn = conn_class(push_url + ':443', ca_certs=CA_FILE, cert_file=CERT_FILE, key_file=KEY_FILE)

    total_users = len(users)
    num_batches = (total_users + batch_size - 1) // batch_size
    for batch_number in range(num_batches):
        start_index = batch_number * batch_size
        end_index = (batch_number + 1) * batch_size
        batch_users = users[start_index:end_index]
        send_bulk_push(conn, section_name, batch_users, message, badge_count, scheduled_time, remaining_seconds)

    conn.close()

# MySQL connection and get user info
def get_users_from_database():
    connection_params = {
        'host': DB_HOST,
        'user': DB_USER,
        'password': DB_PASSWORD,
        'database': DB_NAME,
        'port': DB_PORT,
    }
    
    if PROTOCOL == 'HTTPS':
        connection_params['ssl'] = {
            'ca': CA_FILE,
            'cert': CERT_FILE,
            'key': KEY_FILE
        }
    
    connection = mysql.connector.connect(**connection_params)
    
    try:
        cursor = connection.cursor()
        query = 'SELECT * FROM users'
        cursor.execute(query)
        users = cursor.fetchall()
        return users
    finally:
        cursor.close()
        connection.close()

# Calculate the scheduled time (in seconds from now)
def get_push_schedule_time(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()

    if all(x is None for x in [year, month, day, hour, minute]):
        return int(current_time.timestamp())

    scheduled_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute
    )

    if scheduled_time < current_time:
        return int(current_time.timestamp())

    return int(scheduled_time.timestamp())

# Calculate the remaining expiration time (in seconds from now)
def get_remaining_expiry_seconds(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()
    expiration_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute
    )
    time_difference = expiration_time - current_time
    remaining_seconds = max(int(time_difference.total_seconds()), 0)
    return remaining_seconds




# Push scheduling time
scheduled_time = get_push_schedule_time(2023, 12, 19, 10, 37)
# Remaining seconds until push expiration time
remaining_seconds = get_remaining_expiry_seconds(2023, 12, 19, 10, 38)
print(f"Scheduled Time: {scheduled_time}")
print(f"Remaining Seconds: {remaining_seconds}")

# Section name (company name)
section_name = 'MyApp'
# Get user information
users_data = get_users_from_database()
# Batch size
batch_size = 1000
# Push message
message = "test message."
# Badge count
badge_count = 1

# Send push to users based on batch size
send_push_to_users_in_batches(section_name, users_data, batch_size, message, badge_count, scheduled_time, remaining_seconds)

print("Complete.")

 

#
# config.ini
#

[Server]
# HTTP 또는 HTTPS
protocol = HTTP

[SSL]
ca_file = cert/ca.pem
cert_file = cert/cert.pem
key_file = cert/key.pem

[Database]
host = 127.0.0.1
user = netcanis
password = 12345678
database = My_DB_Name
port = 3306

[HarexApp]
# APNs, FCM
push_type = APNs
# development 또는 production
environment = development
bundle_id = com.mycompany.test
auth_key_path = cert/mycompany/AuthKey.p8
key_id = XX8UHXF9XX
team_id = XX6KEXF2XX

[HarexApp2]
# APNs, FCM
push_type = FCM
# development 또는 production
environment = development
bundle_id = com.mycompany.test
api_key = XXXX.....XXXX

 

2024.01.11 - [Note] - BLE, Beacon, iBeacon

2024.01.11 - [Note] - BLE Advertising Payload format 샘플 분석

2024.01.09 - [Note] - Packet Format for the LE Uncoded PHYs

2024.01.04 - [iOS] - Floating, Dragging Button

2023.12.27 - [생각의 조각들] - 말이 통하지 않는 사람과 싸우지 말라.

2023.12.27 - [Server] - Push Notification

2023.12.27 - [AI,ML,ALGORITHM] - A star

2023.12.07 - [iOS] - XOR 연산을 이용한 문자열 비교

2023.11.03 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

2023.10.29 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

 

반응형

'개발 > Note' 카테고리의 다른 글

BLE Advertising Payload format 샘플 분석  (1) 2024.01.11
Packet Format for the LE Uncoded PHYs  (0) 2024.01.09
[Mac OS X] Apache Virtual Hosts 설정 설정  (0) 2023.02.21
Unwind Segue 사용방법  (0) 2023.01.11
랜덤 seed 초기화  (0) 2022.11.18
블로그 이미지

SKY STORY

,

A star

개발/AI,ML,ALGORITHM 2023. 12. 27. 09:27
반응형

 

Q-Learning 알고리즘으로 구현하였던 길찾기를 A star 알고리즘으로 구현해 보았다.

그리드 구성은 이전과 동일하다.

#
# A* 
#

import numpy as np
import matplotlib.pyplot as plt
import heapq

# 그리드 월드 환경 설정
# 0: 빈공간, 1: 벽, 2: 목적지, (0,0): 시작위치
grid = np.array([[0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
                 [0, 0, 0, 1, 0, 0, 1, 0, 0, 0],
                 [0, 0, 0, 0, 1, 0, 0, 1, 0, 0],
                 [0, 1, 1, 0, 1, 0, 0, 0, 1, 0],
                 [0, 0, 0, 0, 0, 0, 0, 0, 1, 2]])

# 그리드 월드 환경 출력 함수
def plot_grid_world(start, goal, path=[]):
    plt.imshow(grid, cmap="gray", interpolation="none", origin="upper")
    plt.xticks([])
    plt.yticks([])
    
    if path:
        path = np.array(path)
        plt.plot(path[:, 1], path[:, 0], marker='o', markersize=4, linestyle='-', color='green')
        
        for i, pos in enumerate(path):
            plt.text(pos[1], pos[0], str(i + 1), ha="center", va="center", color="orange", fontsize=14)
    
    plt.text(start[1], start[0], "A", ha="center", va="center", color="red", fontsize=16)
    plt.text(goal[1], goal[0], "B", ha="center", va="center", color="green", fontsize=16)

# 휴리스틱 계산 함수
# A* 알고리즘을 이용하여 최적 경로 찾기
def heuristic(state, goal):
    return abs(state[0] - goal[0]) + abs(state[1] - goal[1])

# A* 알고리즘
def astar(grid, start, goal):
    frontier = [(0, start)]
    came_from = {}
    visited = set()
    
    # 'frontier'를 최소 힙(min heap)으로 변환
    #heapq.heapify(frontier)
    
    while frontier:
        current_cost, current_state = heapq.heappop(frontier)
        
        if current_state == goal:
            # 시작 지점부터 목표 지점까지의 경로를 재구성
            path = []
            while current_state in came_from:
                path.append(current_state)
                current_state = came_from[current_state]
            path.append(start)
            return path[::-1] # 리스트 역순으로 변환 
        
        # 현재 상태가 방문한 곳이라면 스킵.
        if current_state in visited:
            continue
        
        # 현재 상태 방문 리스트에 추가 
        visited.add(current_state)
        
        for next_state in get_neighbors(grid, current_state):
            if next_state not in visited:
                priority = current_cost + 1 + heuristic(next_state, goal)   # 우선순위 계산 
                heapq.heappush(frontier, (priority, next_state)) # 최소 우선순위를 가진 상태가 먼저 나오도록 정렬 
                came_from[next_state] = current_state # 현재 이동상태 기록 
    
    return None

# 현재 위치에서 이동 가능한 이웃 반환
def get_neighbors(grid, state):
    neighbors = []
    directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] 
    for dx, dy in directions:
        new_position = (state[0] + dx, state[1] + dy)
        if 0 <= new_position[0] < grid.shape[0] and 0 <= new_position[1] < grid.shape[1] and grid[new_position] != 1:
            neighbors.append(new_position)
    return neighbors

# 시작 위치 및 타겟 위치 설정
start_position = (0, 0)
target_position = tuple(np.argwhere(grid == 2)[0]) # grid값이 2인 곳이 목적지

# A* 알고리즘 실행
optimal_path = astar(grid, start_position, target_position)

# 결과 출력
print("최적 경로:", optimal_path)


# 최적 경로 및 순서 출력
plt.ion() # nteractive mode
plt.figure(3)
plt.clf()   # clear
plt.title(f"Optimal Path - Distance : {len(optimal_path)}")
plot_grid_world(start_position, target_position, optimal_path)
plt.show(block=True)

 

결과는 아래 그림과 같다.

 

 

2023.12.07 - [iOS] - XOR 연산을 이용한 문자열 비교

2023.11.03 - [AI,ML] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

2023.10.29 - [AI,ML] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

2023.10.29 - [AI,ML] - Gomoku(Five in a Row, Omok) (4/5) - 훈련 데이터 생성 및 학습

2023.10.28 - [AI,ML] - Gomoku(Five in a Row, Omok) (3/5) - 속도 최적화 2차 (RANDOM모드 추가)

2023.10.27 - [AI,ML] - Gomoku(Five in a Row, Omok) (2/5) - 속도 최적화 1차 (minimax 속도 개선)

2023.10.27 - [AI,ML] - Gomoku(Five in a Row, Omok) (1/5) - 기본 구현 (minimax, alpha-beta pruning)

2023.08.28 - [AI,ML] - Q-learning

 

반응형
블로그 이미지

SKY STORY

,
반응형

일반적으로 문자열 비교는 '==' 연산자를 사용하거나 문자열 비교함수 'compare(_ :options:)'매서드를 사용한다.

그러나 위와 같이 결과 값이 True, False로 반환된 결과 값으로 처리할 경우 해당 결과 값을 변조하여 해당 조건문을 통과할 수 있다. 이러한 문제를 우회하기 위하여 아래와 같이 XOR 연산을 이용한 문자열 비교처리가 필요할 수 있다.

let str1 = "ABC123"
let str2 = "ABC123"

guard let utf8Bytes1 = str1.data(using: .utf8),
      let utf8Bytes2 = str2.data(using: .utf8) else {
    return
}

// 각각의 바이트 별로 xor한 결과값을 어레이로 반환
let xorResult = zip(utf8Bytes1, utf8Bytes2).map { $0.0 ^ $0.1 } // xor 비교

// xorResult의 모든 값이 0인지 체크 - 모두 0이어야 동일한 문자열 이다.
if (xorResult.allSatisfy { $0 == 0 }) {
    // 좀더 확실하게 막기 원한다면 다시한번 체크.
	for i in 0..<utf8Bytes1.count {
        if utf8Bytes1[i] ^ utf8Bytes2[i] != 0 {
            return
        }
    }
    
    // TODO : 작업은 여기에....
    
}

 

반응형

'개발 > iOS' 카테고리의 다른 글

UITextField 숫자 입력 시 콤마 추가 및 사용  (0) 2024.05.14
Floating, Dragging Button  (1) 2024.01.04
Rosetta2  (0) 2023.04.26
NFC tag read/write Manager Class (2/2)  (0) 2023.04.26
NFC tag read/write Manager Class (1/2)  (0) 2023.04.26
블로그 이미지

SKY STORY

,
반응형

3x3 체크를 위해 아래와 같이 함수를 만들고 플레이어와 AI 둘다 막아보았다.
문제되는 부분이 있다면 댓글로 알려주길 바란다.

# 3x3 체크 
def check3x3(self, row, col):
    def check_direction_with_blank(dx, dy):
        scount = 1 # 같은 색상 갯수 
        bcount = 1 # 빈공간 포함 갯수
        r, c = row + dx, col + dy
        while 0 <= r < len(self.board) and 0 <= c < len(self.board[0]) and (self.board[r][c] == self.board[row][col] or self.board[r][c] == 0):
            bcount += 1
            if self.board[r][c] == self.board[row][col]:
                scount += 1
            r += dx
            c += dy
        return scount, bcount

    self.board[row][col] = self.turn_player
    
    count3x3 = 0
    directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
    for dx, dy in directions:
        sc1, bc1 = check_direction_with_blank( dx,  dy)
        sc2, bc2 = check_direction_with_blank(-dx, -dy)
        scount = sc1 + sc2 - 1
        bcount = bc1 + bc2 - 1
        if scound == 3 and bcound >= 5:
            count3x3 += 1
    self.board[row][col] = 0
    if check3x3 >= 2:
        return True
    return False
    
    
def find_empty_cells(self):
    empty_cells = []
    for row in range(BOARD_SIZE):
        for col in range(BOARD_SIZE):
            if self.board[row][col] == 0:
                if self.check3x3(row, col) == False: # 3x3 체크
                    empty_cells.append((row, col))
    random.shuffle(empty_cells)
    return empty_cells
    
def handle_events(self):
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            self.game_over = True
        elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
            x, y = pygame.mouse.get_pos()
            row = int(round((y - MARGIN) / CELL_SIZE))
            col = int(round((x - MARGIN) / CELL_SIZE))
            if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                if self.check3x3(row, col) == True: # 3x3 체크
                    return

                self.make_move(row, col, self.turn_player)
                
    			(이하 생략)

 
 
 
 
 
 
 
2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (1/5) - 기본 구현 (minimax, alpha-beta pruning)
2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (2/5) - 속도 최적화 1차 (minimax 속도 개선)
2023.10.28 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (3/5) - 속도 최적화 2차 (RANDOM모드 추가)
2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (4/5) - 훈련 데이터 생성 및 학습
2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현
2023.11.03 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가
 
 
 

반응형
블로그 이미지

SKY STORY

,
반응형

이제 머신러닝으로 게임을 구현해 보자.

학습 데이터가 5만 이상 되어야 하는데 2만개 정도만 학습 시켰다.

최적화 이동 처리 함수와 믹스해서 구현해 보았다.

3x3일 경우 막지 않았다.

나중에 그 부분은 업데이트 하도록 하겠다.

#
# FIAR
# Created by netcanis on 2023/10/29.
#
# minimax
# alpha beta pruning
# 속도 개선 
# 훈련데이터 생성하여 머신러닝.

import pygame
import random
import numpy as np
import sys
import os
import tensorflow as tf
from keras.models import load_model


BOARD_SIZE = 9  # (7,9,13,15,19)
CELL_SIZE = 40
MARGIN = 20
CLICK_RADIUS = CELL_SIZE // 2

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

NUM_ITEMS = (BOARD_SIZE * BOARD_SIZE)
PLAYER = 1
AI = -1

DIFFICULTY_LEVEL_AI = 4

PLAYER_MODE = "MANUAL"
AI_MODE = "ML"

H5_FILE_NAME = "fiar_model.h5"


class FIAR:
    def __init__(self):
        self.init_game()
    
    def init_game(self):
        self.episode = 0
        
        pygame.init()
        pygame.display.set_caption("FIAR")
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)
        
    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = PLAYER#random.choice([PLAYER, AI])

        self.draw_board(self.screen)
        pygame.display.flip()
        
        self.model = load_model(H5_FILE_NAME)
        
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        random.shuffle(empty_cells)
        return empty_cells
    
    def find_adjacent_empty_cells(self):
        empty_cells = []
        empty_cells2 = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        # 수를 놓았을 경우 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 배열에 추가.
                        self.board[row][col] = self.turn_player
                        c1, c2, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = -self.turn_player
                        c3, c4, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = 0
                        if (c1 >= 3 and c2 >= 5) or (c3 >= 3 and c4 >= 5):
                            empty_cells2.append((row, col))

                        empty_cells.append((row, col))
                        break
                    
        if len(empty_cells2) > 0:
            return empty_cells2
        
        return empty_cells
    
    def evaluate_position(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count
        
        def check_direction_with_blank(dx, dy):
            count = 1 # 빈자리 포함 총자리  수   
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and (board[r][c] == board[row][col] or board[r][c] == 0):
                count += 1
                r += dx
                c += dy
            return count
        
        total_count1 = 0
        total_count2 = 0
        total_score = 0
        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction( dx,  dy)
            count2 = check_direction(-dx, -dy)
            dir_count1 = count1 + count2 - 1
            count3 = check_direction_with_blank( dx,  dy)
            count4 = check_direction_with_blank(-dx, -dy)
            dir_count2 = count3 + count4 - 1
            total_count1 = max(total_count1, dir_count1)
            total_count2 = max(total_count2, dir_count2)
            # 각 방향별로 연결된 돌의 수가 3자리 이상이고 5개이상 놓을 수 있는 자리라면 점수를 추가한다.
            # 즉, 연결되는 돌이 교차하는 위치라면 점수가 높게 나온다.
            if dir_count1 == 4 and dir_count2 >= 5:
                total_score += dir_count1 * 4
            elif dir_count1 >= 3 and dir_count2 >= 5:
                total_score += dir_count1 * 2
            elif dir_count1 >= 2 and dir_count2 >= 5:
                total_score += dir_count1
                
        return total_count1, total_count2, total_score
    
    def check_winner(self, board, row, col):
        if board[row][col] == 0:
            return False
        
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        
        best_move = None
        best_score = 0
        
        best_move = self.make_optimal_move(player)
        if best_move == None:
            if player == AI:
                empty_cells = self.find_adjacent_empty_cells()
                for row, col in empty_cells: 
                    self.board[row][col] = player
                    s1 = self.evaluate_position(self.board, row, col)[2]
                    if s1 > best_score:
                        best_score = s1
                        best_move = (row, col)
                    self.board[row][col] = -player
                    s2 = self.evaluate_position(self.board, row, col)[2]
                    if s2 > best_score:
                        best_score = s2
                        best_move = (row, col)
                    self.board[row][col] = 0
                
                if best_score == 0:
                    best_move = random.choice(self.find_empty_cells())
            else:
                best_move = random.choice(self.find_empty_cells())
        
        self.make_move(best_move[0], best_move[1], player)
        
        return best_move

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            
            #print(f"[{self.sequence}] <{player}> {row},{col}")
            
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
            else:
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
        else:
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)
        
        if depth >= DIFFICULTY_LEVEL_AI:
            return 0
        
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
                    break
            return best_score
        else:
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
                    break
            return best_score
    
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
            
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        
        # 3. The position that becomes 4 when placed
        for row, col in empty_cells: 
            self.board[row][col] = player
            c1, c2, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = -player
            c3, c4, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = 0
            if (c1 == 4 and c2 >= 5) or (c3 == 4 and c4 >= 5):
                return (row, col)
            
        # 4. The position that becomes 3 when placed
        for row, col in empty_cells: 
            self.board[row][col] = player
            c1, c2, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = -player
            c3, c4, _ = self.evaluate_position(self.board, row, col)
            self.board[row][col] = 0
            if (c1 == 3 and c2 >= 5) or (c3 == 3 and c4 >= 5):
                return (row, col)
            
        return None
        
    def find_best_move(self, player):
        if AI_MODE == "ML":
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                print(f"optimal move! ({optimal_move[0]}, {optimal_move[1]})")
                return optimal_move
            
            move_index = self.predicts(self.board)
            row = move_index // BOARD_SIZE
            col = move_index % BOARD_SIZE
            best_move = (row, col)
            print(f"ML move! ({row}, {col})")
        elif AI_MODE == "RANDOM":
            best_move = self.random_move(player)
        else: # MINIMAX
            print(f"[{self.sequence+1}] <{player}> ...")
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                return optimal_move
        
            alpha = -float('inf')
            beta = float('inf')

            best_move = None
            best_score = -float('inf') if player == AI else float('inf')

            empty_cells = self.find_adjacent_empty_cells()
            for index, (row, col) in enumerate(empty_cells):
                self.board[row][col] = player
                is_maximizing = False if player == AI else True
                score = self.minimax(0, is_maximizing, alpha, beta, row, col)
                self.board[row][col] = 0
                
                if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                    best_score = score
                    best_move = (row, col)
                
                percentage = (index / len(empty_cells)) * 100
                print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            
            print(f"    {best_move[0]},{best_move[1]} ({best_score})")
            
        return best_move
    
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            self.reset_game()
                            popup = False
                            return
                        elif event.key == pygame.K_n:
                            pygame.quit() 
                            sys.exit()
                    else:
                        popup = False
                        message = ""
                        self.draw_board(self.screen)
                        break
            
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
            
            pygame.display.flip()
    
    def draw_board(self, screen):
        screen.fill(GRAY)
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
                            1)
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),
                                    1)

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    self.draw_board(self.screen)
                    pygame.display.flip()
                    
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        if best_move is not None:
                            row, col = best_move
                            self.make_move(row, col, self.turn_player)
                        
                        self.draw_board(self.screen)
                        pygame.display.flip()
                            
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                        else:
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

    def init_ML(self):
        hidden_layer_count = 3 * NUM_ITEMS
        self.model = tf.keras.Sequential([
            tf.keras.layers.Dense(hidden_layer_count, activation='relu', input_shape=(NUM_ITEMS,)),
            tf.keras.layers.Dense(NUM_ITEMS, activation='softmax')
        ])
        self.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    def predicts(self, input_data):
        if isinstance(input_data, list):
            input_data = np.array(input_data)
        
        prediction = self.model.predict(input_data.reshape(1, -1))
        sorted_indices = np.argsort(prediction, axis=-1)[:, ::-1]
        index = 0
        for i in sorted_indices[0]:
            if input_data.shape == (NUM_ITEMS,):
                if input_data[i] == 0:
                    index = i
                    break
            elif input_data.shape == (BOARD_SIZE, BOARD_SIZE):
                row = i // BOARD_SIZE
                col = i % BOARD_SIZE
                if input_data[row][col] == 0:
                    index = i
                    break
        return index


    
if __name__ == "__main__":
    game = FIAR()
    game.init_ML()
    game.reset_game()
    
    while True:
        while not game.game_over:
            game.handle_events()
            
        game.show_message("Play Again? (y/n)", is_exit=True)

 

 

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (1/5) - 기본 구현 (minimax, alpha-beta pruning)

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (2/5) - 속도 최적화 1차 (minimax 속도 개선)

2023.10.28 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (3/5) - 속도 최적화 2차 (RANDOM모드 추가)

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (4/5) - 훈련 데이터 생성 및 학습

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

2023.11.03 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

 

 

반응형
블로그 이미지

SKY STORY

,
반응형

이제 머신러닝을 위한 대량의 훈련 데이터를 만들어보자.

플레이어와 AI는 모두 랜덤 모드로 수를 연산하도록 설정했다. 수를 놓을 때 3자리 이하인 부분에서는,

AI는 점수를 계산하여 높은 점수의 자리에 돌을 놓도록 했다. 플레이어는 3자리 이하인 부분에서 인접한 빈자리 중 랜덤한 위치에 돌을 놓도록 난이도에 약간의 차이를 두었다. 즉, 대부분 비기거나 AI가 좀더 잘 두도록 설정하였다.

#
# FIAR
# Created by netcanis on 2023/10/29.
#
# minimax
# alpha beta pruning
# 속도 개선 
# 훈련데이터 생성하여 머신러닝.

import pygame
import random
import numpy as np
import sys
import os
import tensorflow as tf
from keras.models import Sequential, load_model
import pandas as pd


BOARD_SIZE = 9  # (7,9,13,15,19)
CELL_SIZE = 40
MARGIN = 20
CLICK_RADIUS = CELL_SIZE // 2

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

NUM_ITEMS = (BOARD_SIZE * BOARD_SIZE)
PLAYER = 1
AI = -1

DIFFICULTY_LEVEL_AI = 4
DIFFICULTY_LEVEL_PLAYER = 2

AUTO_PLAY = True

if AUTO_PLAY == True:
    PLAYER_MODE = "RANDOM"#"MINIMAX"
    AI_MODE = "RANDOM"#"MINIMAX"
else:
    PLAYER_MODE = "MANUAL"
    AI_MODE = "RANDOM"#"ML"#"MINIMAX"
    
NUM_EPISODES = 20000
CSV_FILE_NAME = "fiar_training_data.csv"
H5_FILE_NAME = "fiar_model.h5"


class FIAR:
    def __init__(self):
        self.init_game()
    
    def init_game(self):
        self.episode = 0
        
        pygame.init()
        pygame.display.set_caption("FIAR")
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)
        
    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = random.choice([PLAYER, AI]) if AUTO_PLAY == True else PLAYER
        
        self.draw_board(self.screen)
        pygame.display.flip()
        
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        random.shuffle(empty_cells)
        return empty_cells
    
    def find_adjacent_empty_cells(self):
        empty_cells = []
        empty_cells2 = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        # 수를 놓았을 경우 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 배열에 추가.
                        self.board[row][col] = self.turn_player
                        c1, c2, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = -self.turn_player
                        c3, c4, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = 0
                        if (c1 >= 3 and c2 >= 5) or (c3 >= 3 and c4 >= 5):
                            empty_cells2.append((row, col))

                        empty_cells.append((row, col))
                        break
                    
        if len(empty_cells2) > 0:
            return empty_cells2
        
        return empty_cells
    
    def evaluate_position(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count
        
        def check_direction_with_blank(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and (board[r][c] == board[row][col] or board[r][c] == 0):
                count += 1
                r += dx
                c += dy
            return count
        
        total_count1 = 0
        total_count2 = 0
        total_score = 0
        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction( dx,  dy)
            count2 = check_direction(-dx, -dy)
            dir_count1 = count1 + count2 - 1
            count3 = check_direction_with_blank( dx,  dy)
            count4 = check_direction_with_blank(-dx, -dy)
            dir_count2 = count3 + count4 - 1
            total_count1 = max(total_count1, dir_count1)
            total_count2 = max(total_count2, dir_count2)
            # 각 방향별로 연결된 돌의 수가 3자리 이상이고 5개이상 놓을 수 있는 자리라면 점수를 추가한다.
            # 즉, 연결되는 돌이 교차하는 위치라면 점수가 높게 나온다.
            if dir_count1 == 4 and dir_count2 >= 5:
                total_score += dir_count1 * 4
            elif dir_count1 >= 3 and dir_count2 >= 5:
                total_score += dir_count1 * 2
            elif dir_count1 >= 2 and dir_count2 >= 5:
                total_score += dir_count1
                
        return total_count1, total_count2, total_score
    
    def check_winner(self, board, row, col):
        if board[row][col] == 0:
            return False
        
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        
        best_move = None
        best_score = 0
        
        best_move = self.make_optimal_move(player)
        if best_move == None:
            if player == AI:
                empty_cells = self.find_adjacent_empty_cells()
                for row, col in empty_cells: 
                    self.board[row][col] = player
                    s1 = self.evaluate_position(self.board, row, col)[2]
                    if s1 > best_score:
                        best_score = s1
                        best_move = (row, col)
                    self.board[row][col] = -player
                    s2 = self.evaluate_position(self.board, row, col)[2]
                    if s2 > best_score:
                        best_score = s2
                        best_move = (row, col)
                    self.board[row][col] = 0
                
                if best_score == 0:
                    best_move = random.choice(self.find_empty_cells())
            else:
                best_move = random.choice(self.find_empty_cells())
        
        self.make_move(best_move[0], best_move[1], player)
        
        return best_move

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            
            #print(f"[{self.sequence}] <{player}> {row},{col}")
            
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
            else:
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
        else:
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)
        
        if (self.turn_player == AI and depth >= DIFFICULTY_LEVEL_AI) or \
            (self.turn_player == PLAYER and depth >= DIFFICULTY_LEVEL_PLAYER):
            return 0
        
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
                    break
            return best_score
        else:
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
                    break
            return best_score
    
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
            
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        
        if (player == PLAYER and PLAYER_MODE == "RANDOM") or (player == AI and AI_MODE == "RANDOM"):
            # 3. The position that becomes 4 when placed
            for row, col in empty_cells: 
                self.board[row][col] = player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = -player
                c3, c4, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if (c1 == 4 and c2 >= 5) or (c3 == 4 and c4 >= 5):
                    return (row, col)
                
            # 4. The position that becomes 3 when placed
            for row, col in empty_cells: 
                self.board[row][col] = player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = -player
                c3, c4, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if (c1 == 3 and c2 >= 5) or (c3 == 3 and c4 >= 5):
                    return (row, col)
            
        return None
        
    def find_best_move(self, player):
        if AI_MODE == "RANDOM":
            best_move = self.random_move(player)
        else: # MINIMAX
            print(f"[{self.sequence+1}] <{player}> ...")
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                return optimal_move
        
            alpha = -float('inf')
            beta = float('inf')

            best_move = None
            best_score = -float('inf') if player == AI else float('inf')

            empty_cells = self.find_adjacent_empty_cells()
            for index, (row, col) in enumerate(empty_cells):
                self.board[row][col] = player
                is_maximizing = False if player == AI else True
                score = self.minimax(0, is_maximizing, alpha, beta, row, col)
                self.board[row][col] = 0
                
                if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                    best_score = score
                    best_move = (row, col)
                
                percentage = (index / len(empty_cells)) * 100
                print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            
            print(f"    {best_move[0]},{best_move[1]} ({best_score})")
            
        return best_move
    
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            self.reset_game()
                            popup = False
                            return
                        elif event.key == pygame.K_n:
                            pygame.quit() 
                            sys.exit()
                    else:
                        popup = False
                        message = ""
                        self.draw_board(self.screen)
                        break
            
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
            
            pygame.display.flip()
    
    def draw_board(self, screen):
        screen.fill(GRAY)
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
                            1)
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),
                                    1)

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    self.draw_board(self.screen)
                    pygame.display.flip()
                    
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        if best_move is not None:
                            row, col = best_move
                            self.make_move(row, col, self.turn_player)
                        
                        self.draw_board(self.screen)
                        pygame.display.flip()
                            
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                        else:
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

    def init_ML(self):
        hidden_layer_count = 3 * NUM_ITEMS
        self.model = tf.keras.Sequential([
            tf.keras.layers.Dense(hidden_layer_count, activation='relu', input_shape=(NUM_ITEMS,)),
            tf.keras.layers.Dense(NUM_ITEMS, activation='softmax')
        ])
        self.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
        
    def learning(self):
        if os.path.exists(CSV_FILE_NAME) == False:
            x_data, y_data = self.generate_training_data(NUM_EPISODES)

            self.save_data_to_csv(x_data, y_data, CSV_FILE_NAME)
            print(f"{CSV_FILE_NAME} 저장 완료")
        else:
            x_data, y_data = self.load_data_from_csv(CSV_FILE_NAME)
        
        self.model.fit(x_data, y_data, epochs=100, verbose=1)
        self.model.save(H5_FILE_NAME)
        
        test_results = self.model.evaluate(x_data, y_data)
        print(f"손실(Loss): {test_results[0]}")
        print(f"정확도(Accuracy): {test_results[1]}")
 
    def generate_training_data(self, num_games):
        x_data = []
        y_data = []
        
        while self.episode < num_games:
            self.reset_game()
            
            x = []
            y = []
            while True:
                row, col = self.get_next_move(self.turn_player)
                x.append(np.array(self.board).flatten())
                y.append(np.eye(NUM_ITEMS)[row * BOARD_SIZE + col])
                
                self.draw_board(self.screen)
                pygame.display.flip()
                
                if self.check_winner(self.board, row, col):
                    #print(f"{self.board}")
                    break
                if self.is_board_full(self.board):
                    #print(f"{self.board}")
                    break
                
            if self.turn_player != PLAYER:
                del x[-1]
                del y[0]
                x_data.extend(x)
                y_data.extend(y)
                self.episode += 1
                print(f"{self.episode}: {self.turn_player} win.")

        return np.array(x_data), np.array(y_data)

    def get_next_move(self, player):
        if (player == AI and AI_MODE == "MINIMAX") or (player == PLAYER and PLAYER_MODE == "MINIMAX"):
            return self.minimax_move(player)
        else:
            return self.random_move(player)
        
    def save_data_to_csv(self, x_data, y_data, file_name):
        x_data_flat = [x.flatten() for x in x_data]
        y_data_flat = [y.flatten() for y in y_data]
        data = {'x_data': x_data_flat, 'y_data': y_data_flat}
        df = pd.DataFrame(data)
        df.to_csv(file_name, index=False)

    def load_data_from_csv(self, file_name):
        df = pd.read_csv(file_name)
        x_data_flat = df['x_data'].apply(lambda x: np.fromstring(x[1:-1], sep=' '))
        y_data_flat = df['y_data'].apply(lambda x: np.fromstring(x[1:-1], sep=' '))
        x_data = np.array(x_data_flat.to_list())
        y_data = np.array(y_data_flat.to_list())
        return x_data, y_data
    
    
if __name__ == "__main__":
    game = FIAR()
    game.init_ML()
    game.learning()

 

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (1/5) - 기본 구현 (minimax, alpha-beta pruning)

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (2/5) - 속도 최적화 1차 (minimax 속도 개선)

2023.10.28 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (3/5) - 속도 최적화 2차 (RANDOM모드 추가)

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (4/5) - 훈련 데이터 생성 및 학습

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

2023.11.03 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

 

반응형
블로그 이미지

SKY STORY

,
반응형

minimax를 이용한 방식으로는 대량의 훈련데이터를 생성하기에 너무 느리므로
각 빈자리에 대한 점수를 계산하여 높은 점수 자리에 돌을 놓는 랜덤모드 추가해 보았다.
다소 기계적인 소심한 수를 두지만 속도만큼은 확실히 빠르다!
 
<빈 자리 반환 함수 개선사항>
- 수를 놓았을 때 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 반환.
- 수를 놓았을 때 연결된 수가 4개가 되는 경우 즉시 수를 놓도록 처리.
 
<랜덤 모드의 점수 계산 방법>
아래의 계산을 각 방향(좌->우, 상->하, 좌상->우하, 우상->좌하)으로 4번 누적된 값을 반환.
- 연결된 돌의 수가 4개이고 5개 이상 돌을 놓을 수 있는 자리: 16점
- 연결된 돌의 수가 3개이고 5개 이상 돌을 놓을 수 있는 자리:  6점
- 연결된 돌의 수가 2개이고 5개 이상 돌을 놓을 수 있는 자리:  2점
 
랜덤 모드는 수가 놓여진 주변 셀에 대해 위의 랜덤모드 점수를 구해 가장 높은 점수 위치에 수를 놓도록 하였다.

#
# FIAR
# Created by netcanis on 2023/10/28.
#
# minimax
# alpha beta pruning
# 속도 개선 2차 

import pygame
import random
import numpy as np
import sys


BOARD_SIZE = 9  # (7,9,13,15,19)
CELL_SIZE = 40
MARGIN = 20
CLICK_RADIUS = CELL_SIZE // 2

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

NUM_ITEMS = (BOARD_SIZE * BOARD_SIZE)
PLAYER = 1
AI = -1

AI_MODE = "RANDOM"#"MINIMAX"


DIFFICULTY_LEVEL = 3


class FIAR:
    def __init__(self):
        self.init_game()
    
    def init_game(self):
        pygame.init()
        pygame.display.set_caption("FIAR")
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)

    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = PLAYER
        
        self.draw_board(self.screen)
        pygame.display.flip()
         
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        random.shuffle(empty_cells)
        return empty_cells
    
    def find_adjacent_empty_cells(self):
        empty_cells = []
        empty_cells2 = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        # 수를 놓았을 경우 연결된 수가 3개 이상이고 5개를 놓을 수 있는 자리일 경우만 배열에 추가.
                        self.board[row][col] = self.turn_player
                        c1, c2, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = -self.turn_player
                        c3, c4, _ = self.evaluate_position(self.board, row, col)
                        self.board[row][col] = 0
                        if (c1 >= 3 and c2 >= 5) or (c3 >= 3 and c4 >= 5):
                            empty_cells2.append((row, col))

                        empty_cells.append((row, col))
                        break
                    
        if len(empty_cells2) > 0:
            return empty_cells2
        
        return empty_cells
    
    def evaluate_position(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count
        
        def check_direction_with_blank(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and (board[r][c] == board[row][col] or board[r][c] == 0):
                count += 1
                r += dx
                c += dy
            return count
        
        total_count1 = 0
        total_count2 = 0
        total_score = 0
        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction( dx,  dy)
            count2 = check_direction(-dx, -dy)
            dir_count1 = count1 + count2 - 1
            count3 = check_direction_with_blank( dx,  dy)
            count4 = check_direction_with_blank(-dx, -dy)
            dir_count2 = count3 + count4 - 1
            total_count1 = max(total_count1, dir_count1)
            total_count2 = max(total_count2, dir_count2)
            # 각 방향별로 연결된 돌의 수가 3개 이상이고 5개 이상 놓을 수 있는 자리라면 점수를 추가한다.
            # 즉, 연결되는 돌이 교차하는 위치라면 점수가 높게 나온다.
            if dir_count1 == 4 and dir_count2 >= 5:
                total_score += dir_count1 * 4
            elif dir_count1 >= 3 and dir_count2 >= 5:
                total_score += dir_count1 * 2
            elif dir_count1 >= 2 and dir_count2 >= 5:
                total_score += dir_count1
                
        return total_count1, total_count2, total_score
    
    def check_winner(self, board, row, col):
        if board[row][col] == 0:
            return False
        
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        
        optimal_move = self.make_optimal_move(player)
        if optimal_move != None:
            return optimal_move
        
        
        empty_cells = self.find_adjacent_empty_cells()
        
        best_move = (0,0)
        best_score = 0
        
        for row, col in empty_cells: 
            self.board[row][col] = player
            s1 = self.evaluate_position(self.board, row, col)[2]
            if s1 > best_score:
                best_score = s1
                best_move = (row, col)
            self.board[row][col] = -player
            s2 = self.evaluate_position(self.board, row, col)[2]
            if s2 > best_score:
                best_score = s2
                best_move = (row, col)
            self.board[row][col] = 0
        
        if best_score == 0:
            best_move = random.choice(self.find_empty_cells())
            
        return best_move

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            
            print(f"[{self.sequence}] <{player}> {row},{col}")
            
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
            else:
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
        else:
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)

        if depth >= DIFFICULTY_LEVEL:
            return 0
        
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
                    break
            return best_score
        else:
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
                    break
            return best_score
    
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
            
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        
        if AI_MODE == "RANDOM":
            # 3. ai turn: 4
            for row, col in empty_cells: 
                self.board[row][col] = player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if c1 == 4 and c2 >= 5:
                    return (row, col)
                
            # 4. player turn: 4
            for row, col in empty_cells: 
                self.board[row][col] = -player
                c1, c2, _ = self.evaluate_position(self.board, row, col)
                self.board[row][col] = 0
                if c1 == 4 and c2 >= 5:
                    return (row, col)
            
        return None
        
    def find_best_move(self, player):
        if AI_MODE == "RANDOM":
            best_move = self.random_move(player)
        else: # MINIMAX
            print(f"[{self.sequence+1}] <{player}> ...")
            optimal_move = self.make_optimal_move(player)
            if optimal_move != None:
                return optimal_move
        
            alpha = -float('inf')
            beta = float('inf')

            best_move = None
            best_score = -float('inf') if player == AI else float('inf')

            empty_cells = self.find_adjacent_empty_cells()
            for index, (row, col) in enumerate(empty_cells):
                self.board[row][col] = player
                is_maximizing = False if player == AI else True
                score = self.minimax(0, is_maximizing, alpha, beta, row, col)
                self.board[row][col] = 0
                
                if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                    best_score = score
                    best_move = (row, col)
                
                percentage = (index / len(empty_cells)) * 100
                print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            
            print(f"    {best_move[0]},{best_move[1]} ({best_score})")
            
        return best_move
    
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            self.reset_game()
                            popup = False
                            return
                        elif event.key == pygame.K_n:
                            pygame.quit() 
                            sys.exit()
                    else:
                        popup = False
                        message = ""
                        self.draw_board(self.screen)
                        break
            
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
            
            pygame.display.flip()
    
    def draw_board(self, screen):
        screen.fill(GRAY)
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
                            1)
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),
                                    1)

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    self.draw_board(self.screen)
                    pygame.display.flip()
                    
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        self.make_move(best_move[0], best_move[1], self.turn_player)
                        
                        self.draw_board(self.screen)
                        pygame.display.flip()
                            
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                        else:
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")


if __name__ == "__main__":
    game = FIAR()
    game.reset_game()
    
    while True:
        while not game.game_over:
            game.handle_events()
        
        game.show_message("Play Again? (y/n)", is_exit=True)

 
속도는 비교할 수 없을 정도로 빨라졌다.
기계적으로 막기에 급급해 보이는 소심한 수를 놓는건 단점이다.
하지만 대량으로 학습 데이터를 만들기에는 충분한 듯하다.

 
 

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (1/5) - 기본 구현 (minimax, alpha-beta pruning)

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (2/5) - 속도 최적화 1차 (minimax 속도 개선)

2023.10.28 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (3/5) - 속도 최적화 2차 (RANDOM모드 추가)

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (4/5) - 훈련 데이터 생성 및 학습

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

2023.11.03 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

 

반응형
블로그 이미지

SKY STORY

,
반응형

minimax 속도를 개선하기 위해 다음과 같은 변경사항을 적용하였다.

1. 빈 자리를 검색할 때, 이미 돌이 놓여진 위치 주변에 있는 빈 자리만 반환.

2. AI가 돌을 놓았을 때, 승리 조건이 만족되는 위치라면 즉시 돌을 놓는다.

3. 플레이어가 돌을 놓았을 때, 승리 조건이 만족되는 위치라면 AI가 그 위치에 돌을 즉시 놓는다.

 

1번 빈 자리 검색 시 아래 그림과 같이 돌이 놓여진 주변 빈자리만 검색하여 반환되도록 한다.

반환된 빈 자리는 불필요한 트리 탐색을 줄여 속도가 개선된다.

빈 자리 검색시 반환되는 위치

 

#
# FIAR
# Created by netcanis on 2023/10/07.
#
# minimax
# alpha beta pruning
# 속도 개선 

import pygame
import random
import numpy as np
import sys


BOARD_SIZE = 9  # (7,9,13,15,19)
CELL_SIZE = 40
MARGIN = 20
CLICK_RADIUS = CELL_SIZE // 2

BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
RED = (255, 0, 0)
GRAY = (100, 100, 100)

NUM_ITEMS = (BOARD_SIZE * BOARD_SIZE)
PLAYER = 1
AI = -1

DIFFICULTY_LEVEL = 3


class FIAR:
    def __init__(self):
        self.init_game()
    
    def init_game(self):
        pygame.init()
        pygame.display.set_caption("FIAR")
        w = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        h = (BOARD_SIZE-1) * CELL_SIZE + 2 * MARGIN
        self.screen = pygame.display.set_mode((w, h))

        self.font = pygame.font.Font(None, CELL_SIZE-4)

    def reset_game(self):
        self.board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequences = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=int)
        self.sequence = 0
        self.game_over = False
        self.turn_player = PLAYER
        
        self.draw_board(self.screen)
        pygame.display.flip()
         
    def find_empty_cells(self):
        empty_cells = []
        for row in range(BOARD_SIZE):
            for col in range(BOARD_SIZE):
                if self.board[row][col] == 0:
                    empty_cells.append((row, col))
        random.shuffle(empty_cells)
        return empty_cells
    
    # 수가 놓여진 자리의 주변 빈 자리만 반환
    def find_adjacent_empty_cells(self):
        empty_cells = []
        for row, col in self.find_empty_cells():
            for dr in [-1, 0, 1]:
                for dc in [-1, 0, 1]:
                    new_row, new_col = row + dr, col + dc
                    if 0 <= new_row < BOARD_SIZE and 0 <= new_col < BOARD_SIZE and self.board[new_row][new_col] != 0:
                        empty_cells.append((row, col))
                        break
        return empty_cells
    
    def check_winner(self, board, row, col):
        def check_direction(dx, dy):
            count = 1
            r, c = row + dx, col + dy
            while 0 <= r < len(board) and 0 <= c < len(board[0]) and board[r][c] == board[row][col]:
                count += 1
                r += dx
                c += dy
            return count

        directions = [(0, 1), (1, 0), (1, 1), (-1, 1)]
        for dx, dy in directions:
            count1 = check_direction(dx, dy)
            count2 = check_direction(-dx, -dy)
            total_count = count1 + count2 - 1
            if total_count == 5:
                return True

        return False
    
    def is_board_full(self, board):
        return all(cell != 0 for row in board for cell in row)

    def random_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = random.choice(self.find_adjacent_empty_cells())
        self.make_move(row, col, player)
        return row, col

    def minimax_move(self, player):
        if self.game_over:
            return -1, -1
        row, col = self.find_best_move(player)
        self.make_move(row, col, player)
        return row, col

    def make_move(self, row, col, player):
        if self.board[row][col] == 0:
            self.board[row][col] = player
            
            self.sequence += 1
            self.sequences[row][col] = self.sequence
            
            print(f"[{self.sequence}] <{player}> {row},{col}")
            
            if self.check_winner(self.board, row, col):
                self.game_over = True
            elif self.is_board_full(self.board):
                self.game_over = True
                self.turn_player = 0
            else:
                self.turn_player *= -1

    def minimax(self, depth, is_maximizing, alpha, beta, row2, col2):
        if self.is_board_full(self.board):
            return 0
        
        if is_maximizing:
            if self.check_winner(self.board, row2, col2):
                return -(NUM_ITEMS - depth + 1)
        else:
            if self.check_winner(self.board, row2, col2):
                return (NUM_ITEMS - depth + 1)

        if depth >= DIFFICULTY_LEVEL:
            return 0
        
        if is_maximizing:# AI
            best_score = -float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = AI
                score = self.minimax(depth + 1, False, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = max(best_score, score)
                alpha = max(alpha, best_score)
                if beta <= alpha:
                    break
            return best_score
        else:
            best_score = float('inf')
            for row, col in self.find_adjacent_empty_cells():
                self.board[row][col] = PLAYER
                score = self.minimax(depth + 1, True, alpha, beta, row, col)
                self.board[row][col] = 0
                best_score = min(best_score, score)
                beta = min(beta, best_score)
                if beta <= alpha:
                    break
            return best_score
    
    # 아래 우선순위에 따라 즉시 처리할 자리는 minimax 알고리즘 없이 즉시 수를 놓아 속도 개선
    # 1.만약 AI가 놓으면 게임을 이기는 자리라면 즉시 수를 놓는다. 
    # 2.만약 Player가 놓으면 게임을 이기는 자리라면 AI는 그 곳에 수는 놓는다.
    def make_optimal_move(self, player):
        # 0. The first one is random.
        if self.sequence == 0:
            row = BOARD_SIZE // 2 + random.randint(-2, 2)
            col = BOARD_SIZE // 2 + random.randint(-2, 2)
            return (row, col)
        
        empty_cells = self.find_adjacent_empty_cells()

        # 1. ai turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
            
        # 2. player turn: 5
        for row, col in empty_cells: 
            self.board[row][col] = -player
            if self.check_winner(self.board, row, col):
                self.board[row][col] = 0
                return (row, col)
            self.board[row][col] = 0
        
        return None
        
    def find_best_move(self, player):
        
        print(f"[{self.sequence+1}] <{player}> Calculating...")
        
        optimal_move = self.make_optimal_move(player)
        if optimal_move != None:
            return optimal_move
        
        alpha = -float('inf')
        beta = float('inf')

        best_move = None
        best_score = -float('inf') if player == AI else float('inf')

        empty_cells = self.find_adjacent_empty_cells()
        for index, (row, col) in enumerate(empty_cells):
            self.board[row][col] = player
            is_maximizing = False if player == AI else True
            score = self.minimax(0, is_maximizing, alpha, beta, row, col)
            self.board[row][col] = 0
            
            if (player == AI and score > best_score) or (player == PLAYER and score < best_score):
                best_score = score
                best_move = (row, col)
            
            percentage = (index / len(empty_cells)) * 100
            print(f"    [{percentage:.1f}%] <{player}> {row},{col} -> {score}")
            
        return best_move
    
    def show_message(self, message, is_exit=False):
        popup = True
        while popup:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    pygame.quit()
                    sys.exit()

                if event.type == pygame.KEYDOWN:
                    if is_exit:
                        if event.key == pygame.K_y:
                            self.reset_game()
                            popup = False
                            return
                        elif event.key == pygame.K_n:
                            pygame.quit() 
                            sys.exit()
                    else:
                        popup = False
                        break
            
            text_lines = message.split('\n')
            for i, line in enumerate(text_lines):
                text = self.font.render(line, True, (128, 0, 128))
                text_rect = text.get_rect(topleft=(20, 20 + i * 20))
                self.screen.blit(text, text_rect)
            
            pygame.display.flip()
    
    def draw_board(self, screen):
        screen.fill(GRAY)
        for row in range(BOARD_SIZE):# draw horizontal lines
            pygame.draw.line(screen, BLACK, 
                            (0 * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN), 
                            ((BOARD_SIZE-1) * CELL_SIZE + MARGIN, row * CELL_SIZE + MARGIN),
                            1)
            for col in range(BOARD_SIZE):# draw vertical lines
                if row == 0:
                    pygame.draw.line(screen, BLACK, 
                                    (col * CELL_SIZE + MARGIN, 0 * CELL_SIZE + MARGIN), 
                                    (col * CELL_SIZE + MARGIN, (BOARD_SIZE-1) * CELL_SIZE + MARGIN),
                                    1)

                x = col * CELL_SIZE + MARGIN
                y = row * CELL_SIZE + MARGIN

                if self.board[row][col] == PLAYER:
                    pygame.draw.circle(screen, BLACK, (x, y), CLICK_RADIUS)
                elif self.board[row][col] == AI:
                    pygame.draw.circle(screen, WHITE, (x, y), CLICK_RADIUS)
                
                seq_no = self.sequences[row][col] 
                if seq_no != 0:
                    if seq_no == self.sequence: 
                        pygame.draw.circle(screen, RED, (x, y), CLICK_RADIUS+1, 1)
                    
                    color = WHITE if self.board[row][col] == PLAYER else BLACK
                    text = self.font.render(f"{seq_no}", True, color)
                    text_rect = text.get_rect(center=(x, y))
                    screen.blit(text, text_rect)

    def handle_events(self):
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.game_over = True
            elif event.type == pygame.MOUSEBUTTONDOWN and not self.game_over and self.turn_player == PLAYER:
                x, y = pygame.mouse.get_pos()
                row = int(round((y - MARGIN) / CELL_SIZE))
                col = int(round((x - MARGIN) / CELL_SIZE))
                if 0 <= row < BOARD_SIZE and 0 <= col < BOARD_SIZE and self.board[row][col] == 0:
                    self.make_move(row, col, self.turn_player)
                    self.draw_board(self.screen)
                    pygame.display.flip()
                    
                    if self.turn_player == AI:
                        best_move = self.find_best_move(self.turn_player)
                        if best_move is not None:
                            row, col = best_move
                            self.make_move(row, col, self.turn_player)
                        
                        self.draw_board(self.screen)
                        pygame.display.flip()
                            
                    if self.game_over == True:
                        if self.turn_player == 0:
                            self.show_message("Game draw!")
                        else:
                            self.show_message(f"Game Over!\n{'Player' if self.turn_player == PLAYER else 'AI'} wins!")

    
if __name__ == "__main__":
    game = FIAR()
    game.reset_game()
    
    while True:
        while not game.game_over:
            game.handle_events()
        
        game.show_message("Play Again? (y/n)", is_exit=True)

 

속도는 다소 빨라지긴 했다.

3x3을 만들면 이미 진 것을 예감한 것인지 AI는 그 때부터 엉뚱한 수를 놓는다.  

마치 포기한 것처럼...

 

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (1/5) - 기본 구현 (minimax, alpha-beta pruning)

2023.10.27 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (2/5) - 속도 최적화 1차 (minimax 속도 개선)

2023.10.28 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (3/5) - 속도 최적화 2차 (RANDOM모드 추가)

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (4/5) - 훈련 데이터 생성 및 학습

2023.10.29 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

2023.11.03 - [AI,ML, Algorithm] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

 

 

 

반응형
블로그 이미지

SKY STORY

,