Skip to content
Go back

Daily server traffic usage

Updated:

使用 Python脚本统计每日服务器流量使用情况并发送到 Telegram

先决条件

Python 脚本

点击展开Python代码
import requests
import datetime

PROMETHEUS_URL = "http://localhost:9090"
TELEGRAM_TOKEN = "123456789:ABCDEF..."
TELEGRAM_CHAT_ID = "12345678"

# PromQL 查询(总流量 MB)
#QUERY = 'sum by (instance) (increase(node_network_receive_bytes_total{device="eth0"}[1d]) + increase(node_network_transmit_bytes_total{device="eth0"}[1d])) / 1024 / 1024'
QUERY = 'sum by (instance) (increase(node_network_receive_bytes_total{device!~"lo|docker.*|veth.*|br.*"}[1d]) + increase(node_network_transmit_bytes_total{device!~"lo|docker.*|veth.*|br.*"}[1d])) / 1024 / 1024'

def query_prometheus(query):
    url = f"{PROMETHEUS_URL}/api/v1/query"
    r = requests.get(url, params={'query': query})
    return r.json()

def format_result(data):
    yesterday = datetime.date.today() - datetime.timedelta(days=1)
    lines = [f"📊 每日流量报告 - {yesterday}"]
   # lines = [f"📊 每日流量报告 - {datetime.date.today()}"]
    for item in data['data']['result']:
        instance = item['metric']['instance']
        mb = float(item['value'][1])
        lines.append(f"🖥️ {instance}: {mb:.2f} MB")
    return "\n".join(lines)

def send_telegram_message(text):
    url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage"
    data = {"chat_id": TELEGRAM_CHAT_ID, "text": text}
    r = requests.post(url, data=data)
    return r.json()

if __name__ == "__main__":
    data = query_prometheus(QUERY)
    text = format_result(data)
    print("Sending message:\n", text)
    send_telegram_message(text)

crontab 定时发送

10 0 * * * /usr/bin/python3 /path/to/your/script.py

每日流量统计告警通知示例

telegram告警通知


Share this post on:

Next Post
Docker deploys Prometheus Grafana Alertmanager monitoring system