File: //usr/local/bin/check_backups.py
#!/usr/local/bin/python3.12
import os
import sys
import syslog
import socket
import yaml
import subprocess
import json
from datetime import datetime, timezone
# Threshold: 3 days = 72 hours
BACKUP_AGE_THRESHOLD_HOURS = 72
RESULT_LOG = '/root/logs/check_backups.log'
def log_result(message):
"""Log the check result to file"""
timestamp = datetime.now().isoformat()
with open(RESULT_LOG, 'a') as f:
f.write(f"{timestamp} {message}\n")
def get_restic_profiles():
config_file = '/etc/resticprofile/profiles.yaml'
if not os.path.exists(config_file):
return {}
try:
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
profiles = {}
for profile_name, profile_config in config.items():
if profile_name in ['version', 'global', 'groups']:
continue
if isinstance(profile_config, dict):
profiles[profile_name] = profile_config
return profiles
except Exception as e:
syslog.syslog(syslog.LOG_ERR, f"restic-backup-check: Error reading config file: {e}")
return {}
def get_latest_backup_info(profile_name):
"""Get information about the latest backup snapshot for a profile
Returns:
dict with backup info if successful
None if no snapshots exist
dict with 'error' key if command failed
"""
try:
cmd = ['/usr/local/bin/resticprofile', '--profile', profile_name, '--no-lock', 'snapshots', '--json', '--latest', '1']
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode != 0:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: resticprofile command failed for profile={profile_name} "
f"returncode={result.returncode} stderr={result.stderr[:200]}")
return {'error': 'command_failed'}
lines = result.stdout.strip().split('\n')
json_lines = [line for line in lines if line.strip().startswith('[') or line.strip().startswith('{')]
if not json_lines:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: No JSON output found for profile={profile_name} "
f"stdout_length={len(result.stdout)} stdout_preview={result.stdout[:200]}")
return {'error': 'no_json_output'}
try:
snapshots = json.loads(json_lines[0])
if snapshots is None:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: Null response for profile={profile_name}")
return {'error': 'null_response'}
if not isinstance(snapshots, list):
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: Unexpected response type for profile={profile_name} "
f"type={type(snapshots).__name__}")
return {'error': 'unexpected_response_type'}
if len(snapshots) == 0:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: No snapshots found for profile={profile_name} (empty snapshot list)")
return None
latest_snapshot = snapshots[0]
time_str = latest_snapshot['time'].replace('Z', '+00:00')
if '.' in time_str and '+' in time_str:
base_time, tz_offset = time_str.rsplit('+', 1)
if '.' in base_time:
date_part, micro_part = base_time.split('.')
if len(micro_part) > 6:
micro_part = micro_part[:6]
time_str = f"{date_part}.{micro_part}+{tz_offset}"
snapshot_time = datetime.fromisoformat(time_str)
if snapshot_time.tzinfo is not None:
snapshot_time_utc = snapshot_time.astimezone(timezone.utc)
else:
snapshot_time_utc = snapshot_time.replace(tzinfo=timezone.utc)
now_utc = datetime.now(timezone.utc)
age_hours = (now_utc - snapshot_time_utc).total_seconds() / 3600
return {
'timestamp': latest_snapshot['time'],
'age_hours': age_hours,
'id': latest_snapshot.get('id', ''),
}
except json.JSONDecodeError as e:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: JSON decode error for profile={profile_name} "
f"error={e} json_line={json_lines[0][:200] if json_lines else 'none'}")
return {'error': 'json_decode_error'}
except KeyError as e:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: Missing key in snapshot data for profile={profile_name} error={e}")
return {'error': 'missing_key'}
except subprocess.TimeoutExpired:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: Timeout getting backup info for profile={profile_name}")
return {'error': 'timeout'}
except Exception as e:
syslog.syslog(syslog.LOG_ERR,
f"restic-backup-check: Unexpected error for profile={profile_name} error={e}")
return {'error': 'unexpected_error'}
def main():
hostname = socket.gethostname()
warnings = []
profiles = get_restic_profiles()
if not profiles:
msg = "restic-backup-check: No restic profiles found in /etc/resticprofile/profiles.yaml"
syslog.syslog(syslog.LOG_WARNING, msg)
log_result(f"WARNING: {msg}")
print(f"WARNING: {msg}", file=sys.stderr)
return
for profile_name in profiles:
backup_info = get_latest_backup_info(profile_name)
if backup_info is None:
msg = f"restic-backup-failure: profile={profile_name} error=no_backup_found hostname={hostname}"
syslog.syslog(syslog.LOG_ERR, msg)
warnings.append(f"ERROR: Profile '{profile_name}': No backup found")
continue
if isinstance(backup_info, dict) and 'error' in backup_info:
error_type = backup_info['error']
msg = f"restic-backup-failure: profile={profile_name} error={error_type} hostname={hostname}"
syslog.syslog(syslog.LOG_ERR, msg)
warnings.append(f"ERROR: Profile '{profile_name}': Failed to check backup status ({error_type})")
continue
age_hours = backup_info.get('age_hours', 0)
age_days = age_hours / 24
timestamp = backup_info.get('timestamp', 'unknown')
if age_hours > BACKUP_AGE_THRESHOLD_HOURS:
msg = (f"restic-backup-warning: profile={profile_name} age={age_days:.1f} days "
f"last_backup={timestamp} hostname={hostname}")
syslog.syslog(syslog.LOG_WARNING, msg)
warnings.append(f"WARNING: Profile '{profile_name}': Backup is {age_days:.1f} days old (last backup: {timestamp})")
if warnings:
for warning in warnings:
print(warning, file=sys.stderr)
log_result(f"FAIL: {len(warnings)} issue(s) - {'; '.join(warnings)}")
sys.exit(1)
else:
log_result("OK: All backups are current")
print("OK: All backups are current", file=sys.stdout)
if __name__ == "__main__":
main()