Monitor Carrier Authority
A carrier's operating authority and insurance status can change at any time. This guide shows you how to build an automated monitoring pipeline that catches authority revocations, insurance lapses, and safety deterioration before they become your problem.
Why automated monitoring matters
22%
of carriers had an authority or insurance change within 12 months
$20M+
average nuclear verdict in trucking — often citing inadequate carrier vetting
4x daily
CarrierOk pulls from FMCSA source data — you always have the latest status
Add carriers to your monitoring list
The Monitoring API tracks carriers by their profile ID, which is the carrier's DOT number and docket number joined with a hyphen (e.g., 568253-MC277621).
curl -X POST "https://api.carrierok.com/v2/monitoring/add" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"profile_ids": [
"568253-MC277621",
"2247158-MC764145",
"1234567-MC999999"
]
}'{
"status": "Profiles added or updated successfully"
}Building profile IDs
dot_number + docket: f"{carrier['dot_number']}-{carrier['docket']}"List your monitored carriers
Poll your monitoring feed for carriers with recent changes. In the response, each key of the items object is a monitored carrier profile ID, and its value lists the fields that changed since you last acknowledged them. Use this to drive your daily compliance checks.
curl "https://api.carrierok.com/v2/monitoring/list" \
-H "Authorization: Bearer YOUR_API_KEY"{
"total_count": 2,
"items": {
"568253-MC277621": [
{
"safety_rating_desc_current": "Satisfactory",
"safety_rating_desc_prior": "Conditional",
"safety_rating_desc_changed": true
}
],
"2247158-MC764145": [
{
"insurance_bipd_on_file_current": 1000000,
"insurance_bipd_on_file_prior": 750000,
"insurance_bipd_on_file_changed": true
}
]
}
}Build a daily compliance check
This script pulls the carriers that changed, fetches a fresh profile for each, and checks it against a set of compliance rules. Run it on a daily cron, a Cloud Function timer, or any scheduler.
import requests
import time
from datetime import datetime
API_KEY = "YOUR_API_KEY"
BASE = "https://api.carrierok.com"
def get_monitored_carriers():
"""Fetch the carriers that changed since we last acknowledged them."""
resp = requests.get(
f"{BASE}/v2/monitoring/list",
headers={"Authorization": f"Bearer {API_KEY}"},
)
resp.raise_for_status()
data = resp.json()
return list(data["items"].keys())
def lookup_carrier(dot_number: str):
"""Fetch a fresh profile for a single carrier."""
resp = requests.get(
f"{BASE}/v2/profile",
params={"dot_number": dot_number},
headers={"Authorization": f"Bearer {API_KEY}"},
)
if resp.status_code == 429:
time.sleep(2)
return lookup_carrier(dot_number)
resp.raise_for_status()
items = resp.json()["items"]
return items[0] if items else None
def check_carrier(carrier: dict) -> list[dict]:
"""Run compliance rules against a carrier profile.
Returns a list of alerts (empty = all clear).
"""
alerts = []
dot = carrier["dot_number"]
name = carrier["legal_name"]
# 1. Authority status
if carrier["authority_common"] != "Active":
alerts.append({
"severity": "critical",
"rule": "authority_revoked",
"message": f"{name} (DOT {dot}): common authority "
f"is {carrier['authority_common']}",
})
# 2. Insurance coverage
bipd_on_file = int(carrier.get("insurance_bipd_on_file", 0))
bipd_required = int(carrier.get("insurance_bipd_required", 0))
if bipd_required > 0 and bipd_on_file < bipd_required:
alerts.append({
"severity": "critical",
"rule": "insurance_lapse",
"message": f"{name} (DOT {dot}): BIPD on file "
f"(${bipd_on_file:,}) < required (${bipd_required:,})",
})
# 3. ISS score spike
iss = int(carrier.get("iss_value", 0))
if iss >= 75:
alerts.append({
"severity": "high",
"rule": "iss_high",
"message": f"{name} (DOT {dot}): ISS score is {iss} "
f"(high risk threshold: 75)",
})
# 4. Vehicle OOS rate above national average
veh_oos = float(carrier.get("inspections_vehicle_out_of_service_pct", 0))
natl_avg = float(carrier.get("natl_avg_oos_vehicle", 0.22))
if veh_oos > natl_avg * 1.5:
alerts.append({
"severity": "medium",
"rule": "oos_elevated",
"message": f"{name} (DOT {dot}): vehicle OOS rate "
f"{veh_oos:.1%} vs national avg {natl_avg:.1%}",
})
# 5. New entrant (< 18 months / 540 days)
dot_age = int(carrier.get("dot_age", 9999))
if dot_age < 540:
alerts.append({
"severity": "info",
"rule": "new_entrant",
"message": f"{name} (DOT {dot}): new entrant — "
f"registered {dot_age} days ago",
})
return alerts
def run_daily_check():
"""Main monitoring loop."""
print(f"[{datetime.now():%Y-%m-%d %H:%M}] Starting daily check")
profile_ids = get_monitored_carriers()
print(f" Monitoring {len(profile_ids)} carriers")
all_alerts = []
for pid in profile_ids:
dot = pid.split("-")[0]
carrier = lookup_carrier(dot)
if not carrier:
all_alerts.append({
"severity": "warning",
"rule": "not_found",
"message": f"DOT {dot}: carrier not found",
})
continue
alerts = check_carrier(carrier)
all_alerts.extend(alerts)
# Summary
critical = [a for a in all_alerts if a["severity"] == "critical"]
print(f" Finished: {len(all_alerts)} alerts "
f"({len(critical)} critical)")
if all_alerts:
send_alerts(all_alerts)
return all_alerts
def send_alerts(alerts: list[dict]):
"""Send alerts via your preferred channel."""
# Replace with your webhook, email, or Slack integration
for alert in alerts:
level = alert["severity"].upper()
print(f" [{level}] {alert['message']}")
if __name__ == "__main__":
run_daily_check()Rate limiting
Define your monitoring rules
The script above includes five core rules. Here's the full set of fields you can monitor and why each matters:
| Rule | Field | Condition |
|---|---|---|
| Authority revoked | authority_common | != "Active" |
| BIPD lapse | insurance_bipd_on_file | < insurance_bipd_required |
| ISS spike | iss_value | >= 75 |
| OOS elevated | inspections_vehicle_out_of_service_pct | > natl_avg × 1.5 |
| BASIC alert | basic_alert_unsafe_driving | == true |
| Safety downgrade | safety_rating_desc | == "Unsatisfactory" |
| New entrant | dot_age | < 540 |
| Stale MCS-150 | mcs150_date | > 2 years ago |
See the ISS score, BASIC percentile, and OOS rate glossary entries for context on these thresholds.
Connect alerting
Replace the send_alerts() function with your preferred channel. Here are common patterns:
Slack webhook
SLACK_WEBHOOK = "https://hooks.slack.com/services/T.../B.../xxx"
def send_alerts(alerts: list[dict]):
severity_emoji = {
"critical": ":rotating_light:",
"high": ":warning:",
"medium": ":large_yellow_circle:",
"info": ":information_source:",
}
blocks = []
for alert in alerts:
emoji = severity_emoji.get(alert["severity"], ":white_circle:")
blocks.append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"{emoji} *[{alert['severity'].upper()}]* "
f"{alert['message']}",
},
})
requests.post(SLACK_WEBHOOK, json={"blocks": blocks})Generic webhook
WEBHOOK_URL = "https://your-app.com/api/carrier-alerts"
def send_alerts(alerts: list[dict]):
requests.post(
WEBHOOK_URL,
json={
"source": "carrierok-monitor",
"timestamp": datetime.utcnow().isoformat(),
"alerts": alerts,
},
headers={"Authorization": "Bearer YOUR_WEBHOOK_SECRET"},
)Scheduling
0 6 * * * for 6 AM daily). For cloud deployments, use GCP Cloud Scheduler + Cloud Functions, AWS EventBridge + Lambda, or any CI/CD platform with scheduled workflows.The complete workflow
Add carriers
POST /v2/monitoring/add
Daily cron runs
Fetch profiles
Check rules
Authority, insurance, safety
Alert
Slack, webhook, email