引言:为什么需要一个精准的降雨预告系统?

在现代城市生活中,天气变化对我们的日常出行、工作安排和生活决策有着至关重要的影响。特别是在像襄阳这样气候多变的地区,突如其来的降雨可能会打乱您的出行计划,甚至带来安全隐患。传统的天气预报往往只能提供大范围、低精度的信息,无法满足个人对实时、精准天气数据的需求。

想象一下:您正准备出门参加一场重要的商务会议,或者计划周末与家人去郊外野餐。如果此时能提前知道精确到小时的降雨概率、降雨强度和持续时间,您就能做出更明智的决策——是提前出发避开雨峰,还是调整行程携带雨具,甚至是重新安排活动。这就是一个实时更新、精准预报的降雨查询系统的核心价值所在。

本文将详细介绍如何构建一个”襄阳降雨预告查询系统”,该系统将结合现代气象数据API、实时更新机制和用户友好的界面,帮助襄阳市民和游客实现”出行无忧”。我们将从技术架构、数据获取、实时更新机制、用户界面设计等多个维度进行深入探讨,并提供完整的代码示例,帮助您理解并实现这样一个实用的系统。

系统架构概述

核心组件

一个完整的降雨预告查询系统通常包含以下几个核心组件:

  1. 数据源层:负责获取原始气象数据,通常来自专业的气象API服务
  2. 数据处理层:对原始数据进行清洗、转换和分析,提取关键降雨信息
  3. 存储层:保存历史数据和实时数据,支持快速查询
  4. 应用层:提供用户界面和API接口,展示预报信息
  5. 更新机制:确保数据的实时性和准确性

技术栈选择

为了实现一个高效、可靠的系统,我们可以选择以下技术栈:

  • 后端开发:Python(Flask/Django)或Node.js
  • 数据存储:PostgreSQL(关系型数据库)或MongoDB(文档型数据库)
  • 前端开发:React/Vue.js 或简单的HTML/CSS/JavaScript
  • 数据获取:requests库(Python)或axios(JavaScript)
  • 定时任务:APScheduler(Python)或node-cron(Node.js)
  • 部署:Docker容器化 + 云服务器(如阿里云、腾讯云)

系统架构图

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   气象API数据源  │ → │  数据处理模块    │ → │  数据存储模块    │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                             ↓
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  用户查询接口    │ ← │  实时更新机制    │ ← │  应用服务层      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                             ↓
                    ┌─────────────────┐
                    │  用户界面展示    │
                    └─────────────────┘

数据获取:连接气象API

选择合适的气象数据源

要实现精准的降雨预报,首先需要选择一个可靠的气象数据提供商。以下是几个常用的选项:

  1. OpenWeatherMap:提供全球天气数据,包括降雨概率、降水量等,有免费和付费版本
  2. 中国气象局API:官方数据,准确性高,但可能需要申请权限
  3. 和风天气:国内知名的天气数据服务商,提供丰富的API接口
  4. AccuWeather:国际知名气象公司,数据质量优秀

对于襄阳地区的用户,我们推荐使用和风天气中国气象局API,因为它们对国内城市的支持更好,数据更新频率更高。

API调用示例(Python)

以下是一个使用Python调用和风天气API获取襄阳降雨预报的完整示例:

import requests
import json
import time
from datetime import datetime, timedelta

class XiangyangWeatherAPI:
    def __init__(self, api_key):
        """
        初始化天气API客户端
        :param api_key: 和风天气API密钥
        """
        self.api_key = api_key
        self.base_url = "https://devapi.qweather.com/v7"
        self.city_code = "101200201"  # 襄阳的城市代码
        
    def get_realtime_weather(self):
        """
        获取襄阳实时天气数据
        """
        url = f"{self.base_url}/weather/now"
        params = {
            'location': self.city_code,
            'key': self.api_key
        }
        
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            if data['code'] == '200':
                return {
                    'success': True,
                    'temp': data['now']['temp'],
                    'weather': data['now']['text'],
                    'wind_dir': data['now']['windDir'],
                    'wind_scale': data['now']['windScale'],
                    'humidity': data['now']['humidity'],
                    'precipitation': data['now']['precip'],
                    'observation_time': data['now']['obsTime']
                }
            else:
                return {'success': False, 'error': data['code']}
                
        except requests.exceptions.RequestException as e:
            return {'success': False, 'error': str(e)}
    
    def get_hourly_forecast(self, hours=24):
        """
        获取襄阳未来24小时逐小时预报
        """
        url = f"{self.base_url}/weather/24h"
        params = {
            'location': self.city_code,
            'key': self.api_key
        }
        
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            if data['code'] == '200':
                hourly_data = []
                for hour in data['hourly'][:hours]:
                    hourly_data.append({
                        'time': hour['fxTime'],
                        'temp': hour['temp'],
                        'weather': hour['text'],
                        'precip_probability': hour.get('pop', '0'),  # 降雨概率
                        'precip_amount': hour.get('precip', '0'),   # 预计降水量
                        'wind_dir': hour['windDir'],
                        'wind_speed': hour['windSpeed']
                    })
                return {'success': True, 'hourly_data': hourly_data}
            else:
                return {'success': False, 'error': data['code']}
                
        except requests.exceptions.RequestException as e:
            return {'success': False, 'error': str(e)}
    
    def get_daily_forecast(self, days=7):
        """
        获取襄阳未来7天天气预报
        """
        url = f"{self.base_url}/weather/7d"
        params = {
            'location': self.city_code,
            'key': self.api_key
        }
        
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            if data['code'] == '200':
                daily_data = []
                for day in data['daily'][:days]:
                    daily_data.append({
                        'date': day['fxDate'],
                        'temp_max': day['tempMax'],
                        'temp_min': day['tempMin'],
                        'weather_day': day['textDay'],
                        'weather_night': day['textNight'],
                        'precip_probability': day.get('pop', '0'),
                        'wind_dir_day': day['windDirDay'],
                        'wind_scale_day': day['windScaleDay']
                    })
                return {'success': True, 'daily_data': daily_data}
            else:
                return {'success': False, 'error': data['code']}
                
        except requests.exceptions.RequestException as e:
            return {'success': False, 'error': str(e)}
    
    def get_rain_alert(self, threshold=0.5, hours=6):
        """
        获取降雨预警:检查未来N小时内是否有降雨
        :param threshold: 降雨量阈值(毫米)
        :param hours: 检查的时间范围(小时)
        """
        forecast = self.get_hourly_forecast(hours)
        if not forecast['success']:
            return forecast
        
        rain_alerts = []
        for hour in forecast['hourly_data']:
            precip = float(hour['precip_amount'])
            if precip >= threshold:
                rain_alerts.append({
                    'time': hour['time'],
                    'precipitation': precip,
                    'weather': hour['weather']
                })
        
        if rain_alerts:
            return {
                'success': True,
                'has_rain': True,
                'alerts': rain_alerts,
                'message': f"未来{hours}小时内有降雨,请注意携带雨具!"
            }
        else:
            return {
                'success': True,
                'has_rain': False,
                'message': f"未来{hours}小时内无明显降雨,可以放心出行。"
            }

