Build a Carrier Vetting View
Insurance underwriters, freight brokers, and compliance teams evaluate carriers across five dimensions: authority, insurance, safety, fleet operations, and fraud risk. This guide shows you how to extract each signal from a single API response and assemble a production-ready vetting function.
What you'll build
Authority Check
Active status, age, revocations, new entrant flag
Insurance Validation
BIPD vs required, cargo coverage, indicator flags
Safety Score
ISS tiers, all 7 BASICs, OOS rates, crash history
Fleet Analysis
Power units, drivers, CDL count, MCS-150 freshness
Fraud Detection
Risk score, chameleon patterns, contact churn
Overall Verdict
Pass / Review / Fail with reasons
Prerequisites
Fetch the carrier profile
Start with a DOT number lookup. A single API call returns everything you need — identity, authority, insurance, safety, fleet, and contact history.
import requests
API_KEY = "YOUR_API_KEY"
def fetch_carrier(dot_number: str) -> dict | None:
"""Fetch a full carrier profile by DOT number."""
resp = requests.get(
"https://api.carrierok.com/v2/profile",
params={"dot_number": dot_number},
headers={"Authorization": f"Bearer {API_KEY}"},
)
resp.raise_for_status()
items = resp.json()["items"]
return items[0] if items else None
carrier = fetch_carrier("568253")
print(f"Evaluating: {carrier['legal_name']}")The profile for Hansen & Adkins (DOT 568253) returns 280+ fields. The sections below show exactly which fields to check and how to interpret them.
Verify authority status
Operating authority is the first gate. If it isn't active, the carrier cannot legally haul regulated commodities for hire.
def check_authority(c: dict) -> dict:
"""Check authority status, age, and revocation history."""
issues = []
# Active authority required
if c["authority_common"] != "Active":
issues.append(f"Common authority: {c['authority_common']}")
if c["authority_contract"] not in ("Active", "None"):
issues.append(f"Contract authority: {c['authority_contract']}")
# Authority age — flag new entrants (< 18 months)
age_days = int(c.get("authority_age_common", 0))
age_months = age_days // 30
# Revocation history
revocations = int(c.get("total_revocations", 0))
if revocations > 0:
issues.append(
f"{revocations} prior revocation(s), "
f"last {c.get('days_since_last_revocation', '?')} days ago"
)
return {
"status": c["authority_common"],
"age_days": age_days,
"age_months": age_months,
"is_new_entrant": age_days < 540,
"revocations": revocations,
"pass": c["authority_common"] == "Active" and len(issues) == 0,
"issues": issues,
}| Field | What to check |
|---|---|
authority_common | Must be "Active" |
authority_age_common | < 540 days = new entrant |
total_revocations | > 0 is a red flag |
authority_common_pending | true = authority under review |
indicator_authority | true = authority is active & valid |
Validate insurance coverage
BIPD coverage is the minimum liability insurance required by FMCSA. Carriers with coverage below the required minimum are a compliance and liability risk.
def check_insurance(c: dict) -> dict:
"""Validate BIPD and cargo insurance coverage."""
issues = []
bipd_on_file = int(c.get("insurance_bipd_on_file", 0))
bipd_required = int(c.get("insurance_bipd_required", 0))
cargo_on_file = int(c.get("insurance_cargo_on_file", 0))
# BIPD must meet or exceed required minimum
bipd_adequate = bipd_on_file >= bipd_required if bipd_required > 0 else True
if not bipd_adequate:
issues.append(
f"BIPD on file (${bipd_on_file:,}) "
f"below required (${bipd_required:,})"
)
# Cargo insurance (not always required, but good to flag if absent)
cargo_required = int(c.get("insurance_cargo_required", 0))
if cargo_required > 0 and cargo_on_file < cargo_required:
issues.append(
f"Cargo on file (${cargo_on_file:,}) "
f"below required (${cargo_required:,})"
)
return {
"bipd_on_file": bipd_on_file,
"bipd_required": bipd_required,
"bipd_adequate": bipd_adequate,
"cargo_on_file": cargo_on_file,
"indicator": c.get("indicator_insurance", False),
"pass": bipd_adequate and len(issues) == 0,
"issues": issues,
}BIPD minimums by cargo type
insurance_bipd_requiredfield already reflects the carrier's specific requirement based on their authority type. See the BIPD glossary entry for the full table.Assess safety risk
Safety scoring uses three data sources: the ISS score (composite risk), individual BASIC percentiles (7 categories), and OOS rates (inspection outcomes). Together they give you a complete picture of carrier safety.
BASIC_CATEGORIES = [
"unsafe_driving",
"hours_of_service",
"vehicle_maintence",
"controlled_substance",
"driver_fitness",
"hazardous_materials",
"crash_indicator",
]
# FMCSA intervention thresholds (higher = worse)
BASIC_THRESHOLDS = {
"unsafe_driving": 0.65,
"hours_of_service": 0.65,
"vehicle_maintence": 0.80,
"controlled_substance": 0.65,
"driver_fitness": 0.65,
"hazardous_materials": 0.65,
"crash_indicator": 0.65,
}
def check_safety(c: dict) -> dict:
"""Evaluate ISS score, BASICs, and OOS rates."""
issues = []
# ── ISS Score ────────────────────────────────
iss = int(c.get("iss_value", 0))
if iss >= 75:
iss_tier = "high"
issues.append(f"ISS score {iss} — high risk")
elif iss >= 50:
iss_tier = "elevated"
issues.append(f"ISS score {iss} — elevated risk")
elif iss >= 25:
iss_tier = "moderate"
else:
iss_tier = "low"
# ── BASIC Percentiles ────────────────────────
basics = {}
basic_alerts = []
for cat in BASIC_CATEGORIES:
pct_key = f"basic_percentile_{cat}"
alert_key = f"basic_alert_{cat}"
pct = c.get(pct_key)
if pct is not None:
pct = float(pct)
basics[cat] = pct
threshold = BASIC_THRESHOLDS[cat]
if pct >= threshold:
alert_label = cat.replace("_", " ").title()
basic_alerts.append(alert_label)
issues.append(
f"BASIC {alert_label}: "
f"{pct:.0%} >= {threshold:.0%} threshold"
)
# ── OOS Rates vs National Average ────────────
veh_oos = float(c.get("inspections_vehicle_out_of_service_pct", 0))
drv_oos = float(c.get("inspections_driver_out_of_service_pct", 0))
natl_veh = float(c.get("natl_avg_oos_vehicle", 0.22))
natl_drv = float(c.get("natl_avg_oos_driver", 0.067))
if veh_oos > natl_veh * 1.5:
issues.append(
f"Vehicle OOS rate {veh_oos:.1%} "
f"exceeds 1.5x national avg ({natl_veh:.1%})"
)
# ── Crash History ────────────────────────────
crashes = int(c.get("crashes_total", 0))
fatalities = int(c.get("crash_fatalities", 0))
return {
"iss_score": iss,
"iss_tier": iss_tier,
"risk_score": c.get("risk_score", "Unknown"),
"basics": basics,
"basic_alerts": basic_alerts,
"vehicle_oos_rate": veh_oos,
"driver_oos_rate": drv_oos,
"crashes_total": crashes,
"crash_fatalities": fatalities,
"safety_rating": c.get("safety_rating_desc", "Not Rated"),
"pass": iss < 50 and len(basic_alerts) == 0,
"issues": issues,
}ISS risk tiers
| ISS Range | Tier | Interpretation |
|---|---|---|
| 1 – 24 | Low | Below average inspection priority — relatively safer |
| 25 – 49 | Moderate | Average risk — review underlying BASICs |
| 50 – 74 | Elevated | Above average — likely BASIC threshold breaches |
| 75 – 100 | High | Top quartile risk — expect frequent inspections |
All 7 BASICs
basic_percentile_crash_indicator and basic_percentile_hazardous_materials fields are exclusive to CarrierOk.Analyze fleet and operations
Fleet composition reveals operational capacity and regulatory freshness. Power units (not total vehicle count) is the industry-standard fleet size metric.
from datetime import datetime, timedelta
def check_fleet(c: dict) -> dict:
"""Analyze fleet size, composition, and MCS-150 freshness."""
issues = []
power_units = int(c.get("total_power_units", 0))
drivers = int(c.get("total_drivers", 0))
cdl_drivers = int(c.get("total_drivers_cdl", 0))
# MCS-150 freshness — should be updated every 2 years
mcs150_date = c.get("mcs150_date", "")
mcs150_stale = False
if mcs150_date:
last_update = datetime.strptime(mcs150_date, "%Y-%m-%d")
if datetime.now() - last_update > timedelta(days=730):
mcs150_stale = True
issues.append(
f"MCS-150 last updated {mcs150_date} "
f"(over 2 years ago)"
)
# Fleet size category
if power_units == 0:
fleet_size = "unknown"
issues.append("No power units reported")
elif power_units <= 5:
fleet_size = "owner-operator"
elif power_units <= 50:
fleet_size = "small"
elif power_units <= 500:
fleet_size = "mid-size"
else:
fleet_size = "large"
return {
"power_units": power_units,
"drivers": drivers,
"cdl_drivers": cdl_drivers,
"fleet_size": fleet_size,
"cargo_carried": c.get("cargo_carried", ""),
"mcs150_date": mcs150_date,
"mcs150_stale": mcs150_stale,
"pass": not mcs150_stale and power_units > 0,
"issues": issues,
}A stale MCS-150 doesn't necessarily mean the carrier is unsafe, but it does indicate they may not be actively maintaining compliance — worth flagging for review.
Detect fraud signals
CarrierOk's risk_score field synthesizes 50+ signals into a single chameleon carrier risk classification. The contact_history array lets you detect suspicious patterns yourself.
def check_fraud(c: dict) -> dict:
"""Detect fraud indicators and chameleon carrier patterns."""
issues = []
risk = c.get("risk_score", "Unknown")
risk_prob = c.get("risk_score_probability", 0)
if risk in ("High", "Very High"):
issues.append(f"Risk score: {risk} ({risk_prob:.0%} probability)")
# Contact churn — frequent changes suggest identity manipulation
name_changes = int(c.get("name_change_count", 0))
phone_changes = int(c.get("phone_change_count", 0))
address_changes = int(c.get("address_change_count", 0))
email_changes = int(c.get("email_change_count", 0))
total_changes = name_changes + phone_changes + address_changes
if total_changes > 10:
issues.append(
f"High contact churn: {name_changes} name, "
f"{phone_changes} phone, {address_changes} address changes"
)
# Revocation + young authority = possible chameleon
dot_age = int(c.get("dot_age", 9999))
revocations = int(c.get("total_revocations", 0))
if dot_age < 540 and revocations > 0:
issues.append(
"Possible chameleon: new entity with prior revocations"
)
return {
"risk_score": risk,
"risk_probability": risk_prob,
"contact_changes": total_changes,
"name_changes": name_changes,
"phone_changes": phone_changes,
"address_changes": address_changes,
"pass": risk not in ("High", "Very High") and len(issues) == 0,
"issues": issues,
}Contact history depth
contact_history array contains every known address, phone, email, and name change with date ranges. CarrierOk cross-references this data across 4M+ entities to detect shared identifiers — a key chameleon carrier signal.Assemble the verdict
Combine all checks into a single vetting function that returns a verdict with reasons. This is the function you call from your underwriting platform, broker TMS, or compliance dashboard.
def vet_carrier(dot_number: str) -> dict:
"""Complete carrier vetting — returns verdict with reasons.
Verdict:
- PASS: All checks clear
- REVIEW: Non-critical issues found
- FAIL: Critical issues (authority/insurance)
"""
carrier = fetch_carrier(dot_number)
if not carrier:
return {"verdict": "FAIL", "reason": "Carrier not found"}
authority = check_authority(carrier)
insurance = check_insurance(carrier)
safety = check_safety(carrier)
fleet = check_fleet(carrier)
fraud = check_fraud(carrier)
# Collect all issues
all_issues = (
authority["issues"]
+ insurance["issues"]
+ safety["issues"]
+ fleet["issues"]
+ fraud["issues"]
)
# Determine verdict
if not authority["pass"] or not insurance["pass"]:
verdict = "FAIL"
elif not safety["pass"] or not fraud["pass"]:
verdict = "REVIEW"
elif len(all_issues) > 0:
verdict = "REVIEW"
else:
verdict = "PASS"
return {
"verdict": verdict,
"dot_number": dot_number,
"legal_name": carrier["legal_name"],
"authority": authority,
"insurance": insurance,
"safety": safety,
"fleet": fleet,
"fraud": fraud,
"issues": all_issues,
}
# ── Example usage ────────────────────────────────────
result = vet_carrier("568253")
print(f"\n{'='*50}")
print(f"CARRIER: {result['legal_name']}")
print(f"DOT: {result['dot_number']}")
print(f"VERDICT: {result['verdict']}")
print(f"{'='*50}")
if result["issues"]:
print(f"\nIssues ({len(result['issues'])}):")
for issue in result["issues"]:
print(f" - {issue}")
else:
print("\nNo issues found.")
print(f"\nAuthority: {result['authority']['status']} "
f"({result['authority']['age_months']} months)")
print(f"Insurance: BIPD ${result['insurance']['bipd_on_file']:,} "
f"/ ${result['insurance']['bipd_required']:,}")
print(f"Safety: ISS {result['safety']['iss_score']} "
f"({result['safety']['iss_tier']})")
print(f"Fleet: {result['fleet']['power_units']} power units, "
f"{result['fleet']['drivers']} drivers")
print(f"Fraud: {result['fraud']['risk_score']} risk")Example output
==================================================
CARRIER: HANSEN & ADKINS AUTO TRANSPORT INC
DOT: 568253
VERDICT: REVIEW
==================================================
Issues (2):
- 2 prior revocation(s), last 7954 days ago
- Vehicle OOS rate 24.0% exceeds 1.5x national avg (22.3%)
Authority: Active (264 months)
Insurance: BIPD $1,000,000 / $1,000,000
Safety: ISS 45 (moderate)
Fleet: 897 power units, 897 drivers
Fraud: Low riskHansen & Adkins gets a REVIEW verdict — authority and insurance are solid, but there are 2 historical revocations and a slightly elevated vehicle OOS rate. An experienced underwriter would likely approve this carrier given the 20+ year authority history, Satisfactory safety rating, and low fraud risk.
Putting it all together
You now have a complete carrier vetting pipeline that checks:
Authority
Status, age, revocations
Insurance
BIPD vs required
Safety
ISS, BASICs, OOS
Fleet
Units, MCS-150
Fraud
Risk score, churn
Combine this with the monitoring guide to run this vetting logic daily across your entire carrier portfolio.