'개발/Server'에 해당되는 글 1건

Push Notification

개발/Server 2023. 12. 27. 16:42
반응형

python으로 간단한 모바일 푸시 서버를 만들어 보았다.

예약 발송 및 batch 처리등 처리 하였다. 

 

#
# push_server.py
# APNs, FCM, 예약 및 batch 푸시 발송
#

import json
import jwt
import time
from datetime import datetime
from hyper import HTTPConnection
from http.client import HTTPSConnection
from apscheduler.schedulers.blocking import BlockingScheduler
import mysql.connector
import configparser
from apscheduler.events import EVENT_JOB_EXECUTED

# Read configuration from config.ini
config = configparser.ConfigParser()
config.read('config.ini')

# SSL/TLS (Secure Sockets Layer / Transport Layer Security)
CA_FILE = config.get('SSL', 'ca_file', fallback=None)
CERT_FILE = config.get('SSL', 'cert_file', fallback=None)
KEY_FILE = config.get('SSL', 'key_file', fallback=None)

# Database
DB_HOST = config.get('Database', 'host')
DB_USER = config.get('Database', 'user')
DB_PASSWORD = config.get('Database', 'password')
DB_NAME = config.get('Database', 'database')
DB_PORT = config.get('Database', 'port')

# Protocol
PROTOCOL = config.get('Server', 'protocol', fallback='HTTP')

# APNs status codes
status_codes = {
    200: "Success",
    400: "Bad request",
    403: "Error with certificate or provider authentication token",
    405: "Invalid request method. Only POST requests are supported.",
    410: "Device token is no longer active for the topic.",
    413: "Notification payload was too large.",
    429: "Too many requests for the same device token.",
    500: "Internal server error",
    503: "Server is shutting down and unavailable.",
}

conn_class = HTTPConnection if PROTOCOL.upper() == 'HTTP' else HTTPSConnection

scheduler = BlockingScheduler()


# Create the request headers with expiration time
def create_request_headers(section_name, remaining_seconds):
    push_type = config.get(section_name, 'push_type')
    
    if push_type == "APNs": 
        return create_apns_request_headers(section_name, remaining_seconds)
    elif push_type == "FCM":
        return create_fcm_request_headers(section_name)
    else:
        return None

# Create APNs request headers
def create_apns_request_headers(section_name, remaining_seconds):
    auth_key_path = config.get(section_name, 'auth_key_path')
    team_id = config.get(section_name, 'team_id')
    key_id = config.get(section_name, 'key_id')
    bundle_id = config.get(section_name, 'bundle_id')
    
    with open(auth_key_path) as f:
        secret = f.read()
    
    token = jwt.encode(
        {
            'iss': team_id,
            'iat': time.time(),
        },
        secret,
        algorithm='ES256',
        headers={
            'alg': 'ES256',
            'kid': key_id,
        }
    )
    token_bytes = token.encode('ascii')
    
    headers = {
        'apns-priority': '10',
        'apns-topic': bundle_id,
        'authorization': 'bearer {0}'.format(token_bytes.decode('ascii'))
    }
    
    if remaining_seconds is not None:
        headers['apns-expiration'] = str(remaining_seconds)

    return headers

# Create FCM request headers
def create_fcm_request_headers(section_name):
    api_key = config.get(section_name, 'api_key') 
    
    headers = {
        'Authorization': 'key=' + api_key,
        'Content-Type': 'application/json'
    }
    return headers

# Function to send notification with scheduled time
def send_notification(conn, section_name, push_token, message, badge_count=0, remaining_seconds=0):
    request_headers = create_request_headers(section_name, remaining_seconds)

    push_type = config.get(section_name, 'push_type')

    payload_data = create_payload_data(push_type, message, badge_count)
    payload = json.dumps(payload_data).encode('utf-8')

    if push_type == "APNs": 
        conn.request('POST', f'/3/device/{push_token}', payload, headers=request_headers)
    elif push_type == "FCM":
        conn.request('POST', '/fcm/send', payload, headers=request_headers)
    
    resp = conn.get_response()

    status_code = resp.status
    description = status_codes.get(status_code, "Unknown status code")
    print(f"Status code: {status_code} - Description: {description}")
    print(resp.read())

# Create payload data based on push type
def create_payload_data(push_type, message, badge_count):
    if push_type == "APNs":
        return {
            'aps': {
                'alert': message,
                'badge': badge_count,
            }
        }
    elif push_type == "FCM":
        return {
            'to': push_token,
            'notification': {
                'title': '',
                'body': message,
            },
            'data': {
                'badge': badge_count,
            },
        }

