agent-governance

安装量: 7.2K
排名: #397

安装

npx skills add https://github.com/github/awesome-copilot --skill agent-governance
Agent Governance Patterns
Patterns for adding safety, trust, and policy enforcement to AI agent systems.
Overview
Governance patterns ensure AI agents operate within defined boundaries — controlling which tools they can call, what content they can process, how much they can do, and maintaining accountability through audit trails.
User Request → Intent Classification → Policy Check → Tool Execution → Audit Log
↓ ↓ ↓
Threat Detection Allow/Deny Trust Update
When to Use
Agents with tool access
Any agent that calls external tools (APIs, databases, shell commands)
Multi-agent systems
Agents delegating to other agents need trust boundaries
Production deployments
Compliance, audit, and safety requirements
Sensitive operations
Financial transactions, data access, infrastructure management Pattern 1: Governance Policy Define what an agent is allowed to do as a composable, serializable policy object. from dataclasses import dataclass , field from enum import Enum from typing import Optional import re class PolicyAction ( Enum ) : ALLOW = "allow" DENY = "deny" REVIEW = "review"

flag for human review

@dataclass class GovernancePolicy : """Declarative policy controlling agent behavior.""" name : str allowed_tools : list [ str ] = field ( default_factory = list )

allowlist

blocked_tools : list [ str ] = field ( default_factory = list )

blocklist

blocked_patterns : list [ str ] = field ( default_factory = list )

content filters

max_calls_per_request : int = 100

rate limit

require_human_approval : list [ str ] = field ( default_factory = list )

tools needing approval

def check_tool ( self , tool_name : str ) -

PolicyAction : """Check if a tool is allowed by this policy.""" if tool_name in self . blocked_tools : return PolicyAction . DENY if tool_name in self . require_human_approval : return PolicyAction . REVIEW if self . allowed_tools and tool_name not in self . allowed_tools : return PolicyAction . DENY return PolicyAction . ALLOW def check_content ( self , content : str ) -

Optional [ str ] : """Check content against blocked patterns. Returns matched pattern or None.""" for pattern in self . blocked_patterns : if re . search ( pattern , content , re . IGNORECASE ) : return pattern return None Policy Composition Combine multiple policies (e.g., org-wide + team + agent-specific): def compose_policies ( * policies : GovernancePolicy ) -

GovernancePolicy : """Merge policies with most-restrictive-wins semantics.""" combined = GovernancePolicy ( name = "composed" ) for policy in policies : combined . blocked_tools . extend ( policy . blocked_tools ) combined . blocked_patterns . extend ( policy . blocked_patterns ) combined . require_human_approval . extend ( policy . require_human_approval ) combined . max_calls_per_request = min ( combined . max_calls_per_request , policy . max_calls_per_request ) if policy . allowed_tools : if combined . allowed_tools : combined . allowed_tools = [ t for t in combined . allowed_tools if t in policy . allowed_tools ] else : combined . allowed_tools = list ( policy . allowed_tools ) return combined

Usage: layer policies from broad to specific

org_policy

GovernancePolicy ( name = "org-wide" , blocked_tools = [ "shell_exec" , "delete_database" ] , blocked_patterns = [ r"(?i)(api[_-]?key|secret|password)\s*[:=]" ] , max_calls_per_request = 50 ) team_policy = GovernancePolicy ( name = "data-team" , allowed_tools = [ "query_db" , "read_file" , "write_report" ] , require_human_approval = [ "write_report" ] ) agent_policy = compose_policies ( org_policy , team_policy ) Policy as YAML Store policies as configuration, not code:

governance-policy.yaml

name : production - agent allowed_tools : - search_documents - query_database - send_email blocked_tools : - shell_exec - delete_record blocked_patterns : - "(?i)(api[_-]?key|secret|password)\s*[:=]" - "(?i)(drop|truncate|delete from)\s+\w+" max_calls_per_request : 25 require_human_approval : - send_email import yaml def load_policy ( path : str ) -

