Lösungen - LangGraph & Advanced Workflows¶
🎯 Übersicht¶
Diese Musterlösungen zeigen vollständige Implementierungen für alle LangGraph-Übungen. Nutzen Sie diese zur Überprüfung Ihrer eigenen Lösungen oder als Referenz für erweiterte Features.
Lösung 1: Einfacher Risk-Classifier¶
Vollständige Implementierung¶
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
import re
# Zustandstyp definieren
class RiskState(TypedDict):
question: str
risk_level: Optional[str]
response: Optional[str]
def classify_risk(state):
"""Klassifiziert Risiken basierend auf Schlüsselwörtern"""
question = state["question"].lower()
# High-Risk Schlüsselwörter
high_risk_keywords = [
'verdächtig', 'betrug', 'fraud', 'geldwäsche', 'kredit',
'default', 'ausfall', 'insolvenz', 'cyber', 'security'
]
# Medium-Risk Schlüsselwörter
medium_risk_keywords = [
'markt', 'volatilität', 'compliance', 'regulation',
'audit', 'prüfung', 'bewertung'
]
# Risikolevel bestimmen
if any(keyword in question for keyword in high_risk_keywords):
risk_level = "high"
elif any(keyword in question for keyword in medium_risk_keywords):
risk_level = "medium"
else:
risk_level = "low"
return {"risk_level": risk_level}
def handle_risk(state):
"""Generiert angemessene Antworten basierend auf Risikolevel"""
risk_level = state["risk_level"]
responses = {
"high": "🚨 HOCH-RISIKO: Sofortige detaillierte Analyse und Eskalation erforderlich. Compliance-Team informieren.",
"medium": "⚠️ MITTEL-RISIKO: Erweiterte Prüfung erforderlich. Risikomanagement einbeziehen.",
"low": "✅ NIEDRIG-RISIKO: Standard-Überwachung ausreichend. Routine-Verarbeitung möglich."
}
response = responses.get(risk_level, "Unbekanntes Risikolevel")
return {"response": response}
# Graph aufbauen
def create_risk_classifier():
graph = StateGraph(RiskState)
graph.add_node("classify", classify_risk)
graph.add_node("handle", handle_risk)
graph.add_edge("classify", "handle")
graph.add_edge("handle", END)
graph.set_entry_point("classify")
return graph.compile()
# Test der Implementierung
def test_risk_classifier():
app = create_risk_classifier()
test_cases = [
{"question": "Kreditrisiko bei Immobilienfinanzierung"},
{"question": "Routine Kontobewegung prüfen"},
{"question": "Verdächtiger Geldtransfer erkannt"},
{"question": "Marktvolatilität Analyse"},
{"question": "Cyber Security Vorfall"}
]
print("=== Risk Classifier Test Results ===\n")
for i, test in enumerate(test_cases, 1):
result = app.invoke(test)
print(f"Test {i}:")
print(f"Input: {test['question']}")
print(f"Risk Level: {result['risk_level']}")
print(f"Response: {result['response']}\n")
if __name__ == "__main__":
test_risk_classifier()
Erweiterungen¶
def classify_risk_advanced(state):
"""Erweiterte Risikoklassifizierung mit ML-ähnlicher Logik"""
question = state["question"].lower()
# Gewichtete Schlüsselwort-Analyse
risk_scores = {
'fraud': 10, 'betrug': 10, 'verdächtig': 8,
'geldwäsche': 10, 'kredit': 7, 'ausfall': 9,
'cyber': 8, 'security': 6, 'compliance': 5,
'markt': 4, 'volatilität': 6, 'regulation': 4
}
total_score = sum(score for keyword, score in risk_scores.items()
if keyword in question)
if total_score >= 8:
risk_level = "high"
elif total_score >= 4:
risk_level = "medium"
else:
risk_level = "low"
return {
"risk_level": risk_level,
"risk_score": total_score
}
Lösung 2: Routing-Workflow für Finanzrisiken¶
Erweiterte StateGraph-Implementierung¶
from typing import TypedDict, Optional, Literal
from langgraph.graph import StateGraph, END
class AdvancedRiskState(TypedDict):
question: str
risk_type: Optional[str]
risk_level: Optional[str]
expert_analysis: Optional[str]
final_recommendation: Optional[str]
confidence_score: Optional[float]
def classify_risk_type(state):
"""Bestimmt den Risikotyp für spezialisiertes Routing"""
question = state["question"].lower()
risk_type_mapping = {
'market': ['markt', 'kurs', 'preis', 'volatilität', 'handel'],
'credit': ['kredit', 'darlehen', 'ausfall', 'bonität', 'rating'],
'operational': ['betrieb', 'system', 'prozess', 'fehler', 'ausfall'],
'compliance': ['regulation', 'gesetz', 'vorschrift', 'audit', 'prüfung'],
'cyber': ['cyber', 'security', 'hack', 'malware', 'phishing']
}
confidence_scores = {}
for risk_type, keywords in risk_type_mapping.items():
score = sum(1 for keyword in keywords if keyword in question)
if score > 0:
confidence_scores[risk_type] = score / len(keywords)
if confidence_scores:
risk_type = max(confidence_scores, key=confidence_scores.get)
confidence = confidence_scores[risk_type]
else:
risk_type = "general"
confidence = 0.1
return {
"risk_type": risk_type,
"confidence_score": confidence
}
def financial_expert(state):
"""Spezialisierte Finanzrisikoanalyse"""
question = state["question"]
risk_type = state["risk_type"]
analyses = {
'market': f"📊 MARKTRISIKO-ANALYSE für: {question}\n"
"• Marktvolatilität prüfen\n"
"• Hedging-Strategien evaluieren\n"
"• Portfolio-Impact bewerten\n"
"• Risk-Adjusted Returns berechnen",
'credit': f"💳 KREDITRISIKO-ANALYSE für: {question}\n"
"• Bonitätsprüfung durchführen\n"
"• Ausfallwahrscheinlichkeit berechnen\n"
"• Sicherheiten bewerten\n"
"• Credit Scoring aktualisieren"
}
analysis = analyses.get(risk_type, f"Allgemeine Finanzanalyse für {question}")
return {"expert_analysis": analysis}
def compliance_expert(state):
"""Compliance-spezifische Prüfung"""
question = state["question"]
analysis = f"⚖️ COMPLIANCE-ANALYSE für: {question}\n" \
"• Regulatorische Anforderungen prüfen\n" \
"• Dokumentationspflichten sicherstellen\n" \
"• Meldepflichten evaluieren\n" \
"• Audit-Trail erstellen"
return {"expert_analysis": analysis}
def operational_expert(state):
"""Operationelle Risikoanalyse"""
question = state["question"]
analysis = f"⚙️ OPERATIONELLES RISIKO für: {question}\n" \
"• Prozessanalyse durchführen\n" \
"• Systemverfügbarkeit prüfen\n" \
"• Backup-Strategien evaluieren\n" \
"• Kontinuitätsplanung aktualisieren"
return {"expert_analysis": analysis}
def cyber_expert(state):
"""Cyber-Security Analyse"""
question = state["question"]
analysis = f"🔒 CYBER-SECURITY ANALYSE für: {question}\n" \
"• Bedrohungsanalyse durchführen\n" \
"• Sicherheitslücken identifizieren\n" \
"• Incident Response aktivieren\n" \
"• Forensik-Untersuchung einleiten"
return {"expert_analysis": analysis}
def general_handler(state):
"""Allgemeine Risikobehandlung"""
question = state["question"]
analysis = f"📋 ALLGEMEINE RISIKOANALYSE für: {question}\n" \
"• Risiko kategorisieren\n" \
"• Standard-Bewertung durchführen\n" \
"• Dokumentation erstellen\n" \
"• Follow-up planen"
return {"expert_analysis": analysis}
def generate_final_recommendation(state):
"""Generiert finale Empfehlung basierend auf Expertenanalyse"""
expert_analysis = state["expert_analysis"]
risk_type = state["risk_type"]
confidence = state.get("confidence_score", 0.5)
recommendation = f"📋 FINALE EMPFEHLUNG\n\n" \
f"Risikotyp: {risk_type.upper()} (Confidence: {confidence:.2f})\n\n" \
f"{expert_analysis}\n\n" \
f"Nächste Schritte:\n" \
f"• Sofortige Umsetzung der empfohlenen Maßnahmen\n" \
f"• Regelmäßige Überwachung etablieren\n" \
f"• Reporting an Management"
return {"final_recommendation": recommendation}
def route_to_expert(state):
"""Dynamisches Routing zu spezialisierten Experten"""
risk_type = state["risk_type"]
routing_map = {
"market": "financial_expert",
"credit": "financial_expert",
"compliance": "compliance_expert",
"operational": "operational_expert",
"cyber": "cyber_expert",
"general": "general_handler"
}
return routing_map.get(risk_type, "general_handler")
def create_advanced_risk_workflow():
"""Erstellt den erweiterten Risk-Workflow"""
graph = StateGraph(AdvancedRiskState)
# Knoten hinzufügen
graph.add_node("classify_type", classify_risk_type)
graph.add_node("financial_expert", financial_expert)
graph.add_node("compliance_expert", compliance_expert)
graph.add_node("operational_expert", operational_expert)
graph.add_node("cyber_expert", cyber_expert)
graph.add_node("general_handler", general_handler)
graph.add_node("final_recommendation", generate_final_recommendation)
# Routing-Logik
graph.add_conditional_edges(
"classify_type",
route_to_expert,
{
"financial_expert": "financial_expert",
"compliance_expert": "compliance_expert",
"operational_expert": "operational_expert",
"cyber_expert": "cyber_expert",
"general_handler": "general_handler"
}
)
# Alle Experten führen zur finalen Empfehlung
for expert in ["financial_expert", "compliance_expert", "operational_expert",
"cyber_expert", "general_handler"]:
graph.add_edge(expert, "final_recommendation")
graph.add_edge("final_recommendation", END)
graph.set_entry_point("classify_type")
return graph.compile()
# Test der erweiterten Implementierung
def test_advanced_workflow():
app = create_advanced_risk_workflow()
test_cases = [
{"question": "Marktvolatilität bei Aktienportfolio"},
{"question": "Kreditausfall bei Immobilienfinanzierung"},
{"question": "Cyber-Attack auf Banking-System"},
{"question": "Compliance-Verstoß bei Meldepflichten"},
{"question": "Operationeller Systemausfall"}
]
print("=== Advanced Risk Workflow Test Results ===\n")
for i, test in enumerate(test_cases, 1):
result = app.invoke(test)
print(f"Test {i}:")
print(f"Input: {test['question']}")
print(f"Risk Type: {result['risk_type']}")
print(f"Confidence: {result.get('confidence_score', 'N/A')}")
print(f"\n{result['final_recommendation']}")
print("="*60 + "\n")
if __name__ == "__main__":
test_advanced_workflow()
Lösung 3: Compliance-Workflow mit mehrstufiger Genehmigung¶
Multi-Level Approval System¶
from typing import TypedDict, Optional, List
from langgraph.graph import StateGraph, END
from datetime import datetime, timedelta
import json
class ComplianceState(TypedDict):
document_id: str
document_type: str
content: str
risk_assessment: Optional[str]
approval_level: Optional[str]
approvers: Optional[List[str]]
approval_status: Optional[str]
audit_trail: Optional[List[dict]]
escalation_required: Optional[bool]
final_decision: Optional[str]
def analyze_document(state):
"""Automatische Dokumentenanalyse"""
content = state["content"].lower()
doc_type = state["document_type"]
# Risk Indicators
high_risk_indicators = [
'high value', 'foreign entity', 'cash transaction',
'unusual pattern', 'politically exposed'
]
medium_risk_indicators = [
'new client', 'complex structure', 'cross-border',
'regulatory change'
]
risk_score = 0
detected_indicators = []
for indicator in high_risk_indicators:
if indicator in content:
risk_score += 3
detected_indicators.append(f"HIGH: {indicator}")
for indicator in medium_risk_indicators:
if indicator in content:
risk_score += 1
detected_indicators.append(f"MEDIUM: {indicator}")
# Risk Assessment
if risk_score >= 6:
risk_level = "HIGH"
approval_level = "senior_management"
elif risk_score >= 3:
risk_level = "MEDIUM"
approval_level = "management"
else:
risk_level = "LOW"
approval_level = "auto_approve"
risk_assessment = {
"risk_level": risk_level,
"risk_score": risk_score,
"indicators": detected_indicators,
"analysis_timestamp": datetime.now().isoformat()
}
audit_entry = {
"step": "document_analysis",
"timestamp": datetime.now().isoformat(),
"result": risk_assessment,
"system": "auto_analyzer_v2.1"
}
return {
"risk_assessment": json.dumps(risk_assessment),
"approval_level": approval_level,
"audit_trail": [audit_entry]
}
def auto_approve(state):
"""Automatische Genehmigung für Low-Risk Dokumente"""
audit_entry = {
"step": "auto_approval",
"timestamp": datetime.now().isoformat(),
"decision": "APPROVED",
"reason": "Low risk - automatic approval",
"system": "auto_approver_v1.0"
}
audit_trail = state.get("audit_trail", [])
audit_trail.append(audit_entry)
return {
"approval_status": "APPROVED",
"final_decision": "Document automatically approved - Low risk profile",
"audit_trail": audit_trail
}
def management_review(state):
"""Management-Level Review für Medium-Risk Dokumente"""
doc_id = state["document_id"]
risk_data = json.loads(state["risk_assessment"])
# Simulierte Management-Entscheidung
decision = "APPROVED" # In real implementation: actual manager input
manager = "Jane Smith (Risk Manager)"
review_notes = f"Management Review completed for {doc_id}\n" \
f"Risk Level: {risk_data['risk_level']}\n" \
f"Key Indicators: {', '.join(risk_data['indicators'])}\n" \
f"Decision: {decision}\n" \
f"Reviewer: {manager}"
audit_entry = {
"step": "management_review",
"timestamp": datetime.now().isoformat(),
"decision": decision,
"reviewer": manager,
"notes": review_notes,
"escalation_required": False
}
audit_trail = state.get("audit_trail", [])
audit_trail.append(audit_entry)
return {
"approval_status": decision,
"approvers": [manager],
"final_decision": review_notes,
"audit_trail": audit_trail
}
def senior_management_review(state):
"""Senior Management Review für High-Risk Dokumente"""
doc_id = state["document_id"]
risk_data = json.loads(state["risk_assessment"])
# Multi-Level Senior Review
senior_approvers = [
"John Doe (Chief Risk Officer)",
"Alice Johnson (Chief Compliance Officer)"
]
# Simulierte Senior Management Entscheidung
if risk_data["risk_score"] >= 9:
decision = "REJECTED"
escalation_required = True
else:
decision = "APPROVED_WITH_CONDITIONS"
escalation_required = False
conditions = [
"Enhanced monitoring required",
"Quarterly review mandatory",
"Additional documentation needed"
] if decision == "APPROVED_WITH_CONDITIONS" else []
review_notes = f"Senior Management Review for {doc_id}\n" \
f"Risk Score: {risk_data['risk_score']}/10\n" \
f"Decision: {decision}\n" \
f"Conditions: {', '.join(conditions) if conditions else 'None'}\n" \
f"Reviewers: {', '.join(senior_approvers)}"
audit_entry = {
"step": "senior_management_review",
"timestamp": datetime.now().isoformat(),
"decision": decision,
"reviewers": senior_approvers,
"conditions": conditions,
"escalation_required": escalation_required,
"notes": review_notes
}
audit_trail = state.get("audit_trail", [])
audit_trail.append(audit_entry)
return {
"approval_status": decision,
"approvers": senior_approvers,
"escalation_required": escalation_required,
"final_decision": review_notes,
"audit_trail": audit_trail
}
def legal_escalation(state):
"""Legal Department Escalation für kritische Fälle"""
doc_id = state["document_id"]
legal_review = f"LEGAL ESCALATION for {doc_id}\n" \
"This case requires legal department review due to:\n" \
"• High risk score\n" \
"• Regulatory complexity\n" \
"• Potential compliance violations\n\n" \
"Status: Under Legal Review\n" \
"Expected Resolution: 5-10 business days"
audit_entry = {
"step": "legal_escalation",
"timestamp": datetime.now().isoformat(),
"status": "UNDER_LEGAL_REVIEW",
"department": "Legal Compliance",
"expected_resolution": "5-10 business days"
}
audit_trail = state.get("audit_trail", [])
audit_trail.append(audit_entry)
return {
"approval_status": "UNDER_LEGAL_REVIEW",
"final_decision": legal_review,
"audit_trail": audit_trail
}
def route_approval(state):
"""Routing-Logik für Genehmigungsebenen"""
approval_level = state["approval_level"]
routing_map = {
"auto_approve": "auto_approve",
"management": "management_review",
"senior_management": "senior_management_review"
}
return routing_map.get(approval_level, "management_review")
def check_escalation(state):
"""Prüft ob Eskalation erforderlich ist"""
escalation_required = state.get("escalation_required", False)
if escalation_required:
return "legal_escalation"
else:
return END
def create_compliance_workflow():
"""Erstellt den Compliance-Workflow"""
graph = StateGraph(ComplianceState)
# Knoten hinzufügen
graph.add_node("analyze_document", analyze_document)
graph.add_node("auto_approve", auto_approve)
graph.add_node("management_review", management_review)
graph.add_node("senior_management_review", senior_management_review)
graph.add_node("legal_escalation", legal_escalation)
# Routing definieren
graph.add_conditional_edges(
"analyze_document",
route_approval,
{
"auto_approve": "auto_approve",
"management_review": "management_review",
"senior_management_review": "senior_management_review"
}
)
# Auto-approve geht direkt zum Ende
graph.add_edge("auto_approve", END)
# Management Review kann eskalieren
graph.add_edge("management_review", END)
# Senior Management kann zu Legal eskalieren
graph.add_conditional_edges(
"senior_management_review",
check_escalation,
{
"legal_escalation": "legal_escalation",
END: END
}
)
graph.add_edge("legal_escalation", END)
graph.set_entry_point("analyze_document")
return graph.compile()
# Test des Compliance-Workflows
def test_compliance_workflow():
app = create_compliance_workflow()
test_cases = [
{
"document_id": "DOC-001",
"document_type": "client_onboarding",
"content": "Standard client onboarding for local business"
},
{
"document_id": "DOC-002",
"document_type": "transaction_review",
"content": "New client with complex structure cross-border transaction"
},
{
"document_id": "DOC-003",
"document_type": "high_value_transaction",
"content": "High value cash transaction foreign entity politically exposed person unusual pattern"
}
]
print("=== Compliance Workflow Test Results ===\n")
for i, test in enumerate(test_cases, 1):
result = app.invoke(test)
print(f"Test {i} - {test['document_id']}:")
print(f"Content: {test['content']}")
print(f"Approval Status: {result['approval_status']}")
print(f"Decision: {result['final_decision']}")
# Audit Trail ausgeben
print("\n📋 Audit Trail:")
for entry in result.get('audit_trail', []):
print(f" • {entry['step']}: {entry.get('decision', entry.get('status', 'N/A'))} "
f"({entry['timestamp'][:19]})")
print("="*80 + "\n")
if __name__ == "__main__":
test_compliance_workflow()
Lösung 4: Workflow-Visualisierung¶
Mermaid-Diagramm Generator¶
def create_comprehensive_workflow_diagram():
"""Erstellt ein umfassendes Workflow-Diagramm"""
mermaid_code = """
graph TD
A[📄 Document Input] --> B[🔍 Analyze Document]
B --> C{Risk Assessment}
C -->|Low Risk| D[✅ Auto Approve]
C -->|Medium Risk| E[👤 Management Review]
C -->|High Risk| F[👥 Senior Management Review]
D --> G[📋 Complete - Approved]
E --> H{Management Decision}
F --> I{Senior Decision}
H -->|Approve| J[📋 Complete - Approved]
H -->|Reject| K[📋 Complete - Rejected]
I -->|Approve with Conditions| L[📋 Complete - Conditional]
I -->|Reject| M[📋 Complete - Rejected]
I -->|Escalate| N[⚖️ Legal Review]
N --> O[📋 Complete - Under Review]
style A fill:#e1f5fe
style G fill:#c8e6c9
style J fill:#c8e6c9
style L fill:#fff3e0
style K fill:#ffcdd2
style M fill:#ffcdd2
style O fill:#f3e5f5
"""
return mermaid_code
def save_workflow_visualization():
"""Speichert Workflow-Visualisierung"""
diagram = create_comprehensive_workflow_diagram()
# In einer echten Implementierung würde hier die Mermaid-API aufgerufen
print("Mermaid Diagram Code:")
print("="*50)
print(diagram)
print("="*50)
print("\nTo render this diagram, copy the code above to:")
print("• https://mermaid.live/")
print("• https://mermaid-js.github.io/mermaid-live-editor/")
return diagram
if __name__ == "__main__":
save_workflow_visualization()
🎯 Bewertungskriterien & Erfolgsmessung¶
Aufgabe 1 - Risk Classifier (25 Punkte)¶
- Funktionalität (15 Punkte): Korrekte Implementierung von classify_risk() und handle_risk()
- Robustheit (5 Punkte): Fehlerbehandlung und Edge Cases
- Code-Qualität (5 Punkte): Sauberer, dokumentierter Code
Aufgabe 2 - Routing Workflow (30 Punkte)¶
- Routing-Logik (15 Punkte): Korrekte conditional edges und routing functions
- Spezialisierung (10 Punkte): Unterschiedliche Expertenfunktionen implementiert
- State-Management (5 Punkte): Korrekter Umgang mit komplexem State
Aufgabe 3 - Compliance Workflow (30 Punkte)¶
- Multi-Level Approval (15 Punkte): Verschiedene Genehmigungsebenen implementiert
- Eskalationslogik (10 Punkte): Automatische Eskalation bei Bedarf
- Audit Trail (5 Punkte): Vollständige Dokumentation aller Schritte
Aufgabe 4 - Visualisierung (15 Punkte)¶
- Diagramm-Qualität (10 Punkte): Klare, verständliche Visualisierung
- Technische Umsetzung (5 Punkte): Korrekte Mermaid-Syntax
💡 Weiterführende Verbesserungen¶
Performance-Optimierung¶
# Asynchrone Verarbeitung für bessere Performance
import asyncio
from langgraph.graph import StateGraph
async def async_risk_analysis(state):
# Parallele Verarbeitung verschiedener Risikoaspekte
tasks = [
analyze_market_risk(state),
analyze_credit_risk(state),
analyze_operational_risk(state)
]
results = await asyncio.gather(*tasks)
return {"comprehensive_analysis": results}
Integration mit externen APIs¶
# Integration mit externen Risikodatenbanken
def integrate_external_risk_data(state):
# API-Aufrufe zu externen Risikodatenbanken
# Beispiel: Compliance-Datenbanken, Sanktionslisten, etc.
pass
Machine Learning Integration¶
# ML-basierte Risikoklassifizierung
def ml_risk_classifier(state):
# Integration mit ML-Modellen für erweiterte Klassifizierung
# Beispiel: scikit-learn, TensorFlow, etc.
pass
📚 Zusätzliche Ressourcen¶
- Vollständige Code-Beispiele auf GitHub
- LangGraph Production Best Practices
- StateGraph Advanced Patterns
💡 Nächste Schritte: Experimentieren Sie mit eigenen Erweiterungen und integrieren Sie diese Patterns in Ihre eigenen Projekte!