Use Cases & Examples
9n9s is designed to monitor anything that needs to run reliably and on time. This page covers common use cases, implementation patterns, and practical examples to help you understand where 9n9s fits in your infrastructure.
Heartbeat Monitoring Use Cases
Section titled “Heartbeat Monitoring Use Cases”Heartbeat monitors excel at tracking processes that can actively signal their status - perfect for scheduled tasks, background jobs, and any process that can make an HTTP request.
Scheduled Tasks & Cron Jobs
Section titled “Scheduled Tasks & Cron Jobs”System Maintenance Tasks
Section titled “System Maintenance Tasks”Use Case: Monitor critical system maintenance scripts that run on schedules.
#!/bin/bash# Daily cleanup script - /etc/cron.daily/cleanup
# Start signalcurl -fsS https://pulse.9n9s.com/uuid-cleanup-job/start
# Cleanup operationsfind /var/log -name "*.log" -mtime +30 -deletefind /tmp -name "*.tmp" -mtime +1 -deleteapt autoremove -yapt autoclean
# Success signal with summaryCLEANED_FILES=$(find /var/log -name "*.log" -mtime +30 | wc -l)curl -fsS -X POST \ -d "Cleanup completed. Removed $CLEANED_FILES old log files." \ https://pulse.9n9s.com/uuid-cleanup-jobMonitor Configuration:
- Schedule:
0 2 * * *(daily at 2 AM) - Grace Period: 30 minutes
- Expected Runtime: 5-15 minutes
- Tags:
environment:production,type:maintenance,criticality:medium
Database Backups
Section titled “Database Backups”Use Case: Ensure database backups complete successfully and on time.
#!/usr/bin/env python3# PostgreSQL backup script
import subprocessimport requestsimport datetimeimport jsonimport osimport time
HEARTBEAT_URL = "https://pulse.9n9s.com/uuid-db-backup"
try: start_time = time.time()
# Signal backup start requests.get(f"{HEARTBEAT_URL}/start")
# Perform backup timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") backup_file = f"/backups/db_backup_{timestamp}.sql"
result = subprocess.run([ "pg_dump", "-h", "localhost", "-U", "backup_user", "-d", "production_db", "-f", backup_file ], capture_output=True, text=True, check=True)
# Verify backup file backup_size = os.path.getsize(backup_file)
# Signal success with metrics payload = { "backup_file": backup_file, "size_mb": backup_size / (1024 * 1024), "duration_seconds": time.time() - start_time, "timestamp": timestamp }
requests.post(f"{HEARTBEAT_URL}", json=payload)
except subprocess.CalledProcessError as e: # Signal failure with error details error_payload = { "error": str(e), "stderr": e.stderr, "exit_code": e.returncode } requests.post(f"{HEARTBEAT_URL}/fail", json=error_payload) raiseMonitor Configuration:
- Schedule:
0 3 * * *(daily at 3 AM) - Grace Period: 2 hours (large databases may take time)
- Expected Runtime: 30 minutes - 2 hours
- Payload Metrics: Extract
size_mb,duration_seconds
ETL and Data Processing
Section titled “ETL and Data Processing”Data Pipeline Monitoring
Section titled “Data Pipeline Monitoring”Use Case: Monitor multi-stage data processing pipelines.
# Apache Airflow DAG with 9n9s monitoringfrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetime, timedeltaimport requests
def send_heartbeat(context, status="success", **kwargs): """Send heartbeat pulse with task context""" task_id = context['task_instance'].task_id dag_id = context['dag'].dag_id execution_date = context['execution_date']
payload = { "dag_id": dag_id, "task_id": task_id, "execution_date": str(execution_date), "duration": context['task_instance'].duration, "status": status }
if status == "success": requests.post("https://pulse.9n9s.com/uuid-etl-pipeline", json=payload) else: requests.post("https://pulse.9n9s.com/uuid-etl-pipeline/fail", json=payload)
def extract_data(**context): # Extract data from source systems try: # ... extraction logic ... send_heartbeat(context, "success", records_extracted=1000) except Exception as e: send_heartbeat(context, "failure", error=str(e)) raise
def transform_data(**context): # Transform and clean data try: # ... transformation logic ... send_heartbeat(context, "success", records_transformed=950) except Exception as e: send_heartbeat(context, "failure", error=str(e)) raise
def load_data(**context): # Load data to destination try: # ... loading logic ... send_heartbeat(context, "success", records_loaded=950) except Exception as e: send_heartbeat(context, "failure", error=str(e)) raise
dag = DAG( 'daily_etl_pipeline', default_args={ 'owner': 'data-team', 'depends_on_past': False, 'start_date': datetime(2024, 1, 1), 'email_on_failure': False, 'retries': 1, 'retry_delay': timedelta(minutes=5) }, description='Daily ETL pipeline with 9n9s monitoring', schedule_interval='0 1 * * *', catchup=False)
# Define tasksextract_task = PythonOperator( task_id='extract_data', python_callable=extract_data, dag=dag)
transform_task = PythonOperator( task_id='transform_data', python_callable=transform_data, dag=dag)
load_task = PythonOperator( task_id='load_data', python_callable=load_data, dag=dag)
# Set dependenciesextract_task >> transform_task >> load_taskBackground Workers & Queue Processors
Section titled “Background Workers & Queue Processors”Message Queue Consumers
Section titled “Message Queue Consumers”Use Case: Monitor background workers processing jobs from queues.
// Node.js worker with 9n9s monitoringconst Bull = require("bull");const axios = require("axios");
const emailQueue = new Bull("email processing");const HEARTBEAT_URL = "https://pulse.9n9s.com/uuid-email-worker";
let lastHeartbeat = Date.now();let processedCount = 0;let errorCount = 0;
// Send periodic heartbeat every 5 minutessetInterval( async () => { try { await axios.post(HEARTBEAT_URL, { processed_since_last_heartbeat: processedCount, errors_since_last_heartbeat: errorCount, queue_size: await emailQueue.count(), worker_memory_mb: process.memoryUsage().heapUsed / 1024 / 1024, uptime_seconds: process.uptime(), });
processedCount = 0; errorCount = 0; lastHeartbeat = Date.now(); } catch (error) { console.error("Failed to send heartbeat:", error); } }, 5 * 60 * 1000,);
// Process jobsemailQueue.process(async (job) => { try { await sendEmail(job.data); processedCount++; } catch (error) { errorCount++; throw error; }});
// Handle graceful shutdownprocess.on("SIGTERM", async () => { await axios.post(`${HEARTBEAT_URL}/fail`, { reason: "Worker shutdown", processed_total: processedCount, errors_total: errorCount, }); process.exit(0);});Monitor Configuration:
- Schedule:
every 10 minutes(expect heartbeat every 5 minutes with 5-minute grace period) - Tags:
service:email,type:worker,environment:production - Payload Metrics:
processed_count,error_count,queue_size,memory_mb
Serverless Functions
Section titled “Serverless Functions”AWS Lambda Monitoring
Section titled “AWS Lambda Monitoring”Use Case: Monitor serverless function executions and cold starts.
import jsonimport timeimport requestsimport os
HEARTBEAT_URL = os.environ['NINES_HEARTBEAT_URL']
def lambda_handler(event, context): """AWS Lambda function with 9n9s monitoring""" start_time = time.time()
try: # Signal function start requests.get(f"{HEARTBEAT_URL}/start")
# Your function logic here result = process_data(event)
# Calculate metrics duration_ms = (time.time() - start_time) * 1000 memory_used = context.memory_limit_in_mb
# Signal success with context payload = { "duration_ms": duration_ms, "memory_limit_mb": memory_used, "records_processed": len(event.get('Records', [])), "cold_start": is_cold_start(), "aws_request_id": context.aws_request_id }
requests.post(HEARTBEAT_URL, json=payload)
return { 'statusCode': 200, 'body': json.dumps(result) }
except Exception as e: # Signal failure error_payload = { "error": str(e), "error_type": type(e).__name__, "duration_ms": (time.time() - start_time) * 1000, "aws_request_id": context.aws_request_id }
requests.post(f"{HEARTBEAT_URL}/fail", json=error_payload) raiseCI/CD and Deployment Monitoring
Section titled “CI/CD and Deployment Monitoring”Build Pipeline Monitoring
Section titled “Build Pipeline Monitoring”Use Case: Monitor CI/CD pipeline stages and deployments.
# GitHub Actions workflow with 9n9s monitoringname: Deploy to Production
on: push: branches: [main]
jobs: deploy: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4
- name: Signal deployment start run: | curl -fsS https://pulse.9n9s.com/${{ secrets.DEPLOYMENT_MONITOR_ID }}/start
- name: Run tests run: npm test
- name: Build application run: npm run build
- name: Deploy to production run: | # Deployment commands kubectl apply -f k8s/ kubectl rollout status deployment/app
- name: Signal deployment success if: success() run: | curl -fsS -X POST \ -d '{"commit": "${{ github.sha }}", "duration": "${{ github.event.workflow_run.conclusion }}", "environment": "production"}' \ https://pulse.9n9s.com/${{ secrets.DEPLOYMENT_MONITOR_ID }}
- name: Signal deployment failure if: failure() run: | curl -fsS -X POST \ -d '{"commit": "${{ github.sha }}", "error": "Deployment failed", "step": "${{ github.job }}"}' \ https://pulse.9n9s.com/${{ secrets.DEPLOYMENT_MONITOR_ID }}/failUptime Monitoring Use Cases
Section titled “Uptime Monitoring Use Cases”Uptime monitors proactively check your services from external perspectives, perfect for websites, APIs, and any publicly accessible endpoints.
Website Monitoring
Section titled “Website Monitoring”E-commerce Site Monitoring
Section titled “E-commerce Site Monitoring”Use Case: Monitor critical pages of an e-commerce website.
Homepage Monitor:
- URL:
https://shop.example.com - Frequency: Every 1 minute
- Assertions:
- Status code:
200 - Response time:
< 2000ms - Content contains:
"Shop Now" - SSL certificate expires:
> 14 days
- Status code:
Checkout Process Monitor:
- URL:
https://shop.example.com/checkout - Method:
POST - Headers:
Content-Type: application/json - Body:
{"test": true, "items": [{"id": "test-item"}]} - Assertions:
- Status code:
200or302 - Response time:
< 5000ms - Content contains:
"payment"
- Status code:
Payment Gateway Monitor:
- URL:
https://payments.example.com/health - Frequency: Every 30 seconds
- Assertions:
- Status code:
200 - Response body JSON:
$.status == "healthy" - Response time:
< 1000ms
- Status code:
API Health Monitoring
Section titled “API Health Monitoring”REST API Endpoint Monitoring
Section titled “REST API Endpoint Monitoring”Use Case: Monitor critical API endpoints with different authentication and validation requirements.
Public API Health Check:
# 9n9s.yml configurationprojects: api-services: uptime: - name: "Public API Health" url: "https://api.example.com/health" frequency: "30s" assertions: - type: "STATUS_CODE" operator: "EQUALS" value: "200" - type: "RESPONSE_TIME" operator: "LESS_THAN" value: "500" - type: "RESPONSE_BODY" operator: "MATCHES_JSON" value: "$.status == 'healthy'" tags: service: api environment: production criticality: highAuthenticated API Endpoint:
- name: "User Profile API" url: "https://api.example.com/v1/user/profile" method: "GET" headers: Authorization: "Bearer {{env.API_TEST_TOKEN}}" Content-Type: "application/json" frequency: "5m" assertions: - type: "STATUS_CODE" operator: "IN" value: ["200", "401"] # 401 is acceptable if token expires - type: "RESPONSE_TIME" operator: "LESS_THAN" value: "2000" - type: "RESPONSE_BODY" operator: "CONTAINS" value: "user_id"GraphQL API Monitoring:
- name: "GraphQL API" url: "https://api.example.com/graphql" method: "POST" headers: Content-Type: "application/json" body: | { "query": "query HealthCheck { health { status version } }" } frequency: "2m" assertions: - type: "STATUS_CODE" operator: "EQUALS" value: "200" - type: "RESPONSE_BODY" operator: "MATCHES_JSON" value: "$.data.health.status == 'ok'"Infrastructure Monitoring
Section titled “Infrastructure Monitoring”Database Connection Monitoring
Section titled “Database Connection Monitoring”Use Case: Monitor database connectivity through health check endpoints.
- name: "PostgreSQL Health" url: "https://api.example.com/health/database" frequency: "1m" assertions: - type: "STATUS_CODE" operator: "EQUALS" value: "200" - type: "RESPONSE_BODY" operator: "MATCHES_JSON" value: "$.database.postgresql.status == 'connected'" - type: "RESPONSE_TIME" operator: "LESS_THAN" value: "5000" # Database checks may be slowerCDN and Asset Monitoring
Section titled “CDN and Asset Monitoring”Use Case: Monitor CDN performance and asset availability.
- name: "CDN Asset Availability" url: "https://cdn.example.com/assets/main.css" frequency: "5m" regions: ["us-east-1", "eu-west-1", "ap-southeast-1"] assertions: - type: "STATUS_CODE" operator: "EQUALS" value: "200" - type: "RESPONSE_TIME" operator: "LESS_THAN" value: "1000" - type: "RESPONSE_HEADER" operator: "CONTAINS" header: "Cache-Control" value: "max-age"Advanced Use Cases
Section titled “Advanced Use Cases”Multi-Service Health Monitoring
Section titled “Multi-Service Health Monitoring”Use Case: Monitor complex distributed systems with dependencies.
# Comprehensive health check with dependency monitoringimport requestsimport jsonfrom typing import Dict, List
class ServiceHealthMonitor: def __init__(self, heartbeat_url: str): self.heartbeat_url = heartbeat_url self.services = {}
def check_service_health(self, name: str, url: str, timeout: int = 5) -> Dict: """Check individual service health""" try: start_time = time.time() response = requests.get(url, timeout=timeout) duration = (time.time() - start_time) * 1000
return { "name": name, "status": "healthy" if response.status_code == 200 else "unhealthy", "response_time_ms": duration, "status_code": response.status_code, "details": response.json() if response.headers.get('content-type', '').startswith('application/json') else None } except Exception as e: return { "name": name, "status": "unhealthy", "error": str(e), "response_time_ms": None }
def run_health_checks(self): """Run health checks for all services""" services_to_check = [ ("database", "http://localhost:8080/health/db"), ("redis", "http://localhost:8080/health/redis"), ("rabbitmq", "http://localhost:8080/health/rabbitmq"), ("elasticsearch", "http://localhost:8080/health/elasticsearch") ]
results = [] overall_healthy = True
for name, url in services_to_check: result = self.check_service_health(name, url) results.append(result) if result["status"] != "healthy": overall_healthy = False
# Send comprehensive health report payload = { "overall_status": "healthy" if overall_healthy else "degraded", "services": results, "healthy_count": sum(1 for r in results if r["status"] == "healthy"), "total_count": len(results), "check_timestamp": datetime.utcnow().isoformat() }
endpoint = self.heartbeat_url if overall_healthy else f"{self.heartbeat_url}/fail" requests.post(endpoint, json=payload)
# Run every 5 minutesif __name__ == "__main__": monitor = ServiceHealthMonitor("https://pulse.9n9s.com/uuid-service-health") monitor.run_health_checks()Kubernetes Pod Monitoring
Section titled “Kubernetes Pod Monitoring”Use Case: Monitor Kubernetes pods and deployments.
# Kubernetes CronJob with 9n9s monitoringapiVersion: batch/v1kind: CronJobmetadata: name: data-backup-jobspec: schedule: "0 2 * * *" jobTemplate: spec: template: spec: containers: - name: backup image: postgres:15 env: - name: HEARTBEAT_URL valueFrom: secretKeyRef: name: monitoring-secrets key: backup-heartbeat-url command: - /bin/bash - -c - | # Signal job start curl -fsS "${HEARTBEAT_URL}/start"
# Perform backup pg_dump -h $DB_HOST -U $DB_USER $DB_NAME > /backup/backup_$(date +%Y%m%d_%H%M%S).sql
if [ $? -eq 0 ]; then # Signal success BACKUP_SIZE=$(du -h /backup/*.sql | tail -1 | cut -f1) curl -fsS -X POST -d "{\"backup_size\": \"$BACKUP_SIZE\", \"pod\": \"$HOSTNAME\"}" "${HEARTBEAT_URL}" else # Signal failure curl -fsS -X POST -d "{\"error\": \"Backup failed\", \"pod\": \"$HOSTNAME\"}" "${HEARTBEAT_URL}/fail" exit 1 fi restartPolicy: OnFailureThird-Party Service Integration
Section titled “Third-Party Service Integration”Use Case: Monitor third-party service integrations and APIs.
# Monitor multiple third-party servicesclass ThirdPartyServiceMonitor: def __init__(self): self.services = { "stripe": { "url": "https://api.stripe.com/v1/charges", "headers": {"Authorization": f"Bearer {os.environ['STRIPE_SECRET_KEY']}"}, "heartbeat": "https://pulse.9n9s.com/uuid-stripe-integration" }, "sendgrid": { "url": "https://api.sendgrid.com/v3/user/profile", "headers": {"Authorization": f"Bearer {os.environ['SENDGRID_API_KEY']}"}, "heartbeat": "https://pulse.9n9s.com/uuid-sendgrid-integration" }, "aws_s3": { "url": "https://s3.amazonaws.com", "heartbeat": "https://pulse.9n9s.com/uuid-s3-integration" } }
def check_service(self, name: str, config: dict): """Check individual third-party service""" try: response = requests.get( config["url"], headers=config.get("headers", {}), timeout=10 )
# Service-specific validation if name == "stripe" and response.status_code == 200: data = response.json() payload = { "service": "stripe", "api_version": response.headers.get("Stripe-Version"), "response_time_ms": response.elapsed.total_seconds() * 1000 } requests.post(config["heartbeat"], json=payload)
elif name == "sendgrid" and response.status_code == 200: payload = { "service": "sendgrid", "response_time_ms": response.elapsed.total_seconds() * 1000 } requests.post(config["heartbeat"], json=payload)
except Exception as e: # Signal failure error_payload = { "service": name, "error": str(e), "error_type": type(e).__name__ } requests.post(f"{config['heartbeat']}/fail", json=error_payload)Best Practices by Use Case
Section titled “Best Practices by Use Case”High-Frequency Operations
Section titled “High-Frequency Operations”- Use appropriate grace periods to account for natural variance
- Batch heartbeats for very frequent operations (every few seconds)
- Monitor workers rather than individual jobs for high-volume queues
Critical Infrastructure
Section titled “Critical Infrastructure”- Multiple monitoring approaches: Combine heartbeat and uptime monitors
- Staged alerting: Different thresholds for warnings vs. critical alerts
- Redundant checks: Monitor from multiple angles and locations
Development vs. Production
Section titled “Development vs. Production”- Different schedules: More frequent checks in production
- Environment-specific tags: Separate alerting rules by environment
- Graduated response: Warnings in staging, immediate alerts in production
Team Coordination
Section titled “Team Coordination”- Shared monitoring dashboards: Keep teams informed of service health
- Escalation procedures: Route alerts to appropriate team members
- Documentation: Maintain runbooks linked to monitoring alerts
These use cases demonstrate the flexibility and power of 9n9s across different scenarios. Whether you’re monitoring simple cron jobs or complex distributed systems, 9n9s provides the visibility and reliability assurance you need to maintain operational confidence.