Skip to content

Lösungen - LangGraph & Advanced Workflows

🎯 Übersicht

Diese Musterlösungen zeigen vollständige Implementierungen für alle LangGraph-Übungen. Nutzen Sie diese zur Überprüfung Ihrer eigenen Lösungen oder als Referenz für erweiterte Features.


Lösung 1: Einfacher Risk-Classifier

Vollständige Implementierung

from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
import re

# Zustandstyp definieren
class RiskState(TypedDict):
    question: str
    risk_level: Optional[str]
    response: Optional[str]

def classify_risk(state):
    """Klassifiziert Risiken basierend auf Schlüsselwörtern"""
    question = state["question"].lower()

    # High-Risk Schlüsselwörter
    high_risk_keywords = [
        'verdächtig', 'betrug', 'fraud', 'geldwäsche', 'kredit', 
        'default', 'ausfall', 'insolvenz', 'cyber', 'security'
    ]

    # Medium-Risk Schlüsselwörter
    medium_risk_keywords = [
        'markt', 'volatilität', 'compliance', 'regulation',
        'audit', 'prüfung', 'bewertung'
    ]

    # Risikolevel bestimmen
    if any(keyword in question for keyword in high_risk_keywords):
        risk_level = "high"
    elif any(keyword in question for keyword in medium_risk_keywords):
        risk_level = "medium"
    else:
        risk_level = "low"

    return {"risk_level": risk_level}

def handle_risk(state):
    """Generiert angemessene Antworten basierend auf Risikolevel"""
    risk_level = state["risk_level"]

    responses = {
        "high": "🚨 HOCH-RISIKO: Sofortige detaillierte Analyse und Eskalation erforderlich. Compliance-Team informieren.",
        "medium": "⚠️ MITTEL-RISIKO: Erweiterte Prüfung erforderlich. Risikomanagement einbeziehen.",
        "low": "✅ NIEDRIG-RISIKO: Standard-Überwachung ausreichend. Routine-Verarbeitung möglich."
    }

    response = responses.get(risk_level, "Unbekanntes Risikolevel")
    return {"response": response}

# Graph aufbauen
def create_risk_classifier():
    graph = StateGraph(RiskState)
    graph.add_node("classify", classify_risk)
    graph.add_node("handle", handle_risk)
    graph.add_edge("classify", "handle")
    graph.add_edge("handle", END)
    graph.set_entry_point("classify")

    return graph.compile()

# Test der Implementierung
def test_risk_classifier():
    app = create_risk_classifier()

    test_cases = [
        {"question": "Kreditrisiko bei Immobilienfinanzierung"},
        {"question": "Routine Kontobewegung prüfen"},
        {"question": "Verdächtiger Geldtransfer erkannt"},
        {"question": "Marktvolatilität Analyse"},
        {"question": "Cyber Security Vorfall"}
    ]

    print("=== Risk Classifier Test Results ===\n")
    for i, test in enumerate(test_cases, 1):
        result = app.invoke(test)
        print(f"Test {i}:")
        print(f"Input: {test['question']}")
        print(f"Risk Level: {result['risk_level']}")
        print(f"Response: {result['response']}\n")

if __name__ == "__main__":
    test_risk_classifier()

Erweiterungen

def classify_risk_advanced(state):
    """Erweiterte Risikoklassifizierung mit ML-ähnlicher Logik"""
    question = state["question"].lower()

    # Gewichtete Schlüsselwort-Analyse
    risk_scores = {
        'fraud': 10, 'betrug': 10, 'verdächtig': 8,
        'geldwäsche': 10, 'kredit': 7, 'ausfall': 9,
        'cyber': 8, 'security': 6, 'compliance': 5,
        'markt': 4, 'volatilität': 6, 'regulation': 4
    }

    total_score = sum(score for keyword, score in risk_scores.items() 
                     if keyword in question)

    if total_score >= 8:
        risk_level = "high"
    elif total_score >= 4:
        risk_level = "medium"
    else:
        risk_level = "low"

    return {
        "risk_level": risk_level,
        "risk_score": total_score
    }

Lösung 2: Routing-Workflow für Finanzrisiken

Erweiterte StateGraph-Implementierung

from typing import TypedDict, Optional, Literal
from langgraph.graph import StateGraph, END

