python으로 간단한 모바일 푸시 서버를 만들어 보았다.
예약 발송 및 batch 처리등 처리 하였다.
import json
import jwt
import time
from datetime import datetime
from hyper import HTTPConnection
from http.client import HTTPSConnection
from apscheduler.schedulers.blocking import BlockingScheduler
import mysql.connector
import configparser
from apscheduler.events import EVENT_JOB_EXECUTED
config = configparser.ConfigParser()
config.read('config.ini')
CA_FILE = config.get('SSL', 'ca_file', fallback=None)
CERT_FILE = config.get('SSL', 'cert_file', fallback=None)
KEY_FILE = config.get('SSL', 'key_file', fallback=None)
DB_HOST = config.get('Database', 'host')
DB_USER = config.get('Database', 'user')
DB_PASSWORD = config.get('Database', 'password')
DB_NAME = config.get('Database', 'database')
DB_PORT = config.get('Database', 'port')
PROTOCOL = config.get('Server', 'protocol', fallback='HTTP')
status_codes = {
200: "Success",
400: "Bad request",
403: "Error with certificate or provider authentication token",
405: "Invalid request method. Only POST requests are supported.",
410: "Device token is no longer active for the topic.",
413: "Notification payload was too large.",
429: "Too many requests for the same device token.",
500: "Internal server error",
503: "Server is shutting down and unavailable.",
}
conn_class = HTTPConnection if PROTOCOL.upper() == 'HTTP' else HTTPSConnection
scheduler = BlockingScheduler()
def create_request_headers(section_name, remaining_seconds):
push_type = config.get(section_name, 'push_type')
if push_type == "APNs":
return create_apns_request_headers(section_name, remaining_seconds)
elif push_type == "FCM":
return create_fcm_request_headers(section_name)
else:
return None
def create_apns_request_headers(section_name, remaining_seconds):
auth_key_path = config.get(section_name, 'auth_key_path')
team_id = config.get(section_name, 'team_id')
key_id = config.get(section_name, 'key_id')
bundle_id = config.get(section_name, 'bundle_id')
with open(auth_key_path) as f:
secret = f.read()
token = jwt.encode(
{
'iss': team_id,
'iat': time.time(),
},
secret,
algorithm='ES256',
headers={
'alg': 'ES256',
'kid': key_id,
}
)
token_bytes = token.encode('ascii')
headers = {
'apns-priority': '10',
'apns-topic': bundle_id,
'authorization': 'bearer {0}'.format(token_bytes.decode('ascii'))
}
if remaining_seconds is not None:
headers['apns-expiration'] = str(remaining_seconds)
return headers
def create_fcm_request_headers(section_name):
api_key = config.get(section_name, 'api_key')
headers = {
'Authorization': 'key=' + api_key,
'Content-Type': 'application/json'
}
return headers
def send_notification(conn, section_name, push_token, message, badge_count=0, remaining_seconds=0):
request_headers = create_request_headers(section_name, remaining_seconds)
push_type = config.get(section_name, 'push_type')
payload_data = create_payload_data(push_type, message, badge_count)
payload = json.dumps(payload_data).encode('utf-8')
if push_type == "APNs":
conn.request('POST', f'/3/device/{push_token}', payload, headers=request_headers)
elif push_type == "FCM":
conn.request('POST', '/fcm/send', payload, headers=request_headers)
resp = conn.get_response()
status_code = resp.status
description = status_codes.get(status_code, "Unknown status code")
print(f"Status code: {status_code} - Description: {description}")
print(resp.read())
def create_payload_data(push_type, message, badge_count):
if push_type == "APNs":
return {
'aps': {
'alert': message,
'badge': badge_count,
}
}
elif push_type == "FCM":
return {
'to': push_token,
'notification': {
'title': '',
'body': message,
},
'data': {
'badge': badge_count,
},
}
def send_push(conn, section_name, push_token, message, badge_count, scheduled_time, remaining_seconds=0):
scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
scheduler.add_job(
send_notification,
args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
trigger='date',
run_date=datetime.fromtimestamp(scheduled_time)
)
scheduler.start()
def send_bulk_push(conn, section_name, users, message, badge_count, scheduled_time, remaining_seconds=0):
scheduler.add_listener(lambda event: scheduler.shutdown(wait=False), EVENT_JOB_EXECUTED)
for user in users:
push_token = user[6]
scheduler.add_job(
send_notification,
args=(conn, section_name, push_token, message, badge_count, remaining_seconds),
trigger='date',
run_date=datetime.fromtimestamp(scheduled_time)
)
scheduler.start()
def send_push_to_users_in_batches(section_name, users, batch_size, message, badge_count, scheduled_time, remaining_seconds=0):
push_type = config.get(section_name, 'push_type')
environment = config.get(section_name, 'environment', fallback='development')
if push_type == "APNs":
push_url = 'api.development.push.apple.com' if environment == 'development' else 'api.push.apple.com'
elif push_type == "FCM":
push_url = 'fcm.googleapis.com'
conn = conn_class(push_url + ':443', ca_certs=CA_FILE, cert_file=CERT_FILE, key_file=KEY_FILE)
total_users = len(users)
num_batches = (total_users + batch_size - 1) // batch_size
for batch_number in range(num_batches):
start_index = batch_number * batch_size
end_index = (batch_number + 1) * batch_size
batch_users = users[start_index:end_index]
send_bulk_push(conn, section_name, batch_users, message, badge_count, scheduled_time, remaining_seconds)
conn.close()
def get_users_from_database():
connection_params = {
'host': DB_HOST,
'user': DB_USER,
'password': DB_PASSWORD,
'database': DB_NAME,
'port': DB_PORT,
}
if PROTOCOL == 'HTTPS':
connection_params['ssl'] = {
'ca': CA_FILE,
'cert': CERT_FILE,
'key': KEY_FILE
}
connection = mysql.connector.connect(**connection_params)
try:
cursor = connection.cursor()
query = 'SELECT * FROM users'
cursor.execute(query)
users = cursor.fetchall()
return users
finally:
cursor.close()
connection.close()
def get_push_schedule_time(year=None, month=None, day=None, hour=None, minute=None):
current_time = datetime.now()
if all(x is None for x in [year, month, day, hour, minute]):
return int(current_time.timestamp())
scheduled_time = datetime(
year if year is not None else current_time.year,
month if month is not None else current_time.month,
day if day is not None else current_time.day,
hour if hour is not None else current_time.hour,
minute if minute is not None else current_time.minute
)
if scheduled_time < current_time:
return int(current_time.timestamp())
return int(scheduled_time.timestamp())
def get_remaining_expiry_seconds(year=None, month=None, day=None, hour=None, minute=None):
current_time = datetime.now()
expiration_time = datetime(
year if year is not None else current_time.year,
month if month is not None else current_time.month,
day if day is not None else current_time.day,
hour if hour is not None else current_time.hour,
minute if minute is not None else current_time.minute
)
time_difference = expiration_time - current_time
remaining_seconds = max(int(time_difference.total_seconds()), 0)
return remaining_seconds
scheduled_time = get_push_schedule_time(2023, 12, 19, 10, 37)
remaining_seconds = get_remaining_expiry_seconds(2023, 12, 19, 10, 38)
print(f"Scheduled Time: {scheduled_time}")
print(f"Remaining Seconds: {remaining_seconds}")
section_name = 'MyApp'
users_data = get_users_from_database()
batch_size = 1000
message = "test message."
badge_count = 1
send_push_to_users_in_batches(section_name, users_data, batch_size, message, badge_count, scheduled_time, remaining_seconds)
print("Complete.")
[Server]
protocol = HTTP
[SSL]
ca_file = cert/ca.pem
cert_file = cert/cert.pem
key_file = cert/key.pem
[Database]
host = 127.0.0.1
user = netcanis
password = 12345678
database = My_DB_Name
port = 3306
[HarexApp]
push_type = APNs
environment = development
bundle_id = com.mycompany.test
auth_key_path = cert/mycompany/AuthKey.p8
key_id = XX8UHXF9XX
team_id = XX6KEXF2XX
[HarexApp2]
push_type = FCM
environment = development
bundle_id = com.mycompany.test
api_key = XXXX.....XXXX
2024.01.11 - [Note] - BLE, Beacon, iBeacon
2024.01.11 - [Note] - BLE Advertising Payload format 샘플 분석
2024.01.09 - [Note] - Packet Format for the LE Uncoded PHYs
2024.01.04 - [iOS] - Floating, Dragging Button
2023.12.27 - [생각의 조각들] - 말이 통하지 않는 사람과 싸우지 말라.
2023.12.27 - [Server] - Push Notification
2023.12.27 - [AI,ML,ALGORITHM] - A star
2023.12.07 - [iOS] - XOR 연산을 이용한 문자열 비교
2023.11.03 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 3x3 체크 추가
2023.10.29 - [AI,ML,ALGORITHM] - Gomoku(Five in a Row, Omok) (5/5) - 머신러닝으로 게임 구현