Intermediate15 min

Monitor Carrier Authority

A carrier's operating authority and insurance status can change at any time. This guide shows you how to build an automated monitoring pipeline that catches authority revocations, insurance lapses, and safety deterioration before they become your problem.

Why automated monitoring matters

22%

of carriers had an authority or insurance change within 12 months

$20M+

average nuclear verdict in trucking — often citing inadequate carrier vetting

4x daily

CarrierOk pulls from FMCSA source data — you always have the latest status

1

Add carriers to your monitoring list

The Monitoring API tracks carriers by their profile ID, which is the carrier's DOT number and docket number joined with a hyphen (e.g., 568253-MC277621).

curl -X POST "https://api.carrierok.com/v2/monitoring/add" \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "profile_ids": [
      "568253-MC277621",
      "2247158-MC764145",
      "1234567-MC999999"
    ]
  }'
Response200 OK
{
  "status": "Profiles added or updated successfully"
}

Building profile IDs

If you only have DOT numbers, look up each carrier first with the Profiles API and build the ID from dot_number + docket: f"{carrier['dot_number']}-{carrier['docket']}"
2

List your monitored carriers

Poll your monitoring feed for carriers with recent changes. In the response, each key of the items object is a monitored carrier profile ID, and its value lists the fields that changed since you last acknowledged them. Use this to drive your daily compliance checks.

curl "https://api.carrierok.com/v2/monitoring/list" \
  -H "Authorization: Bearer YOUR_API_KEY"
Response200 OK
{
  "total_count": 2,
  "items": {
    "568253-MC277621": [
      {
        "safety_rating_desc_current": "Satisfactory",
        "safety_rating_desc_prior": "Conditional",
        "safety_rating_desc_changed": true
      }
    ],
    "2247158-MC764145": [
      {
        "insurance_bipd_on_file_current": 1000000,
        "insurance_bipd_on_file_prior": 750000,
        "insurance_bipd_on_file_changed": true
      }
    ]
  }
}
3

Build a daily compliance check

This script pulls the carriers that changed, fetches a fresh profile for each, and checks it against a set of compliance rules. Run it on a daily cron, a Cloud Function timer, or any scheduler.

daily_check.py
import requests
import time
from datetime import datetime

API_KEY = "YOUR_API_KEY"
BASE = "https://api.carrierok.com"

def get_monitored_carriers():
    """Fetch the carriers that changed since we last acknowledged them."""
    resp = requests.get(
        f"{BASE}/v2/monitoring/list",
        headers={"Authorization": f"Bearer {API_KEY}"},
    )
    resp.raise_for_status()
    data = resp.json()
    return list(data["items"].keys())

def lookup_carrier(dot_number: str):
    """Fetch a fresh profile for a single carrier."""
    resp = requests.get(
        f"{BASE}/v2/profile",
        params={"dot_number": dot_number},
        headers={"Authorization": f"Bearer {API_KEY}"},
    )
    if resp.status_code == 429:
        time.sleep(2)
        return lookup_carrier(dot_number)
    resp.raise_for_status()
    items = resp.json()["items"]
    return items[0] if items else None

def check_carrier(carrier: dict) -> list[dict]:
    """Run compliance rules against a carrier profile.
    Returns a list of alerts (empty = all clear).
    """
    alerts = []
    dot = carrier["dot_number"]
    name = carrier["legal_name"]

    # 1. Authority status
    if carrier["authority_common"] != "Active":
        alerts.append({
            "severity": "critical",
            "rule": "authority_revoked",
            "message": f"{name} (DOT {dot}): common authority "
                       f"is {carrier['authority_common']}",
        })

    # 2. Insurance coverage
    bipd_on_file = int(carrier.get("insurance_bipd_on_file", 0))
    bipd_required = int(carrier.get("insurance_bipd_required", 0))
    if bipd_required > 0 and bipd_on_file < bipd_required:
        alerts.append({
            "severity": "critical",
            "rule": "insurance_lapse",
            "message": f"{name} (DOT {dot}): BIPD on file "
                       f"(${bipd_on_file:,}) < required (${bipd_required:,})",
        })

    # 3. ISS score spike
    iss = int(carrier.get("iss_value", 0))
    if iss >= 75:
        alerts.append({
            "severity": "high",
            "rule": "iss_high",
            "message": f"{name} (DOT {dot}): ISS score is {iss} "
                       f"(high risk threshold: 75)",
        })

    # 4. Vehicle OOS rate above national average
    veh_oos = float(carrier.get("inspections_vehicle_out_of_service_pct", 0))
    natl_avg = float(carrier.get("natl_avg_oos_vehicle", 0.22))
    if veh_oos > natl_avg * 1.5:
        alerts.append({
            "severity": "medium",
            "rule": "oos_elevated",
            "message": f"{name} (DOT {dot}): vehicle OOS rate "
                       f"{veh_oos:.1%} vs national avg {natl_avg:.1%}",
        })

    # 5. New entrant (< 18 months / 540 days)
    dot_age = int(carrier.get("dot_age", 9999))
    if dot_age < 540:
        alerts.append({
            "severity": "info",
            "rule": "new_entrant",
            "message": f"{name} (DOT {dot}): new entrant — "
                       f"registered {dot_age} days ago",
        })

    return alerts