class AdvancedRiskState(TypedDict):
    question: str
    risk_type: Optional[str]
    risk_level: Optional[str]
    expert_analysis: Optional[str]
    final_recommendation: Optional[str]
    confidence_score: Optional[float]

def classify_risk_type(state):
    """Bestimmt den Risikotyp für spezialisiertes Routing"""
    question = state["question"].lower()

    risk_type_mapping = {
        'market': ['markt', 'kurs', 'preis', 'volatilität', 'handel'],
        'credit': ['kredit', 'darlehen', 'ausfall', 'bonität', 'rating'],
        'operational': ['betrieb', 'system', 'prozess', 'fehler', 'ausfall'],
        'compliance': ['regulation', 'gesetz', 'vorschrift', 'audit', 'prüfung'],
        'cyber': ['cyber', 'security', 'hack', 'malware', 'phishing']
    }

    confidence_scores = {}
    for risk_type, keywords in risk_type_mapping.items():
        score = sum(1 for keyword in keywords if keyword in question)
        if score > 0:
            confidence_scores[risk_type] = score / len(keywords)

    if confidence_scores:
        risk_type = max(confidence_scores, key=confidence_scores.get)
        confidence = confidence_scores[risk_type]
    else:
        risk_type = "general"
        confidence = 0.1

    return {
        "risk_type": risk_type,
        "confidence_score": confidence
    }

def financial_expert(state):
    """Spezialisierte Finanzrisikoanalyse"""
    question = state["question"]
    risk_type = state["risk_type"]

    analyses = {
        'market': f"📊 MARKTRISIKO-ANALYSE für: {question}\n"
                 "• Marktvolatilität prüfen\n"
                 "• Hedging-Strategien evaluieren\n"
                 "• Portfolio-Impact bewerten\n"
                 "• Risk-Adjusted Returns berechnen",

        'credit': f"💳 KREDITRISIKO-ANALYSE für: {question}\n"
                 "• Bonitätsprüfung durchführen\n"
                 "• Ausfallwahrscheinlichkeit berechnen\n"
                 "• Sicherheiten bewerten\n"
                 "• Credit Scoring aktualisieren"
    }

    analysis = analyses.get(risk_type, f"Allgemeine Finanzanalyse für {question}")
    return {"expert_analysis": analysis}

def compliance_expert(state):
    """Compliance-spezifische Prüfung"""
    question = state["question"]

    analysis = f"⚖️ COMPLIANCE-ANALYSE für: {question}\n" \
              "• Regulatorische Anforderungen prüfen\n" \
              "• Dokumentationspflichten sicherstellen\n" \
              "• Meldepflichten evaluieren\n" \
              "• Audit-Trail erstellen"

    return {"expert_analysis": analysis}

def operational_expert(state):
    """Operationelle Risikoanalyse"""
    question = state["question"]

    analysis = f"⚙️ OPERATIONELLES RISIKO für: {question}\n" \
              "• Prozessanalyse durchführen\n" \
              "• Systemverfügbarkeit prüfen\n" \
              "• Backup-Strategien evaluieren\n" \
              "• Kontinuitätsplanung aktualisieren"

    return {"expert_analysis": analysis}

def cyber_expert(state):
    """Cyber-Security Analyse"""
    question = state["question"]

    analysis = f"🔒 CYBER-SECURITY ANALYSE für: {question}\n" \
              "• Bedrohungsanalyse durchführen\n" \
              "• Sicherheitslücken identifizieren\n" \
              "• Incident Response aktivieren\n" \
              "• Forensik-Untersuchung einleiten"

    return {"expert_analysis": analysis}

def general_handler(state):
    """Allgemeine Risikobehandlung"""
    question = state["question"]

    analysis = f"📋 ALLGEMEINE RISIKOANALYSE für: {question}\n" \
              "• Risiko kategorisieren\n" \
              "• Standard-Bewertung durchführen\n" \
              "• Dokumentation erstellen\n" \
              "• Follow-up planen"

    return {"expert_analysis": analysis}