# 使用示例
if __name__ == "__main__":
    # 替换为您的API密钥
    API_KEY = "your_api_key_here"
    weather_api = XiangyangWeatherAPI(API_KEY)
    
    print("=== 襄阳实时天气 ===")
    realtime = weather_api.get_realtime_weather()
    if realtime['success']:
        print(f"当前温度:{realtime['temp']}°C")
        print(f"天气状况:{realtime['weather']}")
        print(f"降水量:{realtime['precipitation']}mm")
        print(f"观测时间:{realtime['observation_time']}")
    
    print("\n=== 未来6小时降雨预警 ===")
    alert = weather_api.get_rain_alert(threshold=0.1, hours=6)
    if alert['success']:
        print(alert['message'])
        if alert['has_rain']:
            for a in alert['alerts']:
                print(f"  {a['time']}: {a['weather']}, 降水量{a['precipitation']}mm")
    
    print("\n=== 未来24小时逐小时预报 ===")
    hourly = weather_api.get_hourly_forecast(24)
    if hourly['success']:
        for hour in hourly['hourly_data'][:12]:  # 显示前12小时
            print(f"{hour['time'][-5:]} | {hour['temp']}°C | {hour['weather']} | 降雨概率: {hour['precip_probability']}%")

代码说明

  1. 类结构XiangyangWeatherAPI类封装了所有与天气相关的API调用
  2. 实时数据get_realtime_weather()方法获取当前天气状况,包括温度、天气现象、降水量等
  3. 逐小时预报get_hourly_forecast()方法提供未来24小时的详细预报,特别关注降雨概率和降水量
  4. 多日预报get_daily_forecast()方法提供未来7天的天气概况
  5. 智能预警get_rain_alert()方法是一个实用功能,可以自动检测未来几小时内是否有降雨,并给出明确的出行建议

数据存储与管理

数据库设计

为了保存历史数据和缓存实时数据,我们需要设计合理的数据库结构。以下是使用PostgreSQL的表设计:

