Blue Team - Defensive Security
Blue Team refers to the defensive side of cybersecurity. Blue team professionals focus on protecting systems, detecting threats, and responding to security incidents. They work to strengthen defenses and maintain the security posture of an organization.
Core Responsibilities
1. Security Monitoring
Continuously monitor systems for suspicious activities and potential threats.
2. Incident Response
Respond to and mitigate security incidents when they occur.
3. Threat Intelligence
Gather and analyze information about potential threats.
4. Security Hardening
Strengthen systems against attacks through configuration and patching.
5. Vulnerability Management
Identify and remediate security weaknesses.
Key Activities
Security Information and Event Management (SIEM)
Example: Setting up Log Monitoring
import re
from datetime import datetime
class LogMonitor:
def __init__(self):
self.failed_login_threshold = 5
self.failed_attempts = {}
def analyze_log(self, log_entry):
# Parse log entry
pattern = r'Failed login attempt for user (\w+) from IP (\d+\.\d+\.\d+\.\d+)'
match = re.search(pattern, log_entry)
if match:
username = match.group(1)
ip_address = match.group(2)
# Track failed attempts
key = f"{username}:{ip_address}"
self.failed_attempts[key] = self.failed_attempts.get(key, 0) + 1
# Alert if threshold exceeded
if self.failed_attempts[key] >= self.failed_login_threshold:
self.trigger_alert(username, ip_address)
def trigger_alert(self, username, ip_address):
alert = {
'timestamp': datetime.now(),
'severity': 'HIGH',
'type': 'Brute Force Attack',
'username': username,
'ip_address': ip_address,
'action': 'Block IP and notify admin'
}
print(f"ALERT: {alert}")
self.block_ip(ip_address)
def block_ip(self, ip_address):
# Add to firewall blocklist
print(f"Blocking IP: {ip_address}")
# Usage
monitor = LogMonitor()
logs = [
"Failed login attempt for admin from IP 192.168.1.100",
"Failed login attempt for admin from IP 192.168.1.100",
"Failed login attempt for admin from IP 192.168.1.100",
"Failed login attempt for admin from IP 192.168.1.100",
"Failed login attempt for admin from IP 192.168.1.100",
]
for log in logs:
monitor.analyze_log(log)
Intrusion Detection System (IDS)
Example: Simple Network Traffic Analyzer
from scapy.all import sniff, IP, TCP
from collections import defaultdict
import time
class SimpleIDS:
def __init__(self):
self.connection_count = defaultdict(int)
self.port_scan_threshold = 10
self.time_window = 60 # seconds
self.scan_attempts = defaultdict(list)
def detect_port_scan(self, packet):
if packet.haslayer(TCP) and packet.haslayer(IP):
src_ip = packet[IP].src
dst_port = packet[TCP].dport
current_time = time.time()
# Track port access attempts
self.scan_attempts[src_ip].append({
'port': dst_port,
'time': current_time
})
# Clean old entries
self.scan_attempts[src_ip] = [
attempt for attempt in self.scan_attempts[src_ip]
if current_time - attempt['time'] < self.time_window
]
# Check for port scan
unique_ports = len(set(a['port'] for a in self.scan_attempts[src_ip]))
if unique_ports >= self.port_scan_threshold:
self.alert_port_scan(src_ip, unique_ports)
def alert_port_scan(self, src_ip, port_count):
print(f"⚠️ PORT SCAN DETECTED!")
print(f" Source IP: {src_ip}")
print(f" Ports scanned: {port_count}")
print(f" Action: Blocking IP and notifying security team")
def analyze_packet(self, packet):
self.detect_port_scan(packet)
# Usage (requires root/admin privileges)
# ids = SimpleIDS()
# sniff(prn=ids.analyze_packet, filter="tcp", count=100)
Vulnerability Scanning
Example: Basic Web Application Scanner
import requests
from urllib.parse import urljoin
class VulnerabilityScanner:
def __init__(self, target_url):
self.target_url = target_url
self.vulnerabilities = []
def check_sql_injection(self, url):
"""Test for SQL injection vulnerabilities"""
payloads = ["'", "1' OR '1'='1", "'; DROP TABLE users--"]
for payload in payloads:
test_url = f"{url}?id={payload}"
try:
response = requests.get(test_url, timeout=5)
# Check for SQL error messages
sql_errors = [
"SQL syntax",
"mysql_fetch",
"ORA-",
"PostgreSQL",
"SQLite"
]
for error in sql_errors:
if error.lower() in response.text.lower():
self.vulnerabilities.append({
'type': 'SQL Injection',
'severity': 'CRITICAL',
'url': test_url,
'payload': payload
})
return True
except Exception as e:
pass
return False
def check_xss(self, url):
"""Test for XSS vulnerabilities"""
payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')"
]
for payload in payloads:
test_url = f"{url}?search={payload}"
try:
response = requests.get(test_url, timeout=5)
if payload in response.text:
self.vulnerabilities.append({
'type': 'Cross-Site Scripting (XSS)',
'severity': 'HIGH',
'url': test_url,
'payload': payload
})
return True
except Exception as e:
pass
return False
def check_security_headers(self):
"""Check for missing security headers"""
try:
response = requests.get(self.target_url, timeout=5)
headers = response.headers
required_headers = {
'X-Frame-Options': 'MEDIUM',
'X-Content-Type-Options': 'MEDIUM',
'Strict-Transport-Security': 'HIGH',
'Content-Security-Policy': 'HIGH',
'X-XSS-Protection': 'MEDIUM'
}
for header, severity in required_headers.items():
if header not in headers:
self.vulnerabilities.append({
'type': 'Missing Security Header',
'severity': severity,
'header': header,
'recommendation': f'Add {header} header'
})
except Exception as e:
pass
def scan(self):
"""Run all vulnerability checks"""
print(f"Scanning {self.target_url}...")
self.check_sql_injection(self.target_url)
self.check_xss(self.target_url)
self.check_security_headers()
return self.vulnerabilities
def generate_report(self):
"""Generate vulnerability report"""
print("\n" + "="*60)
print("VULNERABILITY SCAN REPORT")
print("="*60)
print(f"Target: {self.target_url}")
print(f"Total Vulnerabilities Found: {len(self.vulnerabilities)}\n")
for i, vuln in enumerate(self.vulnerabilities, 1):
print(f"{i}. {vuln['type']} - Severity: {vuln['severity']}")
for key, value in vuln.items():
if key not in ['type', 'severity']:
print(f" {key}: {value}")
print()
# Usage
# scanner = VulnerabilityScanner("http://example.com")
# scanner.scan()
# scanner.generate_report()
Security Hardening
Example: System Hardening Checklist Script
import os
import subprocess
class SystemHardening:
def __init__(self):
self.checks = []
def check_firewall_status(self):
"""Check if firewall is enabled"""
try:
result = subprocess.run(['ufw', 'status'],
capture_output=True, text=True)
if 'active' in result.stdout.lower():
self.checks.append(('Firewall', 'PASS', 'Firewall is active'))
else:
self.checks.append(('Firewall', 'FAIL', 'Firewall is not active'))
except Exception as e:
self.checks.append(('Firewall', 'ERROR', str(e)))
def check_ssh_config(self):
"""Check SSH configuration"""
ssh_config = '/etc/ssh/sshd_config'
issues = []
try:
with open(ssh_config, 'r') as f:
config = f.read()
# Check for root login
if 'PermitRootLogin yes' in config:
issues.append('Root login is enabled')
# Check for password authentication
if 'PasswordAuthentication yes' in config:
issues.append('Password authentication is enabled')
# Check for empty passwords
if 'PermitEmptyPasswords yes' in config:
issues.append('Empty passwords are permitted')
if issues:
self.checks.append(('SSH Config', 'FAIL', ', '.join(issues)))
else:
self.checks.append(('SSH Config', 'PASS', 'SSH is properly configured'))
except Exception as e:
self.checks.append(('SSH Config', 'ERROR', str(e)))
def check_updates(self):
"""Check for available system updates"""
try:
result = subprocess.run(['apt', 'list', '--upgradable'],
capture_output=True, text=True)
updates = len(result.stdout.split('\n')) - 2
if updates > 0:
self.checks.append(('System Updates', 'WARN',
f'{updates} updates available'))
else:
self.checks.append(('System Updates', 'PASS',
'System is up to date'))
except Exception as e:
self.checks.append(('System Updates', 'ERROR', str(e)))
def check_password_policy(self):
"""Check password policy settings"""
try:
with open('/etc/login.defs', 'r') as f:
config = f.read()
# Check password age
if 'PASS_MAX_DAYS\t99999' in config:
self.checks.append(('Password Policy', 'FAIL',
'Password expiration not set'))
else:
self.checks.append(('Password Policy', 'PASS',
'Password policy configured'))
except Exception as e:
self.checks.append(('Password Policy', 'ERROR', str(e)))
def run_audit(self):
"""Run all hardening checks"""
print("Running Security Hardening Audit...\n")
self.check_firewall_status()
self.check_ssh_config()
self.check_updates()
self.check_password_policy()
self.generate_report()
def generate_report(self):
"""Generate hardening report"""
print("="*70)
print("SYSTEM HARDENING AUDIT REPORT")
print("="*70)
passed = sum(1 for c in self.checks if c[1] == 'PASS')
failed = sum(1 for c in self.checks if c[1] == 'FAIL')
warnings = sum(1 for c in self.checks if c[1] == 'WARN')
print(f"\nSummary: {passed} Passed | {failed} Failed | {warnings} Warnings\n")
for check, status, message in self.checks:
status_symbol = {
'PASS': '✓',
'FAIL': '✗',
'WARN': '⚠',
'ERROR': '!'
}
print(f"{status_symbol.get(status, '?')} {check:20} [{status:5}] {message}")
# Usage (requires root privileges)
# auditor = SystemHardening()
# auditor.run_audit()
Blue Team Tools
1. SIEM Solutions
- Splunk: Enterprise log management and analysis
- ELK Stack: Elasticsearch, Logstash, Kibana
- IBM QRadar: Security intelligence platform
- ArcSight: Security event management
2. Intrusion Detection/Prevention
- Snort: Open-source IDS/IPS
- Suricata: Network threat detection
- OSSEC: Host-based IDS
- Zeek (Bro): Network security monitor
3. Endpoint Protection
- CrowdStrike: Endpoint detection and response
- Carbon Black: Endpoint security platform
- Microsoft Defender: Built-in Windows protection
- Sophos: Comprehensive endpoint security
4. Network Monitoring
- Wireshark: Packet analyzer
- Nagios: Infrastructure monitoring
- Zabbix: Enterprise monitoring
- PRTG: Network monitoring tool
5. Vulnerability Management
- Nessus: Vulnerability scanner
- OpenVAS: Open-source vulnerability scanner
- Qualys: Cloud-based security platform
- Rapid7 Nexpose: Vulnerability management
Incident Response Playbook
Phase 1: Preparation
class IncidentResponseTeam:
def __init__(self):
self.contacts = {
'security_lead': '[email protected]',
'it_admin': '[email protected]',
'legal': '[email protected]',
'pr': '[email protected]'
}
self.tools_ready = False
self.backups_verified = False
def prepare(self):
"""Ensure team is ready for incidents"""
self.verify_contact_list()
self.check_tools()
self.verify_backups()
self.review_procedures()
Phase 2: Detection & Analysis
def analyze_incident(self, alert):
"""Analyze security alert"""
incident = {
'id': generate_incident_id(),
'timestamp': datetime.now(),
'severity': self.assess_severity(alert),
'type': self.classify_incident(alert),
'affected_systems': self.identify_affected_systems(alert),
'indicators': self.extract_iocs(alert)
}
return incident
Phase 3: Containment
def contain_incident(self, incident):
"""Contain the security incident"""
if incident['type'] == 'malware':
self.isolate_infected_systems(incident['affected_systems'])
elif incident['type'] == 'data_breach':
self.revoke_compromised_credentials()
self.block_exfiltration_channels()
elif incident['type'] == 'ddos':
self.enable_rate_limiting()
self.activate_cdn_protection()
Phase 4: Eradication
def eradicate_threat(self, incident):
"""Remove threat from environment"""
self.remove_malware()
self.patch_vulnerabilities()
self.reset_compromised_accounts()
self.update_security_rules()
Phase 5: Recovery
def recover_systems(self, incident):
"""Restore normal operations"""
self.restore_from_clean_backups()
self.verify_system_integrity()
self.monitor_for_reinfection()
self.gradually_restore_services()
Phase 6: Lessons Learned
def post_incident_review(self, incident):
"""Document and learn from incident"""
report = {
'what_happened': incident['description'],
'root_cause': self.determine_root_cause(incident),
'response_effectiveness': self.evaluate_response(),
'improvements': self.identify_improvements(),
'updated_procedures': self.update_procedures()
}
return report
Security Metrics & KPIs
class SecurityMetrics:
def __init__(self):
self.metrics = {}
def calculate_mttr(self, incidents):
"""Mean Time to Respond"""
total_time = sum(i['response_time'] for i in incidents)
return total_time / len(incidents) if incidents else 0
def calculate_mttd(self, incidents):
"""Mean Time to Detect"""
total_time = sum(i['detection_time'] for i in incidents)
return total_time / len(incidents) if incidents else 0
def vulnerability_remediation_rate(self, vulnerabilities):
"""Percentage of vulnerabilities fixed"""
fixed = sum(1 for v in vulnerabilities if v['status'] == 'fixed')
return (fixed / len(vulnerabilities)) * 100 if vulnerabilities else 0
def security_awareness_score(self, phishing_tests):
"""Employee security awareness"""
passed = sum(1 for t in phishing_tests if not t['clicked'])
return (passed / len(phishing_tests)) * 100 if phishing_tests else 0
Best Practices
- Continuous Monitoring: Never stop watching your systems
- Defense in Depth: Multiple layers of security controls
- Least Privilege: Grant minimum necessary access
- Regular Updates: Keep all systems patched
- Backup Strategy: Regular, tested backups
- Documentation: Maintain detailed procedures
- Training: Regular security awareness training
- Threat Intelligence: Stay informed about new threats
- Automation: Automate repetitive security tasks
- Collaboration: Work closely with Red Team for improvements
Career Path
- Security Analyst: Monitor and analyze security events
- Incident Responder: Handle security incidents
- Security Engineer: Build and maintain security systems
- SOC Analyst: Security Operations Center monitoring
- Threat Hunter: Proactively search for threats
- Security Architect: Design security infrastructure
- CISO: Chief Information Security Officer
- Go back to Cyber Security
- Return to Home