Kvalita dát je základom dôveryhodnej analytiky. Keď sú vaše event dáta nespoľahlivé, každý dashboard, report a rozhodnutie postavené na týchto dátach je podozrivé. Napriek tomu sú problémy s kvalitou dát prekvapivo bežné - podľa výskumu Gartner nízka kvalita dát stojí organizácie priemerne $12,9 milióna ročne, zatiaľ čo IBM odhaduje, že len v USA strácajú firmy $3,1 bilióna ročne kvôli zlej kvalite dát. Okrem finančného dopadu zamestnanci trávia až 27% svojho času opravovaním zlých dát, čo spomaľuje rozhodovanie a zvyšuje operačné náklady.

Tento sprievodca pokrýva dimenzie kvality dát, bežné problémy v event streamoch, ako budovať monitorovacie a observability systémy, testovacie stratégie a postupy incident response.

Dimenzie kvality dát

Kvalita dát nie je jediná metrika - je viacdimenzionálna. Široko akceptovaný framework zahŕňa šesť základných dimenzií. Pochopenie týchto dimenzií vám pomáha systematicky merať a zlepšovať kvalitu:

Presnosť

Reprezentujú dáta správne realitu?

  • Definícia: Udalosti odrážajú, čo sa skutočne stalo v reálnom svete
  • Príklady problémov: Nesprávne hodnoty, chybné výpočty, nesprávne priradené udalosti, preklepy pri zadávaní dát
  • Meranie: Porovnanie so známymi zdrojmi pravdy, použitie primárneho výskumu alebo validácia referenčných dát tretích strán
-- Príklad kontroly presnosti
SELECT
    source_system,
    COUNT(*) as event_count,
    COUNT(DISTINCT user_id) as unique_users
FROM events
WHERE event_date = CURRENT_DATE
GROUP BY source_system;

-- Porovnajte s vlastnými počtami zdrojového systému
-- Cross-validujte s externými referenčnými dátami kde je to možné

Kompletnosť

Sú prítomné všetky očakávané dáta?

  • Definícia: Žiadne chýbajúce udalosti, properties, záznamy alebo časové obdobia; všetky povinné polia sú vyplnené
  • Príklady problémov: Chýbajúce povinné properties, stratené udalosti, medzery v časových radoch, null hodnoty v povinných poliach
  • Meranie: Kontrola null hodnôt, chýbajúcich období, očakávaných objemov a mier vyplnenosti povinných polí
-- Príklad kontroly kompletnosti
SELECT
    event_name,
    COUNT(*) as total_events,
    COUNT(user_id) as events_with_user_id,
    COUNT(session_id) as events_with_session_id,
    ROUND(100.0 * COUNT(user_id) / COUNT(*), 2) as user_id_completeness,
    ROUND(100.0 * COUNT(session_id) / COUNT(*), 2) as session_id_completeness
FROM events
GROUP BY event_name
ORDER BY user_id_completeness ASC;

Včasnosť

Sú dáta dostupné keď sú potrebné?

  • Definícia: Udalosti prichádzajú v očakávaných časových oknách a odrážajú najaktuálnejšiu situáciu
  • Príklady problémov: Oneskorene prichádzajúce dáta, neaktuálne dashboardy, oneskorenia spracovania, dáta nepripravené pre plánované reporty
  • Meranie: Sledovanie latencie event_time vs. received_time, monitoring dodržiavania SLA
