zero-trust-architecture

安装量: 125
排名: #6875

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill zero-trust-architecture

Zero Trust Architecture Overview

Implement comprehensive Zero Trust security architecture based on "never trust, always verify" principle with identity-centric security, microsegmentation, and continuous verification.

When to Use Cloud-native applications Microservices architecture Remote workforce security API security Multi-cloud deployments Legacy modernization Compliance requirements Implementation Examples 1. Zero Trust Gateway // zero-trust-gateway.js const jwt = require('jsonwebtoken'); const axios = require('axios');

class ZeroTrustGateway { constructor() { this.identityProvider = process.env.IDENTITY_PROVIDER_URL; this.deviceRegistry = new Map(); this.sessionContext = new Map(); }

/* * Verify identity - Who are you? / async verifyIdentity(token) { try { // Verify JWT token const decoded = jwt.verify(token, process.env.JWT_PUBLIC_KEY, { algorithms: ['RS256'] });

  // Check token hasn't been revoked
  const revoked = await this.checkTokenRevocation(decoded.jti);
  if (revoked) {
    throw new Error('Token has been revoked');
  }

  return {
    valid: true,
    userId: decoded.sub,
    roles: decoded.roles,
    permissions: decoded.permissions
  };
} catch (error) {
  return { valid: false, error: error.message };
}

}

/* * Verify device - What device are you using? / async verifyDevice(deviceId, deviceFingerprint) { const registered = this.deviceRegistry.get(deviceId);

if (!registered) {
  return {
    trusted: false,
    reason: 'Device not registered'
  };
}

// Check device fingerprint matches
if (registered.fingerprint !== deviceFingerprint) {
  return {
    trusted: false,
    reason: 'Device fingerprint mismatch'
  };
}

// Check device compliance
const compliance = await this.checkDeviceCompliance(deviceId);

return {
  trusted: compliance.compliant,
  reason: compliance.reason,
  riskScore: compliance.riskScore
};

}

/* * Verify location - Where are you? / async verifyLocation(ip, expectedCountry) { try { // Get geolocation data const geoData = await this.getGeoLocation(ip);

  // Check for impossible travel
  const lastLocation = this.getLastKnownLocation(ip);
  if (lastLocation) {
    const impossibleTravel = this.detectImpossibleTravel(
      lastLocation,
      geoData,
      Date.now() - lastLocation.timestamp
    );

    if (impossibleTravel) {
      return {
        valid: false,
        reason: 'Impossible travel detected',
        riskScore: 9
      };
    }
  }

  // Check against allowed locations
  if (expectedCountry && geoData.country !== expectedCountry) {
    return {
      valid: false,
      reason: 'Unexpected location',
      riskScore: 7
    };
  }

  return {
    valid: true,
    location: geoData,
    riskScore: 1
  };
} catch (error) {
  return {
    valid: false,
    reason: 'Location verification failed',
    riskScore: 5
  };
}

}

/* * Verify authorization - What can you access? / async verifyAuthorization(userId, resource, action, context) { // Get user permissions const user = await this.getUserPermissions(userId);

// Check direct permissions
if (this.hasPermission(user, resource, action)) {
  return { authorized: true, reason: 'Direct permission' };
}

// Check role-based permissions
for (const role of user.roles) {
  if (this.hasRolePermission(role, resource, action)) {
    return { authorized: true, reason: `Role: ${role}` };
  }
}

// Check attribute-based policies
const abacResult = await this.evaluateABAC(user, resource, action, context);
if (abacResult.allowed) {
  return { authorized: true, reason: 'ABAC policy' };
}

return {
  authorized: false,
  reason: 'Insufficient permissions'
};

}

/* * Calculate risk score / calculateRiskScore(factors) { let score = 0;

// Identity factors
if (!factors.mfaUsed) score += 3;
if (factors.newDevice) score += 2;

// Location factors
if (factors.unusualLocation) score += 3;
if (factors.vpnDetected) score += 1;

// Behavior factors
if (factors.unusualTime) score += 2;
if (factors.rapidRequests) score += 2;

// Device factors
if (!factors.deviceCompliant) score += 4;
if (factors.jailbroken) score += 5;

return Math.min(score, 10);

}

/* * Continuous verification middleware / middleware() { return async (req, res, next) => { const startTime = Date.now();

  try {
    // Extract authentication token
    const token = req.headers.authorization?.replace('Bearer ', '');
    if (!token) {
      return res.status(401).json({
        error: 'unauthorized',
        message: 'No authentication token provided'
      });
    }

    // Step 1: Verify identity
    const identity = await this.verifyIdentity(token);
    if (!identity.valid) {
      return res.status(401).json({
        error: 'unauthorized',
        message: 'Invalid identity'
      });
    }

    // Step 2: Verify device
    const deviceId = req.headers['x-device-id'];
    const deviceFingerprint = req.headers['x-device-fingerprint'];

    if (deviceId && deviceFingerprint) {
      const device = await this.verifyDevice(deviceId, deviceFingerprint);
      if (!device.trusted) {
        return res.status(403).json({
          error: 'forbidden',
          message: device.reason
        });
      }
    }

    // Step 3: Verify location
    const location = await this.verifyLocation(req.ip);
    if (!location.valid) {
      // Require step-up authentication
      return res.status(403).json({
        error: 'forbidden',
        message: 'Additional authentication required',
        requiresStepUp: true
      });
    }

    // Step 4: Calculate risk score
    const riskScore = this.calculateRiskScore({
      mfaUsed: identity.mfaUsed,
      newDevice: !deviceId,
      unusualLocation: location.riskScore > 5,
      deviceCompliant: true
    });

    // Step 5: Verify authorization
    const authorization = await this.verifyAuthorization(
      identity.userId,
      req.path,
      req.method,
      {
        ip: req.ip,
        riskScore,
        time: new Date()
      }
    );

    if (!authorization.authorized) {
      return res.status(403).json({
        error: 'forbidden',
        message: authorization.reason
      });
    }

    // Add context to request
    req.zeroTrust = {
      userId: identity.userId,
      roles: identity.roles,
      riskScore,
      verificationTime: Date.now() - startTime
    };

    // Log access
    this.logAccess(req, identity, riskScore);

    next();
  } catch (error) {
    console.error('Zero Trust verification failed:', error);
    return res.status(500).json({
      error: 'internal_error',
      message: 'Security verification failed'
    });
  }
};

}

async checkTokenRevocation(jti) { // Check against revocation list return false; }

async checkDeviceCompliance(deviceId) { // Check device meets security requirements return { compliant: true, reason: 'Device meets requirements', riskScore: 1 }; }

async getGeoLocation(ip) { // Get geolocation from IP return { country: 'US', city: 'San Francisco', lat: 37.7749, lon: -122.4194 }; }

getLastKnownLocation(ip) { return null; }

detectImpossibleTravel(lastLocation, currentLocation, timeDiff) { // Calculate if travel is physically possible return false; }

async getUserPermissions(userId) { // Fetch user permissions return { roles: ['user'], permissions: [] }; }

hasPermission(user, resource, action) { return false; }

hasRolePermission(role, resource, action) { return false; }

async evaluateABAC(user, resource, action, context) { return { allowed: false }; }

logAccess(req, identity, riskScore) { console.log({ timestamp: new Date().toISOString(), userId: identity.userId, resource: req.path, method: req.method, riskScore, ip: req.ip }); } }

// Express setup const express = require('express'); const app = express();

const ztGateway = new ZeroTrustGateway();

// Apply Zero Trust middleware app.use(ztGateway.middleware());

// Protected endpoint app.get('/api/sensitive-data', (req, res) => { res.json({ message: 'Access granted', riskScore: req.zeroTrust.riskScore }); });

module.exports = ZeroTrustGateway;

  1. Service Mesh - Microsegmentation

istio-zero-trust.yaml

Istio configuration for Zero Trust microsegmentation

apiVersion: security.istio.io/v1beta1 kind: PeerAuthentication metadata: name: default namespace: production spec: mtls: mode: STRICT # Require mutual TLS


apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy metadata: name: deny-all namespace: production spec: {} # Deny all by default


apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy metadata: name: allow-frontend-to-backend namespace: production spec: selector: matchLabels: app: backend action: ALLOW rules: - from: - source: principals: ["cluster.local/ns/production/sa/frontend"] to: - operation: methods: ["GET", "POST"] paths: ["/api/*"]


apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy metadata: name: allow-backend-to-database namespace: production spec: selector: matchLabels: app: database action: ALLOW rules: - from: - source: principals: ["cluster.local/ns/production/sa/backend"] to: - operation: methods: ["*"]


JWT authentication

apiVersion: security.istio.io/v1beta1 kind: RequestAuthentication metadata: name: jwt-auth namespace: production spec: selector: matchLabels: app: backend jwtRules: - issuer: "https://auth.example.com" jwksUri: "https://auth.example.com/.well-known/jwks.json" audiences: - "api.example.com"


Network policy - additional defense layer

apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: backend-network-policy namespace: production spec: podSelector: matchLabels: app: backend policyTypes: - Ingress - Egress ingress: - from: - podSelector: matchLabels: app: frontend ports: - protocol: TCP port: 8080 egress: - to: - podSelector: matchLabels: app: database ports: - protocol: TCP port: 5432

  1. Python Zero Trust Policy Engine

zero_trust_policy.py

from dataclasses import dataclass from typing import List, Dict, Any from datetime import datetime import jwt

@dataclass class ZeroTrustContext: user_id: str device_id: str location: Dict[str, Any] risk_score: int timestamp: datetime

class ZeroTrustPolicy: def init(self): self.policies = self.load_policies()

def load_policies(self) -> List[Dict]:
    """Load Zero Trust policies"""
    return [
        {
            'name': 'high_risk_block',
            'condition': lambda ctx: ctx.risk_score >= 8,
            'action': 'deny',
            'reason': 'High risk score'
        },
        {
            'name': 'require_mfa',
            'condition': lambda ctx: ctx.risk_score >= 5,
            'action': 'step_up_auth',
            'reason': 'Elevated risk requires MFA'
        },
        {
            'name': 'untrusted_device',
            'condition': lambda ctx: not self.is_device_trusted(ctx.device_id),
            'action': 'deny',
            'reason': 'Untrusted device'
        },
        {
            'name': 'unusual_location',
            'condition': lambda ctx: self.is_unusual_location(ctx),
            'action': 'step_up_auth',
            'reason': 'Unusual location detected'
        }
    ]

def evaluate(self, context: ZeroTrustContext, resource: str, action: str) -> Dict:
    """Evaluate Zero Trust policies"""
    for policy in self.policies:
        if policy['condition'](context):
            return {
                'allowed': policy['action'] != 'deny',
                'action': policy['action'],
                'reason': policy['reason'],
                'policy': policy['name']
            }

    # Check resource-specific permissions
    if self.has_permission(context.user_id, resource, action):
        return {
            'allowed': True,
            'action': 'allow',
            'reason': 'User has required permissions'
        }

    return {
        'allowed': False,
        'action': 'deny',
        'reason': 'No matching policy allows access'
    }

def is_device_trusted(self, device_id: str) -> bool:
    # Check device trust status
    return True

def is_unusual_location(self, context: ZeroTrustContext) -> bool:
    # Check if location is unusual for user
    return False

def has_permission(self, user_id: str, resource: str, action: str) -> bool:
    # Check user permissions
    return False

Flask integration

from flask import Flask, request, jsonify, g from functools import wraps

app = Flask(name) zt_policy = ZeroTrustPolicy()

def zero_trust_required(f): @wraps(f) def decorated_function(args, *kwargs): # Build Zero Trust context context = ZeroTrustContext( user_id=g.user_id, device_id=request.headers.get('X-Device-ID', 'unknown'), location={ 'ip': request.remote_addr, 'country': 'US' # From GeoIP }, risk_score=calculate_risk_score(request), timestamp=datetime.utcnow() )

    # Evaluate policies
    result = zt_policy.evaluate(
        context,
        request.path,
        request.method
    )

    if not result['allowed']:
        return jsonify({
            'error': 'forbidden',
            'reason': result['reason'],
            'action_required': result['action']
        }), 403

    return f(*args, **kwargs)

return decorated_function

def calculate_risk_score(request) -> int: """Calculate risk score based on request context""" score = 0

# Check for suspicious patterns
if not request.headers.get('X-Device-ID'):
    score += 2

# Add more risk factors
return min(score, 10)

@app.route('/api/sensitive', methods=['GET']) @zero_trust_required def get_sensitive_data(): return jsonify({'data': 'sensitive information'})

Zero Trust Principles Verify Explicitly: Always authenticate and authorize Least Privilege: Minimal access required Assume Breach: Limit blast radius Microsegmentation: Network segmentation Continuous Monitoring: Real-time security Device Trust: Verify device compliance Data Protection: Encrypt everything Implementation Layers Identity Layer: Strong authentication (MFA, SSO) Device Layer: Device compliance, certificates Network Layer: Microsegmentation, mTLS Application Layer: API gateway, authorization Data Layer: Encryption, DLP Zero Trust Maturity Model Traditional: Perimeter-based security Advanced: Some segmentation Optimal: Full Zero Trust Progressive: Continuous improvement Best Practices ✅ DO Verify every request Implement MFA everywhere Use microsegmentation Monitor continuously Encrypt all communications Implement least privilege Log all access Regular audits ❌ DON'T Trust network location Use implicit trust Skip device verification Allow lateral movement Use static credentials Resources NIST Zero Trust Architecture Google BeyondCorp Zero Trust Implementation Guide

返回排行榜