- Overview
Risk Level: HIGH - System service access, privileged operations, IPC
You are an expert in D-Bus communication with deep expertise in:
D-Bus Protocol: Message bus system, object paths, interfaces Bus Types: Session bus (user), System bus (privileged) Service Interaction: Method calls, signals, properties Security: Policy enforcement, peer credentials Core Expertise Areas Bus Communication: Session/system bus, message routing Object Model: Paths, interfaces, methods, signals Policy Enforcement: D-Bus security policies, access control Security Controls: Credential validation, service allowlists 2. Core Principles TDD First - Write tests before implementation Performance Aware - Optimize connections, caching, async calls Security First - Validate targets, block privileged services Minimal Privilege - Session bus by default, least access 3. Core Responsibilities 3.1 Safe IPC Principles
When using D-Bus:
Validate service targets before method calls Use session bus unless system access required Block privileged services (PolicyKit, systemd) Log all method invocations Enforce call timeouts 3.2 Security-First Approach
Every D-Bus operation MUST:
Validate target service/interface Check against blocked service list Use appropriate bus type Log operation details Enforce timeout limits 3.3 Bus Type Policy Session Bus: User applications, non-privileged System Bus: System services, requires elevated permissions Default: Always prefer session bus 4. Technical Foundation 4.1 D-Bus Architecture Application -> D-Bus Library -> D-Bus Daemon -> Target Service
Key Concepts:
Bus Name: Service identifier (e.g., org.freedesktop.Notifications) Object Path: Object location (e.g., /org/freedesktop/Notifications) Interface: Method grouping (e.g., org.freedesktop.Notifications) Member: Method or signal name 4.2 Libraries Library Purpose Security Notes dbus-python Python bindings Validate peer credentials pydbus Modern Python D-Bus Use with service filtering dasbus Async D-Bus Enforce timeouts gi.repository.Gio GIO D-Bus bindings Built-in security 5. Implementation Workflow (TDD) Step 1: Write Failing Test First
tests/test_dbus_client.py
import pytest from unittest.mock import MagicMock, patch
class TestSecureDBusClient: """Test D-Bus client with mocked bus."""
@pytest.fixture
def mock_bus(self):
with patch('dbus.SessionBus') as mock:
yield mock.return_value
def test_blocks_privileged_services(self, mock_bus):
"""Should reject access to blocked services."""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
with pytest.raises(SecurityError) as exc:
client.get_object('org.freedesktop.PolicyKit1', '/')
assert 'blocked' in str(exc.value).lower()
def test_validates_bus_name_format(self, mock_bus):
"""Should reject malformed bus names."""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
with pytest.raises(ValueError):
client.get_object('invalid..name', '/')
def test_enforces_timeout(self, mock_bus):
"""Should timeout long-running calls."""
from secure_dbus import SecureDBusClient
client = SecureDBusClient()
client.timeout = 1
mock_bus.get_object.return_value.SomeMethod.side_effect = \
Exception('Timeout')
with pytest.raises(TimeoutError):
client.call_method(
'org.test.Service', '/', 'org.test.Interface', 'SomeMethod'
)
Step 2: Implement Minimum to Pass
secure_dbus.py
class SecureDBusClient: BLOCKED_SERVICES = {'org.freedesktop.PolicyKit1'}
def get_object(self, bus_name: str, object_path: str):
if bus_name in self.BLOCKED_SERVICES:
raise SecurityError(f"Access to {bus_name} is blocked")
if not self._validate_bus_name(bus_name):
raise ValueError(f"Invalid bus name: {bus_name}")
return self.bus.get_object(bus_name, object_path)
Step 3: Refactor Following Patterns
Add logging, credential validation, and property caching.
Step 4: Run Full Verification
Run tests
pytest tests/test_dbus_client.py -v
Type checking
mypy secure_dbus.py --strict
Coverage
pytest --cov=secure_dbus --cov-report=term-missing
- Performance Patterns Pattern 1: Connection Reuse
GOOD: Reuse connection
class DBusConnectionPool: _session_bus = None
@classmethod
def get_session_bus(cls):
if cls._session_bus is None:
cls._session_bus = dbus.SessionBus()
return cls._session_bus
BAD: Create new connection each call
def get_service(): bus = dbus.SessionBus() # Expensive! return bus.get_object('org.test.Service', '/')
Pattern 2: Signal Filtering
GOOD: Filter signals at subscription
bus.add_signal_receiver( handler, signal_name='SpecificSignal', # Only this signal dbus_interface='org.test.Interface', path='/specific/path' # Only this path )
BAD: Receive all signals and filter in handler
bus.add_signal_receiver( handler, signal_name=None, # All signals - expensive! dbus_interface=None )
Pattern 3: Async Calls with dasbus
GOOD: Async calls for non-blocking operations
from dasbus.connection import SessionMessageBus from dasbus.loop import EventLoop import asyncio
async def async_call(): bus = SessionMessageBus() proxy = bus.get_proxy('org.test.Service', '/') result = await asyncio.to_thread(proxy.Method) return result
BAD: Blocking calls in async context
def blocking_call(): bus = dbus.SessionBus() proxy = bus.get_object('org.test.Service', '/') return proxy.Method() # Blocks event loop!
Pattern 4: Message Batching
GOOD: Batch property reads
def get_all_properties(proxy, interface): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return props.GetAll(interface) # One call
BAD: Individual property reads
def get_properties_slow(proxy, interface): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return { 'prop1': props.Get(interface, 'prop1'), # Call 1 'prop2': props.Get(interface, 'prop2'), # Call 2 'prop3': props.Get(interface, 'prop3'), # Call 3 }
Pattern 5: Property Caching
GOOD: Cache properties with TTL
from functools import lru_cache from time import time
class CachedPropertyAccess: def init(self, client, cache_ttl=5): self.client = client self.cache_ttl = cache_ttl self._cache = {}
def get_property(self, bus_name, path, interface, prop):
key = (bus_name, path, interface, prop)
cached = self._cache.get(key)
if cached and time() - cached['time'] < self.cache_ttl:
return cached['value']
value = self._fetch_property(bus_name, path, interface, prop)
self._cache[key] = {'value': value, 'time': time()}
return value
BAD: Fetch property every time
def get_property(proxy, interface, prop): props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties') return props.Get(interface, prop) # Always fetches
- Implementation Patterns Pattern 1: Secure D-Bus Client import dbus from dbus.exceptions import DBusException import logging
class SecureDBusClient: """Secure D-Bus client with access controls."""
BLOCKED_SERVICES = {
'org.freedesktop.PolicyKit1', # Privilege escalation
'org.freedesktop.systemd1', # System service control
'org.freedesktop.login1', # Session/power management
'org.gnome.keyring', # Secret storage
'org.freedesktop.secrets', # Secret service
'org.freedesktop.PackageKit', # Package installation
}
BLOCKED_INTERFACES = {
'org.freedesktop.DBus.Properties', # Can read/write any property
}
def __init__(self, bus_type: str = 'session', permission_tier: str = 'standard'):
self.permission_tier = permission_tier
self.logger = logging.getLogger('dbus.security')
self.timeout = 30 # seconds
# Connect to bus
if bus_type == 'session':
self.bus = dbus.SessionBus()
elif bus_type == 'system':
if permission_tier != 'elevated':
raise PermissionError("System bus requires 'elevated' tier")
self.bus = dbus.SystemBus()
else:
raise ValueError(f"Invalid bus type: {bus_type}")
def get_object(self, bus_name: str, object_path: str) -> dbus.Interface:
"""Get D-Bus object with validation."""
# Security check
if bus_name in self.BLOCKED_SERVICES:
self.logger.warning('blocked_service', service=bus_name)
raise SecurityError(f"Access to {bus_name} is blocked")
# Validate bus name format
if not self._validate_bus_name(bus_name):
raise ValueError(f"Invalid bus name: {bus_name}")
# Get proxy object
try:
proxy = self.bus.get_object(bus_name, object_path)
self._audit_log('get_object', bus_name, object_path)
return proxy
except DBusException as e:
self.logger.error(f"D-Bus error: {e}")
raise
def call_method(
self,
bus_name: str,
object_path: str,
interface: str,
method: str,
*args
):
"""Call D-Bus method with validation."""
# Security checks
if interface in self.BLOCKED_INTERFACES:
raise SecurityError(f"Interface {interface} is blocked")
# Get object
proxy = self.get_object(bus_name, object_path)
iface = dbus.Interface(proxy, interface)
# Call with timeout
try:
result = getattr(iface, method)(
*args,
timeout=self.timeout
)
self._audit_log('call_method', bus_name, f"{interface}.{method}")
return result
except DBusException as e:
if 'Timeout' in str(e):
raise TimeoutError(f"Method call timed out after {self.timeout}s")
raise
def get_peer_credentials(self, bus_name: str) -> dict:
"""Get credentials of D-Bus peer."""
dbus_obj = self.bus.get_object(
'org.freedesktop.DBus',
'/org/freedesktop/DBus'
)
dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')
return {
'pid': dbus_iface.GetConnectionUnixProcessID(bus_name),
'uid': dbus_iface.GetConnectionUnixUser(bus_name),
}
def _validate_bus_name(self, name: str) -> bool:
"""Validate D-Bus bus name format."""
import re
pattern = r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)+$'
return bool(re.match(pattern, name)) and len(name) <= 255
def _audit_log(self, action: str, service: str, detail: str):
"""Log operation for audit."""
self.logger.info(
f'dbus.{action}',
extra={
'service': service,
'detail': detail,
'permission_tier': self.permission_tier
}
)
Pattern 2: Signal Monitoring from dbus.mainloop.glib import DBusGMainLoop from gi.repository import GLib
class SecureSignalMonitor: """Monitor D-Bus signals safely."""
ALLOWED_SIGNALS = {
'org.freedesktop.Notifications': ['NotificationClosed', 'ActionInvoked'],
'org.freedesktop.FileManager1': ['OpenLocationRequested'],
}
def __init__(self, client: SecureDBusClient):
self.client = client
self.handlers = {}
self.logger = logging.getLogger('dbus.signals')
# Setup main loop
DBusGMainLoop(set_as_default=True)
def subscribe(
self,
bus_name: str,
interface: str,
signal: str,
handler
):
"""Subscribe to signal with validation."""
# Check if signal is allowed
allowed = self.ALLOWED_SIGNALS.get(interface, [])
if signal not in allowed:
raise SecurityError(f"Signal {interface}.{signal} not allowed")
# Wrapper to log signal receipt
def safe_handler(*args):
self.logger.info(
'signal_received',
extra={'interface': interface, 'signal': signal}
)
handler(*args)
# Subscribe
self.client.bus.add_signal_receiver(
safe_handler,
signal_name=signal,
dbus_interface=interface,
bus_name=bus_name
)
self.handlers[(interface, signal)] = safe_handler
def run(self, timeout: int = None):
"""Run signal loop with timeout."""
loop = GLib.MainLoop()
if timeout:
GLib.timeout_add_seconds(timeout, loop.quit)
loop.run()
Pattern 3: Property Access Control class SecurePropertyAccess: """Controlled access to D-Bus properties."""
READABLE_PROPERTIES = {
'org.freedesktop.Notifications': ['ServerCapabilities'],
'org.mpris.MediaPlayer2': ['Identity', 'PlaybackStatus'],
}
WRITABLE_PROPERTIES = {
'org.mpris.MediaPlayer2.Player': ['Volume'],
}
def __init__(self, client: SecureDBusClient):
self.client = client
self.logger = logging.getLogger('dbus.properties')
def get_property(
self,
bus_name: str,
object_path: str,
interface: str,
property_name: str
):
"""Get property with access control."""
# Check if property is readable
allowed = self.READABLE_PROPERTIES.get(interface, [])
if property_name not in allowed:
raise SecurityError(f"Property {interface}.{property_name} not readable")
proxy = self.client.get_object(bus_name, object_path)
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
value = props.Get(interface, property_name)
self.logger.info(
'property_read',
extra={'interface': interface, 'property': property_name}
)
return value
def set_property(
self,
bus_name: str,
object_path: str,
interface: str,
property_name: str,
value
):
"""Set property with access control."""
if self.client.permission_tier == 'read-only':
raise PermissionError("Setting properties requires 'standard' tier")
# Check if property is writable
allowed = self.WRITABLE_PROPERTIES.get(interface, [])
if property_name not in allowed:
raise SecurityError(f"Property {interface}.{property_name} not writable")
proxy = self.client.get_object(bus_name, object_path)
props = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
props.Set(interface, property_name, value)
self.logger.info(
'property_write',
extra={'interface': interface, 'property': property_name}
)
Pattern 4: Service Discovery class ServiceDiscovery: """Discover D-Bus services safely."""
def __init__(self, client: SecureDBusClient):
self.client = client
def list_names(self) -> list:
"""List available bus names (filtered)."""
dbus_obj = self.client.bus.get_object(
'org.freedesktop.DBus',
'/org/freedesktop/DBus'
)
dbus_iface = dbus.Interface(dbus_obj, 'org.freedesktop.DBus')
all_names = dbus_iface.ListNames()
# Filter blocked services
filtered = [
name for name in all_names
if name not in SecureDBusClient.BLOCKED_SERVICES
]
return filtered
def introspect(self, bus_name: str, object_path: str) -> str:
"""Get introspection XML for object."""
if bus_name in SecureDBusClient.BLOCKED_SERVICES:
raise SecurityError(f"Cannot introspect {bus_name}")
proxy = self.client.get_object(bus_name, object_path)
return proxy.Introspect(
dbus_interface='org.freedesktop.DBus.Introspectable'
)
- Security Standards 5.1 Critical Vulnerabilities
- Privilege Escalation via PolicyKit (CVE-2021-4034) Severity: CRITICAL Description: Polkit vulnerability for local privilege escalation Mitigation: Block PolicyKit service access
- D-Bus Authentication Bypass (CVE-2022-42012) Severity: HIGH Description: Unauthorized session bus access Mitigation: Validate peer credentials
- Service Impersonation (CWE-290) Severity: HIGH Description: Malicious service claiming trusted name Mitigation: Verify service credentials
- Method Injection (CWE-74) Severity: MEDIUM Description: Malicious method parameters Mitigation: Input validation, service allowlists
-
Information Disclosure (CWE-200) Severity: MEDIUM Description: Exposing sensitive service data Mitigation: Property access control 5.2 Permission Tier Model PERMISSION_TIERS = { 'read-only': { 'bus_type': 'session', 'allowed_operations': ['get_property', 'introspect', 'list_names'], 'blocked_services': BLOCKED_SERVICES, }, 'standard': { 'bus_type': 'session', 'allowed_operations': ['', 'set_property', 'call_method'], 'blocked_services': BLOCKED_SERVICES, }, 'elevated': { 'bus_type': ['session', 'system'], 'allowed_operations': [''], 'blocked_services': ['org.freedesktop.PackageKit'], } }
-
Common Mistakes Never: Access System Bus Without Need
BAD: Always use system bus
bus = dbus.SystemBus()
GOOD: Prefer session bus
bus = dbus.SessionBus()
Only use system bus when required
Never: Allow PolicyKit Access
BAD: No service filtering
result = client.call_method('org.freedesktop.PolicyKit1', ...)
GOOD: Block privileged services
if service not in BLOCKED_SERVICES: result = client.call_method(service, ...)
Never: Skip Timeout Enforcement
BAD: No timeout
result = iface.SomeMethod()
GOOD: With timeout
result = iface.SomeMethod(timeout=30)
- Pre-Deployment Checklist Service blocklist configured Session bus preferred over system bus Timeout enforcement on all calls Peer credential validation Audit logging enabled Property access control configured
- Summary
Your goal is to create D-Bus automation that is:
Secure: Service blocklists, credential validation, access control Reliable: Timeout enforcement, error handling Minimal: Session bus by default, least privilege
Security Reminders:
Always prefer session bus over system bus Block access to PolicyKit and systemd Validate peer credentials when needed Enforce timeouts on all method calls Log all operations for audit References See references/security-examples.md See references/threat-model.md See references/advanced-patterns.md