-- Príklad kontroly včasnosti
SELECT
    DATE_TRUNC('hour', received_at) as hour,
    AVG(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as avg_latency_seconds,
    MAX(EXTRACT(EPOCH FROM (received_at - event_timestamp))) as max_latency_seconds,
    PERCENTILE_CONT(0.95) WITHIN GROUP (
        ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
    ) as p95_latency_seconds,
    PERCENTILE_CONT(0.99) WITHIN GROUP (
        ORDER BY EXTRACT(EPOCH FROM (received_at - event_timestamp))
    ) as p99_latency_seconds
FROM events
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY DATE_TRUNC('hour', received_at)
ORDER BY hour;

Konzistencia

Sú dáta koherentné naprieč systémom?

  • Definícia: Rovnaká entita má rovnaké hodnoty všade; dáta sú jednotné naprieč všetkými inštanciami a systémami
  • Príklady problémov: Používateľ v jednej tabuľke s iným ID v druhej; nekonzistentné pomenovacie konvencie; konfliktné hodnoty medzi CRM a ERP systémami
  • Meranie: Cross-reference validácia, vynucovanie schémy, kontroly konzistencie formátov
-- Príklad kontroly konzistencie
-- Kontrola nekonzistencií user_id medzi events a users tabuľkou
SELECT
    e.user_id as event_user_id,
    u.user_id as users_table_user_id,
    CASE WHEN u.user_id IS NULL THEN 'Chýba v users' ELSE 'OK' END as status
FROM (SELECT DISTINCT user_id FROM events WHERE user_id IS NOT NULL) e
LEFT JOIN users u ON e.user_id = u.user_id
WHERE u.user_id IS NULL
LIMIT 100;

-- Kontrola konzistencie formátov
SELECT
    event_name,
    properties->>'date_format' as date_value,
    CASE
        WHEN properties->>'date_format' ~ '^\d{4}-\d{2}-\d{2}$' THEN 'ISO formát'
        WHEN properties->>'date_format' ~ '^\d{2}/\d{2}/\d{4}$' THEN 'US formát'
        ELSE 'Iný formát'
    END as format_type
FROM events
WHERE event_name = 'Form Submitted';

Validita

Zodpovedajú dáta očakávaným formátom a obmedzeniam?

  • Definícia: Hodnoty sú v očakávaných rozsahoch, formátoch a zodpovedajú business pravidlám
  • Príklady problémov: Záporné ceny, budúce timestamps, neplatné enumy, nesprávne formátované email adresy, PSČ s nesprávnym počtom znakov
  • Meranie: Validácia obmedzení, kontroly rozsahov, regex pattern matching, validácia business pravidiel
-- Príklad kontroly validity
SELECT
    event_name,
    COUNT(*) as invalid_events,
    STRING_AGG(DISTINCT
        CASE
            WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'future_timestamp'
            WHEN event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0 THEN 'negative_price'
            WHEN properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown') THEN 'invalid_platform'
            WHEN properties->>'email' !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 'invalid_email'
        END, ', '
    ) as violation_types
FROM events
WHERE
    event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
    OR (event_name = 'Purchase Completed' AND CAST(properties->>'price' AS DECIMAL) < 0)
    OR properties->>'platform' NOT IN ('web', 'ios', 'android', 'unknown')
GROUP BY event_name;

Unikátnosť

Sú dáta bez nechcených duplicít?

  • Definícia: Každý záznam reprezentuje jedinú, unikátnu entitu bez neúmyselných duplicít
  • Príklady problémov: Duplicitné záznamy zákazníkov (napr. "Fred Smith" a "Freddy Smith" ako samostatné záznamy), rovnaká udalosť zaznamenaná viackrát, záznamy zduplikované po migrácii
  • Meranie: Validácia primárnych kľúčov, dotazy na detekciu duplicít, fuzzy matching pre takmer-duplicity
-- Príklad kontroly unikátnosti
-- Nájdite presné duplicity podľa event_id
SELECT
    event_id,
    COUNT(*) as occurrence_count,
    MIN(received_at) as first_received,
    MAX(received_at) as last_received
FROM events
GROUP BY event_id
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;

-- Nájdite potenciálne duplicitných používateľov (fuzzy matching)
SELECT
    a.user_id as user_id_1,
    b.user_id as user_id_2,
    a.email,
    a.name as name_1,
    b.name as name_2
FROM users a
JOIN users b ON a.email = b.email AND a.user_id < b.user_id
LIMIT 100;

Bežné problémy v event streamoch

Pochopenie typických problémov vám pomáha budovať lepšiu prevenciu a detekciu:

Duplicitné udalosti

Rovnaká udalosť zaznamenaná viackrát.

-- Detekčný dotaz
SELECT
    event_id,
    user_id,
    event_name,
    COUNT(*) as occurrence_count,
    MIN(event_timestamp) as first_occurrence,
    MAX(event_timestamp) as last_occurrence
FROM events
GROUP BY event_id, user_id, event_name
HAVING COUNT(*) > 1
ORDER BY occurrence_count DESC
LIMIT 100;

Bežné príčiny:

  • Retry logika bez idempotency kľúčov
  • Viacnásobná inicializácia SDK pri načítaní stránky
  • Async spracovanie bez deduplikácie
  • Network timeouty spôsobujúce opätovné odoslanie
  • Message queue redelivery bez správneho spracovania

Chýbajúce udalosti

Očakávané udalosti neprichádzajú.

-- Detekcia: Porovnanie hodinových objemov so štatistickou detekciou anomálií
WITH hourly_volumes AS (
    SELECT
        DATE_TRUNC('hour', event_timestamp) as hour,
        event_name,
        COUNT(*) as event_count
    FROM events
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '7 days'
    GROUP BY 1, 2
),
averages AS (
    SELECT
        event_name,
        EXTRACT(HOUR FROM hour) as hour_of_day,
        EXTRACT(DOW FROM hour) as day_of_week,
        AVG(event_count) as avg_count,
        STDDEV(event_count) as stddev_count
    FROM hourly_volumes
    GROUP BY 1, 2, 3
)
SELECT
    hv.hour,
    hv.event_name,
    hv.event_count,
    ROUND(a.avg_count, 2) as expected_count,
    ROUND((hv.event_count - a.avg_count) / NULLIF(a.stddev_count, 0), 2) as z_score,
    CASE
        WHEN hv.event_count < a.avg_count - 2 * a.stddev_count THEN 'LOW_ANOMALY'
        WHEN hv.event_count > a.avg_count + 2 * a.stddev_count THEN 'HIGH_ANOMALY'
        ELSE 'OK'
    END as status
FROM hourly_volumes hv
JOIN averages a ON hv.event_name = a.event_name
    AND EXTRACT(HOUR FROM hv.hour) = a.hour_of_day
    AND EXTRACT(DOW FROM hv.hour) = a.day_of_week
WHERE hv.hour > CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY hv.hour DESC;

Schema drift

Štruktúra udalosti sa neočakávane mení - polia pridané, odstránené, premenované alebo zmenený typ.

-- Detekcia: Sledovanie prítomnosti properties v čase
WITH daily_schema AS (
    SELECT
        event_name,
        DATE_TRUNC('day', event_timestamp) as day,
        jsonb_object_keys(properties) as property_name
    FROM events
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '14 days'
    GROUP BY 1, 2, 3
),
property_timeline AS (
    SELECT
        event_name,
        property_name,
        MIN(day) as first_seen,
        MAX(day) as last_seen,
        COUNT(DISTINCT day) as days_present
    FROM daily_schema
    GROUP BY 1, 2
)
SELECT
    event_name,
    property_name,
    first_seen,
    last_seen,
    days_present,
    CASE
        WHEN first_seen > CURRENT_DATE - INTERVAL '3 days' THEN 'NEW_PROPERTY'
        WHEN last_seen < CURRENT_DATE - INTERVAL '3 days' THEN 'DISAPPEARED'
        ELSE 'STABLE'
    END as status
FROM property_timeline
WHERE first_seen > CURRENT_DATE - INTERVAL '3 days'
   OR last_seen < CURRENT_DATE - INTERVAL '3 days'
ORDER BY event_name, first_seen DESC;

Nezhody dátových typov

Properties odosielané s nesprávnymi typmi.

-- Detekcia: Nájdite typové nekonzistencie
SELECT
    event_name,
    'price' as property_name,
    properties->>'price' as value,
    CASE
        WHEN properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$' THEN 'numeric'
        WHEN properties->>'price' ~ '^\$' THEN 'string_with_currency_symbol'
        WHEN properties->>'price' ~ '^[0-9,]+\.?[0-9]*$' THEN 'numeric_with_commas'
        WHEN properties->>'price' IS NULL THEN 'null'
        ELSE 'non_numeric_string'
    END as detected_type
FROM events
WHERE event_name = 'Purchase Completed'
    AND properties->>'price' IS NOT NULL
    AND NOT (properties->>'price' ~ '^-?[0-9]+\.?[0-9]*$')
LIMIT 100;

Nesprávne timestamps

Udalosti s nesprávnymi časovými hodnotami.

-- Detekcia: Nájdite podozrivé timestamps
SELECT
    event_name,
    event_id,
    event_timestamp,
    received_at,
    EXTRACT(EPOCH FROM (received_at - event_timestamp)) as latency_seconds,
    CASE
        WHEN event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour' THEN 'FUTURE_EVENT'
        WHEN event_timestamp < '2020-01-01' THEN 'SUSPICIOUSLY_OLD'
        WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) > 86400 THEN 'EXCESSIVE_LATENCY'
        WHEN EXTRACT(EPOCH FROM (received_at - event_timestamp)) < 0 THEN 'RECEIVED_BEFORE_EVENT'
        ELSE 'OK'
    END as issue_type