def generate_final_recommendation(state):
    """Generiert finale Empfehlung basierend auf Expertenanalyse"""
    expert_analysis = state["expert_analysis"]
    risk_type = state["risk_type"]
    confidence = state.get("confidence_score", 0.5)

    recommendation = f"📋 FINALE EMPFEHLUNG\n\n" \
                    f"Risikotyp: {risk_type.upper()} (Confidence: {confidence:.2f})\n\n" \
                    f"{expert_analysis}\n\n" \
                    f"Nächste Schritte:\n" \
                    f"• Sofortige Umsetzung der empfohlenen Maßnahmen\n" \
                    f"• Regelmäßige Überwachung etablieren\n" \
                    f"• Reporting an Management"

    return {"final_recommendation": recommendation}

def route_to_expert(state):
    """Dynamisches Routing zu spezialisierten Experten"""
    risk_type = state["risk_type"]

    routing_map = {
        "market": "financial_expert",
        "credit": "financial_expert", 
        "compliance": "compliance_expert",
        "operational": "operational_expert",
        "cyber": "cyber_expert",
        "general": "general_handler"
    }

    return routing_map.get(risk_type, "general_handler")

def create_advanced_risk_workflow():
    """Erstellt den erweiterten Risk-Workflow"""
    graph = StateGraph(AdvancedRiskState)

    # Knoten hinzufügen
    graph.add_node("classify_type", classify_risk_type)
    graph.add_node("financial_expert", financial_expert)
    graph.add_node("compliance_expert", compliance_expert)
    graph.add_node("operational_expert", operational_expert)
    graph.add_node("cyber_expert", cyber_expert)
    graph.add_node("general_handler", general_handler)
    graph.add_node("final_recommendation", generate_final_recommendation)

    # Routing-Logik
    graph.add_conditional_edges(
        "classify_type",
        route_to_expert,
        {
            "financial_expert": "financial_expert",
            "compliance_expert": "compliance_expert",
            "operational_expert": "operational_expert", 
            "cyber_expert": "cyber_expert",
            "general_handler": "general_handler"
        }
    )

    # Alle Experten führen zur finalen Empfehlung
    for expert in ["financial_expert", "compliance_expert", "operational_expert", 
                   "cyber_expert", "general_handler"]:
        graph.add_edge(expert, "final_recommendation")

    graph.add_edge("final_recommendation", END)
    graph.set_entry_point("classify_type")

    return graph.compile()

# Test der erweiterten Implementierung
def test_advanced_workflow():
    app = create_advanced_risk_workflow()

    test_cases = [
        {"question": "Marktvolatilität bei Aktienportfolio"},
        {"question": "Kreditausfall bei Immobilienfinanzierung"},
        {"question": "Cyber-Attack auf Banking-System"},
        {"question": "Compliance-Verstoß bei Meldepflichten"},
        {"question": "Operationeller Systemausfall"}
    ]

    print("=== Advanced Risk Workflow Test Results ===\n")
    for i, test in enumerate(test_cases, 1):
        result = app.invoke(test)
        print(f"Test {i}:")
        print(f"Input: {test['question']}")
        print(f"Risk Type: {result['risk_type']}")
        print(f"Confidence: {result.get('confidence_score', 'N/A')}")
        print(f"\n{result['final_recommendation']}")
        print("="*60 + "\n")

if __name__ == "__main__":
    test_advanced_workflow()

Lösung 3: Compliance-Workflow mit mehrstufiger Genehmigung

Multi-Level Approval System

from typing import TypedDict, Optional, List
from langgraph.graph import StateGraph, END
from datetime import datetime, timedelta
import json

class ComplianceState(TypedDict):
    document_id: str
    document_type: str
    content: str
    risk_assessment: Optional[str]
    approval_level: Optional[str]
    approvers: Optional[List[str]]
    approval_status: Optional[str]
    audit_trail: Optional[List[dict]]
    escalation_required: Optional[bool]
    final_decision: Optional[str]

