Server-Sent Events (SSE) 알람 : 활용편
SSE 를 구현할때 고려해야 하는 것들
다국어 알람 메세지 구현
실제 업무에서 Server-Sent Event(SSE) 를 이용하여 알람을 구현했던 기능은 아래와 같습니다.
- 주기적으로 DB 를 조회한 이전 조회 시간 이후, 새로운 데이터가 있는 경우 알람을 보내는 기능
- 추가적으로 클라이언트가 선택한 언어에 따라, 알람 메세지도 각 언어로 데이터를 전송
전체적인 흐름과 구성
클라이언트 화면에서 언제든 언어를 변경할 수 있습니다. 선택한 언어에 따라서 SSE 연결 요청하고, 그때마다 요청 HEADER 에 Accept-Language
을 사용하여 선택한 언어를 전달하게 구현돼 있습니다.
> 언어값을 HEADER - Accept-Language 로 전달 받자
DB 에는 primary key 로 id
와 lang
을 사용하고 같은 id
에 각각 ko
, en-US
, jp
등의 키 값으로 알람 메세지 데이터가 각각 들어 있는 상태입니다.
id* | lang* | message | isn_dt |
---|---|---|---|
5b48ff24 | ko | 예약에 성공했습니다! | 2023-12-01 11:10:04 |
5b48ff24 | US-en | I have successfully made a reservation. | 2023-12-01 11:10:04 |
5b48ff24 | ja | 予約に成功しました | 2023-12-01 11:10:04 |
서버 구현
클라이언트가 SSE 요청 하면서 보내주는 lang
값을 이용해서 DB 데이터를 조회하면 됩니다. Flask 에서는 request
객체에 전달되는 accept_languages
중에서 best
속성값을 이용해 손쉽게 정보를 가져올 수 있습니다.
SSE 연결하면서 전달 받은 언어 정보를 제너레이터 함수의 인자값으로 넣어주면 해당 값을 이용해서 DB 를 조회하면 사용자가 선택한 언어에 알맞은 알람이 전달 됩니다.
from flask import request
from sse import get_user_alarm_by_lang
@app.get("/connection/<user_id>")
def connection(user_id: str):
lang = request.accept_languages.best
return Response(
get_user_alarm_by_lang(user_id, lang), content_type="text/event-stream"
)
from time import sleep
import json
def select_alarm(user_id, lang):
# DB 조회 로직
return { "message": "예약에 성공했습니다!" }
def get_user_alarm_by_lang(user_id, lang):
PENDING_TIME = 5
while True:
data = select_alarm(user_id, lang)
yield f"""event: alarm\ndata: {json.dumps(data)}\n\n"""
sleep(PENDING_TIME)
사용자가 언어를 변경한다면?
사용자가 이미 SSE 요청을 한 이후, 화면에서 언어를 변경한다고 가정해 봅시다. 이때 이전 언어로 호출한 제너레이터는 더 이상 필요하지 않기 때문에 종료가 필요합니다. 그리고 변경된 언어의 lang
파라미터로 다시 새로운 제너레이터를 호출 합니다.
- 클라이언트가 선택한 언어로 다시 SSE 연결 요청
- 이전의 연결 존재하는 경우는 해당 연결을 종료
- 새로운 연결만 존재하고 이를 통해 알람 전달
SSE 문제점
하지만 문제는 SSE 는 서버에서 클라이언트로만 통신이 가능한 단방향 통신이라는 점입니다. 서버에서 클라이언트로 데이터를 보내는 것만 가능하며 클라이언트로부터의 응답이나 상태 확인은 기본적으로 제공되지 않습니다. 물론 기본적인 연결 오류 감지 기능을 통해 연결을 종료 할 수 있습니다. 하지만 올바른 방식으로 여러 번 SSE 연결을 요청하는 경우 중복된 연결을 확인 할 수 없기에 추가적인 보완이 필요합니다.
connection 관리 구현
connection 관리는 간단하게 global 변수를 이용해서 구현 가능합니다. 우선은 connection 을 관리 할 global 변수를 dict
형태로 생성해 둡니다. 가장 최근의 요청의 임의값으로 global 변수를 최신화 해두고 이를 비교하는 로직을 작성하면 됩니다.
- 각 SSE 요청마다 임의값 생성
- connection 관리하는 global 변수에 임의값을 덮어쓰기
- 제너레이터 호출 하면서 각 요청마다 생성한 임의값을 전달
- 제너레이터 내부에서 전달받은 임의값과 connection 관리하는 global 변수를 비교
- 최신 연결이 아닌 경우,
break
를 통해서 제너레이터를 종료하고 연결을 종료
from flask import request
from sse import get_user_alarm_by_lang, client_connections
from uuid import uuid4
@app.get("/connection/<user_id>")
def connection(user_id: str):
current_conn_id = uuid4().hex
client_connections[user_id] = current_conn_id # 가장 최신값으로 덮어씀
lang = request.accept_languages.best
return Response(
get_user_alarm_by_lang(user_id, lang, current_conn_id),
content_type="text/event-stream"
)
from time import sleep
import json
client_connections = {}
def get_user_alarm_by_lang(user_id, lang, current_conn_id):
PENDING_TIME = 5
while True:
# global 변수의 최신값과 호출시 전달 받은 값을 비교
if client_connections.get(user_id) != current_conn_id:
print(f"BYE BYE conn_id:{current_conn_id}")
break
data = select_alarm_by_lang(user_id, lang)
yield f"""event: alarm\ndata: {json.dumps(data)}\n\n"""
sleep(PENDING_TIME)
클라이언트 구현
SSE 는 기본적으로 클라이언트에서 연결이 끊어진 경우, 자동 재연결을 하기 때문에 클라이언트에서도 서버가 연결을 끊음에 따라 알맞은 구현이 필요합니다.
const eventSource = new EventSource('http://localhost:5000/connection/tiaz');
eventSource.onerror = function() {
eventSource.close()
// 필요에 따라 여기에서 재연결 로직을 구현할 수 있습니다.
};
동시성 문제
여기서 하나 더 생각해야 할것이 있습니다. Flask 자체는 멀티스레딩을 지원하는 웹 프레임워크입니다. 때문에 경쟁 상태(race conditions), 데드락(deadlocks)과 같은 동시성 문제를 야기할 수 있습니다. 이를 방지하기 위해 적절한 동기화 메커니즘을 사용해야 합니다. 여기서는 스레드락을 사용하겠습니다.
from threading import Lock
connections_lock = Lock()
@app.get("/connection/<user_id>")
def connection(user_id: str):
current_conn_id = uuid4().hex
with connections_lock:
client_connections[user_id] = current_conn_id
lang = request.accept_languages.best
return Response(
get_user_alarm_by_lang(user_id, lang, current_conn_id),
content_type="text/event-stream",
)
에필로그 : 더 생각해야 하는 것
추가적으로 Flask 는 멀티 프로레싱 또한 지원합니다. 따라서 flask 애플리케이션을 구동하는 환경이 멀티프로세스 인 경우, 각 프로세스는 자체 메모리 공간을 가집니다. 따라서 위에서 구현한 전역 변수는 프로세스 간에 공유되지 않습니다.
또한 전역 변수를 사용하면 서버의 상태가 유지되므로, 서버의 무상태(stateless) 특성이 손상될 수 있습니다. 이는 확장성과 유지보수성에 영향을 줄 수 있습니다.
따라서 이러한 환경에서는 Redis와 같은 외부 데이터 저장소를 사용하여 데이터를 공유하게 개선이 필요합니다. 😊