Kvalita dát je základom dôveryhodnej analytiky. Keď sú vaše event dáta nespoľahlivé, každý dashboard, report a rozhodnutie postavené na týchto dátach je podozrivé. Napriek tomu sú problémy s kvalitou dát prekvapivo bežné - podľa výskumu Gartner nízka kvalita dát stojí organizácie priemerne $12,9 milióna ročne, zatiaľ čo IBM odhaduje, že len v USA strácajú firmy $3,1 bilióna ročne kvôli zlej kvalite dát. Okrem finančného dopadu zamestnanci trávia až 27% svojho času opravovaním zlých dát, čo spomaľuje rozhodovanie a zvyšuje operačné náklady.
Tento sprievodca pokrýva dimenzie kvality dát, bežné problémy v event streamoch, ako budovať monitorovacie a observability systémy, testovacie stratégie a postupy incident response.
Dimenzie kvality dát
Kvalita dát nie je jediná metrika - je viacdimenzionálna. Široko akceptovaný framework zahŕňa šesť základných dimenzií. Pochopenie týchto dimenzií vám pomáha systematicky merať a zlepšovať kvalitu:
Presnosť
Reprezentujú dáta správne realitu?
- Definícia: Udalosti odrážajú, čo sa skutočne stalo v reálnom svete
- Príklady problémov: Nesprávne hodnoty, chybné výpočty, nesprávne priradené udalosti, preklepy pri zadávaní dát
- Meranie: Porovnanie so známymi zdrojmi pravdy, použitie primárneho výskumu alebo validácia referenčných dát tretích strán
-- Príklad kontroly presnosti
SELECT
source_system,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_date = CURRENT_DATE
GROUP BY source_system;
-- Porovnajte s vlastnými počtami zdrojového systému
-- Cross-validujte s externými referenčnými dátami kde je to možné
Kompletnosť
Sú prítomné všetky očakávané dáta?
- Definícia: Žiadne chýbajúce udalosti, properties, záznamy alebo časové obdobia; všetky povinné polia sú vyplnené
- Príklady problémov: Chýbajúce povinné properties, stratené udalosti, medzery v časových radoch, null hodnoty v povinných poliach
- Meranie: Kontrola null hodnôt, chýbajúcich období, očakávaných objemov a mier vyplnenosti povinných polí
-- Príklad kontroly kompletnosti
SELECT
event_name,
COUNT(*) as total_events,
COUNT(user_id) as events_with_user_id,
COUNT(session_id) as events_with_session_id,
ROUND(100.0 * COUNT(user_id) / COUNT(*), 2) as user_id_completeness,
ROUND(100.0 * COUNT(session_id) / COUNT(*), 2) as session_id_completeness
FROM events
GROUP BY event_name
ORDER BY user_id_completeness ASC;
Včasnosť
Sú dáta dostupné keď sú potrebné?
- Definícia: Udalosti prichádzajú v očakávaných časových oknách a odrážajú najaktuálnejšiu situáciu
- Príklady problémov: Oneskorene prichádzajúce dáta, neaktuálne dashboardy, oneskorenia spracovania, dáta nepripravené pre plánované reporty
- Meranie: Sledovanie latencie event_time vs. received_time, monitoring dodržiavania SLA
-- Príklad kontroly včasnosti
SELECT
DATE_TRUNC('hour', received_at) as hour,
AVG(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as avg_latency_seconds,
MAX(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as max_latency_seconds,
PERCENTILE_CONT(0.95) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
) as p95_latency_seconds,
PERCENTILE_CONT(0.99) WITHIN GROUP (
ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
) as p99_latency_seconds
FROM events
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', received_at)
ORDER BY hour;
Konzistencia
Sú dáta koherentné naprieč systémom?
- Definícia: Rovnaká entita má rovnaké hodnoty všade; dáta sú jednotné naprieč všetkými inštanciami a systémami
- Príklady problémov: Používateľ v jednej tabuľke s iným ID v druhej; nekonzistentné pomenovacie konvencie; konfliktné hodnoty medzi CRM a ERP systémami
- Meranie: Cross-reference validácia, vynucovanie schémy, kontroly konzistencie formátov
-- Príklad kontroly konzistencie
-- Kontrola nekonzistencií user_id medzi events a users tabuľkou
SELECT
e.user_id as event_user_id,
u.user_id as users_table_user_id,
CASE WHEN u.user_id IS NULL THEN 'Chýba v users' ELSE 'OK' END as status
FROM (SELECT DISTINCT user_id FROM events WHERE user_id IS NOT NULL) e
LEFT JOIN users u ON e.user_id = u.user_id
WHERE u.user_id IS NULL
LIMIT 100;
-- Kontrola konzistencie formátov
SELECT
event_name,
properties->>'date_format' as date_value,
CASE
WHEN properties->>'date_format' ~ '^\d{4}-\d{2}-\d{2}$' THEN 'ISO formát'
WHEN properties->>'date_format' ~ '^\d{2}/\d{2}/\d{4}$' THEN 'US formát'
ELSE 'Iný formát'
END as format_type
FROM events
WHERE event_name = 'Form Submitted';
Validita
Zodpovedajú dáta očakávaným formátom a obmedzeniam?
- Definícia: Hodnoty sú v očakávaných rozsahoch, formátoch a zodpovedajú business pravidlám
- Príklady problémov: Záporné ceny, budúce timestamps, neplatné enumy, nesprávne formátované email adresy, PSČ s nesprávnym počtom znakov
- Meranie: Validácia obmedzení, kontroly rozsahov, regex pattern matching, validácia business pravidiel
-- Príklad kontroly validity
SELECT
event_name,
COUNT(*) as invalid_events,
STRING_AGG(DISTINCT
CASE
WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'future_timestamp'
WHEN event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0 THEN 'negative_price'
WHEN properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown') THEN 'invalid_platform'
WHEN properties->>'email' !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 'invalid_email'
END, ', '
) as violation_types
FROM events
WHERE
event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
OR (event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0)
OR properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown')
GROUP BY event_name;
Unikátnosť
Sú dáta bez nechcených duplicít?
- Definícia: Každý záznam reprezentuje jedinú, unikátnu entitu bez neúmyselných duplicít
- Príklady problémov: Duplicitné záznamy zákazníkov (napr. "Fred Smith" a "Freddy Smith" ako samostatné záznamy), rovnaká udalosť zaznamenaná viackrát, záznamy zduplikované po migrácii
- Meranie: Validácia primárnych kľúčov, dotazy na detekciu duplicít, fuzzy matching pre takmer-duplicity
-- Príklad kontroly unikátnosti
-- Nájdite presné duplicity podľa event_id
SELECT
event_id,
COUNT(*) as occurrence_count,
MIN(received_at) as first_received,
MAX(received_at) as last_received
FROM events
GROUP BY event_id
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;
-- Nájdite potenciálne duplicitných používateľov (fuzzy matching)
SELECT
a.user_id as user_id_1,
b.user_id as user_id_2,
a.email,
a.name as name_1,
b.name as name_2
FROM users a
JOIN users b ON a.email = b.email AND a.user_id < b.user_id
LIMIT 100;
Bežné problémy v event streamoch
Pochopenie typických problémov vám pomáha budovať lepšiu prevenciu a detekciu:
Duplicitné udalosti
Rovnaká udalosť zaznamenaná viackrát.
-- Detekčný dotaz
SELECT
event_id,
user_id,
event_name,
COUNT(*) as occurrence_count,
MIN(event_timestamp) as first_occurrence,
MAX(event_timestamp) as last_occurrence
FROM events
GROUP BY event_id, user_id, event_name
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;
Bežné príčiny:
- Retry logika bez idempotency kľúčov
- Viacnásobná inicializácia SDK pri načítaní stránky
- Async spracovanie bez deduplikácie
- Network timeouty spôsobujúce opätovné odoslanie
- Message queue redelivery bez správneho spracovania
Chýbajúce udalosti
Očakávané udalosti neprichádzajú.
-- Detekcia: Porovnanie hodinových objemov so štatistickou detekciou anomálií
WITH hourly_volumes AS (
SELECT
DATE_TRUNC('hour', event_timestamp) as hour,
event_name,
COUNT(*) as event_count
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '7 days'
GROUP BY 1, 2
),
averages AS (
SELECT
event_name,
EXTRACT(HOUR FROM hour) as hour_of_day,
EXTRACT(DOW FROM hour) as day_of_week,
AVG(event_count) as avg_count,
STDDEV(event_count) as stddev_count
FROM hourly_volumes
GROUP BY 1, 2, 3
)
SELECT
hv.hour,
hv.event_name,
hv.event_count,
ROUND(a.avg_count, 2) as expected_count,
ROUND((hv.event_count - a.avg_count) / NULLIF(a.stddev_count, 0), 2) as z_score,
CASE
WHEN hv.event_count < a.avg_count - 2 * a.stddev_count THEN 'LOW_ANOMALY'
WHEN hv.event_count > a.avg_count + 2 * a.stddev_count THEN 'HIGH_ANOMALY'
ELSE 'OK'
END as status
FROM hourly_volumes hv
JOIN averages a ON hv.event_name = a.event_name
AND EXTRACT(HOUR FROM hv.hour) = a.hour_of_day
AND EXTRACT(DOW FROM hv.hour) = a.day_of_week
WHERE hv.hour > CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY hv.hour DESC;
Schema drift
Štruktúra udalosti sa neočakávane mení - polia pridané, odstránené, premenované alebo zmenený typ.
-- Detekcia: Sledovanie prítomnosti properties v čase
WITH daily_schema AS (
SELECT
event_name,
DATE_TRUNC('day', event_timestamp) as day,
jsonb_object_keys(properties) as property_name
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '14 days'
GROUP BY 1, 2, 3
),
property_timeline AS (
SELECT
event_name,
property_name,
MIN(day) as first_seen,
MAX(day) as last_seen,
COUNT(DISTINCT day) as days_present
FROM daily_schema
GROUP BY 1, 2
)
SELECT
event_name,
property_name,
first_seen,
last_seen,
days_present,
CASE
WHEN first_seen > CURRENT_DATE - INTERVAL '3 days' THEN 'NEW_PROPERTY'
WHEN last_seen < CURRENT_DATE - INTERVAL '3 days' THEN 'DISAPPEARED'
ELSE 'STABLE'
END as status
FROM property_timeline
WHERE first_seen > CURRENT_DATE - INTERVAL '3 days'
OR last_seen < CURRENT_DATE - INTERVAL '3 days'
ORDER BY event_name, first_seen DESC;
Nezhody dátových typov
Properties odosielané s nesprávnymi typmi.
-- Detekcia: Nájdite typové nekonzistencie
SELECT
event_name,
'price' as property_name,
properties->>'price' as value,
CASE
WHEN properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$' THEN 'numeric'
WHEN properties->>'price' ~ '^\$' THEN 'string_with_currency_symbol'
WHEN properties->>'price' ~ '^[0-9,]+\.?[0-9]*$' THEN 'numeric_with_commas'
WHEN properties->>'price' IS NULL THEN 'null'
ELSE 'non_numeric_string'
END as detected_type
FROM events
WHERE event_name = 'Purchase Completed'
AND properties->>'price' IS NOT NULL
AND NOT (properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$')
LIMIT 100;
Nesprávne timestamps
Udalosti s nesprávnymi časovými hodnotami.
-- Detekcia: Nájdite podozrivé timestamps
SELECT
event_name,
event_id,
event_timestamp,
received_at,
EXTRACT(EPOCH FROM (received_at - event_timestamp)) as latency_seconds,
CASE
WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'FUTURE_EVENT'
WHEN event_timestamp < '2020-01-01' THEN 'SUSPICIOUSLY_OLD'
WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) > 86400 THEN 'EXCESSIVE_LATENCY'
WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) < 0 THEN 'RECEIVED_BEFORE_EVENT'
ELSE 'OK'
END as issue_type
FROM events
WHERE
event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
OR event_timestamp < '2020-01-01'
OR ABS(EXTRACT(EPOCH FROM (received_at - event_timestamp))) > 86400
LIMIT 100;
Únik PII
Osobne identifikovateľné informácie objavujúce sa na neočakávaných miestach.
-- Detekcia: Sken potenciálnych PII v properties
SELECT
event_name,
event_id,
CASE
WHEN properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}' THEN 'POTENTIAL_SSN'
WHEN properties::text ~ '[0-9]{16}' THEN 'POTENTIAL_CREDIT_CARD'
WHEN properties::text ~* '[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}'
AND event_name NOT IN ('User Signup', 'Login') THEN 'UNEXPECTED_EMAIL'
ELSE 'OK'
END as pii_risk
FROM events
WHERE
properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}'
OR properties::text ~ '[0-9]{16}'
LIMIT 100;
Monitorovacie a observability systémy
Proaktívny monitoring zachytí problémy predtým, ako ovplyvnia rozhodnutia. Moderná data observability rozširuje tradičný monitoring aplikovaním DevOps best practices na dátové pipelines.
Kľúčové metriky na sledovanie
Implementujte monitoring pre tieto esenciálne metriky:
- MTTD (Mean Time to Detect): Priemerný čas od vzniku problému po jeho detekciu
- MTTR (Mean Time to Resolve): Priemerný čas od detekcie po vyriešenie
- Čerstvosť dát: Čas od poslednej aktualizácie dát
- Objemové anomálie: Odchýlky od očakávaných počtov udalostí
- Zmeny schémy: Nové, modifikované alebo odstránené polia
- Drift distribúcie: Štatistické posuny v hodnotách dát
Monitoring objemu
# Príklad konfigurácie alertu (pseudo-code)
monitor:
name: event_volume_anomaly
query: |
SELECT
event_name,
COUNT(*) as current_hour_count,
AVG(historical_count) as expected_count,
STDDEV(historical_count) as stddev_count
FROM current_events
JOIN historical_averages USING (event_name, hour_of_day, day_of_week)
GROUP BY event_name
alert_condition: |
current_hour_count < expected_count - 2 * stddev_count
OR current_hour_count > expected_count + 2 * stddev_count
severity: warning
channels: [slack, pagerduty]
runbook_url: https://wiki.company.com/runbooks/volume-anomaly
Monitoring čerstvosti
-- Kontrola čerstvosti dát naprieč všetkými kritickými tabuľkami
WITH freshness_check AS (
SELECT
'events' as table_name,
MAX(event_timestamp) as latest_record,
30 as max_age_minutes -- SLA threshold
FROM events
UNION ALL
SELECT
'page_views' as table_name,
MAX(timestamp) as latest_record,
15 as max_age_minutes
FROM page_views
UNION ALL
SELECT
'transactions' as table_name,
MAX(created_at) as latest_record,
5 as max_age_minutes
FROM transactions
)
SELECT
table_name,
latest_record,
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 as minutes_since_update,
max_age_minutes as sla_threshold_minutes,
CASE
WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes
THEN 'SLA_BREACH'
WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes * 0.8
THEN 'WARNING'
ELSE 'OK'
END as status
FROM freshness_check
ORDER BY status DESC, minutes_since_update DESC;
Monitoring schémy
-- Sledovanie kompletnosti povinných properties
WITH property_checks AS (
SELECT
event_name,
required_property,
COUNT(*) as total_events,
COUNT(CASE WHEN properties->>required_property IS NOT NULL THEN 1 END) as events_with_property,
COUNT(CASE WHEN properties->>required_property = '' THEN 1 END) as events_with_empty_value
FROM events
CROSS JOIN (
SELECT unnest(ARRAY['user_id', 'session_id', 'timestamp', 'platform']) as required_property
) required
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY event_name, required_property
)
SELECT
event_name,
required_property,
total_events,
events_with_property,
events_with_empty_value,
ROUND(100.0 * events_with_property / NULLIF(total_events, 0), 2) as completeness_pct,
CASE
WHEN events_with_property < total_events * 0.95 THEN 'CRITICAL'
WHEN events_with_property < total_events * 0.99 THEN 'WARNING'
ELSE 'OK'
END as status
FROM property_checks
WHERE events_with_property < total_events * 0.99
ORDER BY completeness_pct ASC;
Monitoring driftu distribúcie
-- Detekcia štatistického driftu v numerických poliach
WITH baseline AS (
SELECT
event_name,
AVG(CAST(properties->>'amount' AS DECIMAL)) as baseline_mean,
STDDEV(CAST(properties->>'amount' AS DECIMAL)) as baseline_stddev,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as baseline_median
FROM events
WHERE event_timestamp BETWEEN CURRENT_TIMESTAMP - INTERVAL '30 days'
AND CURRENT_TIMESTAMP - INTERVAL '1 day'
AND event_name = 'Purchase Completed'
),
current_window AS (
SELECT
event_name,
AVG(CAST(properties->>'amount' AS DECIMAL)) as current_mean,
STDDEV(CAST(properties->>'amount' AS DECIMAL)) as current_stddev,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as current_median
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 day'
AND event_name = 'Purchase Completed'
)
SELECT
b.event_name,
ROUND(b.baseline_mean, 2) as baseline_mean,
ROUND(c.current_mean, 2) as current_mean,
ROUND(ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0), 2) as z_score_shift,
CASE
WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 3 THEN 'SIGNIFICANT_DRIFT'
WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 2 THEN 'MODERATE_DRIFT'
ELSE 'OK'
END as drift_status
FROM baseline b
CROSS JOIN current_window c;
Data Quality Dashboard
Vybudujte komplexný data quality dashboard s týmito panelmi:
- Trendy objemu: Udalosti za hodinu so zvýraznením anomálií a day-over-day porovnaniami
- Metriky latencie: p50, p95, p99 ingestion latencia s SLA threshold čiarami
- Skóre kompletnosti: Miera vyplnenia povinných properties podľa typu udalosti
- Chybovosť: Zlyhané validácie podľa typu udalosti a kategórie chyby
- Zmeny schémy: Detekované nové properties s first-seen timestamps
- MTTD/MTTR trendy: Časy detekcie a riešenia incidentov v čase
- Čerstvosť dát: Čas od poslednej aktualizácie na kritickú tabuľku so SLA statusom
- Miera duplicít: Percentuálny podiel duplicitných udalostí podľa zdroja
Testovacie stratégie
Zachyťte problémy predtým, ako sa dostanú do produkcie s komplexným testovacím prístupom:
Unit testy pre tracking kód
// JavaScript príklad: Testovanie event trackingu
describe('Purchase Event', () => {
it('should include all required properties', () => {
const event = trackPurchase({
orderId: '12345',
total: 99.99,
currency: 'USD',
items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }]
});
expect(event.event_name).toBe('Purchase Completed');
expect(event.properties.order_id).toBeDefined();
expect(event.properties.total).toBeGreaterThan(0);
expect(event.properties.currency).toMatch(/^[A-Z]{3}$/);
expect(event.properties.item_count).toBeGreaterThan(0);
expect(event.timestamp).toBeDefined();
expect(new Date(event.timestamp)).toBeInstanceOf(Date);
});
it('should not include PII', () => {
const event = trackPurchase({
orderId: '12345',
total: 99.99,
currency: 'USD',
items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }],
customerEmail: 'test@example.com', // Toto by malo byť odfiltrované
creditCard: '4111111111111111' // Toto by malo byť odfiltrované
});
expect(event.properties.email).toBeUndefined();
expect(event.properties.customerEmail).toBeUndefined();
expect(event.properties.credit_card).toBeUndefined();
expect(event.properties.creditCard).toBeUndefined();
expect(JSON.stringify(event)).not.toContain('4111111111111111');
});
it('should handle edge cases gracefully', () => {
const event = trackPurchase({
orderId: '12345',
total: 0, // Free order
currency: 'USD',
items: []
});
expect(event.properties.total).toBe(0);
expect(event.properties.item_count).toBe(0);
});
});
Integračné testy
# Python príklad: End-to-end tracking test
import uuid
import time
from datetime import datetime
def test_event_pipeline_end_to_end():
"""Test, že eventy prúdia celým pipeline správne."""
# Vygenerujte testovaciu udalosť s unikátnym identifikátorom
test_event_id = f"test_{uuid.uuid4()}"
test_timestamp = datetime.utcnow().isoformat()
# Odošlite testovaciu udalosť
send_event({
"event_name": "Test Event",
"event_id": test_event_id,
"user_id": "test_user_integration",
"timestamp": test_timestamp,
"properties": {
"test_property": "test_value",
"numeric_property": 42
}
})
# Počkajte na spracovanie (upravte podľa latencie vášho pipeline SLA)
max_wait_seconds = 60
poll_interval = 5
elapsed = 0
result = None
while elapsed < max_wait_seconds:
time.sleep(poll_interval)
elapsed += poll_interval
result = query_warehouse(f"""
SELECT * FROM events
WHERE event_id = '{test_event_id}'
""")
if len(result) > 0:
break
# Overte, že udalosť dorazila
assert len(result) == 1, f"Udalosť by mala doraziť presne raz, dostal {len(result)}"
# Overte integritu dát
event = result[0]
assert event['user_id'] == 'test_user_integration'
assert event['event_name'] == 'Test Event'
assert event['properties']['test_property'] == 'test_value'
assert event['properties']['numeric_property'] == 42
# Overte, že latencia je v rámci SLA
received_time = event['received_at']
event_time = datetime.fromisoformat(test_timestamp)
latency_seconds = (received_time - event_time).total_seconds()
assert latency_seconds < 60, f"Latencia {latency_seconds}s prekračuje 60s SLA"
def test_duplicate_handling():
"""Test, že duplicitné eventy sú správne spracované."""
test_event_id = f"test_dup_{uuid.uuid4()}"
# Odošlite rovnakú udalosť dvakrát
for _ in range(2):
send_event({
"event_name": "Test Duplicate",
"event_id": test_event_id,
"user_id": "test_user",
"timestamp": datetime.utcnow().isoformat()
})
time.sleep(30)
result = query_warehouse(f"""
SELECT COUNT(*) as count FROM events
WHERE event_id = '{test_event_id}'
""")
# Podľa vašej stratégie deduplikácie:
# - Ak je deduped pri ingescii: count by mal byť 1
# - Ak je deduped pri dotaze: overte, že dedup logika funguje
assert result[0]['count'] == 1, "Duplicitné eventy by mali byť deduplikované"
Contract testy s JSON Schema
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Purchase Completed Event",
"type": "object",
"required": ["event_name", "event_id", "user_id", "timestamp", "properties"],
"properties": {
"event_name": {
"const": "Purchase Completed"
},
"event_id": {
"type": "string",
"minLength": 1,
"pattern": "^[a-zA-Z0-9_-]+$"
},
"user_id": {
"type": "string",
"minLength": 1
},
"timestamp": {
"type": "string",
"format": "date-time"
},
"properties": {
"type": "object",
"required": ["order_id", "total", "currency", "item_count"],
"properties": {
"order_id": {
"type": "string",
"minLength": 1
},
"total": {
"type": "number",
"minimum": 0
},
"currency": {
"type": "string",
"pattern": "^[A-Z]{3}$",
"enum": ["USD", "EUR", "GBP", "CAD", "AUD", "JPY"]
},
"item_count": {
"type": "integer",
"minimum": 0
},
"discount_code": {
"type": ["string", "null"]
}
},
"additionalProperties": true
}
},
"additionalProperties": false
}
Data Quality testy s dbt
# schema.yml - dbt data testy
version: 2
models:
- name: fct_events
description: "Faktová tabuľka obsahujúca všetky trackované eventy"
columns:
- name: event_id
description: "Unikátny identifikátor udalosti"
tests:
- unique
- not_null
- name: user_id
description: "Používateľ, ktorý spustil udalosť"
tests:
- not_null
- relationships:
to: ref('dim_users')
field: user_id
- name: event_timestamp
description: "Kedy sa udalosť stala"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "event_timestamp <= current_timestamp"
config:
error_if: ">100"
warn_if: ">10"
- name: platform
description: "Platforma, kde udalosť vznikla"
tests:
- accepted_values:
values: ['web', 'ios', 'android', 'unknown']
tests:
- dbt_utils.recency:
datepart: hour
field: event_timestamp
interval: 1
config:
severity: error
Great Expectations validácia
# Great Expectations validačná suite
import great_expectations as gx
# Vytvorte expectation suite pre events tabuľku
context = gx.get_context()
suite = context.add_expectation_suite("events_quality_suite")
# Pridajte expectations
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="event_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="event_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="price",
min_value=0,
max_value=100000 # Rozumná horná hranica
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(
min_value=1000, # Alert ak príliš málo riadkov
max_value=10000000 # Alert ak podozrivo veľa
)
)
Validácia v staging prostredí
- Spúšťajte všetok tracking cez staging pred produkčným nasadením
- Porovnávajte staging metriky s produkčnými baseline pre detekciu anomálií
- Validujte, že zmeny schémy nerozbijú downstream konzumentov
- Používajte data diff nástroje na porovnanie staging a produkčných výstupov
- Spúšťajte contract testy proti staging dátam pred mergnutím kódu
Incident Response
Keď nastanú problémy s kvalitou dát, reagujte systematicky s jasnými procesmi:
Úrovne závažnosti incidentov
| Úroveň | Popis | Čas odpovede | Príklad | Eskalácia |
|---|---|---|---|---|
| Kritická (P1) | Kompletná strata alebo poškodenie dát ovplyvňujúce business-kritické systémy | Okamžite (< 15 min) | Všetky eventy vypadávajú, produkčný pipeline dole | Page on-call, notifikácia vedenia |
| Vysoká (P2) | Významná degradácia kvality ovplyvňujúca kľúčové metriky | < 1 hodina | 50% strata eventov, chýba kritická property | Page on-call, notifikácia stakeholderov |
| Stredná (P3) | Čiastočné problémy s kvalitou s limitovaným business dopadom | < 4 hodiny | Chýbajúce nekritické properties, menší schema drift | Slack alert, najbližší dostupný inžinier |
| Nízka (P4) | Menšie problémy s kvalitou s minimálnym dopadom | Nasledujúci pracovný deň | Kozmetické zmeny schémy, nízko-objemové anomálie | Vytvorenie ticketu, prioritizácia backlogu |
Proces Incident Response
- Detekcia: Automatizovaný monitorovací alert alebo report používateľa spustí incident
- Potvrdenie: On-call inžinier potvrdí v rámci SLA, založí incident channel
- Triáž: Posúdenie závažnosti, rozsahu dopadu a dotknutých downstream systémov
- Komunikácia: Notifikácia dotknutých stakeholderov s počiatočným posúdením
- Mitigácia: Zastavenie krvácania - pozastavenie pipelines, zapnutie circuit breakerov, alebo rollback ak je potrebný
- Investigácia: Root cause analýza pomocou logov, lineage a metrík
- Oprava: Implementácia a testovanie riešenia v staging
- Nasadenie: Rollout opravy do produkcie s monitoringom
- Obnova: Backfill alebo korekcia dotknutých dát podľa potreby
- Review: Vykonanie post-incident review do 48 hodín
Šablóna Runbooku
# Data Quality Incident Runbook
## Informácie o incidente
- **ID incidentu:** [auto-generated]
- **Zdroj alertu:** [monitoring systém/report používateľa]
- **Čas detekcie:** [timestamp]
- **Závažnosť:** [P1/P2/P3/P4]
- **On-Call inžinier:** [meno]
## Počiatočné symptómy
- Popis: [čo bolo pozorované]
- Dotknuté eventy/tabuľky: [zoznam]
- Chybové správy: [relevantné logy]
## Posúdenie dopadu
- **Dotknutý časový rozsah:** [začiatok] do [koniec]
- **Objem dotknutých dát:** [odhadovaný počet záznamov]
- **Dotknuté downstream systémy:** [dashboardy, reporty, ML modely]
- **Business dopad:** [revenue, rozhodnutia, compliance]
- **Notifikovaní používatelia:** [zoznam]
## Diagnostické kroky
1. [ ] Skontrolujte status pipeline v orchestračnom nástroji
2. [ ] Preskúmajte error logy pre dotknuté časové okno
3. [ ] Query na dátové anomálie (objem, schéma, hodnoty)
4. [ ] Skontrolujte upstream dátové zdroje pre problémy
5. [ ] Preskúmajte nedávne deploymenty alebo zmeny konfigurácie
6. [ ] Trasujte dátový lineage pre identifikáciu bodu zlyhania
## Kroky riešenia
1. [ ] Implementujte okamžitú mitigáciu (pause, rollback, circuit breaker)
2. [ ] Identifikujte a zdokumentujte root cause
3. [ ] Vyviňte opravu a testujte v staging prostredí
4. [ ] Nasaďte opravu do produkcie
5. [ ] Overte, že oprava rieši problém
6. [ ] Backfillujte alebo opravte dotknuté dáta
7. [ ] Re-enableujte pozastavené pipelines
8. [ ] Overte, že downstream systémy sú zdravé
## Log komunikácie
| Čas | Update | Publikum |
|------|--------|----------|
| [timestamp] | Incident potvrdený | #data-incidents |
| [timestamp] | Počiatočné posúdenie | Stakeholderi |
| [timestamp] | Riešenie nasadené | Všetky dotknuté strany |
## Metriky
- **MTTD (Čas do detekcie):** [trvanie]
- **MTTR (Čas do vyriešenia):** [trvanie]
- **Strata/poškodenie dát:** [objem]
## Post-Incident
- **Root cause:** [detailný popis]
- **Prispievajúce faktory:** [zoznam]
- **Preventívne opatrenia:** [akčné položky]
- **Follow-up úlohy:** [vytvorené tickety]
- **Post-mortem naplánované:** [dátum/čas]
Bežné runbook scenáre
Zlyhanie pipeline
# Rýchle diagnostické príkazy
# Skontrolujte nedávne behy pipeline
$ airflow dags list-runs -d data_quality_pipeline --limit 10
# Zobrazte logy úlohy
$ airflow tasks logs data_quality_pipeline validate_events 2024-01-15
# Skontrolujte zaseknuté úlohy
$ airflow tasks list data_quality_pipeline --tree
Volume Drop Alert
-- Diagnostika poklesu objemu
-- Krok 1: Identifikujte kedy pokles začal
SELECT
DATE_TRUNC('hour', event_timestamp) as hour,
COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '48 hours'
GROUP BY 1
ORDER BY 1;
-- Krok 2: Skontrolujte podľa zdroja/platformy
SELECT
properties->>'source' as source,
DATE_TRUNC('hour', event_timestamp) as hour,
COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1, 2;
-- Krok 3: Skontrolujte chyby v raw eventoch
SELECT
error_type,
COUNT(*) as error_count
FROM raw_events_errors
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1
ORDER BY 2 DESC;
Budovanie kultúry kvality dát
Technické riešenia samotné nestačia - organizačný záväzok je nevyhnutný:
Vlastníctvo a zodpovednosť
- Priraďte jasných vlastníkov dát pre každý typ udalosti, tabuľku a pipeline
- Zahrňte metriky kvality dát do tímových OKR a performance reviews
- Vytvorte data quality SLO (Service Level Objectives) s definovanými thresholdmi
- Urobte quality dashboardy viditeľnými a prístupnými všetkým stakeholderom
- Ustanovte RACI maticu pre zodpovednosti kvality dát
Integrácia do procesov
- Vyžadujte kontroly kvality dát v code review checklistoch
- Zahrňte tracking validáciu do QA test plánov pre feature releasy
- Pridajte data contract review do definition of done
- Integrujte data testy do CI/CD pipelines s blokujúcimi zlyhaniami
- Plánujte pravidelné data quality review meetingy
Vzdelávanie a tréning
- Trénujte všetkých inžinierov v tracking best practices a bežných úskaliach
- Zdieľajte incident post-mortems široko pre šírenie poznatkov
- Vytvorte internú dokumentáciu a style guides pre dátové štandardy
- Oslavujte zlepšenia kvality a uznávajte dobré praktiky
- Organizujte data quality workshopy a brown bag sessions
Kontinuálne zlepšovanie
- Sledujte trendy metrík kvality v čase
- Vykonávajte štvrťročné data quality audity
- Preskúmajte a aktualizujte data contracts ako sa business potreby vyvíjajú
- Benchmarkujte voči priemyselným štandardom
- Investujte do zlepšení toolingu na základe incident patternov
Ďalšie kroky
- Audit aktuálneho stavu: Zmerajte kvalitu naprieč všetkými šiestimi dimenziami pre vaše kritické dátové assety
- Identifikujte medzery: Prioritizujte najväčšie problémy s kvalitou podľa business dopadu
- Vybudujte monitoring: Začnite s alertmi na objem, čerstvosť a schému pre high-priority tabuľky
- Implementujte testovanie: Pridajte validáciu do vášho CI/CD pipeline s contract testami
- Dokumentujte procesy: Vytvorte runbooky pre top 5 najbežnejších typov incidentov
- Priraďte vlastníctvo: Určte vlastníkov dát a ustanovte SLO
- Iterujte: Kontinuálne zlepšujte na základe incidentov a feedback stakeholderov
Kvalita dát nie je jednorazový projekt - je to prebiehajúca prax, ktorá vyžaduje trvalú investíciu. Návratnosť je dôveryhodná analytika, sebavedomé rozhodnutia, znížený čas strávený ladením dátových problémov a v konečnom dôsledku lepšie business výsledky. Organizácie, ktoré zaobchádzajú s kvalitou dát ako s prvotriednym záujmom, konzistentne prekonávajú tie, ktoré ju berú ako dodatočnú úvahu.