Server-Sent Events (SSE) 알람 : 활용편

다국어 알람 메세지 구현


실제 업무에서 Server-Sent Event(SSE) 를 이용하여 알람을 구현했던 기능은 아래와 같습니다.

  1. 주기적으로 DB 를 조회한 이전 조회 시간 이후, 새로운 데이터가 있는 경우 알람을 보내는 기능
  2. 추가적으로 클라이언트가 선택한 언어에 따라, 알람 메세지도 각 언어로 데이터를 전송

전체적인 흐름과 구성


클라이언트 화면에서 언제든 언어를 변경할 수 있습니다. 선택한 언어에 따라서 SSE 연결 요청하고, 그때마다 요청 HEADER 에 Accept-Language 을 사용하여 선택한 언어를 전달하게 구현돼 있습니다.

> 언어값을 HEADER - Accept-Language 로 전달 받자

DB 에는 primary key 로 idlang 을 사용하고 같은 id 에 각각 ko , en-US, jp 등의 키 값으로 알람 메세지 데이터가 각각 들어 있는 상태입니다.

id* lang* message isn_dt
5b48ff24 ko 예약에 성공했습니다! 2023-12-01 11:10:04
5b48ff24 US-en I have successfully made a reservation. 2023-12-01 11:10:04
5b48ff24 ja 予約に成功しました 2023-12-01 11:10:04

서버 구현


클라이언트가 SSE 요청 하면서 보내주는 lang 값을 이용해서 DB 데이터를 조회하면 됩니다. Flask 에서는 request 객체에 전달되는 accept_languages 중에서 best 속성값을 이용해 손쉽게 정보를 가져올 수 있습니다.

SSE 연결하면서 전달 받은 언어 정보를 제너레이터 함수의 인자값으로 넣어주면 해당 값을 이용해서 DB 를 조회하면 사용자가 선택한 언어에 알맞은 알람이 전달 됩니다.

app.py
from flask import request
from sse import get_user_alarm_by_lang

@app.get("/connection/<user_id>")
def connection(user_id: str):
    lang = request.accept_languages.best

    return Response(
        get_user_alarm_by_lang(user_id, lang), content_type="text/event-stream"
    )
sse.py
from time import sleep
import json

def select_alarm(user_id, lang):
    # DB 조회 로직
    return { "message": "예약에 성공했습니다!" }

def get_user_alarm_by_lang(user_id, lang):
    PENDING_TIME = 5

    while True:
        data = select_alarm(user_id, lang)
        yield f"""event: alarm\ndata: {json.dumps(data)}\n\n"""
        sleep(PENDING_TIME)

사용자가 언어를 변경한다면?


사용자가 이미 SSE 요청을 한 이후, 화면에서 언어를 변경한다고 가정해 봅시다. 이때 이전 언어로 호출한 제너레이터는 더 이상 필요하지 않기 때문에 종료가 필요합니다. 그리고 변경된 언어의 lang 파라미터로 다시 새로운 제너레이터를 호출 합니다.

  1. 클라이언트가 선택한 언어로 다시 SSE 연결 요청
  2. 이전의 연결 존재하는 경우는 해당 연결을 종료
  3. 새로운 연결만 존재하고 이를 통해 알람 전달

SSE 문제점


하지만 문제는 SSE 는 서버에서 클라이언트로만 통신이 가능한 단방향 통신이라는 점입니다. 서버에서 클라이언트로 데이터를 보내는 것만 가능하며 클라이언트로부터의 응답이나 상태 확인은 기본적으로 제공되지 않습니다. 물론 기본적인 연결 오류 감지 기능을 통해 연결을 종료 할 수 있습니다. 하지만 올바른 방식으로 여러 번 SSE 연결을 요청하는 경우 중복된 연결을 확인 할 수 없기에 추가적인 보완이 필요합니다.

connection 관리 구현


connection 관리는 간단하게 global 변수를 이용해서 구현 가능합니다. 우선은 connection 을 관리 할 global 변수를 dict 형태로 생성해 둡니다. 가장 최근의 요청의 임의값으로 global 변수를 최신화 해두고 이를 비교하는 로직을 작성하면 됩니다.

  1. 각 SSE 요청마다 임의값 생성
  2. connection 관리하는 global 변수에 임의값을 덮어쓰기
  3. 제너레이터 호출 하면서 각 요청마다 생성한 임의값을 전달
  4. 제너레이터 내부에서 전달받은 임의값과 connection 관리하는 global 변수를 비교
  5. 최신 연결이 아닌 경우, break 를 통해서 제너레이터를 종료하고 연결을 종료
app.py
from flask import request
from sse import get_user_alarm_by_lang, client_connections
from uuid import uuid4

@app.get("/connection/<user_id>")
def connection(user_id: str):
    current_conn_id = uuid4().hex
    client_connections[user_id] = current_conn_id # 가장 최신값으로 덮어씀

    lang = request.accept_languages.best

    return Response(
        get_user_alarm_by_lang(user_id, lang, current_conn_id), 
        content_type="text/event-stream"
    )
sse.py
from time import sleep
import json

client_connections = {}

def get_user_alarm_by_lang(user_id, lang, current_conn_id):
    PENDING_TIME = 5

    while True:
        # global 변수의 최신값과 호출시 전달 받은 값을 비교
        if client_connections.get(user_id) != current_conn_id:
            print(f"BYE BYE conn_id:{current_conn_id}")
            break

        data = select_alarm_by_lang(user_id, lang)
        yield f"""event: alarm\ndata: {json.dumps(data)}\n\n"""

        sleep(PENDING_TIME)

클라이언트 구현


SSE 는 기본적으로 클라이언트에서 연결이 끊어진 경우, 자동 재연결을 하기 때문에 클라이언트에서도 서버가 연결을 끊음에 따라 알맞은 구현이 필요합니다.

js
const eventSource = new EventSource('http://localhost:5000/connection/tiaz');

eventSource.onerror = function() {
    eventSource.close()
    // 필요에 따라 여기에서 재연결 로직을 구현할 수 있습니다.
};

동시성 문제


여기서 하나 더 생각해야 할것이 있습니다. Flask 자체는 멀티스레딩을 지원하는 웹 프레임워크입니다. 때문에 경쟁 상태(race conditions), 데드락(deadlocks)과 같은 동시성 문제를 야기할 수 있습니다. 이를 방지하기 위해 적절한 동기화 메커니즘을 사용해야 합니다. 여기서는 스레드락을 사용하겠습니다.

app.py
from threading import Lock

connections_lock = Lock()

@app.get("/connection/<user_id>")
def connection(user_id: str):
    current_conn_id = uuid4().hex

    with connections_lock:
        client_connections[user_id] = current_conn_id

    lang = request.accept_languages.best

    return Response(
        get_user_alarm_by_lang(user_id, lang, current_conn_id),
        content_type="text/event-stream",
    )

에필로그 : 더 생각해야 하는 것


추가적으로 Flask 는 멀티 프로레싱 또한 지원합니다. 따라서 flask 애플리케이션을 구동하는 환경이 멀티프로세스 인 경우, 각 프로세스는 자체 메모리 공간을 가집니다. 따라서 위에서 구현한 전역 변수는 프로세스 간에 공유되지 않습니다.

또한 전역 변수를 사용하면 서버의 상태가 유지되므로, 서버의 무상태(stateless) 특성이 손상될 수 있습니다. 이는 확장성과 유지보수성에 영향을 줄 수 있습니다.

따라서 이러한 환경에서는 Redis와 같은 외부 데이터 저장소를 사용하여 데이터를 공유하게 개선이 필요합니다. 😊

flask sse