FROM events
WHERE
    event_timestamp > CURRENT_TIMESTAMP + INTERVAL '1 hour'
    OR event_timestamp < '2020-01-01'
    OR ABS(EXTRACT(EPOCH FROM (received_at - event_timestamp))) > 86400
LIMIT 100;

Únik PII

Osobne identifikovateľné informácie objavujúce sa na neočakávaných miestach.

-- Detekcia: Sken potenciálnych PII v properties
SELECT
    event_name,
    event_id,
    CASE
        WHEN properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}' THEN 'POTENTIAL_SSN'
        WHEN properties::text ~ '[0-9]{16}' THEN 'POTENTIAL_CREDIT_CARD'
        WHEN properties::text ~* '[a-z0-9._%+-]+@[a-z0-9.-]+\.[a-z]{2,}'
             AND event_name NOT IN ('User Signup', 'Login') THEN 'UNEXPECTED_EMAIL'
        ELSE 'OK'
    END as pii_risk
FROM events
WHERE
    properties::text ~ '[0-9]{3}-[0-9]{2}-[0-9]{4}'
    OR properties::text ~ '[0-9]{16}'
LIMIT 100;

Monitorovacie a observability systémy

Proaktívny monitoring zachytí problémy predtým, ako ovplyvnia rozhodnutia. Moderná data observability rozširuje tradičný monitoring aplikovaním DevOps best practices na dátové pipelines.