def analyze_document(state):
    """Automatische Dokumentenanalyse"""
    content = state["content"].lower()
    doc_type = state["document_type"]

    # Risk Indicators
    high_risk_indicators = [
        'high value', 'foreign entity', 'cash transaction',
        'unusual pattern', 'politically exposed'
    ]

    medium_risk_indicators = [
        'new client', 'complex structure', 'cross-border',
        'regulatory change'
    ]

    risk_score = 0
    detected_indicators = []

    for indicator in high_risk_indicators:
        if indicator in content:
            risk_score += 3
            detected_indicators.append(f"HIGH: {indicator}")

    for indicator in medium_risk_indicators:
        if indicator in content:
            risk_score += 1
            detected_indicators.append(f"MEDIUM: {indicator}")

    # Risk Assessment
    if risk_score >= 6:
        risk_level = "HIGH"
        approval_level = "senior_management"
    elif risk_score >= 3:
        risk_level = "MEDIUM" 
        approval_level = "management"
    else:
        risk_level = "LOW"
        approval_level = "auto_approve"

    risk_assessment = {
        "risk_level": risk_level,
        "risk_score": risk_score,
        "indicators": detected_indicators,
        "analysis_timestamp": datetime.now().isoformat()
    }

    audit_entry = {
        "step": "document_analysis",
        "timestamp": datetime.now().isoformat(),
        "result": risk_assessment,
        "system": "auto_analyzer_v2.1"
    }

    return {
        "risk_assessment": json.dumps(risk_assessment),
        "approval_level": approval_level,
        "audit_trail": [audit_entry]
    }

def auto_approve(state):
    """Automatische Genehmigung für Low-Risk Dokumente"""
    audit_entry = {
        "step": "auto_approval",
        "timestamp": datetime.now().isoformat(),
        "decision": "APPROVED",
        "reason": "Low risk - automatic approval",
        "system": "auto_approver_v1.0"
    }

    audit_trail = state.get("audit_trail", [])
    audit_trail.append(audit_entry)

    return {
        "approval_status": "APPROVED",
        "final_decision": "Document automatically approved - Low risk profile",
        "audit_trail": audit_trail
    }

def management_review(state):
    """Management-Level Review für Medium-Risk Dokumente"""
    doc_id = state["document_id"]
    risk_data = json.loads(state["risk_assessment"])

    # Simulierte Management-Entscheidung
    decision = "APPROVED"  # In real implementation: actual manager input
    manager = "Jane Smith (Risk Manager)"

    review_notes = f"Management Review completed for {doc_id}\n" \
                  f"Risk Level: {risk_data['risk_level']}\n" \
                  f"Key Indicators: {', '.join(risk_data['indicators'])}\n" \
                  f"Decision: {decision}\n" \
                  f"Reviewer: {manager}"

    audit_entry = {
        "step": "management_review",
        "timestamp": datetime.now().isoformat(),
        "decision": decision,
        "reviewer": manager,
        "notes": review_notes,
        "escalation_required": False
    }

    audit_trail = state.get("audit_trail", [])
    audit_trail.append(audit_entry)

    return {
        "approval_status": decision,
        "approvers": [manager],
        "final_decision": review_notes,
        "audit_trail": audit_trail
    }

def senior_management_review(state):
    """Senior Management Review für High-Risk Dokumente"""
    doc_id = state["document_id"]
    risk_data = json.loads(state["risk_assessment"])

    # Multi-Level Senior Review
    senior_approvers = [
        "John Doe (Chief Risk Officer)",
        "Alice Johnson (Chief Compliance Officer)"
    ]

    # Simulierte Senior Management Entscheidung
    if risk_data["risk_score"] >= 9:
        decision = "REJECTED"
        escalation_required = True
    else:
        decision = "APPROVED_WITH_CONDITIONS"
        escalation_required = False

    conditions = [
        "Enhanced monitoring required",
        "Quarterly review mandatory", 
        "Additional documentation needed"
    ] if decision == "APPROVED_WITH_CONDITIONS" else []

    review_notes = f"Senior Management Review for {doc_id}\n" \
                  f"Risk Score: {risk_data['risk_score']}/10\n" \
                  f"Decision: {decision}\n" \
                  f"Conditions: {', '.join(conditions) if conditions else 'None'}\n" \
                  f"Reviewers: {', '.join(senior_approvers)}"

    audit_entry = {
        "step": "senior_management_review",
        "timestamp": datetime.now().isoformat(),
        "decision": decision,
        "reviewers": senior_approvers,
        "conditions": conditions,
        "escalation_required": escalation_required,
        "notes": review_notes
    }

    audit_trail = state.get("audit_trail", [])
    audit_trail.append(audit_entry)

    return {
        "approval_status": decision,
        "approvers": senior_approvers,
        "escalation_required": escalation_required,
        "final_decision": review_notes,
        "audit_trail": audit_trail
    }

