Skip to content

Use Cases & Examples

9n9s is designed to monitor anything that needs to run reliably and on time. This page covers common use cases, implementation patterns, and practical examples to help you understand where 9n9s fits in your infrastructure.

Heartbeat monitors excel at tracking processes that can actively signal their status - perfect for scheduled tasks, background jobs, and any process that can make an HTTP request.

Use Case: Monitor critical system maintenance scripts that run on schedules.

#!/bin/bash
# Daily cleanup script - /etc/cron.daily/cleanup
# Start signal
curl -fsS https://pulse.9n9s.com/uuid-cleanup-job/start
# Cleanup operations
find /var/log -name "*.log" -mtime +30 -delete
find /tmp -name "*.tmp" -mtime +1 -delete
apt autoremove -y
apt autoclean
# Success signal with summary
CLEANED_FILES=$(find /var/log -name "*.log" -mtime +30 | wc -l)
curl -fsS -X POST \
-d "Cleanup completed. Removed $CLEANED_FILES old log files." \
https://pulse.9n9s.com/uuid-cleanup-job

Monitor Configuration:

  • Schedule: 0 2 * * * (daily at 2 AM)
  • Grace Period: 30 minutes
  • Expected Runtime: 5-15 minutes
  • Tags: environment:production, type:maintenance, criticality:medium

Use Case: Ensure database backups complete successfully and on time.

#!/usr/bin/env python3
# PostgreSQL backup script
import subprocess
import requests
import datetime
import json
import os
import time
HEARTBEAT_URL = "https://pulse.9n9s.com/uuid-db-backup"
try:
start_time = time.time()
# Signal backup start
requests.get(f"{HEARTBEAT_URL}/start")
# Perform backup
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"/backups/db_backup_{timestamp}.sql"
result = subprocess.run([
"pg_dump", "-h", "localhost", "-U", "backup_user",
"-d", "production_db", "-f", backup_file
], capture_output=True, text=True, check=True)
# Verify backup file
backup_size = os.path.getsize(backup_file)
# Signal success with metrics
payload = {
"backup_file": backup_file,
"size_mb": backup_size / (1024 * 1024),
"duration_seconds": time.time() - start_time,
"timestamp": timestamp
}
requests.post(f"{HEARTBEAT_URL}", json=payload)
except subprocess.CalledProcessError as e:
# Signal failure with error details
error_payload = {
"error": str(e),
"stderr": e.stderr,
"exit_code": e.returncode
}
requests.post(f"{HEARTBEAT_URL}/fail", json=error_payload)
raise

Monitor Configuration:

  • Schedule: 0 3 * * * (daily at 3 AM)
  • Grace Period: 2 hours (large databases may take time)
  • Expected Runtime: 30 minutes - 2 hours
  • Payload Metrics: Extract size_mb, duration_seconds

Use Case: Monitor multi-stage data processing pipelines.

# Apache Airflow DAG with 9n9s monitoring
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
def send_heartbeat(context, status="success", **kwargs):
"""Send heartbeat pulse with task context"""
task_id = context['task_instance'].task_id
dag_id = context['dag'].dag_id
execution_date = context['execution_date']
payload = {
"dag_id": dag_id,
"task_id": task_id,
"execution_date": str(execution_date),
"duration": context['task_instance'].duration,
"status": status
}
if status == "success":
requests.post("https://pulse.9n9s.com/uuid-etl-pipeline", json=payload)
else:
requests.post("https://pulse.9n9s.com/uuid-etl-pipeline/fail", json=payload)
def extract_data(**context):
# Extract data from source systems
try:
# ... extraction logic ...
send_heartbeat(context, "success", records_extracted=1000)
except Exception as e:
send_heartbeat(context, "failure", error=str(e))
raise
def transform_data(**context):
# Transform and clean data
try:
# ... transformation logic ...
send_heartbeat(context, "success", records_transformed=950)
except Exception as e:
send_heartbeat(context, "failure", error=str(e))
raise
def load_data(**context):
# Load data to destination
try:
# ... loading logic ...
send_heartbeat(context, "success", records_loaded=950)
except Exception as e:
send_heartbeat(context, "failure", error=str(e))
raise
dag = DAG(
'daily_etl_pipeline',
default_args={
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Daily ETL pipeline with 9n9s monitoring',
schedule_interval='0 1 * * *',
catchup=False
)
# Define tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag
)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
dag=dag
)
# Set dependencies
extract_task >> transform_task >> load_task

Use Case: Monitor background workers processing jobs from queues.