Kľúčové metriky na sledovanie

Implementujte monitoring pre tieto esenciálne metriky:

  • MTTD (Mean Time to Detect): Priemerný čas od vzniku problému po jeho detekciu
  • MTTR (Mean Time to Resolve): Priemerný čas od detekcie po vyriešenie
  • Čerstvosť dát: Čas od poslednej aktualizácie dát
  • Objemové anomálie: Odchýlky od očakávaných počtov udalostí
  • Zmeny schémy: Nové, modifikované alebo odstránené polia
  • Drift distribúcie: Štatistické posuny v hodnotách dát

Monitoring objemu

# Príklad konfigurácie alertu (pseudo-code)
monitor:
  name: event_volume_anomaly
  query: |
    SELECT
      event_name,
      COUNT(*) as current_hour_count,
      AVG(historical_count) as expected_count,
      STDDEV(historical_count) as stddev_count
    FROM current_events
    JOIN historical_averages USING (event_name, hour_of_day, day_of_week)
    GROUP BY event_name
  alert_condition: |
    current_hour_count < expected_count - 2 * stddev_count
    OR current_hour_count > expected_count + 2 * stddev_count
  severity: warning
  channels: [slack, pagerduty]
  runbook_url: https://wiki.company.com/runbooks/volume-anomaly

Monitoring čerstvosti

-- Kontrola čerstvosti dát naprieč všetkými kritickými tabuľkami
WITH freshness_check AS (
    SELECT
        'events' as table_name,
        MAX(event_timestamp) as latest_record,
        30 as max_age_minutes  -- SLA threshold
    FROM events
    UNION ALL
    SELECT
        'page_views' as table_name,
        MAX(timestamp) as latest_record,
        15 as max_age_minutes
    FROM page_views
    UNION ALL
    SELECT
        'transactions' as table_name,
        MAX(created_at) as latest_record,
        5 as max_age_minutes
    FROM transactions
)
SELECT
    table_name,
    latest_record,
    EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 as minutes_since_update,
    max_age_minutes as sla_threshold_minutes,
    CASE
        WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes
        THEN 'SLA_BREACH'
        WHEN EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - latest_record)) / 60 > max_age_minutes * 0.8
        THEN 'WARNING'
        ELSE 'OK'
    END as status
FROM freshness_check
ORDER BY status DESC, minutes_since_update DESC;

Monitoring schémy

-- Sledovanie kompletnosti povinných properties
WITH property_checks AS (
    SELECT
        event_name,
        required_property,
        COUNT(*) as total_events,
        COUNT(CASE WHEN properties->>required_property IS NOT NULL THEN 1 END) as events_with_property,
        COUNT(CASE WHEN properties->>required_property = '' THEN 1 END) as events_with_empty_value
    FROM events
    CROSS JOIN (
        SELECT unnest(ARRAY['user_id', 'session_id', 'timestamp', 'platform']) as required_property
    ) required
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 hour'
    GROUP BY event_name, required_property
)
SELECT
    event_name,
    required_property,
    total_events,
    events_with_property,
    events_with_empty_value,
    ROUND(100.0 * events_with_property / NULLIF(total_events, 0), 2) as completeness_pct,
    CASE
        WHEN events_with_property < total_events * 0.95 THEN 'CRITICAL'
        WHEN events_with_property < total_events * 0.99 THEN 'WARNING'
        ELSE 'OK'
    END as status
FROM property_checks
WHERE events_with_property < total_events * 0.99
ORDER BY completeness_pct ASC;

Monitoring driftu distribúcie