GovernancePolicy : with open ( path ) as f : data = yaml . safe_load ( f ) return GovernancePolicy ( ** data ) Pattern 2: Semantic Intent Classification Detect dangerous intent in prompts before they reach the agent, using pattern-based signals. from dataclasses import dataclass @dataclass class IntentSignal : category : str

e.g., "data_exfiltration", "privilege_escalation"

confidence : float

0.0 to 1.0

evidence : str

what triggered the detection

Weighted signal patterns for threat detection

THREAT_SIGNALS

[

Data exfiltration

( r"(?i)send\s+(all|every|entire)\s+\w+\s+to\s+" , "data_exfiltration" , 0.8 ) , ( r"(?i)export\s+.\s+to\s+(external|outside|third.?party)" , "data_exfiltration" , 0.9 ) , ( r"(?i)curl\s+.\s+-d\s+" , "data_exfiltration" , 0.7 ) ,

Privilege escalation

( r"(?i)(sudo|as\s+root|admin\s+access)" , "privilege_escalation" , 0.8 ) , ( r"(?i)chmod\s+777" , "privilege_escalation" , 0.9 ) ,

System modification

( r"(?i)(rm\s+-rf|del\s+/[sq]|format\s+c:)" , "system_destruction" , 0.95 ) , ( r"(?i)(drop\s+database|truncate\s+table)" , "system_destruction" , 0.9 ) ,

Prompt injection

(
r"(?i)ignore\s+(previous|above|all)\s+(instructions?|rules?)"
,
"prompt_injection"
,
0.9
)
,
(
r"(?i)you\s+are\s+now\s+(a|an)\s+"
,
"prompt_injection"
,
0.7
)
,
]
def
classify_intent
(
content
:
str
)
-
>
list
[
IntentSignal
]
:
"""Classify content for threat signals."""
signals
=
[
]
for
pattern
,
category
,
weight
in
THREAT_SIGNALS
:
match
=
re
.
search
(
pattern
,
content
)
if
match
:
signals
.
append
(
IntentSignal
(
category
=
category
,
confidence
=
weight
,
evidence
=
match
.
group
(
)
)
)
return
signals
def
is_safe
(
content
:
str
,
threshold
:
float
=
0.7
)
-
>
bool
:
"""Quick check: is the content safe above the given threshold?"""
signals
=
classify_intent
(
content
)
return
not
any
(
s
.
confidence
>=
threshold
for
s
in
signals
)
Key insight
Intent classification happens before tool execution, acting as a pre-flight safety check. This is fundamentally different from output guardrails which only check after generation. Pattern 3: Tool-Level Governance Decorator Wrap individual tool functions with governance checks: import functools import time from collections import defaultdict _call_counters : dict [ str , int ] = defaultdict ( int ) def govern ( policy : GovernancePolicy , audit_trail = None ) : """Decorator that enforces governance policy on a tool function.""" def decorator ( func ) : @functools . wraps ( func ) async def wrapper ( * args , ** kwargs ) : tool_name = func . name

1. Check tool allowlist/blocklist

action

policy . check_tool ( tool_name ) if action == PolicyAction . DENY : raise PermissionError ( f"Policy ' { policy . name } ' blocks tool ' { tool_name } '" ) if action == PolicyAction . REVIEW : raise PermissionError ( f"Tool ' { tool_name } ' requires human approval" )

2. Check rate limit

_call_counters [ policy . name ] += 1 if _call_counters [ policy . name ]

policy . max_calls_per_request : raise PermissionError ( f"Rate limit exceeded: { policy . max_calls_per_request } calls" )

3. Check content in arguments

for arg in list ( args ) + list ( kwargs . values ( ) ) : if isinstance ( arg , str ) : matched = policy . check_content ( arg ) if matched : raise PermissionError ( f"Blocked pattern detected: { matched } " )

4. Execute and audit

start

