使用 Python脚本统计每日服务器流量使用情况并发送到 Telegram
先决条件
Python 脚本
- TELEGRAM_TOKEN 填写你获取的 Token,TELEGRAM_CHAT_ID 填写你的Chat ID
- 如果要显示GB流量,就在代码中改 QUERY ,在后面加上 / 1024
点击展开Python代码
import requests
import datetime
PROMETHEUS_URL = "http://localhost:9090"
TELEGRAM_TOKEN = "123456789:ABCDEF..."
TELEGRAM_CHAT_ID = "12345678"
# PromQL 查询(总流量 MB)
#QUERY = 'sum by (instance) (increase(node_network_receive_bytes_total{device="eth0"}[1d]) + increase(node_network_transmit_bytes_total{device="eth0"}[1d])) / 1024 / 1024'
QUERY = 'sum by (instance) (increase(node_network_receive_bytes_total{device!~"lo|docker.*|veth.*|br.*"}[1d]) + increase(node_network_transmit_bytes_total{device!~"lo|docker.*|veth.*|br.*"}[1d])) / 1024 / 1024'
def query_prometheus(query):
url = f"{PROMETHEUS_URL}/api/v1/query"
r = requests.get(url, params={'query': query})
return r.json()
def format_result(data):
yesterday = datetime.date.today() - datetime.timedelta(days=1)
lines = [f"📊 每日流量报告 - {yesterday}"]
# lines = [f"📊 每日流量报告 - {datetime.date.today()}"]
for item in data['data']['result']:
instance = item['metric']['instance']
mb = float(item['value'][1])
lines.append(f"🖥️ {instance}: {mb:.2f} MB")
return "\n".join(lines)
def send_telegram_message(text):
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
data = {"chat_id": TELEGRAM_CHAT_ID, "text": text}
r = requests.post(url, data=data)
return r.json()
if __name__ == "__main__":
data = query_prometheus(QUERY)
text = format_result(data)
print("Sending message:\n", text)
send_telegram_message(text)
crontab 定时发送
- 在每天凌晨 00:10 执行任务,统计“过去 1 天”的流量
10 0 * * * /usr/bin/python3 /path/to/your/script.py
每日流量统计告警通知示例