// Node.js worker with 9n9s monitoring
const Bull = require("bull");
const axios = require("axios");
const emailQueue = new Bull("email processing");
const HEARTBEAT_URL = "https://pulse.9n9s.com/uuid-email-worker";
let lastHeartbeat = Date.now();
let processedCount = 0;
let errorCount = 0;
// Send periodic heartbeat every 5 minutes
setInterval(
async () => {
try {
await axios.post(HEARTBEAT_URL, {
processed_since_last_heartbeat: processedCount,
errors_since_last_heartbeat: errorCount,
queue_size: await emailQueue.count(),
worker_memory_mb: process.memoryUsage().heapUsed / 1024 / 1024,
uptime_seconds: process.uptime(),
});
processedCount = 0;
errorCount = 0;
lastHeartbeat = Date.now();
} catch (error) {
console.error("Failed to send heartbeat:", error);
}
},
5 * 60 * 1000,
);
// Process jobs
emailQueue.process(async (job) => {
try {
await sendEmail(job.data);
processedCount++;
} catch (error) {
errorCount++;
throw error;
}
});
// Handle graceful shutdown
process.on("SIGTERM", async () => {
await axios.post(`${HEARTBEAT_URL}/fail`, {
reason: "Worker shutdown",
processed_total: processedCount,
errors_total: errorCount,
});
process.exit(0);
});

Monitor Configuration:

  • Schedule: every 10 minutes (expect heartbeat every 5 minutes with 5-minute grace period)
  • Tags: service:email, type:worker, environment:production
  • Payload Metrics: processed_count, error_count, queue_size, memory_mb

Use Case: Monitor serverless function executions and cold starts.

import json
import time
import requests
import os
HEARTBEAT_URL = os.environ['NINES_HEARTBEAT_URL']
def lambda_handler(event, context):
"""AWS Lambda function with 9n9s monitoring"""
start_time = time.time()
try:
# Signal function start
requests.get(f"{HEARTBEAT_URL}/start")
# Your function logic here
result = process_data(event)
# Calculate metrics
duration_ms = (time.time() - start_time) * 1000
memory_used = context.memory_limit_in_mb
# Signal success with context
payload = {
"duration_ms": duration_ms,
"memory_limit_mb": memory_used,
"records_processed": len(event.get('Records', [])),
"cold_start": is_cold_start(),
"aws_request_id": context.aws_request_id
}
requests.post(HEARTBEAT_URL, json=payload)
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
# Signal failure
error_payload = {
"error": str(e),
"error_type": type(e).__name__,
"duration_ms": (time.time() - start_time) * 1000,
"aws_request_id": context.aws_request_id
}
requests.post(f"{HEARTBEAT_URL}/fail", json=error_payload)
raise

Use Case: Monitor CI/CD pipeline stages and deployments.

# GitHub Actions workflow with 9n9s monitoring
name: Deploy to Production
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Signal deployment start
run: |
curl -fsS https://pulse.9n9s.com/${{ secrets.DEPLOYMENT_MONITOR_ID }}/start
- name: Run tests
run: npm test
- name: Build application
run: npm run build
- name: Deploy to production
run: |
# Deployment commands
kubectl apply -f k8s/
kubectl rollout status deployment/app
- name: Signal deployment success
if: success()
run: |
curl -fsS -X POST \
-d '{"commit": "${{ github.sha }}", "duration": "${{ github.event.workflow_run.conclusion }}", "environment": "production"}' \
https://pulse.9n9s.com/${{ secrets.DEPLOYMENT_MONITOR_ID }}
- name: Signal deployment failure
if: failure()
run: |
curl -fsS -X POST \
-d '{"commit": "${{ github.sha }}", "error": "Deployment failed", "step": "${{ github.job }}"}' \
https://pulse.9n9s.com/${{ secrets.DEPLOYMENT_MONITOR_ID }}/fail

Uptime monitors proactively check your services from external perspectives, perfect for websites, APIs, and any publicly accessible endpoints.

Use Case: Monitor critical pages of an e-commerce website.

Homepage Monitor:

  • URL: https://shop.example.com
  • Frequency: Every 1 minute
  • Assertions:
    • Status code: 200
    • Response time: < 2000ms
    • Content contains: "Shop Now"
    • SSL certificate expires: > 14 days

Checkout Process Monitor:

  • URL: https://shop.example.com/checkout
  • Method: POST
  • Headers: Content-Type: application/json
  • Body: {"test": true, "items": [{"id": "test-item"}]}
  • Assertions:
    • Status code: 200 or 302
    • Response time: < 5000ms
    • Content contains: "payment"

