AI-TOL
📚 Complete Guide 🕐 25分钟 📊 Intermediate ✓ Updated 2026-02-28

Cron表达式完全指南 | 从入门到精通

全面深入学习Cron表达式语法,包含10个实际案例、调试技巧、最佳实践和常见问题解答。掌握Linux定时任务调度必备知识。

Learn by doing

Practice with the while reading - free, no registration required

Open →

Real-World Examples

Case Study 1: 自动化数据库备份

Problem

一家初创公司需要每天可靠地备份PostgreSQL数据库,但最初几次尝试都失败了,cron日志显示"command not found"错误。

Solution

问题在于pg_dump需要环境变量,且脚本中使用了相对路径。解决方案是在脚本顶部设置环境变量,使用绝对路径,添加日志记录和错误处理:


                  #!/bin/bash
# 数据库备份脚本

# 环境变量
export PGPASSWORD='your_password'
export PATH=/usr/local/bin:/usr/bin:/bin

# 配置
DB_NAME="mydb"
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_FILE="$BACKUP_DIR/${DB_NAME}_${DATE}.sql"

# 创建备份
/usr/bin/pg_dump -h localhost -U postgres "$DB_NAME" > "$BACKUP_FILE" 2>&1

# 检查成功
if [ $? -eq 0 ]; then
  echo "备份成功: $BACKUP_FILE" >> /var/log/db_backup.log
  # 保留最近7天的备份
  find "$BACKUP_DIR" -name "${DB_NAME}_*.sql" -mtime +7 -delete
else
  echo "备份失败,检查错误信息" >> /var/log/db_backup.log
  exit 1
fi
                

Case Study 2: 微服务健康检查

Problem

一个部署在Kubernetes上的微服务需要定期检查健康状态。服务偶尔会卡住,需要自动重启。

Solution

创建Kubernetes CronJob资源,定期调用健康检查端点:


                  apiVersion: batch/v1
kind: CronJob
metadata:
  name: health-check
