secrets-rotation

安装量: 117
排名: #7325

安装

npx skills add https://github.com/aj-geddes/useful-ai-prompts --skill secrets-rotation

Secrets Rotation Overview

Implement automated secrets rotation strategy for credentials, API keys, certificates, and encryption keys with zero-downtime deployment and comprehensive audit logging.

When to Use API key management Database credentials TLS/SSL certificates Encryption key rotation Compliance requirements Security incident response Service account management Implementation Examples 1. Node.js Secrets Manager with Rotation // secrets-manager.js const AWS = require('aws-sdk'); const crypto = require('crypto');

class SecretsManager { constructor() { this.secretsManager = new AWS.SecretsManager({ region: process.env.AWS_REGION });

this.rotationSchedule = new Map();

}

/* * Generate new secret value / generateSecret(type = 'api_key', length = 32) { switch (type) { case 'api_key': return crypto.randomBytes(length).toString('hex');

  case 'password':
    // Generate strong password
    const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*';
    let password = '';
    for (let i = 0; i < length; i++) {
      password += chars.charAt(crypto.randomInt(chars.length));
    }
    return password;

  case 'jwt_secret':
    return crypto.randomBytes(64).toString('base64');

  default:
    return crypto.randomBytes(length).toString('base64');
}

}

/* * Store secret in AWS Secrets Manager / async createSecret(name, value, description = '') { const params = { Name: name, SecretString: JSON.stringify(value), Description: description };

try {
  const result = await this.secretsManager.createSecret(params).promise();
  return result;
} catch (error) {
  if (error.code === 'ResourceExistsException') {
    // Update existing secret
    return this.updateSecret(name, value);
  }
  throw error;
}

}

/* * Retrieve secret / async getSecret(name) { const params = { SecretId: name };

try {
  const data = await this.secretsManager.getSecretValue(params).promise();

  if ('SecretString' in data) {
    return JSON.parse(data.SecretString);
  }

  // Binary secret
  const buff = Buffer.from(data.SecretBinary, 'base64');
  return buff.toString('ascii');
} catch (error) {
  console.error(`Error retrieving secret ${name}:`, error);
  throw error;
}

}

/* * Update secret value / async updateSecret(name, value) { const params = { SecretId: name, SecretString: JSON.stringify(value) };

return this.secretsManager.updateSecret(params).promise();

}

/* * Rotate secret with zero downtime / async rotateSecret(name, type = 'api_key') { console.log(Starting rotation for secret: ${name});

try {
  // Step 1: Generate new secret
  const newValue = this.generateSecret(type);

  // Step 2: Store new version
  const currentSecret = await this.getSecret(name);

  // Keep old value temporarily for graceful transition
  const secretWithRotation = {
    current: newValue,
    previous: currentSecret.current || currentSecret,
    rotatedAt: new Date().toISOString()
  };

  await this.updateSecret(name, secretWithRotation);

  console.log(`New secret version created for: ${name}`);

  // Step 3: Wait for applications to pick up new secret
  await this.waitForPropagation(5000);

  // Step 4: Verify new secret works
  const verificationPassed = await this.verifySecret(name, newValue);

  if (!verificationPassed) {
    throw new Error('Secret verification failed');
  }

  // Step 5: Remove previous version after grace period
  setTimeout(async () => {
    await this.updateSecret(name, {
      current: newValue,
      rotatedAt: new Date().toISOString()
    });
    console.log(`Rotation completed for: ${name}`);
  }, 300000); // 5 minutes grace period

  return {
    success: true,
    secretName: name,
    rotatedAt: new Date().toISOString()
  };
} catch (error) {
  console.error(`Rotation failed for ${name}:`, error);

  // Rollback on failure
  await this.rollbackRotation(name);

  throw error;
}

}

/* * Schedule automatic rotation / async scheduleRotation(name, intervalDays = 90) { const intervalMs = intervalDays * 24 * 60 * 60 * 1000;

const rotationJob = setInterval(async () => {
  try {
    await this.rotateSecret(name);
    console.log(`Scheduled rotation completed for: ${name}`);
  } catch (error) {
    console.error(`Scheduled rotation failed for ${name}:`, error);
    // Alert operations team
    this.sendAlert(name, error);
  }
}, intervalMs);

this.rotationSchedule.set(name, rotationJob);

// AWS Secrets Manager automatic rotation
const params = {
  SecretId: name,
  RotationLambdaARN: process.env.ROTATION_LAMBDA_ARN,
  RotationRules: {
    AutomaticallyAfterDays: intervalDays
  }
};

await this.secretsManager.rotateSecret(params).promise();

}

/* * Rotate database credentials / async rotateDatabaseCredentials(secretName) { const credentials = await this.getSecret(secretName);

// Generate new password
const newPassword = this.generateSecret('password', 20);

// Update database user password
const connection = await this.connectToDatabase(credentials);

await connection.query(
  'ALTER USER ? IDENTIFIED BY ?',
  [credentials.username, newPassword]
);

// Update secret
await this.updateSecret(secretName, {
  username: credentials.username,
  password: newPassword,
  host: credentials.host,
  database: credentials.database,
  rotatedAt: new Date().toISOString()
});

await connection.end();

return { success: true };

}

/* * Rotate TLS certificate / async rotateTLSCertificate(domain) { // Use Let's Encrypt or internal CA const certbot = require('certbot');

try {
  // Request new certificate
  const newCert = await certbot.certonly({
    domains: [domain],
    email: process.env.ADMIN_EMAIL,
    agreeTos: true,
    renewByDefault: true
  });

  // Store in secrets manager
  await this.createSecret(`tls-cert-${domain}`, {
    certificate: newCert.certificate,
    privateKey: newCert.privateKey,
    chain: newCert.chain,
    issuedAt: new Date().toISOString(),
    expiresAt: newCert.expiresAt
  });

  // Update load balancer/web server
  await this.updateServerCertificate(domain, newCert);

  console.log(`TLS certificate rotated for: ${domain}`);

  return { success: true };
} catch (error) {
  console.error('Certificate rotation failed:', error);
  throw error;
}

}

async waitForPropagation(ms) { return new Promise(resolve => setTimeout(resolve, ms)); }

async verifySecret(name, value) { // Implement verification logic // Test API call, database connection, etc. return true; }

async rollbackRotation(name) { // Restore previous version console.log(Rolling back rotation for: ${name}); }

async sendAlert(secretName, error) { // Send to monitoring system console.error(ALERT: Rotation failed for ${secretName}, error); }

async connectToDatabase(credentials) { // Database connection logic return null; }

async updateServerCertificate(domain, cert) { // Update server configuration return null; } }

// Usage const secretsManager = new SecretsManager();

// Rotate API key async function rotateAPIKey() { await secretsManager.rotateSecret('api-key-external-service', 'api_key'); }

// Schedule automatic rotation async function setupRotationSchedule() { await secretsManager.scheduleRotation('database-credentials', 90); await secretsManager.scheduleRotation('api-keys', 30); }

// Rotate database credentials async function rotateDatabaseCreds() { await secretsManager.rotateDatabaseCredentials('rds-production'); }

module.exports = SecretsManager;

  1. Python Secrets Rotation with Vault

secrets_rotation.py

import hvac import secrets import string from datetime import datetime, timedelta from typing import Dict, Any import psycopg2 import boto3

class SecretsRotation: def init(self, vault_url: str, vault_token: str): self.vault_client = hvac.Client(url=vault_url, token=vault_token) self.ssm = boto3.client('ssm')

def generate_secret(self, secret_type: str = 'api_key', length: int = 32) -> str:
    """Generate new secret value"""
    if secret_type == 'api_key':
        return secrets.token_urlsafe(length)

    elif secret_type == 'password':
        # Strong password with all character types
        chars = string.ascii_letters + string.digits + string.punctuation
        return ''.join(secrets.choice(chars) for _ in range(length))

    elif secret_type == 'jwt_secret':
        return secrets.token_urlsafe(64)

    else:
        return secrets.token_bytes(length).hex()

def rotate_secret(self, path: str, secret_type: str = 'api_key') -> Dict[str, Any]:
    """Rotate secret with zero downtime"""
    print(f"Starting rotation for: {path}")

    try:
        # Read current secret
        current_secret = self.vault_client.secrets.kv.v2.read_secret(path=path)
        current_data = current_secret['data']['data']

        # Generate new value
        new_value = self.generate_secret(secret_type)

        # Store with both old and new values
        rotation_data = {
            'current': new_value,
            'previous': current_data.get('current', current_data.get('value')),
            'rotated_at': datetime.utcnow().isoformat()
        }

        self.vault_client.secrets.kv.v2.create_or_update_secret(
            path=path,
            secret=rotation_data
        )

        print(f"Secret rotated successfully: {path}")

        return {
            'success': True,
            'path': path,
            'rotated_at': rotation_data['rotated_at']
        }

    except Exception as e:
        print(f"Rotation failed for {path}: {e}")
        raise

def rotate_database_password(self, secret_path: str) -> Dict[str, Any]:
    """Rotate database credentials"""
    # Get current credentials
    secret = self.vault_client.secrets.kv.v2.read_secret(path=secret_path)
    creds = secret['data']['data']

    # Generate new password
    new_password = self.generate_secret('password', 20)

    # Connect to database
    conn = psycopg2.connect(
        host=creds['host'],
        database=creds['database'],
        user=creds['username'],
        password=creds['password']
    )

    cursor = conn.cursor()

    try:
        # Update password in database
        cursor.execute(
            f"ALTER USER {creds['username']} WITH PASSWORD %s",
            (new_password,)
        )
        conn.commit()

        # Update secret in Vault
        updated_creds = {
            **creds,
            'password': new_password,
            'rotated_at': datetime.utcnow().isoformat()
        }

        self.vault_client.secrets.kv.v2.create_or_update_secret(
            path=secret_path,
            secret=updated_creds
        )

        print(f"Database credentials rotated: {secret_path}")

        return {'success': True}

    finally:
        cursor.close()
        conn.close()

def schedule_rotation(self, path: str, interval_days: int = 90):
    """Schedule automatic rotation using AWS Lambda"""
    # Create rotation schedule in AWS Secrets Manager
    # or use cron job

    schedule_expression = f"rate({interval_days} days)"

    # This would trigger a Lambda function
    print(f"Rotation scheduled for {path}: every {interval_days} days")

def rotate_encryption_keys(self, key_id: str):
    """Rotate encryption keys"""
    kms = boto3.client('kms')

    # Enable automatic key rotation
    kms.enable_key_rotation(KeyId=key_id)

    print(f"Automatic rotation enabled for KMS key: {key_id}")

def audit_rotation_history(self, path: str) -> list:
    """Get rotation history"""
    versions = self.vault_client.secrets.kv.v2.read_secret_metadata(path=path)

    history = []
    for version, metadata in versions['data']['versions'].items():
        history.append({
            'version': version,
            'created_time': metadata['created_time'],
            'deleted': metadata.get('deletion_time') is not None
        })

    return sorted(history, key=lambda x: x['created_time'], reverse=True)

Usage

if name == 'main': rotation = SecretsRotation( vault_url='http://localhost:8200', vault_token='your-token' )

# Rotate API key
rotation.rotate_secret('api-keys/external-service', 'api_key')

# Rotate database credentials
rotation.rotate_database_password('database/production')

# Schedule rotations
rotation.schedule_rotation('api-keys/external-service', 30)
rotation.schedule_rotation('database/production', 90)

# View history
history = rotation.audit_rotation_history('api-keys/external-service')
print(f"Rotation history: {history}")
  1. Kubernetes Secrets Rotation

secrets-rotation-cronjob.yaml

apiVersion: batch/v1 kind: CronJob metadata: name: secrets-rotation namespace: production spec: schedule: "0 2 * * 0" # Weekly at 2 AM Sunday jobTemplate: spec: template: spec: serviceAccountName: secrets-rotator containers: - name: rotate image: secrets-rotator:latest env: - name: VAULT_ADDR value: "http://vault:8200" - name: VAULT_TOKEN valueFrom: secretKeyRef: name: vault-token key: token command: - /bin/sh - -c - | # Rotate secrets python /app/rotate_secrets.py \ --secret database-password \ --secret api-keys \ --secret tls-certificates

      restartPolicy: OnFailure

apiVersion: v1 kind: ServiceAccount metadata: name: secrets-rotator namespace: production


apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: secrets-rotator namespace: production rules: - apiGroups: [""] resources: ["secrets"] verbs: ["get", "list", "update", "patch"]


apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: secrets-rotator namespace: production subjects: - kind: ServiceAccount name: secrets-rotator roleRef: kind: Role name: secrets-rotator apiGroup: rbac.authorization.k8s.io

Best Practices ✅ DO Automate rotation Use grace periods Verify new secrets Maintain rotation audit trail Implement rollback procedures Monitor rotation failures Use managed services (AWS Secrets Manager) Test rotation procedures ❌ DON'T Hardcode secrets Share secrets Skip verification Rotate without grace period Ignore rotation failures Store secrets in version control Rotation Schedule API Keys: 30-90 days Database Passwords: 90 days TLS Certificates: Before expiry Encryption Keys: 1 year Service Account Tokens: 90 days Zero-Downtime Strategy Generate new secret Store with versioning Grace period (both versions valid) Verification Deprecate old version Remove after grace period Resources AWS Secrets Manager Rotation HashiCorp Vault NIST Key Management

返回排行榜