안녕하세요. 위기의 코딩맨입니다.
Kafka를 포스팅하고, 이제 예제를 풀어보도록 하겠습니다.
Kafka에 대해서 알아보고 싶으시면 아래 링크를 확인해주세요~!
[We-Co] Kafka에 대해서 간단하게 알아보자.
안녕하세요. 위기의 코딩맨입니다.요즘 너무 많은 일이 있어서 오랜만에 포스팅을 작성해봅니다. ㅎㅎㅎ오늘은 kafka에 대해서 간단하게 알아보도록 하겠습니다. [ Apache Kafka란? ]Apache Kafka는 대용
we-co.tistory.com
[ 예제 ]
예제로 풀어볼 흐름을 설명드리도록 하겠습니다.
[Order Service] --(order_created)--> Kafka --consume--> [Payment Service]
[Payment Service] --(payment_completed / payment_failed)--> Kafka --consume--> [Notification Service]
- Order Service: 주문 생성 이벤트 발행(Producer)
- Payment Service: 주문 이벤트 소비(Consumer) → 결제 처리 → 결과 이벤트 발행(Producer)
- Notification Service: 결제 결과 이벤트 소비(Consumer) → 알림 출력(또는 메일/SMS)
흐름으로 진행을 해보도록 하겠습니다.
먼저 카프카의 토픽을 설정하도록 하겠습니다.
토픽은 위에 링크에서 설치를 진행하고, 진행하셔야 합니다.
[ Topic ]
먼저 오더, 페이, 결과 토픽을 생성해주도록 합니다.
cd ~/opt/kafka
bin/kafka-topics.sh --create --topic order_created \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic payment_completed \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --create --topic payment_failed \
--bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
[ Python ]
파이썬 코드로 진행했으며, 먼저 라이브러리를 설치합니다.
pip install kafka-python
그리고 각각의 생성, 완료, 결과에 대한 소스코드를 작성합니다.
1. order_service.py
import json, time, uuid
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
key_serializer=lambda k: k.encode("utf-8"),
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
def create_order(user_id: str, amount: int, currency="KRW"):
order_id = f"o-{uuid.uuid4().hex[:8]}"
event = {
"order_id": order_id,
"user_id": user_id,
"amount": amount,
"currency": currency,
"created_at": int(time.time()),
}
producer.send("order_created", key=user_id, value=event)
producer.flush()
print("[ORDER] created:", event)
if __name__ == "__main__":
for i in range(5):
uid = f"u-{(i % 2) + 1}"
create_order(uid, amount=10000 + i * 1000)
time.sleep(1)
2. payment_service.py
import json, time, uuid, random
from kafka import KafkaConsumer, KafkaProducer
consumer = KafkaConsumer(
"order_created",
bootstrap_servers="localhost:9092",
group_id="payment-service",
auto_offset_reset="earliest",
enable_auto_commit=False,
key_deserializer=lambda k: k.decode("utf-8") if k else None,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
key_serializer=lambda k: k.encode("utf-8"),
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
def pay(order_event: dict) -> dict:
ok = random.random() < 0.8
now = int(time.time())
base = {
"order_id": order_event["order_id"],
"user_id": order_event["user_id"],
"amount": order_event["amount"],
}
if ok:
return {
**base,
"status": "PAID",
"paid_at": now,
"payment_id": f"p-{uuid.uuid4().hex[:6]}",
}
else:
return {
**base,
"status": "FAILED",
"failed_at": now,
"reason": "INSUFFICIENT_FUNDS",
}
if __name__ == "__main__":
print("[PAYMENT] start consuming order_created ...")
for msg in consumer:
order = msg.value
user_id = msg.key or order["user_id"]
try:
result = pay(order)
if result["status"] == "PAID":
producer.send("payment_completed", key=user_id, value=result)
print("[PAYMENT] completed:", result)
else:
producer.send("payment_failed", key=user_id, value=result)
print("[PAYMENT] failed:", result)
producer.flush()
consumer.commit()
except Exception as e:
print("[PAYMENT] error:", e)
time.sleep(1)
3. notification_service.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"payment_completed",
"payment_failed",
bootstrap_servers="localhost:9092",
group_id="notification-service",
auto_offset_reset="earliest",
enable_auto_commit=True,
key_deserializer=lambda k: k.decode("utf-8") if k else None,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
if __name__ == "__main__":
print("[NOTI] start consuming payment results ...")
for msg in consumer:
event = msg.value
topic = msg.topic
if topic == "payment_completed":
print(f"[NOTI] 결제완료 order={event['order_id']} user={event['user_id']} amount={event['amount']}")
else:
print(f"[NOTI] 결제실패 order={event['order_id']} user={event['user_id']} reason={event.get('reason')}")
이렇게 작성해서 진행해보도록 하겠습니다.
이제 터미널을 각각(3개)를 열어서 실행하도록 합니다~!
먼저 payment_service를 실행하면, 주문이 생성된것을 확인할 수 있습니다.

다음으로 notification_service를 실행하면,
결제가 진행되고 있는 것을 확인하고




이렇게 로그를 작성하고 실패한게 무엇인지 등 여러 데이터를 남길수 있습니다.
오늘은 간단한 예제를 확인해보았습니다.
아직은 정확한 흐름과 실무에서 어떻게 적용되는지 천천히 공부하고 있어서 미흡한점이 많습니다.. 흐흐

'기타' 카테고리의 다른 글
| [We-Co] Kafka에 대해서 간단하게 알아보자. (1) | 2025.12.23 |
|---|---|
| [We-Co] ChatGPT-5 - 달라진 GPT-5를 알아보자! (6) | 2025.08.08 |
| [We-Co] RAG - LangChain , return_source_documents (0) | 2025.07.16 |
| [We-Co] RAG 검색과 응답 최적화 - Chunking (0) | 2025.07.08 |
| [We-Co] langchain - FewShotPromptTemplate란 무엇일까! (0) | 2025.07.02 |