-- Detekcia štatistického driftu v numerických poliach
WITH baseline AS (
    SELECT
        event_name,
        AVG(CAST(properties->>'amount' AS DECIMAL)) as baseline_mean,
        STDDEV(CAST(properties->>'amount' AS DECIMAL)) as baseline_stddev,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as baseline_median
    FROM events
    WHERE event_timestamp BETWEEN CURRENT_TIMESTAMP - INTERVAL '30 days'
                              AND CURRENT_TIMESTAMP - INTERVAL '1 day'
      AND event_name = 'Purchase Completed'
),
current_window AS (
    SELECT
        event_name,
        AVG(CAST(properties->>'amount' AS DECIMAL)) as current_mean,
        STDDEV(CAST(properties->>'amount' AS DECIMAL)) as current_stddev,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY CAST(properties->>'amount' AS DECIMAL)) as current_median
    FROM events
    WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '1 day'
      AND event_name = 'Purchase Completed'
)
SELECT
    b.event_name,
    ROUND(b.baseline_mean, 2) as baseline_mean,
    ROUND(c.current_mean, 2) as current_mean,
    ROUND(ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0), 2) as z_score_shift,
    CASE
        WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 3 THEN 'SIGNIFICANT_DRIFT'
        WHEN ABS(c.current_mean - b.baseline_mean) / NULLIF(b.baseline_stddev, 0) > 2 THEN 'MODERATE_DRIFT'
        ELSE 'OK'
    END as drift_status
FROM baseline b
CROSS JOIN current_window c;

Data Quality Dashboard

Vybudujte komplexný data quality dashboard s týmito panelmi:

  1. Trendy objemu: Udalosti za hodinu so zvýraznením anomálií a day-over-day porovnaniami
  2. Metriky latencie: p50, p95, p99 ingestion latencia s SLA threshold čiarami
  3. Skóre kompletnosti: Miera vyplnenia povinných properties podľa typu udalosti
  4. Chybovosť: Zlyhané validácie podľa typu udalosti a kategórie chyby
  5. Zmeny schémy: Detekované nové properties s first-seen timestamps
  6. MTTD/MTTR trendy: Časy detekcie a riešenia incidentov v čase
  7. Čerstvosť dát: Čas od poslednej aktualizácie na kritickú tabuľku so SLA statusom
  8. Miera duplicít: Percentuálny podiel duplicitných udalostí podľa zdroja

Testovacie stratégie

Zachyťte problémy predtým, ako sa dostanú do produkcie s komplexným testovacím prístupom:

Unit testy pre tracking kód

// JavaScript príklad: Testovanie event trackingu
describe('Purchase Event', () => {
    it('should include all required properties', () => {
        const event = trackPurchase({
            orderId: '12345',
            total: 99.99,
            currency: 'USD',
            items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }]
        });

        expect(event.event_name).toBe('Purchase Completed');
        expect(event.properties.order_id).toBeDefined();
        expect(event.properties.total).toBeGreaterThan(0);
        expect(event.properties.currency).toMatch(/^[A-Z]{3}$/);
        expect(event.properties.item_count).toBeGreaterThan(0);
        expect(event.timestamp).toBeDefined();
        expect(new Date(event.timestamp)).toBeInstanceOf(Date);
    });

    it('should not include PII', () => {
        const event = trackPurchase({
            orderId: '12345',
            total: 99.99,
            currency: 'USD',
            items: [{ sku: 'PROD-1', quantity: 2, price: 49.99 }],
            customerEmail: 'test@example.com',  // Toto by malo byť odfiltrované
            creditCard: '4111111111111111'       // Toto by malo byť odfiltrované
        });

        expect(event.properties.email).toBeUndefined();
        expect(event.properties.customerEmail).toBeUndefined();
        expect(event.properties.credit_card).toBeUndefined();
        expect(event.properties.creditCard).toBeUndefined();
        expect(JSON.stringify(event)).not.toContain('4111111111111111');
    });

    it('should handle edge cases gracefully', () => {
        const event = trackPurchase({
            orderId: '12345',
            total: 0,  // Free order
            currency: 'USD',
            items: []
        });

        expect(event.properties.total).toBe(0);
        expect(event.properties.item_count).toBe(0);
    });
});

Integračné testy

# Python príklad: End-to-end tracking test
import uuid
import time
from datetime import datetime

