6.8 KiB
6.8 KiB
id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
| id | title | category | status | source_trust_level | verification_status | created_at | updated_at | tags | tech_stack | applied_in | aliases | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| data-eng-schema-registry | Schema Registry — Avro / Protobuf / 호환성 | Coding | draft | B | conceptual | 2026-05-09 | 2026-05-09 |
|
|
|
Schema Registry
Streaming / messaging 의 schema 진화 관리. Producer = schema register, Consumer = schema fetch. Confluent Schema Registry, Apicurio. Avro / Protobuf / JSON Schema.
📖 핵심 개념
- Schema: 메시지 format.
- Subject: schema 의 namespace.
- Version: 진화 단계.
- Compatibility: 옛 / 새 호환.
💻 코드 패턴
Avro schema
{
"type": "record",
"name": "Order",
"namespace": "com.acme.events",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "user_id", "type": "string" },
{ "name": "amount", "type": { "type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2 } },
{ "name": "created_at", "type": { "type": "long", "logicalType": "timestamp-millis" } }
]
}
등록
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "..."}' \
http://schema-registry:8081/subjects/orders-value/versions
Producer (KafkaJS + Avro)
import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry';
import { Kafka } from 'kafkajs';
const registry = new SchemaRegistry({ host: 'http://schema-registry:8081' });
const schema = `
{
"type": "record",
"name": "Order",
"fields": [...]
}`;
const { id } = await registry.register({ type: SchemaType.AVRO, schema });
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const producer = kafka.producer();
await producer.connect();
const message = await registry.encode(id, { id: '...', user_id: '...', amount: '99.50', created_at: Date.now() });
await producer.send({ topic: 'orders', messages: [{ key: id, value: message }] });
Consumer
const consumer = kafka.consumer({ groupId: 'orders-processor' });
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ message }) => {
const decoded = await registry.decode(message.value!);
console.log(decoded); // typed object
},
});
Protobuf
syntax = "proto3";
package com.acme.events;
message Order {
string id = 1;
string user_id = 2;
double amount = 3;
google.protobuf.Timestamp created_at = 4;
}
# Code generation
buf generate
# 또는 protoc
const { id } = await registry.register({ type: SchemaType.PROTOBUF, schema: protoSchema });
JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Order",
"type": "object",
"required": ["id", "user_id", "amount", "created_at"],
"properties": {
"id": { "type": "string", "format": "uuid" },
"user_id": { "type": "string" },
"amount": { "type": "string", "pattern": "^\\d+\\.\\d{2}$" },
"created_at": { "type": "integer" }
}
}
Compatibility 정책
BACKWARD: 새 schema 가 옛 데이터 read 가능 (consumer first 업그레이드)
FORWARD: 옛 schema 가 새 데이터 read 가능 (producer first)
FULL: BACKWARD + FORWARD
NONE: 검사 X
TRANSITIVE: 모든 옛 version 호환
→ 보통 BACKWARD 가 안전 default.
curl -X PUT -H "Content-Type: application/json" \
--data '{"compatibility": "BACKWARD"}' \
http://schema-registry:8081/config/orders-value
Schema 변경 — Backwards-compatible
✅ 새 optional field 추가 (default value)
✅ 새 enum value 추가 (default 케이스 있으면)
✅ Field 이름 alias (Avro)
✅ 더 큰 type (int → long)
❌ Required field 추가
❌ Field 제거
❌ Type 변경 (int → string)
❌ Enum 값 제거
Buf (Protobuf modern tool)
# buf.yaml
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULT
buf lint
buf breaking --against '.git#branch=main'
buf generate
# Schema registry push
buf push --tag v1.0.0
→ Schema 도 monorepo + git workflow.
Code generation
# Avro → TS
npx avsc avro2ts schemas/order.avsc -o src/types/order.ts
# Protobuf → TS (ts-proto)
protoc --plugin=protoc-gen-ts_proto=./node_modules/.bin/protoc-gen-ts_proto \
--ts_proto_out=. order.proto
# Buf
buf generate
→ Type-safe consumer.
Kafka serialization
Wire format = magic byte (1) + schema id (4) + payload
→ Consumer 가 schema id 로 registry fetch + decode
Multiple subjects per topic
키 / value 별 schema:
- orders-key: 단순 string id
- orders-value: 위 Order schema
또는 multi-event topic:
- orders.user-orders-value
- orders.fraud-detected-value
Schema 폐지
# Soft delete
curl -X DELETE http://schema-registry:8081/subjects/orders-value/versions/1
# Hard delete (admin only)
curl -X DELETE http://schema-registry:8081/subjects/orders-value/versions/1?permanent=true
→ Consumer 가 옛 version 안 사용 보장 후.
CI 검증
- name: Schema breaking check
run: |
buf breaking --against 'git://example.com/repo.git#branch=main'
- name: Lint schemas
run: |
buf lint
→ PR 가 breaking schema 차단.
Apicurio (open-source 대안)
Confluent Schema Registry 라이센스 / 가격 부담 시.
Apicurio = open Apache, Kafka / multi-protocol (Avro/Proto/JSON).
REST API client (별 streaming)
// Schema 정보로 generated DTO 사용
import type { Order } from './generated/order';
app.post('/orders', async (req, res) => {
const order: Order = req.body;
// type-safe
});
Datacontract (마이크로서비스)
"내 Kafka topic 가 이 schema 보장" — 다른 팀 / service 가 의존.
schema = data contract.
변경 시 communication + breaking check.
🤔 의사결정 기준
| 상황 | 추천 |
|---|---|
| Kafka + 큰 throughput | Avro / Protobuf |
| Type safety 강 | Protobuf + buf |
| Polyglot (다언어) | Avro / Protobuf |
| 단일 언어 + 단순 | JSON Schema 또는 Zod |
| Confluent Cloud | Schema Registry built-in |
| Self-host | Apicurio |
❌ 안티패턴
- JSON without schema: drift, 검증 X.
- Schema 변경 + 등록 안 함: consumer 깨짐.
- NONE compatibility: 모든 변경 OK — 카오스.
- Required field 추가: BACKWARD 깨짐.
- Field 제거: BACKWARD 깨짐.
- Schema 다양 location: 한 곳 (registry) 만.
- Code generation 안 함: type drift.
🤖 LLM 활용 힌트
- Avro / Protobuf + Schema Registry.
- BACKWARD default.
- Code generation 매 schema 변경.
- Buf / Apicurio = modern.