-- 创建城市信息表
CREATE TABLE cities (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    city_code VARCHAR(20) UNIQUE NOT NULL,
    latitude DECIMAL(10, 8),
    longitude DECIMAL(11, 8),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 创建实时天气表
CREATE TABLE realtime_weather (
    id SERIAL PRIMARY KEY,
    city_code VARCHAR(20) NOT NULL,
    temperature DECIMAL(5, 2),
    weather_text VARCHAR(50),
    precipitation DECIMAL(6, 3),
    humidity INTEGER,
    wind_dir VARCHAR(20),
    wind_scale VARCHAR(10),
    observation_time TIMESTAMP,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (city_code) REFERENCES cities(city_code)
);

-- 创建逐小时预报表
CREATE TABLE hourly_forecast (
    id SERIAL PRIMARY KEY,
    city_code VARCHAR(20) NOT NULL,
    forecast_time TIMESTAMP NOT NULL,
    temperature DECIMAL(5, 2),
    weather_text VARCHAR(50),
    precip_probability DECIMAL(5, 2),
    precip_amount DECIMAL(6, 3),
    wind_dir VARCHAR(20),
    wind_speed DECIMAL(6, 2),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (city_code) REFERENCES cities(city_code),
    UNIQUE(city_code, forecast_time)
);

-- 创建每日预报表
CREATE TABLE daily_forecast (
    id SERIAL PRIMARY KEY,
    city_code VARCHAR(20) NOT NULL,
    forecast_date DATE NOT NULL,
    temp_max DECIMAL(5, 2),
    temp_min DECIMAL(5, 2),
    weather_day VARCHAR(50),
    weather_night VARCHAR(50),
    precip_probability DECIMAL(5, 2),
    wind_dir_day VARCHAR(20),
    wind_scale_day VARCHAR(10),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (city_code) REFERENCES cities(city_code),
    UNIQUE(city_code, forecast_date)
);

-- 创建降雨预警表
CREATE TABLE rain_alerts (
    id SERIAL PRIMARY KEY,
    city_code VARCHAR(20) NOT NULL,
    alert_time TIMESTAMP NOT NULL,
    precip_amount DECIMAL(6, 3),
    weather_text VARCHAR(50),
    is_sent BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (city_code) REFERENCES cities(city_code)
);

-- 创建索引以提高查询性能
CREATE INDEX idx_hourly_forecast_time ON hourly_forecast(forecast_time);
CREATE INDEX idx_hourly_city_time ON hourly_forecast(city_code, forecast_time);
CREATE INDEX idx_daily_city_date ON daily_forecast(city_code, forecast_date);
CREATE INDEX idx_rain_alerts_city_time ON rain_alerts(city_code, alert_time);

Python数据库操作示例

import psycopg2
from psycopg2.extras import RealDictCursor
from datetime import datetime, timedelta
import logging

class WeatherDatabase:
    def __init__(self, db_config):
        """
        初始化数据库连接
        :param db_config: 数据库配置字典
        """
        self.db_config = db_config
        self.logger = logging.getLogger(__name__)
    
    def get_connection(self):
        """获取数据库连接"""
        try:
            conn = psycopg2.connect(**self.db_config)
            return conn
        except Exception as e:
            self.logger.error(f"数据库连接失败: {e}")
            raise
    
    def save_realtime_weather(self, weather_data):
        """
        保存实时天气数据
        :param weather_data: 从API获取的实时天气数据
        """
        if not weather_data.get('success'):
            return False
        
        conn = self.get_connection()
        try:
            with conn.cursor() as cur:
                insert_sql = """
                INSERT INTO realtime_weather 
                (city_code, temperature, weather_text, precipitation, humidity, wind_dir, wind_scale, observation_time)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """
                cur.execute(insert_sql, (
                    '101200201',  # 襄阳城市代码
                    weather_data['temp'],
                    weather_data['weather'],
                    weather_data['precipitation'],
                    weather_data['humidity'],
                    weather_data['wind_dir'],
                    weather_data['wind_scale'],
                    weather_data['observation_time']
                ))
                conn.commit()
                self.logger.info("实时天气数据保存成功")
                return True
        except Exception as e:
            conn.rollback()
            self.logger.error(f"保存实时天气数据失败: {e}")
            return False
        finally:
            conn.close()
    
    def save_hourly_forecast(self, hourly_data):
        """
        保存逐小时预报数据
        :param hourly_data: 从API获取的逐小时预报数据
        """
        if not hourly_data.get('success'):
            return False
        
        conn = self.get_connection()
        try:
            with conn.cursor() as cur:
                insert_sql = """
                INSERT INTO hourly_forecast 
                (city_code, forecast_time, temperature, weather_text, precip_probability, precip_amount, wind_dir, wind_speed)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (city_code, forecast_time) 
                DO UPDATE SET
                    temperature = EXCLUDED.temperature,
                    weather_text = EXCLUDED.weather_text,
                    precip_probability = EXCLUDED.precip_probability,
                    precip_amount = EXCLUDED.precip_amount,
                    wind_dir = EXCLUDED.wind_dir,
                    wind_speed = EXCLUDED.wind_speed,
                    created_at = CURRENT_TIMESTAMP
                """
                
                for hour in hourly_data['hourly_data']:
                    cur.execute(insert_sql, (
                        '101200201',
                        hour['time'],
                        hour['temp'],
                        hour['weather'],
                        hour['precip_probability'],
                        hour['precip_amount'],
                        hour['wind_dir'],
                        hour['wind_speed']
                    ))
                
                conn.commit()
                self.logger.info(f"保存{len(hourly_data['hourly_data'])}条逐小时预报数据成功")
                return True
        except Exception as e:
            conn.rollback()
            self.logger.error(f"保存逐小时预报数据失败: {e}")
            return False
        finally:
            conn.close()
    
    def get_upcoming_rain_forecast(self, hours=6, threshold=0.1):
        """
        查询未来N小时内的降雨预报
        :param hours: 查询时间范围(小时)
        :param threshold: 降雨量阈值(毫米)
        """
        conn = self.get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                query_sql = """
                SELECT forecast_time, temperature, weather_text, precip_amount, precip_probability
                FROM hourly_forecast
                WHERE city_code = %s 
                AND forecast_time BETWEEN %s AND %s
                AND precip_amount >= %s
                ORDER BY forecast_time
                """
                
                start_time = datetime.now()
                end_time = start_time + timedelta(hours=hours)
                
                cur.execute(query_sql, ('101200201', start_time, end_time, threshold))
                results = cur.fetchall()
                
                return {
                    'success': True,
                    'count': len(results),
                    'forecasts': results
                }
        except Exception as e:
            self.logger.error(f"查询降雨预报失败: {e}")
            return {'success': False, 'error': str(e)}
        finally:
            conn.close()
    
    def get_weather_summary(self, days=3):
        """
        获取未来N天的天气摘要
        """
        conn = self.get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                query_sql = """
                SELECT forecast_date, temp_max, temp_min, weather_day, weather_night, precip_probability
                FROM daily_forecast
                WHERE city_code = %s 
                AND forecast_date BETWEEN %s AND %s
                ORDER BY forecast_date
                """
                
                start_date = datetime.now().date()
                end_date = start_date + timedelta(days=days)
                
                cur.execute(query_sql, ('101200201', start_date, end_date))
                results = cur.fetchall()
                
                return {
                    'success': True,
                    'days': results
                }
        except Exception as e:
            self.logger.error(f"查询天气摘要失败: {e}")
            return {'success': False, 'error': str(e)}
        finally:
            conn.close()

实时更新机制

定时任务调度

为了实现数据的实时更新,我们需要设置定时任务,定期从气象API获取最新数据。以下是使用APScheduler的实现:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
import atexit
import logging
from datetime import datetime

class WeatherUpdateService:
    def __init__(self, weather_api, db_manager):
        """
        初始化天气更新服务
        :param weather_api: 天气API客户端实例
        :param db_manager: 数据库管理器实例
        """
        self.weather_api = weather_api
        self.db_manager = db_manager
        self.scheduler = BackgroundScheduler()
        self.logger = logging.getLogger(__name__)
        
        # 配置定时任务
        self.setup_jobs()
    
    def setup_jobs(self):
        """配置定时任务"""
        # 每15分钟更新一次实时天气
        self.scheduler.add_job(
            func=self.update_realtime_weather,
            trigger=IntervalTrigger(minutes=15),
            id='realtime_weather_update',
            name='实时天气数据更新',
            replace_existing=True
        )
        
        # 每小时更新一次逐小时预报
        self.scheduler.add_job(
            func=self.update_hourly_forecast,
            trigger=IntervalTrigger(hours=1),
            id='hourly_forecast_update',
            name='逐小时预报数据更新',
            replace_existing=True
        )
        
        # 每6小时更新一次多日预报
        self.scheduler.add_job(
            func=self.update_daily_forecast,
            trigger=IntervalTrigger(hours=6),
            id='daily_forecast_update',
            name='多日预报数据更新',
            replace_existing=True
        )
        
        # 每5分钟检查一次降雨预警
        self.scheduler.add_job(
            func=self.check_rain_alerts,
            trigger=IntervalTrigger(minutes=5),
            id='rain_alert_check',
            name='降雨预警检查',
            replace_existing=True
        )
    
    def update_realtime_weather(self):
        """更新实时天气数据"""
        self.logger.info(f"[{datetime.now()}] 开始更新实时天气...")
        try:
            result = self.weather_api.get_realtime_weather()
            if result['success']:
                self.db_manager.save_realtime_weather(result)
                self.logger.info("实时天气更新成功")
            else:
                self.logger.error(f"获取实时天气失败: {result.get('error')}")
        except Exception as e:
            self.logger.error(f"更新实时天气时发生错误: {e}")
    
    def update_hourly_forecast(self):
        """更新逐小时预报数据"""
        self.logger.info(f"[{datetime.now()}] 开始更新逐小时预报...")
        try:
            result = self.weather_api.get_hourly_forecast(24)
            if result['success']:
                self.db_manager.save_hourly_forecast(result)
                self.logger.info("逐小时预报更新成功")
            else:
                self.logger.error(f"获取逐小时预报失败: {result.get('error')}")
        except Exception as e:
            self.logger.error(f"更新逐小时预报时发生错误: {e}")
    
    def update_daily_forecast(self):
        """更新多日预报数据"""
        self.logger.info(f"[{datetime.now()}] 开始更新多日预报...")
        try:
            result = self.weather_api.get_daily_forecast(7)
            if result['success']:
                # 保存到数据库的实现(类似save_hourly_forecast)
                self.logger.info("多日预报更新成功")
            else:
                self.logger.error(f"获取多日预报失败: {result.get('error')}")
        except Exception as e:
            self.logger.error(f"更新多日预报时发生错误: {e}")
    
    def check_rain_alerts(self):
        """检查降雨预警并发送通知"""
        self.logger.info(f"[{datetime.now()}] 开始检查降雨预警...")
        try:
            # 检查未来6小时内的降雨
            alert = self.weather_api.get_rain_alert(threshold=0.1, hours=6)
            if alert['success'] and alert['has_rain']:
                # 保存预警到数据库
                self.save_rain_alert(alert['alerts'])
                
                # 这里可以添加发送通知的逻辑(邮件、短信、推送等)
                self.send_rain_notification(alert)
                
                self.logger.warning(f"检测到降雨预警: {alert['message']}")
            else:
                self.logger.info("未检测到降雨预警")
        except Exception as e:
            self.logger.error(f"检查降雨预警时发生错误: {e}")
    
    def save_rain_alert(self, alerts):
        """保存降雨预警到数据库"""
        conn = self.db_manager.get_connection()
        try:
            with conn.cursor() as cur:
                insert_sql = """
                INSERT INTO rain_alerts (city_code, alert_time, precip_amount, weather_text)
                VALUES (%s, %s, %s, %s)
                """
                for alert in alerts:
                    cur.execute(insert_sql, (
                        '101200201',
                        alert['time'],
                        alert['precipitation'],
                        alert['weather']
                    ))
                conn.commit()
                self.logger.info(f"保存{len(alerts)}条降雨预警成功")
        except Exception as e:
            conn.rollback()
            self.logger.error(f"保存降雨预警失败: {e}")
        finally:
            conn.close()
    
    def send_rain_notification(self, alert):
        """
        发送降雨通知(示例实现)
        实际应用中可以集成邮件、短信、微信推送等
        """
        message = alert['message']
        for a in alert['alerts']:
            message += f"\n{a['time']}: {a['weather']}, 降水量{a['precipitation']}mm"
        
        # 这里可以添加实际的通知发送逻辑
        # 例如:发送邮件、短信、推送通知等
        self.logger.info(f"发送通知: {message}")
        
        # 示例:打印到控制台(实际应用中替换为真实的通知发送)
        print(f"\n{'='*50}")
        print(f"【降雨预警通知】")
        print(f"{'='*50}")
        print(message)
        print(f"{'='*50}\n")
    
    def start(self):
        """启动定时任务"""
        self.scheduler.start()
        self.logger.info("天气更新服务已启动")
    
    def shutdown(self):
        """关闭定时任务"""
        self.scheduler.shutdown()
        self.logger.info("天气更新服务已关闭")

# 使用示例
if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(level=logging.INFO)
    
    # 初始化组件
    API_KEY = "your_api_key_here"
    weather_api = XiangyangWeatherAPI(API_KEY)
    
    db_config = {
        'host': 'localhost',
        'database': 'weather_db',
        'user': 'postgres',
        'password': 'your_password'
    }
    db_manager = WeatherDatabase(db_config)
    
    # 创建更新服务
    update_service = WeatherUpdateService(weather_api, db_manager)
    
    # 启动服务
    update_service.start()
    
    # 保持程序运行
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        update_service.shutdown()

更新策略说明

  1. 实时天气:每15分钟更新一次,确保数据的及时性
  2. 逐小时预报:每小时更新一次,因为气象模型通常每小时更新一次
  3. 多日预报:每6小时更新一次,因为长期预报变化相对较慢
  4. 降雨预警:每5分钟检查一次,确保能及时发现降雨变化

数据缓存策略

为了减少API调用次数和提高响应速度,可以实现多级缓存:

from functools import lru_cache
import redis
import json

class WeatherCache:
    def __init__(self, redis_client=None):
        self.redis = redis_client
        self.local_cache = {}
    
    def get_cached_weather(self, city_code, data_type, max_age_minutes=10):
        """
        获取缓存的天气数据
        :param city_code: 城市代码
        :param data_type: 数据类型(realtime, hourly, daily)
        :param max_age_minutes: 缓存最大年龄(分钟)
        """
        cache_key = f"weather:{city_code}:{data_type}"
        
        # 先检查Redis缓存
        if self.redis:
            cached = self.redis.get(cache_key)
            if cached:
                data = json.loads(cached)
                timestamp = datetime.fromisoformat(data['timestamp'])
                if datetime.now() - timestamp < timedelta(minutes=max_age_minutes):
                    return data['payload']
        
        # 检查本地缓存
        if cache_key in self.local_cache:
            timestamp, payload = self.local_cache[cache_key]
            if datetime.now() - timestamp < timedelta(minutes=max_age_minutes):
                return payload
        
        return None
    
    def set_cached_weather(self, city_code, data_type, payload):
        """设置缓存"""
        cache_key = f"weather:{city_code}:{data_type}"
        data = {
            'timestamp': datetime.now().isoformat(),
            'payload': payload
        }
        
        # 设置Redis缓存(过期时间30分钟)
        if self.redis:
            self.redis.setex(cache_key, 1800, json.dumps(data))
        
        # 设置本地缓存
        self.local_cache[cache_key] = (datetime.now(), payload)

用户界面设计

Web界面实现

以下是一个基于Flask和Bootstrap的简单Web界面实现:

from flask import Flask, render_template, jsonify, request
from datetime import datetime, timedelta
import json

app = Flask(__name__)

# 假设已经初始化了weather_api和db_manager
weather_api = None  # 需要实际初始化
db_manager = None   # 需要实际初始化

@app.route('/')
def index():
    """首页:显示实时天气和降雨预警"""
    # 获取实时天气
    realtime = weather_api.get_realtime_weather()
    
    # 获取降雨预警
    rain_alert = weather_api.get_rain_alert(threshold=0.1, hours=6)
    
    # 获取未来3小时天气
    hourly = weather_api.get_hourly_forecast(3)
    
    return render_template('index.html', 
                         realtime=realtime, 
                         rain_alert=rain_alert,
                         hourly=hourly,
                         current_time=datetime.now())

@app.route('/forecast')
def forecast():
    """预报页面:显示详细预报"""
    # 获取24小时预报
    hourly = weather_api.get_hourly_forecast(24)
    
    # 获取7天预报
    daily = weather_api.get_daily_forecast(7)
    
    return render_template('forecast.html', 
                         hourly=hourly, 
                         daily=daily,
                         current_time=datetime.now())

@app.route('/api/realtime')
def api_realtime():
    """API:实时天气"""
    result = weather_api.get_realtime_weather()
    return jsonify(result)

@app.route('/api/rain_alert')
def api_rain_alert():
    """API:降雨预警"""
    hours = request.args.get('hours', 6, type=int)
    threshold = request.args.get('threshold', 0.1, type=float)
    result = weather_api.get_rain_alert(threshold=threshold, hours=hours)
    return jsonify(result)

@app.route('/api/hourly')
def api_hourly():
    """API:逐小时预报"""
    hours = request.args.get('hours', 24, type=int)
    result = weather_api.get_hourly_forecast(hours)
    return jsonify(result)

@app.route('/api/daily')
def api_daily():
    """API:多日预报"""
    days = request.args.get('days', 7, type=int)
    result = weather_api.get_daily_forecast(days)
    return jsonify(result)

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

HTML模板示例(index.html)

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>襄阳降雨预告查询系统</title>
    <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
    <style>
        .weather-card {
            background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
            color: white;
            border-radius: 15px;
            box-shadow: 0 10px 20px rgba(0,0,0,0.2);
        }
        .rain-alert {
            background: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
            color: white;
            border-radius: 15px;
            animation: pulse 2s infinite;
        }
        @keyframes pulse {
            0% { transform: scale(1); }
            50% { transform: scale(1.02); }
            100% { transform: scale(1); }
        }
        .forecast-item {
            background: white;
            border-radius: 10px;
            padding: 15px;
            margin-bottom: 10px;
            box-shadow: 0 2px 5px rgba(0,0,0,0.1);
        }
        .temp-display {
            font-size: 3rem;
            font-weight: bold;
        }
        .weather-icon {
            font-size: 2rem;
        }
    </style>
</head>
<body>
    <div class="container py-4">
        <!-- 头部 -->
        <div class="row mb-4">
            <div class="col-12 text-center">
                <h1 class="display-4 fw-bold">襄阳降雨预告查询系统</h1>
                <p class="text-muted">实时更新 · 精准预报 · 助您出行无忧</p>
                <p class="text-muted">最后更新时间: {{ current_time.strftime('%Y-%m-%d %H:%M:%S') }}</p>
            </div>
        </div>

        <!-- 降雨预警 -->
        {% if rain_alert.success and rain_alert.has_rain %}
        <div class="row mb-4">
            <div class="col-12">
                <div class="rain-alert p-4 text-center">
                    <h3 class="fw-bold">⚠️ 降雨预警</h3>
                    <p class="mb-0 fs-5">{{ rain_alert.message }}</p>
                    <div class="mt-3">
                        {% for alert in rain_alert.alerts %}
                        <div class="alert alert-light mt-2 mb-0">
                            <strong>{{ alert.time[-5:] }}</strong> - {{ alert.weather }} ({{ alert.precipitation }}mm)
                        </div>
                        {% endfor %}
                    </div>
                </div>
            </div>
        </div>
        {% endif %}

        <!-- 实时天气 -->
        <div class="row mb-4">
            <div class="col-md-6 mb-3">
                <div class="weather-card p-4 h-100">
                    <h3 class="fw-bold mb-3">实时天气</h3>
                    {% if realtime.success %}
                    <div class="row align-items-center">
                        <div class="col-6">
                            <div class="temp-display">{{ realtime.temp }}°C</div>
                            <div class="weather-icon">{{ realtime.weather }}</div>
                        </div>
                        <div class="col-6">
                            <ul class="list-unstyled mb-0">
                                <li>降水: {{ realtime.precipitation }}mm</li>
                                <li>湿度: {{ realtime.humidity }}%</li>
                                <li>风向: {{ realtime.wind_dir }}</li>
                                <li>风力: {{ realtime.wind_scale }}级</li>
                            </ul>
                        </div>
                    </div>
                    {% else %}
                    <p class="text-center">暂无实时数据</p>
                    {% endif %}
                </div>
            </div>

            <!-- 未来3小时 -->
            <div class="col-md-6 mb-3">
                <div class="card h-100">
                    <div class="card-header bg-primary text-white">
                        <h5 class="mb-0">未来3小时</h5>
                    </div>
                    <div class="card-body">
                        {% if hourly.success %}
                        <div class="row text-center">
                            {% for hour in hourly.hourly_data %}
                            <div class="col-4 forecast-item">
                                <strong>{{ hour.time[-5:] }}</strong><br>
                                <span class="text-primary">{{ hour.temp }}°C</span><br>
                                <small>{{ hour.weather }}</small><br>
                                <small class="text-danger">{{ hour.precip_probability }}%</small>
                            </div>
                            {% endfor %}
                        </div>
                        {% else %}
                        <p class="text-center">暂无预报数据</p>
                        {% endif %}
                    </div>
                </div>
            </div>
        </div>

        <!-- 快捷操作 -->
        <div class="row">
            <div class="col-12 text-center">
                <a href="/forecast" class="btn btn-lg btn-primary me-2">查看详细预报</a>
                <button onclick="location.reload()" class="btn btn-lg btn-outline-secondary">刷新数据</button>
            </div>
        </div>

        <!-- 提示信息 -->
        <div class="row mt-4">
            <div class="col-12">
                <div class="alert alert-info">
                    <strong>💡 使用提示:</strong><br>
                    • 系统每15分钟自动更新实时数据<br>
                    • 降雨预警每5分钟检查一次<br>
                    • 点击"查看详细预报"可获取未来7天天气<br>
                    • 出行前请关注降雨概率和降水量
                </div>
            </div>
        </div>
    </div>

    <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
    <script>
        // 自动刷新功能(可选)
        // setTimeout(() => location.reload(), 300000); // 每5分钟自动刷新
    </script>
</body>
</html>

移动端适配

对于移动端用户,可以添加以下优化:

/* 移动端优化 */
@media (max-width: 768px) {
    .temp-display {
        font-size: 2rem;
    }
    .weather-icon {
        font-size: 1.5rem;
    }
    .forecast-item {
        padding: 10px;
        font-size: 0.9rem;
    }
    .container {
        padding: 10px;
    }
}

高级功能扩展

1. 智能出行建议

基于天气数据生成个性化出行建议:

class TravelAdvisor:
    def __init__(self, weather_api):
        self.weather_api = weather_api
    
    def get_travel_suggestions(self, activity_type="general"):
        """
        根据天气和活动类型生成出行建议
        :param activity_type: 活动类型(general, outdoor, driving, cycling)
        """
        # 获取天气数据
        realtime = self.weather_api.get_realtime_weather()
        hourly = self.weather_api.get_hourly_forecast(6)
        rain_alert = self.weather_api.get_rain_alert(threshold=0.1, hours=6)
        
        if not realtime['success'] or not hourly['success']:
            return {"success": False, "message": "无法获取天气数据"}
        
        suggestions = []
        warnings = []
        
        # 基础天气分析
        temp = float(realtime['temp'])
        precip = float(realtime['precipitation'])
        weather = realtime['weather']
        
        # 温度建议
        if temp < 10:
            suggestions.append("🌡️ 温度较低,请注意保暖,建议穿厚外套")
        elif temp > 30:
            suggestions.append("🌡️ 天气炎热,请注意防暑降温")
        elif temp > 25:
            suggestions.append("🌡️ 温度适宜,穿着轻便即可")
        
        # 降雨建议
        if rain_alert['success'] and rain_alert['has_rain']:
            warnings.append("🌧️ 未来几小时有降雨,请务必携带雨具")
            if precip > 0:
                warnings.append(f"⚠️ 当前正在降雨,降水量{precip}mm")
            
            # 分析降雨趋势
            rain_hours = [h for h in hourly['hourly_data'] if float(h['precip_amount']) > 0]
            if rain_hours:
                times = [h['time'][-5:] for h in rain_hours[:3]]
                warnings.append(f"⏰ 预计降雨时段: {', '.join(times)}")
        
        # 活动类型特定建议
        if activity_type == "outdoor":
            if rain_alert['success'] and rain_alert['has_rain']:
                suggestions.append("🚫 建议改期或选择室内活动")
            elif float(realtime['humidity']) > 80:
                suggestions.append("💧 湿度较高,户外活动可能感到闷热")
            else:
                suggestions.append("✅ 适合户外活动")
        
        elif activity_type == "driving":
            if rain_alert['success'] and rain_alert['has_rain']:
                warnings.append("🚗 雨天路滑,请减速慢行,保持安全车距")
            if float(realtime['wind_scale']) >= 4:
                warnings.append("🌬️ 风力较大,注意横风影响")
        
        elif activity_type == "cycling":
            if rain_alert['success'] and rain_alert['has_rain']:
                warnings.append("🚴 雨天骑行危险,建议改乘公共交通")
            if float(realtime['wind_scale']) >= 3:
                warnings.append("🌬️ 逆风骑行会比较费力")
        
        # 通用建议
        if not suggestions and not warnings:
            suggestions.append("✅ 天气良好,适合出行")
        
        return {
            "success": True,
            "temperature": temp,
            "weather": weather,
            "suggestions": suggestions,
            "warnings": warnings,
            "activity_type": activity_type
        }

# 使用示例
advisor = TravelAdvisor(weather_api)

# 通用出行建议
print("=== 通用出行建议 ===")
result = advisor.get_travel_suggestions("general")
if result['success']:
    print(f"当前天气: {result['weather']}, 温度: {result['temperature']}°C")
    for s in result['suggestions']:
        print(f"💡 {s}")
    for w in result['warnings']:
        print(f"⚠️ {w}")

# 户外活动建议
print("\n=== 户外活动建议 ===")
result = advisor.get_travel_suggestions("outdoor")
if result['success']:
    for s in result['suggestions']:
        print(f"💡 {s}")
    for w in result['warnings']:
        print(f"⚠️ {w}")

# 驾驶建议
print("\n=== 驾驶建议 ===")
result = advisor.get_travel_suggestions("driving")
if result['success']:
    for w in result['warnings']:
        print(f"⚠️ {w}")

2. 天气数据可视化

使用图表展示天气趋势:

import matplotlib.pyplot as plt
import io
import base64

class WeatherVisualizer:
    def __init__(self, weather_api):
        self.weather_api = weather_api
    
    def generate_temperature_chart(self, hours=24):
        """生成温度趋势图"""
        hourly = self.weather_api.get_hourly_forecast(hours)
        if not hourly['success']:
            return None
        
        times = [h['time'][-5:] for h in hourly['hourly_data']]
        temps = [float(h['temp']) for h in hourly['hourly_data']]
        
        plt.figure(figsize=(12, 6))
        plt.plot(times, temps, marker='o', linewidth=2, markersize=6)
        plt.title('襄阳未来24小时温度趋势', fontsize=16)
        plt.xlabel('时间', fontsize=12)
        plt.ylabel('温度 (°C)', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        plt.tight_layout()
        
        # 保存到内存
        buffer = io.BytesIO()
        plt.savefig(buffer, format='png', dpi=100)
        buffer.seek(0)
        image_base64 = base64.b64encode(buffer.read()).decode()
        plt.close()
        
        return f"data:image/png;base64,{image_base64}"
    
    def generate_precipitation_chart(self, hours=24):
        """生成降雨量柱状图"""
        hourly = self.weather_api.get_hourly_forecast(hours)
        if not hourly['success']:
            return None
        
        times = [h['time'][-5:] for h in hourly['hourly_data']]
        precip = [float(h['precip_amount']) for h in hourly['hourly_data']]
        
        plt.figure(figsize=(12, 6))
        bars = plt.bar(times, precip, color='skyblue', alpha=0.7)
        
        # 标记降雨时段
        for i, p in enumerate(precip):
            if p > 0:
                bars[i].set_color('red')
                bars[i].set_alpha(0.8)
        
        plt.title('襄阳未来24小时降雨量预测', fontsize=16)
        plt.xlabel('时间', fontsize=12)
        plt.ylabel('降水量 (mm)', fontsize=12)
        plt.grid(True, alpha=0.3, axis='y')
        plt.xticks(rotation=45)
        plt.tight_layout()
        
        buffer = io.BytesIO()
        plt.savefig(buffer, format='png', dpi=100)
        buffer.seek(0)
        image_base64 = base64.b64encode(buffer.read()).decode()
        plt.close()
        
        return f"data:image/png;base64,{image_base64}"

3. 多用户支持与个性化

class UserPreferenceManager:
    def __init__(self, db_manager):
        self.db_manager = db_manager
    
    def save_user_preference(self, user_id, city_code, notification_enabled, activity_types):
        """保存用户偏好设置"""
        conn = self.db_manager.get_connection()
        try:
            with conn.cursor() as cur:
                insert_sql = """
                INSERT INTO user_preferences (user_id, city_code, notification_enabled, activity_types)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (user_id) DO UPDATE SET
                    city_code = EXCLUDED.city_code,
                    notification_enabled = EXCLUDED.notification_enabled,
                    activity_types = EXCLUDED.activity_types,
                    updated_at = CURRENT_TIMESTAMP
                """
                cur.execute(insert_sql, (user_id, city_code, notification_enabled, json.dumps(activity_types)))
                conn.commit()
                return True
        except Exception as e:
            conn.rollback()
            return False
        finally:
            conn.close()
    
    def get_user_preference(self, user_id):
        """获取用户偏好设置"""
        conn = self.db_manager.get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                query_sql = "SELECT * FROM user_preferences WHERE user_id = %s"
                cur.execute(query_sql, (user_id,))
                result = cur.fetchone()
                return result
        finally:
            conn.close()
    
    def send_personalized_notifications(self):
        """发送个性化通知"""
        # 获取所有启用通知的用户
        conn = self.db_manager.get_connection()
        try:
            with conn.cursor(cursor_factory=RealDictCursor) as cur:
                query_sql = """
                SELECT user_id, city_code, activity_types 
                FROM user_preferences 
                WHERE notification_enabled = TRUE
                """
                cur.execute(query_sql)
                users = cur.fetchall()
                
                for user in users:
                    # 为每个用户生成个性化建议
                    advisor = TravelAdvisor(weather_api)
                    activities = json.loads(user['activity_types'])
                    
                    for activity in activities:
                        suggestion = advisor.get_travel_suggestions(activity)
                        if suggestion['success'] and suggestion['warnings']:
                            # 发送通知(邮件、推送等)
                            self.send_notification(user['user_id'], suggestion)
        finally:
            conn.close()
    
    def send_notification(self, user_id, suggestion):
        """发送通知(示例)"""
        message = f"【天气提醒】\n"
        message += f"天气: {suggestion['weather']}, 温度: {suggestion['temperature']}°C\n"
        if suggestion['warnings']:
            message += "\n".join(suggestion['warnings'])
        
        # 实际应用中这里会调用推送服务
        print(f"发送给用户 {user_id}: {message}")

部署与运维

Docker部署

创建Dockerfile:

FROM python:3.9-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    postgresql-client \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser

# 暴露端口
EXPOSE 5000

# 启动命令
CMD ["python", "app.py"]

docker-compose.yml:

version: '3.8'

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_DB: weather_db
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: your_password
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"
  
  redis:
    image: redis:6-alpine
    ports:
      - "6379:6379"
  
  weather-app:
    build: .
    environment:
      DATABASE_URL: postgresql://postgres:your_password@postgres:5432/weather_db
      REDIS_URL: redis://redis:6379
      API_KEY: ${API_KEY}
    ports:
      - "5000:5000"
    depends_on:
      - postgres
      - redis
    restart: unless-stopped

volumes:
  postgres_data:

监控与日志

import logging
from logging.handlers import RotatingFileHandler

def setup_logging():
    """配置日志系统"""
    # 创建日志目录
    import os
    os.makedirs('logs', exist_ok=True)
    
    # 配置根日志器
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # 文件处理器(按大小轮转)
    file_handler = RotatingFileHandler(
        'logs/weather_app.log',
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setLevel(logging.INFO)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    
    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 添加处理器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

# 健康检查端点
@app.route('/health')
def health_check():
    """健康检查"""
    try:
        # 检查数据库连接
        db_manager.get_connection().close()
        
        # 检查API可用性
        test_result = weather_api.get_realtime_weather()
        
        return jsonify({
            "status": "healthy",
            "timestamp": datetime.now().isoformat(),
            "api_available": test_result['success']
        })
    except Exception as e:
        return jsonify({
            "status": "unhealthy",
            "error": str(e)
        }), 500

总结

通过本文的详细介绍,我们构建了一个完整的”襄阳降雨预告查询系统”,涵盖了从数据获取、存储、实时更新到用户界面的完整流程。系统的主要特点包括:

  1. 精准数据源:连接专业气象API,确保数据准确性
  2. 实时更新:定时任务机制保证数据及时性
  3. 智能预警:自动检测降雨风险,及时提醒用户
  4. 用户友好:直观的界面设计,清晰的出行建议
  5. 可扩展性:支持多用户、个性化设置和高级功能扩展

核心优势

  • 及时性:每15分钟更新实时数据,每5分钟检查降雨预警
  • 准确性:基于专业气象数据,提供精确到小时的预报
  • 实用性:结合具体活动类型给出个性化出行建议
  • 可靠性:多重缓存机制和错误处理确保系统稳定运行

未来扩展方向

  1. 机器学习预测:结合历史数据训练降雨预测模型
  2. 多城市支持:扩展到襄阳周边地区
  3. 社交功能:用户可以分享实时天气情况
  4. 集成更多服务:如交通、旅游、农业等垂直领域

这个系统不仅能帮助襄阳市民和游客更好地规划出行,还能为相关行业提供有价值的天气数据支持。通过持续优化和扩展,它将成为一个真正实用的智能天气服务平台。