Push Notification

개발/Note 2023. 12. 27. 16:42

python으로 간단한 모바일 푸시 서버를 만들어 보았다.

예약 발송 및 batch 처리등 처리 하였다. 


# push_server.py
# APNs, FCM, 예약 및 batch 푸시 발송

import json
import jwt
import time
from datetime import datetime
from hyper import HTTPConnection
from http.client import HTTPSConnection
from apscheduler.schedulers.blocking import BlockingScheduler
import mysql.connector
import configparser
from apscheduler.events import EVENT_JOB_EXECUTED

# Read configuration from config.ini
config = configparser.ConfigParser()

# SSL/TLS (Secure Sockets Layer / Transport Layer Security)
CA_FILE = config.get('SSL', 'ca_file', fallback=None)
CERT_FILE = config.get('SSL', 'cert_file', fallback=None)
KEY_FILE = config.get('SSL', 'key_file', fallback=None)

# Database
DB_HOST = config.get('Database', 'host')
DB_USER = config.get('Database', 'user')
DB_PASSWORD = config.get('Database', 'password')
DB_NAME = config.get('Database', 'database')
DB_PORT = config.get('Database', 'port')

# Protocol
PROTOCOL = config.get('Server', 'protocol', fallback='HTTP')

# APNs status codes
status_codes = {
    200: "Success",
    400: "Bad request",
    403: "Error with certificate or provider authentication token",
    405: "Invalid request method. Only POST requests are supported.",
    410: "Device token is no longer active for the topic.",
    413: "Notification payload was too large.",
    429: "Too many requests for the same device token.",
    500: "Internal server error",
    503: "Server is shutting down and unavailable.",

conn_class = HTTPConnection if PROTOCOL.upper() == 'HTTP' else HTTPSConnection

scheduler = BlockingScheduler()

# Create the request headers with expiration time
def create_request_headers(section_name, remaining_seconds):
    push_type = config.get(section_name, 'push_type')
    if push_type == "APNs": 
        return create_apns_request_headers(section_name, remaining_seconds)
    elif push_type == "FCM":
        return create_fcm_request_headers(section_name)
        return None

# Create APNs request headers
def create_apns_request_headers(section_name, remaining_seconds):
    auth_key_path = config.get(section_name, 'auth_key_path')
    team_id = config.get(section_name, 'team_id')
    key_id = config.get(section_name, 'key_id')
    bundle_id = config.get(section_name, 'bundle_id')
    with open(auth_key_path) as f:
        secret = f.read()
    token = jwt.encode(
            'iss': team_id,
            'iat': time.time(),
            'alg': 'ES256',
            'kid': key_id,
    token_bytes = token.encode('ascii')
    headers = {
        'apns-priority': '10',
        'apns-topic': bundle_id,
        'authorization': 'bearer {0}'.format(token_bytes.decode('ascii'))
    if remaining_seconds is not None:
        headers['apns-expiration'] = str(remaining_seconds)

    return headers

# Create FCM request headers
def create_fcm_request_headers(section_name):
    api_key = config.get(section_name, 'api_key') 
    headers = {
        'Authorization': 'key=' + api_key,
        'Content-Type': 'application/json'
    return headers

# Function to send notification with scheduled time
def send_notification(conn, section_name, push_token, message, badge_count=0, remaining_seconds=0):
    request_headers = create_request_headers(section_name, remaining_seconds)

    push_type = config.get(section_name, 'push_type')

    payload_data = create_payload_data(push_type, message, badge_count)
    payload = json.dumps(payload_data).encode('utf-8')

    if push_type == "APNs": 
        conn.request('POST', f'/3/device/{push_token}', payload, headers=request_headers)
    elif push_type == "FCM":
        conn.request('POST', '/fcm/send', payload, headers=request_headers)
    resp = conn.get_response()

    status_code = resp.status
    description = status_codes.get(status_code, "Unknown status code")
    print(f"Status code: {status_code} - Description: {description}")

# Create payload data based on push type
def create_payload_data(push_type, message, badge_count):
    if push_type == "APNs":
        return {
            'aps': {
                'alert': message,
                'badge': badge_count,
    elif push_type == "FCM":
        return {
            'to': push_token,
            'notification': {
                'title': '',
                'body': message,
            'data': {
                'badge': badge_count,

# Function to send push with scheduled time
def send_push(conn, section_name, push_token, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
        args=(conn, section_name, push_token, message, badge_count, remaining_seconds),

# Function to send bulk push
def send_bulk_push(conn, section_name, users, message, badge_count, scheduled_time, remaining_seconds=0):
    scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
    for user in users: 
        push_token = user[6]
            args=(conn, section_name, push_token, message, badge_count, remaining_seconds),

# Function to send push to users in batches
def send_push_to_users_in_batches(section_name, users, batch_size, message, badge_count, scheduled_time, remaining_seconds=0): 
    push_type = config.get(section_name, 'push_type')
    environment = config.get(section_name, 'environment', fallback='development')

    if push_type == "APNs": 
        push_url = 'api.development.push.apple.com' if environment == 'development' else 'api.push.apple.com'
    elif push_type == "FCM":
        push_url = 'fcm.googleapis.com'

    conn = conn_class(push_url + ':443', ca_certs=CA_FILE, cert_file=CERT_FILE, key_file=KEY_FILE)

    total_users = len(users)
    num_batches = (total_users + batch_size - 1) // batch_size
    for batch_number in range(num_batches):
        start_index = batch_number * batch_size
        end_index = (batch_number + 1) * batch_size
        batch_users = users[start_index:end_index]
        send_bulk_push(conn, section_name, batch_users, message, badge_count, scheduled_time, remaining_seconds)


# MySQL connection and get user info
def get_users_from_database():
    connection_params = {
        'host': DB_HOST,
        'user': DB_USER,
        'password': DB_PASSWORD,
        'database': DB_NAME,
        'port': DB_PORT,
    if PROTOCOL == 'HTTPS':
        connection_params['ssl'] = {
            'ca': CA_FILE,
            'cert': CERT_FILE,
            'key': KEY_FILE
    connection = mysql.connector.connect(**connection_params)
        cursor = connection.cursor()
        query = 'SELECT * FROM users'
        users = cursor.fetchall()
        return users

# Calculate the scheduled time (in seconds from now)
def get_push_schedule_time(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()

    if all(x is None for x in [year, month, day, hour, minute]):
        return int(current_time.timestamp())

    scheduled_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute

    if scheduled_time < current_time:
        return int(current_time.timestamp())

    return int(scheduled_time.timestamp())

# Calculate the remaining expiration time (in seconds from now)
def get_remaining_expiry_seconds(year=None, month=None, day=None, hour=None, minute=None):
    current_time = datetime.now()
    expiration_time = datetime(
        year if year is not None else current_time.year,
        month if month is not None else current_time.month,
        day if day is not None else current_time.day,
        hour if hour is not None else current_time.hour,
        minute if minute is not None else current_time.minute
    time_difference = expiration_time - current_time
    remaining_seconds = max(int(time_difference.total_seconds()), 0)
    return remaining_seconds

# Push scheduling time
scheduled_time = get_push_schedule_time(2023, 12, 19, 10, 37)
# Remaining seconds until push expiration time
remaining_seconds = get_remaining_expiry_seconds(2023, 12, 19, 10, 38)
print(f"Scheduled Time: {scheduled_time}")
print(f"Remaining Seconds: {remaining_seconds}")

# Section name (company name)
section_name = 'MyApp'
# Get user information
users_data = get_users_from_database()
# Batch size
batch_size = 1000
# Push message
message = "test message."
# Badge count
badge_count = 1

# Send push to users based on batch size
send_push_to_users_in_batches(section_name, users_data, batch_size, message, badge_count, scheduled_time, remaining_seconds)



# config.ini

protocol = HTTP

ca_file = cert/ca.pem
cert_file = cert/cert.pem
key_file = cert/key.pem

host =
user = netcanis
password = 12345678
database = My_DB_Name
port = 3306

push_type = APNs
# development 또는 production
environment = development
bundle_id = com.mycompany.test
auth_key_path = cert/mycompany/AuthKey.p8
key_id = XX8UHXF9XX
team_id = XX6KEXF2XX

push_type = FCM
# development 또는 production
environment = development
bundle_id = com.mycompany.test
api_key = XXXX.....XXXX