Payment Gateway Monitor:

  • URL: https://payments.example.com/health
  • Frequency: Every 30 seconds
  • Assertions:
    • Status code: 200
    • Response body JSON: $.status == "healthy"
    • Response time: < 1000ms

Use Case: Monitor critical API endpoints with different authentication and validation requirements.

Public API Health Check:

# 9n9s.yml configuration
projects:
api-services:
uptime:
- name: "Public API Health"
url: "https://api.example.com/health"
frequency: "30s"
assertions:
- type: "STATUS_CODE"
operator: "EQUALS"
value: "200"
- type: "RESPONSE_TIME"
operator: "LESS_THAN"
value: "500"
- type: "RESPONSE_BODY"
operator: "MATCHES_JSON"
value: "$.status == 'healthy'"
tags:
service: api
environment: production
criticality: high

Authenticated API Endpoint:

- name: "User Profile API"
url: "https://api.example.com/v1/user/profile"
method: "GET"
headers:
Authorization: "Bearer {{env.API_TEST_TOKEN}}"
Content-Type: "application/json"
frequency: "5m"
assertions:
- type: "STATUS_CODE"
operator: "IN"
value: ["200", "401"] # 401 is acceptable if token expires
- type: "RESPONSE_TIME"
operator: "LESS_THAN"
value: "2000"
- type: "RESPONSE_BODY"
operator: "CONTAINS"
value: "user_id"

GraphQL API Monitoring:

- name: "GraphQL API"
url: "https://api.example.com/graphql"
method: "POST"
headers:
Content-Type: "application/json"
body: |
{
"query": "query HealthCheck { health { status version } }"
}
frequency: "2m"
assertions:
- type: "STATUS_CODE"
operator: "EQUALS"
value: "200"
- type: "RESPONSE_BODY"
operator: "MATCHES_JSON"
value: "$.data.health.status == 'ok'"

Use Case: Monitor database connectivity through health check endpoints.

- name: "PostgreSQL Health"
url: "https://api.example.com/health/database"
frequency: "1m"
assertions:
- type: "STATUS_CODE"
operator: "EQUALS"
value: "200"
- type: "RESPONSE_BODY"
operator: "MATCHES_JSON"
value: "$.database.postgresql.status == 'connected'"
- type: "RESPONSE_TIME"
operator: "LESS_THAN"
value: "5000" # Database checks may be slower

Use Case: Monitor CDN performance and asset availability.

- name: "CDN Asset Availability"
url: "https://cdn.example.com/assets/main.css"
frequency: "5m"
regions: ["us-east-1", "eu-west-1", "ap-southeast-1"]
assertions:
- type: "STATUS_CODE"
operator: "EQUALS"
value: "200"
- type: "RESPONSE_TIME"
operator: "LESS_THAN"
value: "1000"
- type: "RESPONSE_HEADER"
operator: "CONTAINS"
header: "Cache-Control"
value: "max-age"

Use Case: Monitor complex distributed systems with dependencies.

# Comprehensive health check with dependency monitoring
import requests
import json
from typing import Dict, List
class ServiceHealthMonitor:
def __init__(self, heartbeat_url: str):
self.heartbeat_url = heartbeat_url
self.services = {}
def check_service_health(self, name: str, url: str, timeout: int = 5) -> Dict:
"""Check individual service health"""
try:
start_time = time.time()
response = requests.get(url, timeout=timeout)
duration = (time.time() - start_time) * 1000
return {
"name": name,
"status": "healthy" if response.status_code == 200 else "unhealthy",
"response_time_ms": duration,
"status_code": response.status_code,
"details": response.json() if response.headers.get('content-type', '').startswith('application/json') else None
}
except Exception as e:
return {
"name": name,
"status": "unhealthy",
"error": str(e),
"response_time_ms": None
}
def run_health_checks(self):
"""Run health checks for all services"""
services_to_check = [
("database", "http://localhost:8080/health/db"),
("redis", "http://localhost:8080/health/redis"),
("rabbitmq", "http://localhost:8080/health/rabbitmq"),
("elasticsearch", "http://localhost:8080/health/elasticsearch")
]
results = []
overall_healthy = True
for name, url in services_to_check:
result = self.check_service_health(name, url)
results.append(result)
if result["status"] != "healthy":
overall_healthy = False
# Send comprehensive health report
payload = {
"overall_status": "healthy" if overall_healthy else "degraded",
"services": results,
"healthy_count": sum(1 for r in results if r["status"] == "healthy"),
"total_count": len(results),
"check_timestamp": datetime.utcnow().isoformat()
}
endpoint = self.heartbeat_url if overall_healthy else f"{self.heartbeat_url}/fail"
requests.post(endpoint, json=payload)
# Run every 5 minutes
if __name__ == "__main__":
monitor = ServiceHealthMonitor("https://pulse.9n9s.com/uuid-service-health")
monitor.run_health_checks()