time . monotonic ( ) try : result = await func ( * args , ** kwargs ) if audit_trail is not None : audit_trail . append ( { "tool" : tool_name , "action" : "allowed" , "duration_ms" : ( time . monotonic ( ) - start ) * 1000 , "timestamp" : time . time ( ) } ) return result except Exception as e : if audit_trail is not None : audit_trail . append ( { "tool" : tool_name , "action" : "error" , "error" : str ( e ) , "timestamp" : time . time ( ) } ) raise return wrapper return decorator

Usage with any agent framework

audit_log

[ ] policy = GovernancePolicy ( name = "search-agent" , allowed_tools = [ "search" , "summarize" ] , blocked_patterns = [ r"(?i)password" ] , max_calls_per_request = 10 ) @govern ( policy , audit_trail = audit_log ) async def search ( query : str ) -

str : """Search documents — governed by policy.""" return f"Results for: { query } "

Passes: search("latest quarterly report")

Blocked: search("show me the admin password")

Pattern 4: Trust Scoring Track agent reliability over time with decay-based trust scores: from dataclasses import dataclass , field import math import time @dataclass class TrustScore : """Trust score with temporal decay.""" score : float = 0.5

0.0 (untrusted) to 1.0 (fully trusted)

successes : int = 0 failures : int = 0 last_updated : float = field ( default_factory = time . time ) def record_success ( self , reward : float = 0.05 ) : self . successes += 1 self . score = min ( 1.0 , self . score + reward * ( 1 - self . score ) ) self . last_updated = time . time ( ) def record_failure ( self , penalty : float = 0.15 ) : self . failures += 1 self . score = max ( 0.0 , self . score - penalty * self . score ) self . last_updated = time . time ( ) def current ( self , decay_rate : float = 0.001 ) -

float : """Get score with temporal decay — trust erodes without activity.""" elapsed = time . time ( ) - self . last_updated decay = math . exp ( - decay_rate * elapsed ) return self . score * decay @property def reliability ( self ) -

float : total = self . successes + self . failures return self . successes / total if total

0 else 0.0

Usage in multi-agent systems

trust

TrustScore ( )

Agent completes tasks successfully

trust . record_success ( )

0.525

trust . record_success ( )

0.549

Agent makes an error

trust . record_failure ( )

0.467

Gate sensitive operations on trust

if trust . current ( )

= 0.7 :

Allow autonomous operation

pass elif trust . current ( )

= 0.4 :

Allow with human oversight

pass else :

Deny or require explicit approval

pass
Multi-agent trust
In systems where agents delegate to other agents, each agent maintains trust scores for its delegates: class AgentTrustRegistry : def init ( self ) : self . scores : dict [ str , TrustScore ] = { } def get_trust ( self , agent_id : str ) -

TrustScore : if agent_id not in self . scores : self . scores [ agent_id ] = TrustScore ( ) return self . scores [ agent_id ] def most_trusted ( self , agents : list [ str ] ) -

str : return max ( agents , key = lambda a : self . get_trust ( a ) . current ( ) ) def meets_threshold ( self , agent_id : str , threshold : float ) -

bool : return self . get_trust ( agent_id ) . current ( ) = threshold Pattern 5: Audit Trail Append-only audit log for all agent actions — critical for compliance and debugging: from dataclasses import dataclass , field import json import time @dataclass class AuditEntry : timestamp : float agent_id : str tool_name : str action : str

"allowed", "denied", "error"

policy_name : str details : dict = field ( default_factory = dict ) class AuditTrail : """Append-only audit trail for agent governance events.""" def init ( self ) : self . _entries : list [ AuditEntry ] = [ ] def log ( self , agent_id : str , tool_name : str , action : str , policy_name : str , ** details ) : self . _entries . append ( AuditEntry ( timestamp = time . time ( ) , agent_id = agent_id , tool_name = tool_name , action = action , policy_name = policy_name , details = details ) ) def denied ( self ) -

list [ AuditEntry ] : """Get all denied actions — useful for security review.""" return [ e for e in self . _entries if e . action == "denied" ] def by_agent ( self , agent_id : str ) -