def test_event_pipeline_end_to_end():
    """Test, že eventy prúdia celým pipeline správne."""

    # Vygenerujte testovaciu udalosť s unikátnym identifikátorom
    test_event_id = f"test_{uuid.uuid4()}"
    test_timestamp = datetime.utcnow().isoformat()

    # Odošlite testovaciu udalosť
    send_event({
        "event_name": "Test Event",
        "event_id": test_event_id,
        "user_id": "test_user_integration",
        "timestamp": test_timestamp,
        "properties": {
            "test_property": "test_value",
            "numeric_property": 42
        }
    })

    # Počkajte na spracovanie (upravte podľa latencie vášho pipeline SLA)
    max_wait_seconds = 60
    poll_interval = 5
    elapsed = 0
    result = None

    while elapsed < max_wait_seconds:
        time.sleep(poll_interval)
        elapsed += poll_interval

        result = query_warehouse(f"""
            SELECT * FROM events
            WHERE event_id = '{test_event_id}'
        """)

        if len(result) > 0:
            break

    # Overte, že udalosť dorazila
    assert len(result) == 1, f"Udalosť by mala doraziť presne raz, dostal {len(result)}"

    # Overte integritu dát
    event = result[0]
    assert event['user_id'] == 'test_user_integration'
    assert event['event_name'] == 'Test Event'
    assert event['properties']['test_property'] == 'test_value'
    assert event['properties']['numeric_property'] == 42

    # Overte, že latencia je v rámci SLA
    received_time = event['received_at']
    event_time = datetime.fromisoformat(test_timestamp)
    latency_seconds = (received_time - event_time).total_seconds()
    assert latency_seconds < 60, f"Latencia {latency_seconds}s prekračuje 60s SLA"


def test_duplicate_handling():
    """Test, že duplicitné eventy sú správne spracované."""

    test_event_id = f"test_dup_{uuid.uuid4()}"

    # Odošlite rovnakú udalosť dvakrát
    for _ in range(2):
        send_event({
            "event_name": "Test Duplicate",
            "event_id": test_event_id,
            "user_id": "test_user",
            "timestamp": datetime.utcnow().isoformat()
        })

    time.sleep(30)

    result = query_warehouse(f"""
        SELECT COUNT(*) as count FROM events
        WHERE event_id = '{test_event_id}'
    """)

    # Podľa vašej stratégie deduplikácie:
    # - Ak je deduped pri ingescii: count by mal byť 1
    # - Ak je deduped pri dotaze: overte, že dedup logika funguje
    assert result[0]['count'] == 1, "Duplicitné eventy by mali byť deduplikované"

Contract testy s JSON Schema

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Purchase Completed Event",
    "type": "object",
    "required": ["event_name", "event_id", "user_id", "timestamp", "properties"],
    "properties": {
        "event_name": {
            "const": "Purchase Completed"
        },
        "event_id": {
            "type": "string",
            "minLength": 1,
            "pattern": "^[a-zA-Z0-9_-]+$"
        },
        "user_id": {
            "type": "string",
            "minLength": 1
        },
        "timestamp": {
            "type": "string",
            "format": "date-time"
        },
        "properties": {
            "type": "object",
            "required": ["order_id", "total", "currency", "item_count"],
            "properties": {
                "order_id": {
                    "type": "string",
                    "minLength": 1
                },
                "total": {
                    "type": "number",
                    "minimum": 0
                },
                "currency": {
                    "type": "string",
                    "pattern": "^[A-Z]{3}$",
                    "enum": ["USD", "EUR", "GBP", "CAD", "AUD", "JPY"]
                },
                "item_count": {
                    "type": "integer",
                    "minimum": 0
                },
                "discount_code": {
                    "type": ["string", "null"]
                }
            },
            "additionalProperties": true
        }
    },
    "additionalProperties": false
}

Data Quality testy s dbt

# schema.yml - dbt data testy
version: 2

models:
  - name: fct_events
    description: "Faktová tabuľka obsahujúca všetky trackované eventy"
    columns:
      - name: event_id
        description: "Unikátny identifikátor udalosti"
        tests:
          - unique
          - not_null

      - name: user_id
        description: "Používateľ, ktorý spustil udalosť"
        tests:
          - not_null
          - relationships:
              to: ref('dim_users')
              field: user_id

      - name: event_timestamp
        description: "Kedy sa udalosť stala"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "event_timestamp <= current_timestamp"
              config:
                error_if: ">100"
                warn_if: ">10"

      - name: platform
        description: "Platforma, kde udalosť vznikla"
        tests:
          - accepted_values:
              values: ['web', 'ios', 'android', 'unknown']

    tests:
      - dbt_utils.recency:
          datepart: hour
          field: event_timestamp
          interval: 1
          config:
            severity: error

Great Expectations validácia

# Great Expectations validačná suite
import great_expectations as gx

# Vytvorte expectation suite pre events tabuľku
context = gx.get_context()

suite = context.add_expectation_suite("events_quality_suite")

# Pridajte expectations
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="event_id")
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="event_id")
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeBetween(
        column="price",
        min_value=0,
        max_value=100000  # Rozumná horná hranica
    )
)

suite.add_expectation(
    gx.expectations.ExpectColumnValuesToMatchRegex(
        column="email",
        regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    )
)

suite.add_expectation(
    gx.expectations.ExpectTableRowCountToBeBetween(
        min_value=1000,  # Alert ak príliš málo riadkov
        max_value=10000000  # Alert ak podozrivo veľa
    )
)