def run_daily_check():
    """Main monitoring loop."""
    print(f"[{datetime.now():%Y-%m-%d %H:%M}] Starting daily check")

    profile_ids = get_monitored_carriers()
    print(f"  Monitoring {len(profile_ids)} carriers")

    all_alerts = []

    for pid in profile_ids:
        dot = pid.split("-")[0]
        carrier = lookup_carrier(dot)
        if not carrier:
            all_alerts.append({
                "severity": "warning",
                "rule": "not_found",
                "message": f"DOT {dot}: carrier not found",
            })
            continue

        alerts = check_carrier(carrier)
        all_alerts.extend(alerts)

    # Summary
    critical = [a for a in all_alerts if a["severity"] == "critical"]
    print(f"  Finished: {len(all_alerts)} alerts "
          f"({len(critical)} critical)")

    if all_alerts:
        send_alerts(all_alerts)

    return all_alerts

def send_alerts(alerts: list[dict]):
    """Send alerts via your preferred channel."""
    # Replace with your webhook, email, or Slack integration
    for alert in alerts:
        level = alert["severity"].upper()
        print(f"  [{level}] {alert['message']}")

if __name__ == "__main__":
    run_daily_check()

Rate limiting

Each profile lookup counts as one API request. If you're monitoring hundreds of carriers, add a small delay between requests or use the batch approach described in the Rate Limits docs. The script above includes automatic retry on 429 responses.
4

Define your monitoring rules

The script above includes five core rules. Here's the full set of fields you can monitor and why each matters:

RuleFieldCondition
Authority revokedauthority_common!= "Active"
BIPD lapseinsurance_bipd_on_file< insurance_bipd_required
ISS spikeiss_value>= 75
OOS elevatedinspections_vehicle_out_of_service_pct> natl_avg × 1.5
BASIC alertbasic_alert_unsafe_driving== true
Safety downgradesafety_rating_desc== "Unsatisfactory"
New entrantdot_age< 540
Stale MCS-150mcs150_date> 2 years ago

See the ISS score, BASIC percentile, and OOS rate glossary entries for context on these thresholds.

5

Connect alerting

Replace the send_alerts() function with your preferred channel. Here are common patterns:

Slack webhook

Python
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../xxx"

def send_alerts(alerts: list[dict]):
    severity_emoji = {
        "critical": ":rotating_light:",
        "high": ":warning:",
        "medium": ":large_yellow_circle:",
        "info": ":information_source:",
    }

    blocks = []
    for alert in alerts:
        emoji = severity_emoji.get(alert["severity"], ":white_circle:")
        blocks.append({
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"{emoji} *[{alert['severity'].upper()}]* "
                        f"{alert['message']}",
            },
        })

    requests.post(SLACK_WEBHOOK, json={"blocks": blocks})

Generic webhook

Python
WEBHOOK_URL = "https://your-app.com/api/carrier-alerts"

def send_alerts(alerts: list[dict]):
    requests.post(
        WEBHOOK_URL,
        json={
            "source": "carrierok-monitor",
            "timestamp": datetime.utcnow().isoformat(),
            "alerts": alerts,
        },
        headers={"Authorization": "Bearer YOUR_WEBHOOK_SECRET"},
    )

Scheduling

For a simple setup, use a cron job (0 6 * * * for 6 AM daily). For cloud deployments, use GCP Cloud Scheduler + Cloud Functions, AWS EventBridge + Lambda, or any CI/CD platform with scheduled workflows.

The complete workflow

1

Add carriers

POST /v2/monitoring/add

2

Daily cron runs

Fetch profiles

3

Check rules

Authority, insurance, safety

4

Alert

Slack, webhook, email