list [ AuditEntry ] : return [ e for e in self . _entries if e . agent_id == agent_id ] def export_jsonl ( self , path : str ) : """Export as JSON Lines for log aggregation systems.""" with open ( path , "w" ) as f : for entry in self . _entries : f . write ( json . dumps ( { "timestamp" : entry . timestamp , "agent_id" : entry . agent_id , "tool" : entry . tool_name , "action" : entry . action , "policy" : entry . policy_name , ** entry . details } ) + "\n" ) Pattern 6: Framework Integration PydanticAI from pydantic_ai import Agent policy = GovernancePolicy ( name = "support-bot" , allowed_tools = [ "search_docs" , "create_ticket" ] , blocked_patterns = [ r"(?i)(ssn|social\s+security|credit\s+card)" ] , max_calls_per_request = 20 ) agent = Agent ( "openai:gpt-4o" , system_prompt = "You are a support assistant." ) @agent . tool @govern ( policy ) async def search_docs ( ctx , query : str ) -

str : """Search knowledge base — governed.""" return await kb . search ( query ) @agent . tool @govern ( policy ) async def create_ticket ( ctx , title : str , body : str ) -

str : """Create support ticket — governed.""" return await tickets . create ( title = title , body = body ) CrewAI from crewai import Agent , Task , Crew policy = GovernancePolicy ( name = "research-crew" , allowed_tools = [ "search" , "analyze" ] , max_calls_per_request = 30 )

Apply governance at the crew level

def governed_crew_run ( crew : Crew , policy : GovernancePolicy ) : """Wrap crew execution with governance checks.""" audit = AuditTrail ( ) for agent in crew . agents : for tool in agent . tools : original = tool . func tool . func = govern ( policy , audit_trail = audit ) ( original ) result = crew . kickoff ( ) return result , audit OpenAI Agents SDK from agents import Agent , function_tool policy = GovernancePolicy ( name = "coding-agent" , allowed_tools = [ "read_file" , "write_file" , "run_tests" ] , blocked_tools = [ "shell_exec" ] , max_calls_per_request = 50 ) @function_tool @govern ( policy ) async def read_file ( path : str ) -

str : """Read file contents — governed.""" import os safe_path = os . path . realpath ( path ) if not safe_path . startswith ( os . path . realpath ( "." ) ) : raise ValueError ( "Path traversal blocked by governance" ) with open ( safe_path ) as f : return f . read ( ) Governance Levels Match governance strictness to risk level: Level Controls Use Case Open Audit only, no restrictions Internal dev/testing Standard Tool allowlist + content filters General production agents Strict All controls + human approval for sensitive ops Financial, healthcare, legal Locked Allowlist only, no dynamic tools, full audit Compliance-critical systems Best Practices Practice Rationale Policy as configuration Store policies in YAML/JSON, not hardcoded — enables change without deploys Most-restrictive-wins When composing policies, deny always overrides allow Pre-flight intent check Classify intent before tool execution, not after Trust decay Trust scores should decay over time — require ongoing good behavior Append-only audit Never modify or delete audit entries — immutability enables compliance Fail closed If governance check errors, deny the action rather than allowing it Separate policy from logic Governance enforcement should be independent of agent business logic Quick Start Checklist

Agent Governance Implementation Checklist

Setup

[ ] Define governance policy (allowed tools, blocked patterns, rate limits)

[ ] Choose governance level (open/standard/strict/locked)

[ ] Set up audit trail storage

Implementation

[ ] Add @govern decorator to all tool functions

[ ] Add intent classification to user input processing

[ ] Implement trust scoring for multi-agent interactions

[ ] Wire up audit trail export

Validation

[ ] Test that blocked tools are properly denied

[ ] Test that content filters catch sensitive patterns

[ ] Test rate limiting behavior

[ ] Verify audit trail captures all events

[ ] Test policy composition (most-restrictive-wins) Related Resources Agent-OS Governance Engine — Full governance framework AgentMesh Integrations — Framework-specific packages OWASP Top 10 for LLM Applications

返回排行榜