Validácia v staging prostredí

  • Spúšťajte všetok tracking cez staging pred produkčným nasadením
  • Porovnávajte staging metriky s produkčnými baseline pre detekciu anomálií
  • Validujte, že zmeny schémy nerozbijú downstream konzumentov
  • Používajte data diff nástroje na porovnanie staging a produkčných výstupov
  • Spúšťajte contract testy proti staging dátam pred mergnutím kódu

Incident Response

Keď nastanú problémy s kvalitou dát, reagujte systematicky s jasnými procesmi:

Úrovne závažnosti incidentov

Úroveň Popis Čas odpovede Príklad Eskalácia
Kritická (P1) Kompletná strata alebo poškodenie dát ovplyvňujúce business-kritické systémy Okamžite (< 15 min) Všetky eventy vypadávajú, produkčný pipeline dole Page on-call, notifikácia vedenia
Vysoká (P2) Významná degradácia kvality ovplyvňujúca kľúčové metriky < 1 hodina 50% strata eventov, chýba kritická property Page on-call, notifikácia stakeholderov
Stredná (P3) Čiastočné problémy s kvalitou s limitovaným business dopadom < 4 hodiny Chýbajúce nekritické properties, menší schema drift Slack alert, najbližší dostupný inžinier
Nízka (P4) Menšie problémy s kvalitou s minimálnym dopadom Nasledujúci pracovný deň Kozmetické zmeny schémy, nízko-objemové anomálie Vytvorenie ticketu, prioritizácia backlogu

Proces Incident Response

  1. Detekcia: Automatizovaný monitorovací alert alebo report používateľa spustí incident
  2. Potvrdenie: On-call inžinier potvrdí v rámci SLA, založí incident channel
  3. Triáž: Posúdenie závažnosti, rozsahu dopadu a dotknutých downstream systémov
  4. Komunikácia: Notifikácia dotknutých stakeholderov s počiatočným posúdením
  5. Mitigácia: Zastavenie krvácania - pozastavenie pipelines, zapnutie circuit breakerov, alebo rollback ak je potrebný
  6. Investigácia: Root cause analýza pomocou logov, lineage a metrík
  7. Oprava: Implementácia a testovanie riešenia v staging
  8. Nasadenie: Rollout opravy do produkcie s monitoringom
  9. Obnova: Backfill alebo korekcia dotknutých dát podľa potreby
  10. Review: Vykonanie post-incident review do 48 hodín

Šablóna Runbooku

# Data Quality Incident Runbook

## Informácie o incidente
- **ID incidentu:** [auto-generated]
- **Zdroj alertu:** [monitoring systém/report používateľa]
- **Čas detekcie:** [timestamp]
- **Závažnosť:** [P1/P2/P3/P4]
- **On-Call inžinier:** [meno]

## Počiatočné symptómy
- Popis: [čo bolo pozorované]
- Dotknuté eventy/tabuľky: [zoznam]
- Chybové správy: [relevantné logy]

## Posúdenie dopadu
- **Dotknutý časový rozsah:** [začiatok] do [koniec]
- **Objem dotknutých dát:** [odhadovaný počet záznamov]
- **Dotknuté downstream systémy:** [dashboardy, reporty, ML modely]
- **Business dopad:** [revenue, rozhodnutia, compliance]
- **Notifikovaní používatelia:** [zoznam]

## Diagnostické kroky
1. [ ] Skontrolujte status pipeline v orchestračnom nástroji
2. [ ] Preskúmajte error logy pre dotknuté časové okno
3. [ ] Query na dátové anomálie (objem, schéma, hodnoty)
4. [ ] Skontrolujte upstream dátové zdroje pre problémy
5. [ ] Preskúmajte nedávne deploymenty alebo zmeny konfigurácie
6. [ ] Trasujte dátový lineage pre identifikáciu bodu zlyhania

## Kroky riešenia
1. [ ] Implementujte okamžitú mitigáciu (pause, rollback, circuit breaker)
2. [ ] Identifikujte a zdokumentujte root cause
3. [ ] Vyviňte opravu a testujte v staging prostredí
4. [ ] Nasaďte opravu do produkcie
5. [ ] Overte, že oprava rieši problém
6. [ ] Backfillujte alebo opravte dotknuté dáta
7. [ ] Re-enableujte pozastavené pipelines
8. [ ] Overte, že downstream systémy sú zdravé

## Log komunikácie
| Čas | Update | Publikum |
|------|--------|----------|
| [timestamp] | Incident potvrdený | #data-incidents |
| [timestamp] | Počiatočné posúdenie | Stakeholderi |
| [timestamp] | Riešenie nasadené | Všetky dotknuté strany |