def legal_escalation(state):
    """Legal Department Escalation für kritische Fälle"""
    doc_id = state["document_id"]

    legal_review = f"LEGAL ESCALATION for {doc_id}\n" \
                  "This case requires legal department review due to:\n" \
                  "• High risk score\n" \
                  "• Regulatory complexity\n" \
                  "• Potential compliance violations\n\n" \
                  "Status: Under Legal Review\n" \
                  "Expected Resolution: 5-10 business days"

    audit_entry = {
        "step": "legal_escalation",
        "timestamp": datetime.now().isoformat(),
        "status": "UNDER_LEGAL_REVIEW",
        "department": "Legal Compliance",
        "expected_resolution": "5-10 business days"
    }

    audit_trail = state.get("audit_trail", [])
    audit_trail.append(audit_entry)

    return {
        "approval_status": "UNDER_LEGAL_REVIEW",
        "final_decision": legal_review,
        "audit_trail": audit_trail
    }

def route_approval(state):
    """Routing-Logik für Genehmigungsebenen"""
    approval_level = state["approval_level"]

    routing_map = {
        "auto_approve": "auto_approve",
        "management": "management_review",
        "senior_management": "senior_management_review"
    }

    return routing_map.get(approval_level, "management_review")

def check_escalation(state):
    """Prüft ob Eskalation erforderlich ist"""
    escalation_required = state.get("escalation_required", False)

    if escalation_required:
        return "legal_escalation"
    else:
        return END

def create_compliance_workflow():
    """Erstellt den Compliance-Workflow"""
    graph = StateGraph(ComplianceState)

    # Knoten hinzufügen
    graph.add_node("analyze_document", analyze_document)
    graph.add_node("auto_approve", auto_approve)
    graph.add_node("management_review", management_review)
    graph.add_node("senior_management_review", senior_management_review)
    graph.add_node("legal_escalation", legal_escalation)

    # Routing definieren
    graph.add_conditional_edges(
        "analyze_document",
        route_approval,
        {
            "auto_approve": "auto_approve",
            "management_review": "management_review",
            "senior_management_review": "senior_management_review"
        }
    )

    # Auto-approve geht direkt zum Ende
    graph.add_edge("auto_approve", END)

    # Management Review kann eskalieren
    graph.add_edge("management_review", END)

    # Senior Management kann zu Legal eskalieren
    graph.add_conditional_edges(
        "senior_management_review",
        check_escalation,
        {
            "legal_escalation": "legal_escalation",
            END: END
        }
    )

    graph.add_edge("legal_escalation", END)
    graph.set_entry_point("analyze_document")

    return graph.compile()

# Test des Compliance-Workflows
def test_compliance_workflow():
    app = create_compliance_workflow()

    test_cases = [
        {
            "document_id": "DOC-001",
            "document_type": "client_onboarding",
            "content": "Standard client onboarding for local business"
        },
        {
            "document_id": "DOC-002", 
            "document_type": "transaction_review",
            "content": "New client with complex structure cross-border transaction"
        },
        {
            "document_id": "DOC-003",
            "document_type": "high_value_transaction",
            "content": "High value cash transaction foreign entity politically exposed person unusual pattern"
        }
    ]

    print("=== Compliance Workflow Test Results ===\n")
    for i, test in enumerate(test_cases, 1):
        result = app.invoke(test)
        print(f"Test {i} - {test['document_id']}:")
        print(f"Content: {test['content']}")
        print(f"Approval Status: {result['approval_status']}")
        print(f"Decision: {result['final_decision']}")

        # Audit Trail ausgeben
        print("\n📋 Audit Trail:")
        for entry in result.get('audit_trail', []):
            print(f"  • {entry['step']}: {entry.get('decision', entry.get('status', 'N/A'))} "
                 f"({entry['timestamp'][:19]})")

        print("="*80 + "\n")

if __name__ == "__main__":
    test_compliance_workflow()

Lösung 4: Workflow-Visualisierung

Mermaid-Diagramm Generator