spec:
  schedule: "*/5 * * * *"  # 每5分钟
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: health-check
            image: curlimages/curl:latest
            imagePullPolicy: Always
            command:
            - /bin/sh
            - -c
            - |-
              response=$(curl -s -o /dev/null -w "%{http_code}" http://service:8080/health)
              if [ $response -ne 200 ]; then
                echo "健康检查失败,HTTP状态码: $response"
                # 触发重启逻辑
                kubectl rollout restart deployment/my-service
              fi
          restartPolicy: OnFailure
                

Case Study 3: 日志轮转和归档

Problem

Web服务器日志文件快速增长,磁盘空间经常不足。需要自动轮转、压缩和清理日志。

Solution

使用logrotate配合cron实现日志管理:


                  /var/log/nginx/*.log {
  daily
  missingok
  rotate 14
  compress
  delaycompress
  notifempty
  create 0640 www-data adm
  sharedscripts
  postrotate
    [ -f /var/run/nginx.pid ] && kill -USR1 $(cat /var/run/nginx.pid)
  endscript
  lastaction
    # 将7天前的压缩日志移动到归档目录
    find /var/log/nginx -name "*.gz" -mtime +7 -exec mv {} /archive/nginx/ \;
  endaction
}

# Cron任务(logrotate自动创建)
0 0 * * * /usr/sbin/logrotate -f /etc/logrotate.conf
                

Case Study 4: 定时邮件报告

Problem

财务部门需要每天早上8点收到前一天的营收报告邮件。

Solution

创建生成报告并发送邮件的脚本:


                  #!/usr/bin/env python3
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import subprocess
from datetime import datetime, timedelta

def generate_report():
    """生成营收报告"""
    yesterday = datetime.now() - timedelta(days=1)
    date_str = yesterday.strftime('%Y-%m-%d')

    # 执行SQL查询
    cmd = f'''
    mysql -u reporter -ppassword -h db.internal -e "
    SELECT product, SUM(amount) as revenue
    FROM sales
    WHERE DATE(created_at) = '{date_str}'
    GROUP BY product
    "
    '''
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    return result.stdout

def send_email(report):
    """发送邮件"""
    msg = MIMEMultipart()
    msg['From'] = '[email protected]'
    msg['To'] = '[email protected]'
    msg['Subject'] = f'营收报告 - {datetime.now().strftime("%Y-%m-%d")}'

    body = f'''
    营收报告
    {'='*40}

    {report}

    {'='*40}
    此邮件由系统自动生成,请勿回复。
    '''
    msg.attach(MIMEText(body, 'plain'))

    with smtplib.SMTP('smtp.company.com', 587) as server:
        server.starttls()
        server.login('[email protected]', 'password')
        server.send_message(msg)

if __name__ == '__main__':
    report = generate_report()
    send_email(report)
                

Case Study 5: 分布式定时任务同步

Problem

在多台服务器上运行的定时任务需要确保同一时间只有一台执行,避免重复处理。

Solution

使用分布式锁确保任务互斥:


                  #!/bin/bash
# 分布式数据处理任务

LOCK_FILE="/tmp/data_processing.lock"
LOCK_TIMEOUT=3600  # 1小时超时
REDIS_HOST="redis.internal"

# 尝试获取锁
acquire_lock() {
  redis-cli -h "$REDIS_HOST" SETNX "$LOCK_FILE" "$HOSTNAME:$$" > /dev/null
  return $?
}

# 释放锁
release_lock() {
  redis-cli -h "$REDIS_HOST" DEL "$LOCK_FILE" > /dev/null
}

# 主逻辑
main() {
  echo "开始数据处理..."

  # 检查锁是否超时
  lock_time=$(redis-cli -h "$REDIS_HOST" GET "$LOCK_FILE.ttl")
  if [ -n "$lock_time" ]; then
    current_time=$(date +%s)
    if [ $((current_time - lock_time)) -gt $LOCK_TIMEOUT ]; then
      echo "锁已超时,强制获取"
      release_lock
    fi
  fi

  # 尝试获取锁
  if ! acquire_lock; then
    owner=$(redis-cli -h "$REDIS_HOST" GET "$LOCK_FILE")
    echo "任务正在运行: $owner"
    exit 0
  fi

  # 设置锁超时
  redis-cli -h "$REDIS_HOST" SETEX "$LOCK_FILE.ttl" "$LOCK_TIMEOUT" "$(date +%s)" > /dev/null

  # 执行任务
  python3 /scripts/process_data.py

  # 释放锁
  release_lock

  echo "数据处理完成"
}

# 执行
main
                

Case Study 6: 渐进式部署脚本

Problem

需要在不影响服务的情况下滚动更新所有服务器,每批5台,间隔2分钟。

Solution

使用Cron触发的渐进式部署系统:


                  #!/bin/bash
# 渐进式部署脚本

DEPLOYMENT_CONFIG="/etc/deployment/config.json"
CURRENT_BATCH=$(cat /var/run/deployment_batch 2>/dev/null || echo 0)
BATCH_SIZE=5
BATCH_DELAY=120  # 2分钟

# 获取服务器列表
servers=$(jq -r '.servers[]' "$DEPLOYMENT_CONFIG")
total_servers=$(echo "$servers" | wc -l)

# 计算当前批次
start=$((CURRENT_BATCH * BATCH_SIZE))
end=$((start + BATCH_SIZE))

echo "开始部署批次 $((CURRENT_BATCH + 1)),服务器 $start-$end"

# 部署当前批次
echo "$servers" | sed -n "${start},${end}p" | while read server; do
  echo "部署到 $server ..."
  ssh "$server" 'bash -s' < /scripts/update_service.sh
done

# 记录进度
if [ $end -lt $total_servers ]; then
  echo $((CURRENT_BATCH + 1)) > /var/run/deployment_batch
  echo "批次完成,等待下一批..."
else
  echo "所有批次部署完成"
  rm -f /var/run/deployment_batch
fi
                

Case Study 7: 证书自动续期

Problem

SSL证书需要每90天续期,手动操作容易忘记导致服务中断。

Solution

使用Let's Encrypt自动续期:


                  #!/bin/bash
# SSL证书自动续期

DOMAINS="example.com www.example.com api.example.com"
EMAIL="[email protected]"
CERT_DIR="/etc/letsencrypt/live"
LOG_FILE="/var/log/cert_renewal.log"

log() {
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> "$LOG_FILE"
}

# 检查证书有效期
check_cert_expiry() {
  local domain=$1
  local cert_file="$CERT_DIR/$domain/cert.pem"

  if [ ! -f "$cert_file" ]; then
    log "证书不存在: $domain"
    return 1
  fi

  # 计算剩余天数
  expiry_date=$(openssl x509 -enddate -noout -in "$cert_file" | cut -d= -f2)
  expiry_epoch=$(date -d "$expiry_date" +%s)
  current_epoch=$(date +%s)
  days_left=$(( (expiry_epoch - current_epoch) / 86400 ))

  echo $days_left
}

# 续期证书
renew_cert() {
  log "开始续期证书: $DOMAINS"

  certbot certonly --non-interactive --agree-tos     --email "$EMAIL"     --webroot     -w /var/www/html     -d $DOMAINS     >> "$LOG_FILE" 2>&1

  if [ $? -eq 0 ]; then
    log "证书续期成功"

    # 重载nginx
    systemctl reload nginx

    # 发送通知
    echo "证书续期成功" | mail -s "证书续期通知" "$EMAIL"
  else
    log "证书续期失败"
    echo "证书续期失败,请检查日志" | mail -s "证书续期失败" "$EMAIL"
    exit 1
  fi
}

# 主逻辑
for domain in $DOMAINS; do
  days_left=$(check_cert_expiry "$domain")

  if [ $days_left -lt 30 ]; then
    log "证书将在 $days_left 天后过期,开始续期"
    renew_cert
  else
    log "证书还有 $days_left 天,无需续期"
  fi
done
                

Case Study 8: 缓存预热任务

Problem

电商网站在流量高峰期缓存命中率低,需要提前预热缓存。

Solution

在高峰前自动预热热门商品缓存:


                  #!/usr/bin/env python3
import requests
import json
from datetime import datetime

API_BASE = "https://api.example.com"
CACHE_ENDPOINT = f"{API_BASE}/cache/warmup"

def get_popular_products():
    """获取热门商品列表"""
    response = requests.get(f"{API_BASE}/analytics/popular", params={
      'limit': 1000,
      'time_range': '24h'
    })
    return response.json()['products']

def warmup_cache(product_ids):
    """预热缓存"""
    chunks = [product_ids[i:i+50] for i in range(0, len(product_ids), 50)]

    for chunk in chunks:
      try:
        response = requests.post(CACHE_ENDPOINT, json={
          'product_ids': chunk,
          'ttl': 3600
        }, timeout=30)

        if response.status_code == 200:
          print(f"预热成功: {len(chunk)} 个商品")
        else:
          print(f"预热失败: {response.text}")
      except Exception as e:
        print(f"预热错误: {e}")

def main():
  print(f"[{datetime.now()}] 开始缓存预热")

  try:
    # 获取热门商品
    products = get_popular_products()
    product_ids = [p['id'] for p in products]

    print(f"获取到 {len(product_ids)} 个热门商品")

    # 预热缓存
    warmup_cache(product_ids)

    print(f"[{datetime.now()}] 缓存预热完成")

  except Exception as e:
    print(f"预热失败: {e}")
    exit(1)

if __name__ == '__main__':
  main()
                

Case Study 9: 数据同步和清理

Problem

需要将生产数据库的脱敏数据同步到测试环境,并保留最近30天的数据。

Solution

创建数据同步和清理脚本:


                  #!/bin/bash
# 数据库同步到测试环境

PROD_DB="production"
TEST_DB="test_staging"
RETENTION_DAYS=30

log() {
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"
}

# 导出生产数据(脱敏)
dump_data() {
  log "开始导出生产数据..."

  mysqldump -h prod-db.internal -u exporter -ppassword \
    --single-transaction \
    --quick \
    --lock-tables=false \
    --where="DATE(created_at) >= DATE(NOW()) - INTERVAL $RETENTION_DAYS DAY" \
    "$PROD_DB" \
    users orders products \
    | gzip > /tmp/dump.sql.gz

  # 脱敏处理
  zcat /tmp/dump.sql.gz | sed \
    -e 's/"\(email\|phone\|ssn\)":"[^"]*"/"\1":"***"/g' \
    -e 's/"\(credit_card\|password\)":"[^"]*"/"\1":"***"/g' \
    | gzip > /tmp/dump_anon.sql.gz

  log "数据导出和脱敏完成"
}

# 导入到测试环境
import_data() {
  log "开始导入到测试环境..."

  gunzip < /tmp/dump_anon.sql.gz | mysql -h test-db.internal -u importer -ppassword "$TEST_DB"

  log "数据导入完成"
}

# 清理临时文件
cleanup() {
  log "清理临时文件..."
  rm -f /tmp/dump*.sql.gz
}

# 清理测试环境旧数据
cleanup_old_data() {
  log "清理测试环境旧数据..."

  mysql -h test-db.internal -u admin -ppassword "$TEST_DB" <<EOF
DELETE FROM orders WHERE DATE(created_at) < DATE(NOW()) - INTERVAL $RETENTION_DAYS DAY;
DELETE FROM audit_logs WHERE DATE(created_at) < DATE(NOW()) - INTERVAL $RETENTION_DAYS DAY;
OPTIMIZE TABLE users, orders, products;
EOF

  log "旧数据清理完成"
}

# 主流程
main() {
  log "===== 数据同步开始 ====="

  dump_data
  import_data
  cleanup_old_data
  cleanup

  log "===== 数据同步完成 ====="
}

main
                

Case Study 10: 监控告警集成

Problem

定时任务失败时需要立即通知运维团队,集成现有监控系统。

Solution

使用Prometheus和Alertmanager集成告警:


                  #!/bin/bash
# 带监控集成的任务包装器

TASK_NAME="data_import"
TASK_COMMAND="/usr/bin/python3 /scripts/import.py"
LOG_FILE="/var/log/tasks/${TASK_NAME}.log"
METRICS_FILE="/var/lib/node_exporter/textfile_collector/${TASK_NAME}.prom"

# 初始化
START_TIME=$(date +%s)
echo "# HELP task_last_success_timestamp Last success time" > "$METRICS_FILE"
echo "# TYPE task_last_success_timestamp gauge" >> "$METRICS_FILE"

# 执行任务
run_task() {
  echo "[$(date)] 开始执行任务: $TASK_NAME" >> "$LOG_FILE"

  $TASK_COMMAND >> "$LOG_FILE" 2>&1
  EXIT_CODE=$?

  END_TIME=$(date +%s)
  DURATION=$((END_TIME - START_TIME))

  if [ $EXIT_CODE -eq 0 ]; then
    echo "[$(date)] 任务成功,耗时: ${DURATION}s" >> "$LOG_FILE"

    # 记录成功指标
    echo "task_last_success_timestamp{task="$TASK_NAME"} $END_TIME" >> "$METRICS_FILE"
    echo "task_duration_seconds{task="$TASK_NAME",status="success"} $DURATION" >> "$METRICS_FILE"
  else
    echo "[$(date)] 任务失败,退出码: $EXIT_CODE" >> "$LOG_FILE"

    # 记录失败指标
    echo "task_last_success_timestamp{task="$TASK_NAME"} 0" >> "$METRICS_FILE"
    echo "task_duration_seconds{task="$TASK_NAME",status="failed"} $DURATION" >> "$METRICS_FILE"

    # 发送告警
    curl -X POST http://alertmanager:9093/api/v1/alerts -d '[
      {
        "labels": {
          "alertname": "TaskFailed",
          "task": "'$TASK_NAME'",
          "severity": "critical"
        },
        "annotations": {
          "summary": "任务失败: '$TASK_NAME'",
          "description": "退出码: '$EXIT_CODE',查看日志: '$LOG_FILE'"
        }
      }
    ]'

    exit 1
  fi
}

run_task
                

Best Practices

始终使用绝对路径

将输出重定向到日志文件

设置PATH环境变量

使用Shebang指定解释器

添加执行权限

测试复杂表达式

Master Today

Put your knowledge into practice with our free, privacy-focused tools

Try →