본문 바로가기
기타

[We-Co] Kafka로 간단한 예제를 풀어봅시다~! - 결제

by 위기의코딩맨 2025. 12. 24.
반응형

안녕하세요. 위기의 코딩맨입니다.

Kafka를 포스팅하고, 이제 예제를 풀어보도록 하겠습니다.

Kafka에 대해서 알아보고 싶으시면 아래 링크를 확인해주세요~!

 

 

[We-Co] Kafka에 대해서 간단하게 알아보자.

안녕하세요. 위기의 코딩맨입니다.요즘 너무 많은 일이 있어서 오랜만에 포스팅을 작성해봅니다. ㅎㅎㅎ오늘은 kafka에 대해서 간단하게 알아보도록 하겠습니다. [ Apache Kafka란? ]Apache Kafka는 대용

we-co.tistory.com

 

[ 예제 ]

예제로 풀어볼 흐름을 설명드리도록 하겠습니다.

 

[Order Service]  --(order_created)-->  Kafka  --consume-->  [Payment Service]
[Payment Service] --(payment_completed / payment_failed)--> Kafka --consume--> [Notification Service]

  • Order Service: 주문 생성 이벤트 발행(Producer)
  • Payment Service: 주문 이벤트 소비(Consumer) → 결제 처리 → 결과 이벤트 발행(Producer)
  • Notification Service: 결제 결과 이벤트 소비(Consumer) → 알림 출력(또는 메일/SMS)

흐름으로 진행을 해보도록 하겠습니다.

먼저 카프카의 토픽을 설정하도록 하겠습니다. 

토픽은 위에 링크에서 설치를 진행하고, 진행하셔야 합니다.

 

 

[ Topic ]

먼저 오더, 페이, 결과 토픽을 생성해주도록 합니다.

더보기
cd ~/opt/kafka

bin/kafka-topics.sh --create --topic order_created \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

bin/kafka-topics.sh --create --topic payment_completed \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

bin/kafka-topics.sh --create --topic payment_failed \
  --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

 

 

[ Python ]

파이썬 코드로 진행했으며, 먼저 라이브러리를 설치합니다.

pip install kafka-python

 

그리고 각각의 생성, 완료, 결과에 대한 소스코드를 작성합니다.

 

1. order_service.py 

더보기
import json, time, uuid
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    key_serializer=lambda k: k.encode("utf-8"),
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

def create_order(user_id: str, amount: int, currency="KRW"):
    order_id = f"o-{uuid.uuid4().hex[:8]}"
    event = {
        "order_id": order_id,
        "user_id": user_id,
        "amount": amount,
        "currency": currency,
        "created_at": int(time.time()),
    } 
    producer.send("order_created", key=user_id, value=event)
    producer.flush()
    print("[ORDER] created:", event)

if __name__ == "__main__": 
    for i in range(5):
        uid = f"u-{(i % 2) + 1}"       
        create_order(uid, amount=10000 + i * 1000)
        time.sleep(1)

 

2. payment_service.py

더보기
import json, time, uuid, random
from kafka import KafkaConsumer, KafkaProducer

consumer = KafkaConsumer(
    "order_created",
    bootstrap_servers="localhost:9092",
    group_id="payment-service",
    auto_offset_reset="earliest",
    enable_auto_commit=False,   
    key_deserializer=lambda k: k.decode("utf-8") if k else None,
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

producer = KafkaProducer(
    bootstrap_servers="localhost:9092",
    key_serializer=lambda k: k.encode("utf-8"),
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

def pay(order_event: dict) -> dict: 
    ok = random.random() < 0.8
    now = int(time.time())
    base = {
        "order_id": order_event["order_id"],
        "user_id": order_event["user_id"],
        "amount": order_event["amount"],
    }
    if ok:
        return {
            **base,
            "status": "PAID",
            "paid_at": now,
            "payment_id": f"p-{uuid.uuid4().hex[:6]}",
        }
    else:
        return {
            **base,
            "status": "FAILED",
            "failed_at": now,
            "reason": "INSUFFICIENT_FUNDS",
        }

if __name__ == "__main__":
    print("[PAYMENT] start consuming order_created ...")
    for msg in consumer:
        order = msg.value
        user_id = msg.key or order["user_id"] 
        try:
            result = pay(order) 
            if result["status"] == "PAID":
                producer.send("payment_completed", key=user_id, value=result)
                print("[PAYMENT] completed:", result)
            else:
                producer.send("payment_failed", key=user_id, value=result)
                print("[PAYMENT] failed:", result)

            producer.flush() 
            consumer.commit()

        except Exception as e: 
            print("[PAYMENT] error:", e)
            time.sleep(1)

3. notification_service.py

더보기
import json
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "payment_completed",
    "payment_failed",
    bootstrap_servers="localhost:9092",
    group_id="notification-service",
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    key_deserializer=lambda k: k.decode("utf-8") if k else None,
    value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)

if __name__ == "__main__":
    print("[NOTI] start consuming payment results ...")
    for msg in consumer:
        event = msg.value
        topic = msg.topic
        if topic == "payment_completed":
            print(f"[NOTI] 결제완료 order={event['order_id']} user={event['user_id']} amount={event['amount']}")
        else:
            print(f"[NOTI] 결제실패 order={event['order_id']} user={event['user_id']} reason={event.get('reason')}")

 

이렇게 작성해서 진행해보도록 하겠습니다.

이제 터미널을 각각(3개)를 열어서 실행하도록 합니다~!

먼저 payment_service를 실행하면, 주문이 생성된것을 확인할 수 있습니다.

다음으로 notification_service를 실행하면,

결제가 진행되고 있는 것을 확인하고

 

 

 

이렇게 로그를 작성하고 실패한게 무엇인지 등 여러 데이터를 남길수 있습니다.

오늘은 간단한 예제를 확인해보았습니다.

아직은 정확한 흐름과 실무에서 어떻게 적용되는지 천천히 공부하고 있어서 미흡한점이 많습니다.. 흐흐

반응형