def create_comprehensive_workflow_diagram():
    """Erstellt ein umfassendes Workflow-Diagramm"""

    mermaid_code = """
    graph TD
        A[📄 Document Input] --> B[🔍 Analyze Document]
        B --> C{Risk Assessment}

        C -->|Low Risk| D[✅ Auto Approve]
        C -->|Medium Risk| E[👤 Management Review]
        C -->|High Risk| F[👥 Senior Management Review]

        D --> G[📋 Complete - Approved]
        E --> H{Management Decision}
        F --> I{Senior Decision}

        H -->|Approve| J[📋 Complete - Approved]
        H -->|Reject| K[📋 Complete - Rejected]

        I -->|Approve with Conditions| L[📋 Complete - Conditional]
        I -->|Reject| M[📋 Complete - Rejected]
        I -->|Escalate| N[⚖️ Legal Review]

        N --> O[📋 Complete - Under Review]

        style A fill:#e1f5fe
        style G fill:#c8e6c9
        style J fill:#c8e6c9
        style L fill:#fff3e0
        style K fill:#ffcdd2
        style M fill:#ffcdd2
        style O fill:#f3e5f5
    """

    return mermaid_code

def save_workflow_visualization():
    """Speichert Workflow-Visualisierung"""
    diagram = create_comprehensive_workflow_diagram()

    # In einer echten Implementierung würde hier die Mermaid-API aufgerufen
    print("Mermaid Diagram Code:")
    print("="*50)
    print(diagram)
    print("="*50)
    print("\nTo render this diagram, copy the code above to:")
    print("• https://mermaid.live/")
    print("• https://mermaid-js.github.io/mermaid-live-editor/")

    return diagram

if __name__ == "__main__":
    save_workflow_visualization()

🎯 Bewertungskriterien & Erfolgsmessung

Aufgabe 1 - Risk Classifier (25 Punkte)

  • Funktionalität (15 Punkte): Korrekte Implementierung von classify_risk() und handle_risk()
  • Robustheit (5 Punkte): Fehlerbehandlung und Edge Cases
  • Code-Qualität (5 Punkte): Sauberer, dokumentierter Code

Aufgabe 2 - Routing Workflow (30 Punkte)

  • Routing-Logik (15 Punkte): Korrekte conditional edges und routing functions
  • Spezialisierung (10 Punkte): Unterschiedliche Expertenfunktionen implementiert
  • State-Management (5 Punkte): Korrekter Umgang mit komplexem State

Aufgabe 3 - Compliance Workflow (30 Punkte)

  • Multi-Level Approval (15 Punkte): Verschiedene Genehmigungsebenen implementiert
  • Eskalationslogik (10 Punkte): Automatische Eskalation bei Bedarf
  • Audit Trail (5 Punkte): Vollständige Dokumentation aller Schritte

Aufgabe 4 - Visualisierung (15 Punkte)

  • Diagramm-Qualität (10 Punkte): Klare, verständliche Visualisierung
  • Technische Umsetzung (5 Punkte): Korrekte Mermaid-Syntax

💡 Weiterführende Verbesserungen

Performance-Optimierung

# Asynchrone Verarbeitung für bessere Performance
import asyncio
from langgraph.graph import StateGraph

async def async_risk_analysis(state):
    # Parallele Verarbeitung verschiedener Risikoaspekte
    tasks = [
        analyze_market_risk(state),
        analyze_credit_risk(state),
        analyze_operational_risk(state)
    ]
    results = await asyncio.gather(*tasks)
    return {"comprehensive_analysis": results}

Integration mit externen APIs

# Integration mit externen Risikodatenbanken
def integrate_external_risk_data(state):
    # API-Aufrufe zu externen Risikodatenbanken
    # Beispiel: Compliance-Datenbanken, Sanktionslisten, etc.
    pass

Machine Learning Integration

# ML-basierte Risikoklassifizierung
def ml_risk_classifier(state):
    # Integration mit ML-Modellen für erweiterte Klassifizierung
    # Beispiel: scikit-learn, TensorFlow, etc.
    pass

📚 Zusätzliche Ressourcen


💡 Nächste Schritte: Experimentieren Sie mit eigenen Erweiterungen und integrieren Sie diese Patterns in Ihre eigenen Projekte!