## Metriky
- **MTTD (Čas do detekcie):** [trvanie]
- **MTTR (Čas do vyriešenia):** [trvanie]
- **Strata/poškodenie dát:** [objem]

## Post-Incident
- **Root cause:** [detailný popis]
- **Prispievajúce faktory:** [zoznam]
- **Preventívne opatrenia:** [akčné položky]
- **Follow-up úlohy:** [vytvorené tickety]
- **Post-mortem naplánované:** [dátum/čas]

Bežné runbook scenáre

Zlyhanie pipeline

# Rýchle diagnostické príkazy

# Skontrolujte nedávne behy pipeline
$ airflow dags list-runs -d data_quality_pipeline --limit 10

# Zobrazte logy úlohy
$ airflow tasks logs data_quality_pipeline validate_events 2024-01-15

# Skontrolujte zaseknuté úlohy
$ airflow tasks list data_quality_pipeline --tree

Volume Drop Alert

-- Diagnostika poklesu objemu
-- Krok 1: Identifikujte kedy pokles začal
SELECT
    DATE_TRUNC('hour', event_timestamp) as hour,
    COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '48 hours'
GROUP BY 1
ORDER BY 1;

-- Krok 2: Skontrolujte podľa zdroja/platformy
SELECT
    properties->>'source' as source,
    DATE_TRUNC('hour', event_timestamp) as hour,
    COUNT(*) as events
FROM events
WHERE event_timestamp > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1, 2
ORDER BY 1, 2;

-- Krok 3: Skontrolujte chyby v raw eventoch
SELECT
    error_type,
    COUNT(*) as error_count
FROM raw_events_errors
WHERE received_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY 1
ORDER BY 2 DESC;

Budovanie kultúry kvality dát

Technické riešenia samotné nestačia - organizačný záväzok je nevyhnutný:

Vlastníctvo a zodpovednosť

  • Priraďte jasných vlastníkov dát pre každý typ udalosti, tabuľku a pipeline
  • Zahrňte metriky kvality dát do tímových OKR a performance reviews
  • Vytvorte data quality SLO (Service Level Objectives) s definovanými thresholdmi
  • Urobte quality dashboardy viditeľnými a prístupnými všetkým stakeholderom
  • Ustanovte RACI maticu pre zodpovednosti kvality dát

Integrácia do procesov

  • Vyžadujte kontroly kvality dát v code review checklistoch
  • Zahrňte tracking validáciu do QA test plánov pre feature releasy
  • Pridajte data contract review do definition of done
  • Integrujte data testy do CI/CD pipelines s blokujúcimi zlyhaniami
  • Plánujte pravidelné data quality review meetingy

Vzdelávanie a tréning

  • Trénujte všetkých inžinierov v tracking best practices a bežných úskaliach
  • Zdieľajte incident post-mortems široko pre šírenie poznatkov
  • Vytvorte internú dokumentáciu a style guides pre dátové štandardy
  • Oslavujte zlepšenia kvality a uznávajte dobré praktiky
  • Organizujte data quality workshopy a brown bag sessions

Kontinuálne zlepšovanie

  • Sledujte trendy metrík kvality v čase
  • Vykonávajte štvrťročné data quality audity
  • Preskúmajte a aktualizujte data contracts ako sa business potreby vyvíjajú
  • Benchmarkujte voči priemyselným štandardom
  • Investujte do zlepšení toolingu na základe incident patternov

Ďalšie kroky

  1. Audit aktuálneho stavu: Zmerajte kvalitu naprieč všetkými šiestimi dimenziami pre vaše kritické dátové assety
  2. Identifikujte medzery: Prioritizujte najväčšie problémy s kvalitou podľa business dopadu
  3. Vybudujte monitoring: Začnite s alertmi na objem, čerstvosť a schému pre high-priority tabuľky
  4. Implementujte testovanie: Pridajte validáciu do vášho CI/CD pipeline s contract testami
  5. Dokumentujte procesy: Vytvorte runbooky pre top 5 najbežnejších typov incidentov
  6. Priraďte vlastníctvo: Určte vlastníkov dát a ustanovte SLO
  7. Iterujte: Kontinuálne zlepšujte na základe incidentov a feedback stakeholderov

Kvalita dát nie je jednorazový projekt - je to prebiehajúca prax, ktorá vyžaduje trvalú investíciu. Návratnosť je dôveryhodná analytika, sebavedomé rozhodnutia, znížený čas strávený ladením dátových problémov a v konečnom dôsledku lepšie business výsledky. Organizácie, ktoré zaobchádzajú s kvalitou dát ako s prvotriednym záujmom, konzistentne prekonávajú tie, ktoré ju berú ako dodatočnú úvahu.