python으로 간단한 모바일 푸시 서버를 만들어 보았다.
예약 발송 및 batch 처리등 처리 하였다.
#
# push_server.py
# APNs, FCM, 예약 및 batch 푸시 발송
#
import json
import jwt
import time
from datetime import datetime
from hyper import HTTPConnection
from http.client import HTTPSConnection
from apscheduler.schedulers.blocking import BlockingScheduler
import mysql.connector
import configparser
from apscheduler.events import EVENT_JOB_EXECUTED
# Read configuration from config.ini
config = configparser.ConfigParser()
config.read('config.ini')
# SSL/TLS (Secure Sockets Layer / Transport Layer Security)
CA_FILE = config.get('SSL', 'ca_file', fallback=None)
CERT_FILE = config.get('SSL', 'cert_file', fallback=None)
KEY_FILE = config.get('SSL', 'key_file', fallback=None)
# Database
DB_HOST = config.get('Database', 'host')
DB_USER = config.get('Database', 'user')
DB_PASSWORD = config.get('Database', 'password')
DB_NAME = config.get('Database', 'database')
DB_PORT = config.get('Database', 'port')
# Protocol
PROTOCOL = config.get('Server', 'protocol', fallback='HTTP')
# APNs status codes
status_codes = {
200: "Success",
400: "Bad request",
403: "Error with certificate or provider authentication token",
405: "Invalid request method. Only POST requests are supported.",
410: "Device token is no longer active for the topic.",
413: "Notification payload was too large.",
429: "Too many requests for the same device token.",
500: "Internal server error",
503: "Server is shutting down and unavailable.",
}
conn_class = HTTPConnection if PROTOCOL.upper() == 'HTTP' else HTTPSConnection
scheduler = BlockingScheduler()
# Create the request headers with expiration time
def create_request_headers(section_name, remaining_seconds):
push_type = config.get(section_name, 'push_type')
if push_type == "APNs":
return create_apns_request_headers(section_name, remaining_seconds)
elif push_type == "FCM":
return create_fcm_request_headers(section_name)
else:
return None
# Create APNs request headers
def create_apns_request_headers(section_name, remaining_seconds):
auth_key_path = config.get(section_name, 'auth_key_path')
team_id = config.get(section_name, 'team_id')
key_id = config.get(section_name, 'key_id')
bundle_id = config.get(section_name, 'bundle_id')
with open(auth_key_path) as f:
secret = f.read()
token = jwt.encode(
{
'iss': team_id,
'iat': time.time(),
},
secret,
algorithm='ES256',
headers={
'alg': 'ES256',
'kid': key_id,
}
)
token_bytes = token.encode('ascii')
headers = {
'apns-priority': '10',
'apns-topic': bundle_id,
'authorization': 'bearer {0}'.format(token_bytes.decode('ascii'))
}
if remaining_seconds is not None:
headers['apns-expiration'] = str(remaining_seconds)
return headers
# Create FCM request headers
def create_fcm_request_headers(section_name):
api_key = config.get(section_name, 'api_key')
headers = {
'Authorization': 'key=' + api_key,
'Content-Type': 'application/json'
}
return headers
# Function to send notification with scheduled time
def send_notification(conn, section_name, push_token, message, badge_count=0, remaining_seconds=0):
request_headers = create_request_headers(section_name, remaining_seconds)
push_type = config.get(section_name, 'push_type')
payload_data = create_payload_data(push_type, message, badge_count)
payload = json.dumps(payload_data).encode('utf-8')
if push_type == "APNs":
conn.request('POST', f'/3/device/{push_token}', payload, headers=request_headers)
elif push_type == "FCM":
conn.request('POST', '/fcm/send', payload, headers=request_headers)
resp = conn.get_response()
status_code = resp.status
description = status_codes.get(status_code, "Unknown status code")
print(f"Status code: {status_code} - Description: {description}")
print(resp.read())
# Create payload data based on push type
def create_payload_data(push_type, message, badge_count):
if push_type == "APNs":
return {
'aps': {
'alert': message,
'badge': badge_count,
}
}
elif push_type == "FCM":
return {
'to': push_token,
'notification': {
'title': '',
'body': message,
},
'data': {
'badge': badge_count,
},
}
# Function to send push with scheduled time
def send_push(conn, section_name, push_token, message, badge_count, scheduled_time, remaining_seconds=0):
scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
scheduler.add_job(
send_notification,
args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
trigger='date',
run_date=datetime.fromtimestamp(scheduled_time)
)
scheduler.start()
# Function to send bulk push
def send_bulk_push(conn, section_name, users, message, badge_count, scheduled_time, remaining_seconds=0):
scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
for user in users:
push_token = user[6]
scheduler.add_job(
send_notification,
args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
trigger='date',
run_date=datetime.fromtimestamp(scheduled_time)
)
scheduler.start()
# Function to send push to users in batches
def send_push_to_users_in_batches(section_name, users, batch_size, message, badge_count, scheduled_time, remaining_seconds=0):
push_type = config.get(section_name, 'push_type')
environment = config.get(section_name, 'environment', fallback='development')
if push_type == "APNs":
push_url = 'api.development.push.apple.com' if environment == 'development' else 'api.push.apple.com'
elif push_type == "FCM":
push_url = 'fcm.googleapis.com'
conn = conn_class(push_url + ':443', ca_certs=CA_FILE, cert_file=CERT_FILE, key_file=KEY_FILE)
total_users = len(users)
num_batches = (total_users + batch_size - 1) // batch_size
for batch_number in range(num_batches):
start_index = batch_number * batch_size
end_index = (batch_number + 1) * batch_size
batch_users = users[start_index:end_index]
send_bulk_push(conn, section_name, batch_users, message, badge_count, scheduled_time, remaining_seconds)
conn.close()
# MySQL connection and get user info
def get_users_from_database():
connection_params = {
'host': DB_HOST,
'user': DB_USER,
'password': DB_PASSWORD,
'database': DB_NAME,
'port': DB_PORT,
}
if PROTOCOL == 'HTTPS':
connection_params['ssl'] = {
'ca': CA_FILE,
'cert': CERT_FILE,
'key': KEY_FILE
}
connection = mysql.connector.connect(**connection_params)
try:
cursor = connection.cursor()
query = 'SELECT * FROM users'
cursor.execute(query)
users = cursor.fetchall()
return users
finally:
cursor.close()
connection.close()
# Calculate the scheduled time (in seconds from now)
def get_push_schedule_time(year=None, month=None, day=None, hour=None, minute=None):
current_time = datetime.now()
if all(x is None for x in [year, month, day, hour, minute]):
return int(current_time.timestamp())
scheduled_time = datetime(
year if year is not None else current_time.year,
month if month is not None else current_time.month,
day if day is not None else current_time.day,
hour if hour is not None else current_time.hour,
minute if minute is not None else current_time.minute
)
if scheduled_time < current_time:
return int(current_time.timestamp())
return int(scheduled_time.timestamp())
# Calculate the remaining expiration time (in seconds from now)
def get_remaining_expiry_seconds(year=None, month=None, day=None, hour=None, minute=None):
current_time = datetime.now()
expiration_time = datetime(
year if year is not None else current_time.year,
month if month is not None else current_time.month,
day if day is not None else current_time.day,
hour if hour is not None else current_time.hour,
minute if minute is not None else current_time.minute
)
time_difference = expiration_time - current_time
remaining_seconds = max(int(time_difference.total_seconds()), 0)
return remaining_seconds
# Push scheduling time
scheduled_time = get_push_schedule_time(2023, 12, 19, 10, 37)
# Remaining seconds until push expiration time
remaining_seconds = get_remaining_expiry_seconds(2023, 12, 19, 10, 38)
print(f"Scheduled Time: {scheduled_time}")
print(f"Remaining Seconds: {remaining_seconds}")
# Section name (company name)
section_name = 'MyApp'
# Get user information
users_data = get_users_from_database()
# Batch size
batch_size = 1000
# Push message
message = "test message."
# Badge count
badge_count = 1
# Send push to users based on batch size
send_push_to_users_in_batches(section_name, users_data, batch_size, message, badge_count, scheduled_time, remaining_seconds)
print("Complete.")
#
# config.ini
#
[Server]
# HTTP 또는 HTTPS
protocol = HTTP
[SSL]
ca_file = cert/ca.pem
cert_file = cert/cert.pem
key_file = cert/key.pem
[Database]
host = 127.0.0.1
user = netcanis
password = 12345678
database = My_DB_Name
port = 3306
[HarexApp]
# APNs, FCM
push_type = APNs
# development 또는 production
environment = development
bundle_id = com.mycompany.test
auth_key_path = cert/mycompany/AuthKey.p8
key_id = XX8UHXF9XX
team_id = XX6KEXF2XX
[HarexApp2]
# APNs, FCM
push_type = FCM
# development 또는 production
environment = development
bundle_id = com.mycompany.test
api_key = XXXX.....XXXX
2024.01.11 - [Note] - BLE, Beacon, iBeacon
2024.01.11 - [Note] - BLE Advertising Payload format 샘플 분석
2024.01.09 - [Note] - Packet Format for the LE Uncoded PHYs
2024.01.04 - [iOS] - Floating, Dragging Button
2023.12.27 - [생각의 조각들] - 말이 통하지 않는 사람과 싸우지 말라.
2023.12.27 - [Server] - Push Notification
2023.12.27 - [AI,ML,ALGORITHM] - A star
2023.12.07 - [iOS] - XOR 연산을 이용한 문자열 비교
2023.11.03 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가
2023.10.29 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현