# Function to send push with scheduled time
def send_push(conn, section_name, push_token, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
    scheduler.add_job(
        send_notification,
        args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
        trigger='date',
        run_date=datetime.fromtimestamp(scheduled_time)
    )
    scheduler.start()

# Function to send bulk push
def send_bulk_push(conn, section_name, users, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
    for user in users: 
        push_token = user[6]
        scheduler.add_job(
            send_notification,
            args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
            trigger='date',
            run_date=datetime.fromtimestamp(scheduled_time)
        )
    scheduler.start()

# Function to send push to users in batches
def send_push_to_users_in_batches(section_name, users, batch_size, message, badge_count, scheduled_time, remaining_seconds=0): 
    push_type = config.get(section_name, 'push_type')
    environment = config.get(section_name, 'environment', fallback='development')

    if push_type == "APNs": 
        push_url = 'api.development.push.apple.com' if environment == 'development' else 'api.push.apple.com'
    elif push_type == "FCM":
        push_url = 'fcm.googleapis.com'

    conn = conn_class(push_url + ':443', ca_certs=CA_FILE, cert_file=CERT_FILE, key_file=KEY_FILE)

    total_users = len(users)
    num_batches = (total_users + batch_size - 1) // batch_size
    for batch_number in range(num_batches):
        start_index = batch_number * batch_size
        end_index = (batch_number + 1) * batch_size
        batch_users = users[start_index:end_index]
        send_bulk_push(conn, section_name, batch_users, message, badge_count, scheduled_time, remaining_seconds)

    conn.close()

# MySQL connection and get user info
def get_users_from_database():
    connection_params = {
        'host': DB_HOST,
        'user': DB_USER,
        'password': DB_PASSWORD,
        'database': DB_NAME,
        'port': DB_PORT,
    }
    
    if PROTOCOL == 'HTTPS':
        connection_params['ssl'] = {
            'ca': CA_FILE,
            'cert': CERT_FILE,
            'key': KEY_FILE
        }
    
    connection = mysql.connector.connect(**connection_params)
    
    try:
        cursor = connection.cursor()
        query = 'SELECT * FROM users'
        cursor.execute(query)
        users = cursor.fetchall()
        return users
    finally:
        cursor.close()
        connection.close()

# Calculate the scheduled time (in seconds from now)
def get_push_schedule_time(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()

    if all(x is None for x in [year, month, day, hour, minute]):
        return int(current_time.timestamp())

    scheduled_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute
    )

    if scheduled_time < current_time:
        return int(current_time.timestamp())

    return int(scheduled_time.timestamp())

# Calculate the remaining expiration time (in seconds from now)
def get_remaining_expiry_seconds(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()
    expiration_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute
    )
    time_difference = expiration_time - current_time
    remaining_seconds = max(int(time_difference.total_seconds()), 0)
    return remaining_seconds




# Push scheduling time
scheduled_time = get_push_schedule_time(2023, 12, 19, 10, 37)
# Remaining seconds until push expiration time
remaining_seconds = get_remaining_expiry_seconds(2023, 12, 19, 10, 38)
print(f"Scheduled Time: {scheduled_time}")
print(f"Remaining Seconds: {remaining_seconds}")

# Section name (company name)
section_name = 'MyApp'
# Get user information
users_data = get_users_from_database()
# Batch size
batch_size = 1000
# Push message
message = "test message."
# Badge count
badge_count = 1

# Send push to users based on batch size
send_push_to_users_in_batches(section_name, users_data, batch_size, message, badge_count, scheduled_time, remaining_seconds)

print("Complete.")

 

#
# config.ini
#

[Server]
# HTTP 또는 HTTPS
protocol = HTTP

[SSL]
ca_file = cert/ca.pem
cert_file = cert/cert.pem
key_file = cert/key.pem

[Database]
host = 127.0.0.1
user = netcanis
password = 12345678
database = My_DB_Name
port = 3306

[HarexApp]
# APNs, FCM
push_type = APNs
# development 또는 production
environment = development
bundle_id = com.mycompany.test
auth_key_path = cert/mycompany/AuthKey.p8
key_id = XX8UHXF9XX
team_id = XX6KEXF2XX

[HarexApp2]
# APNs, FCM
push_type = FCM
# development 또는 production
environment = development
bundle_id = com.mycompany.test
api_key = XXXX.....XXXX

 

2024.01.11 - [Note] - BLE, Beacon, iBeacon

2024.01.11 - [Note] - BLE Advertising Payload format 샘플 분석

2024.01.09 - [Note] - Packet Format for the LE Uncoded PHYs

2024.01.04 - [iOS] - Floating, Dragging Button

2023.12.27 - [Bark Bark] - 말이 통하지 않는 사람과 싸우지 말라.

2023.12.27 - [Server] - Push Notification

2023.12.27 - [AI,ML,ALGORITHM] - A star

2023.12.07 - [iOS] - XOR 연산을 이용한 문자열 비교

2023.11.03 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가

2023.10.29 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현

 

반응형
블로그 이미지

SKY STORY

,