Use Case: Monitor Kubernetes pods and deployments.

# Kubernetes CronJob with 9n9s monitoring
apiVersion: batch/v1
kind: CronJob
metadata:
name: data-backup-job
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: backup
image: postgres:15
env:
- name: HEARTBEAT_URL
valueFrom:
secretKeyRef:
name: monitoring-secrets
key: backup-heartbeat-url
command:
- /bin/bash
- -c
- |
# Signal job start
curl -fsS "${HEARTBEAT_URL}/start"
# Perform backup
pg_dump -h $DB_HOST -U $DB_USER $DB_NAME > /backup/backup_$(date +%Y%m%d_%H%M%S).sql
if [ $? -eq 0 ]; then
# Signal success
BACKUP_SIZE=$(du -h /backup/*.sql | tail -1 | cut -f1)
curl -fsS -X POST -d "{\"backup_size\": \"$BACKUP_SIZE\", \"pod\": \"$HOSTNAME\"}" "${HEARTBEAT_URL}"
else
# Signal failure
curl -fsS -X POST -d "{\"error\": \"Backup failed\", \"pod\": \"$HOSTNAME\"}" "${HEARTBEAT_URL}/fail"
exit 1
fi
restartPolicy: OnFailure

Use Case: Monitor third-party service integrations and APIs.

# Monitor multiple third-party services
class ThirdPartyServiceMonitor:
def __init__(self):
self.services = {
"stripe": {
"url": "https://api.stripe.com/v1/charges",
"headers": {"Authorization": f"Bearer {os.environ['STRIPE_SECRET_KEY']}"},
"heartbeat": "https://pulse.9n9s.com/uuid-stripe-integration"
},
"sendgrid": {
"url": "https://api.sendgrid.com/v3/user/profile",
"headers": {"Authorization": f"Bearer {os.environ['SENDGRID_API_KEY']}"},
"heartbeat": "https://pulse.9n9s.com/uuid-sendgrid-integration"
},
"aws_s3": {
"url": "https://s3.amazonaws.com",
"heartbeat": "https://pulse.9n9s.com/uuid-s3-integration"
}
}
def check_service(self, name: str, config: dict):
"""Check individual third-party service"""
try:
response = requests.get(
config["url"],
headers=config.get("headers", {}),
timeout=10
)
# Service-specific validation
if name == "stripe" and response.status_code == 200:
data = response.json()
payload = {
"service": "stripe",
"api_version": response.headers.get("Stripe-Version"),
"response_time_ms": response.elapsed.total_seconds() * 1000
}
requests.post(config["heartbeat"], json=payload)
elif name == "sendgrid" and response.status_code == 200:
payload = {
"service": "sendgrid",
"response_time_ms": response.elapsed.total_seconds() * 1000
}
requests.post(config["heartbeat"], json=payload)
except Exception as e:
# Signal failure
error_payload = {
"service": name,
"error": str(e),
"error_type": type(e).__name__
}
requests.post(f"{config['heartbeat']}/fail", json=error_payload)
  • Use appropriate grace periods to account for natural variance
  • Batch heartbeats for very frequent operations (every few seconds)
  • Monitor workers rather than individual jobs for high-volume queues
  • Multiple monitoring approaches: Combine heartbeat and uptime monitors
  • Staged alerting: Different thresholds for warnings vs. critical alerts
  • Redundant checks: Monitor from multiple angles and locations
  • Different schedules: More frequent checks in production
  • Environment-specific tags: Separate alerting rules by environment
  • Graduated response: Warnings in staging, immediate alerts in production
  • Shared monitoring dashboards: Keep teams informed of service health
  • Escalation procedures: Route alerts to appropriate team members
  • Documentation: Maintain runbooks linked to monitoring alerts

These use cases demonstrate the flexibility and power of 9n9s across different scenarios. Whether you’re monitoring simple cron jobs or complex distributed systems, 9n9s provides the visibility and reliability assurance you need to maintain operational confidence.