Nautobot Workshop Blog Series - Part 16 - Nautobot Device Lifecycle Management - Part 3
From Live Advisories to Persisted CVEs: Cisco PSIRT, Arista, and EoX
This week we wire the pipeline into real vendor sources. You’ll see how the job authenticates to Cisco PSIRT, maps OS versions to what Cisco expects, scrapes Arista advisories, normalizes dates and CVSS, and finally persists CVEs and Vulnerabilities in Nautobot’s DLM models. We’ll also pull Cisco EoX (hardware lifecycle) for devices and tie it all together with a run() orchestration that supports dry-run safely.
Pulling the Cisco EoX API has been difficult to figure out what Credentials are required for this, so for now it doesn’t work because the API calls are failing.
Cisco PSIRT OAuth: Getting a Token
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def get_cisco_token(self) -> str | None:
client_id, client_secret = self._get_cisco_credentials()
if not client_id or not client_secret:
self.logger.warning("Missing Cisco API credentials in env: CISCO_CLIENT_ID and CISCO_CLIENT_SECRET.")
return None
headers = {"Content-Type": "application/x-www-form-urlencoded"}
okta_url = "https://id.cisco.com/oauth2/default/v1/token"
for data in (
{"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "audience": "https://api.cisco.com"},
{"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret},
):
try:
r = requests.post(okta_url, data=data, headers=headers, timeout=30)
if r.ok:
token = r.json().get("access_token")
if token:
return token
else:
self.logger.warning(f"Cisco token (Okta) attempt failed: {r.status_code} {r.text[:200]}")
except Exception as exc:
self.logger.warning(f"Cisco token request error (Okta): {exc}")
legacy_url = "https://cloudsso.cisco.com/as/token.oauth2"
try:
r = requests.post(legacy_url, data={"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret}, headers=headers, timeout=30)
if r.ok:
token = r.json().get("access_token")
if token:
return token
else:
self.logger.warning(f"Cisco token fetch failed (legacy): {r.status_code} {r.text[:200]}")
except Exception as exc:
self.logger.warning(f"Cisco token request error (legacy): {exc}")
return None
get_cisco_token negotiates a bearer token using Cisco’s Okta OAuth endpoint, then falls back to the legacy CloudSSO if needed.
- Tries Okta first (id.cisco.com/oauth2/default/v1/token), with and without an audience param.
- Falls back to legacy (cloudsso.cisco.com/as/token.oauth2) as a last resort.
- Uses environment-provided credentials (CISCO_CLIENT_ID, CISCO_CLIENT_SECRET).
- Logs partial response text on failure for debugging (first 200 chars).
What’s Happening?
- The method posts x-www-form-urlencoded credentials for grant_type=client_credentials.
- Any working response yields an access_token string.
- Errors are non-fatal; the job continues without Cisco data if token retrieval fails.
Caveats
- Make sure the credentials are valid and have PSIRT API access enabled in your Cisco developer account.
- Timeouts are set to 30s; adjust if your environment is proxy-constrained.
- Prefer Okta; legacy endpoint remains as compatibility fallback.
Mapping Device Versions to Cisco’s Expected Versions
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@staticmethod
def _parse_version_tuple(version_text: str) -> tuple[int | None, int | None, int | None]:
s = version_text.strip()
m = re.search(r"(\d+)\.(\d+)(?:\.(\d+))?", s)
if not m:
return (None, None, None)
major = int(m.group(1))
minor = int(m.group(2))
patch = int(m.group(3)) if m.group(3) else None
if patch is None:
m2 = re.search(r"\(\s*(\d+)\s*\)", s)
if m2:
patch = int(m2.group(1))
return (major, minor, patch)
@staticmethod
def _parse_available_version_tuple(v: str) -> tuple[int | None, int | None, int | None]:
m = re.match(r"^\s*(\d+)\.(\d+)(?:\.\s*(\d+))?", v.strip())
if not m:
m = re.match(r"^\s*(\d+)\.(\d+)(?:\.(\d+))?", v.strip())
if not m:
return (None, None, None)
return (int(m.group(1)), int(m.group(2)), int(m.group(3)) if m.group(3) else None)
def _select_best_available_version(self, desired: tuple[int | None, int | None, int | None], available: list[str]) -> str | None:
dmaj, dmin, dpat = desired
if dmaj is None or dmin is None:
return None
candidates = []
for v in available:
maj, minr, pat = self._parse_available_version_tuple(v)
if maj == dmaj and minr == dmin:
candidates.append((pat if pat is not None else -1, v))
if not candidates:
return None
if dpat is not None:
exacts = [v for (p, v) in candidates if p == dpat]
if exacts:
return sorted(exacts)[-1]
candidates.sort(key=lambda x: (x[0], x[1]))
return candidates[-1][1]
def _get_cisco_versions(self, token: str, ostype: str) -> list[str]:
if not hasattr(self, "_cisco_versions_cache"):
self._cisco_versions_cache = {}
if ostype in self._cisco_versions_cache:
return self._cisco_versions_cache[ostype]
url = "https://apix.cisco.com/security/advisories/v2/OS_version/OS_data"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json", "User-Agent": "nautobot-vendor-cve-job"}
versions = set()
try:
resp = requests.get(url, headers=headers, params={"OSType": ostype}, timeout=30)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
self._cisco_versions_cache[ostype] = []
self.logger.warning(f"Cisco OS_version list request error for {ostype}: {exc}")
return []
def _walk(x):
if isinstance(x, str):
if re.match(r"^\d+\.\d+(?:\.\d+)?[A-Za-z0-9.\-()]*$", x):
versions.add(x)
elif isinstance(x, list):
for y in x:
_walk(y)
elif isinstance(x, dict):
for v in x.values():
_walk(v)
_walk(data)
sorted_versions = sorted(
versions,
key=lambda vs: (
tuple(val if val is not None else -1 for val in self._parse_available_version_tuple(vs)),
vs,
)
)
self._cisco_versions_cache[ostype] = sorted_versions
return sorted_versions
def _map_version_for_cisco(self, token: str, ostype: str, sw_version: str) -> tuple[str | None, str]:
desired = self._parse_version_tuple(sw_version)
versions = self._get_cisco_versions(token, ostype)
best = self._select_best_available_version(desired, versions)
if best:
return best, f"Mapped device '{sw_version}' -> '{best}' for OSType {ostype}"
dmaj, dmin, _ = desired
if dmaj is not None and dmin is not None:
fallback = f"{dmaj}.{dmin}"
return fallback, f"No exact {dmaj}.{dmin}.* match found. Falling back to '{fallback}' for OSType {ostype}"
return None, "Could not parse a usable version from device; skipping Cisco query."
Cisco’s API requires versions formatted exactly as Cisco enumerates them per OSType (e.g., iosxe). These helpers normalize device-reported versions to the “closest” Cisco-recognized value.
key pieces:
- _parse_version_tuple: Pulls major, minor, and a patch-like number. For Cisco IOS “parentheses” releases (e.g., 12.2(55)), it treats the number in parentheses as patch if no - dotted patch exists.
- _get_cisco_versions: Calls OS_version/OS_data and walks the returned JSON to collect all version strings Cisco recognizes for that OSType. Results are cached per OSType for - reuse.
- _select_best_available_version: Given a desired (major, minor, patch) and a list of available strings, picks the best candidate:
- Only considers same major.minor.
- Prefers an exact patch match. If none, chooses the highest patch in that branch.
- _map_version_for_cisco: Orchestrates the above:
- Returns the selected best match.
- If none found, falls back to “major.minor” (e.g., 16.12).
- Logs a human-readable mapping message for transparency.
Caveats
- Version parsing is heuristic; weird strings can slip through. The code errors on being permissive, then degrades to major.minor.
- The OS_version/OS_data endpoint is a “catalog” of acceptable versions, not advisories; we use it to avoid 404s and mismatches later.
- Maintain a consistent User-Agent and caching to reduce rate-limit exposure.
Cisco Advisories: Pulling CVEs By OSType + Version
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def pull_cisco_cves(self, software: SoftwareLCM | SimpleNamespace, device: Device) -> list[dict]:
token = self.get_cisco_token()
if not token:
return []
ostype = self._map_platform_to_cisco_ostype(device)
if not ostype:
self.logger.info(f"{device.name}: Platform '{device.platform.name if device.platform else ''}' not mapped to a Cisco OSType; skipping Cisco PSIRT.")
return []
mapped_version, mapping_msg = self._map_version_for_cisco(token, ostype, software.version)
if not mapped_version:
self.logger.warning(f"{device.name}: {mapping_msg}")
return []
self.logger.info(f"{device.name}: {mapping_msg}")
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json", "User-Agent": "nautobot-vendor-cve-job"}
base_url = f"https://apix.cisco.com/security/advisories/v2/OSType/{ostype}"
try:
resp = requests.get(base_url, headers=headers, params={"version": mapped_version}, timeout=30)
resp.raise_for_status()
data = resp.json()
except Exception as exc:
self.logger.warning(f"Cisco API v2 failed ({ostype}, v={mapped_version}): {exc}")
return []
advisories = data.get("advisories", []) or []
self.logger.info(f"Cisco API v2 returned {len(advisories)} advisories for {device.name} ({ostype} v={mapped_version}).")
return advisories
pull_cisco_cves drives the PSIRT call once we have a token, an OSType (e.g., iosxe), and a mapped version.
What’s Happening?
- OSType is derived from platform via last week’s _map_platform_to_cisco_ostype helper.
- Version is mapped through the logic above to Cisco’s expectation.
- Calls https://apix.cisco.com/security/advisories/v2/OSType/{ostype}?version= …
- Returns a list of advisory dictionaries (data[“advisories”]) and logs the count.
Caveats
- If platform isn’t Cisco-like, or token fails, it returns an empty list cleanly.
- Network and API errors are caught and logged; the job continues on other devices.
Arista Advisories: Lightweight Scrape
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def pull_arista_cves(self, software: SoftwareLCM | SimpleNamespace) -> list[dict]:
url = "https://www.arista.com/en/support/advisories-notices/security-advisories"
try:
resp = requests.get(url, headers={"User-Agent": "nautobot-vendor-cve-job"}, timeout=30)
if not resp.ok:
self.logger.warning(f"Arista scrape failed: {resp.status_code}")
return []
except Exception as exc:
self.logger.warning(f"Arista advisories request error: {exc}")
return []
soup = BeautifulSoup(resp.text, "lxml")
advisories = []
for item in soup.select(".advisory-item, .item, article, li"):
title_el = item.find(["h2", "h3", "a"])
para_el = item.find("p")
title = title_el.get_text(strip=True) if title_el else ""
desc = para_el.get_text(strip=True) if para_el else ""
text = f"{title}\n{desc}"
if software.version in text:
cve_ids = set(re.findall(r"CVE-\d{4}-\d{4,7}", text))
for cve_id in cve_ids:
advisories.append(
{
"advisoryIdentifier": cve_id,
"summary": title or desc or "Arista security advisory",
"cves": [cve_id],
"publicationUrl": url,
"firstPublished": None,
"lastUpdated": None,
"severity": "Unknown",
}
)
return advisories
Arista doesn’t provide a comparable JSON API, so pull_arista_cves performs a simple, targeted scrape.
What’s Happening?
- Downloads https://www.arista.com/en/support/advisories-notices/security-advisories.
- Parses the page with BeautifulSoup and scans item-like elements for text blobs.
- If the discovered software.version appears in the text, extracts CVE IDs with a regex.
- Returns advisories that look similar to the Cisco schema (advisoryIdentifier, summary, cves, etc.) so downstream code can be vendor-agnostic.
Caveats
- This is intentionally conservative to avoid false positives. It may miss advisories that don’t spell out the version plainly.
- HTML structure can change; keep an eye on selectors.
- Treats severity and dates as unknown when not present; enrichment can fill gaps later.
Dates: Normalizing Published and Last-Modified
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def _extract_date_from_keys(self, advisory: dict, keys: list[str], default: date | None = None) -> date | None:
value = None
for key in keys:
if key in advisory and advisory[key]:
value = advisory[key]
break
if isinstance(value, str):
s = value.strip()
try:
s2 = s.replace("Z", "+00:00")
dt = datetime.fromisoformat(s2)
return dt.date()
except Exception:
pass
# try common formats and substrings
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y/%m/%d", "%d-%b-%Y", "%b %d, %Y"):
try:
return datetime.strptime(str(value), fmt).date()
except Exception:
continue
m = re.search(r"(\d{4})-(\d{2})-(\d{2})", str(value or ""))
if m:
try:
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
except Exception:
pass
return default
def _extract_published_date(self, advisory: dict) -> date:
d = self._extract_date_from_keys(
advisory,
["firstPublished", "firstPublishedDate", "publicationDate", "advisoryPublicationDate", "published"],
default=None,
)
return d or date.today()
def _extract_last_modified_date(self, advisory: dict, fallback: date | None) -> date | None:
d = self._extract_date_from_keys(
advisory,
["lastUpdated", "lastModified", "lastModifiedDate", "advisoryLastUpdatedDate"],
default=None,
)
return d or fallback
APIs and scrapes return dates in many shapes. These helpers standardize for DLM fields.
- _extract_date_from_keys: Given an advisory and a list of candidate keys, tries to parse ISO and several common formats; also scans for YYYY-MM-DD substrings.
- _extract_published_date: Prefers keys like firstPublished; defaults to today if none found.
- _extract_last_modified_date: Prefers lastUpdated-style keys; defaults to the published date.
Caveats
- Parsing is best-effort; unexpected formats will fall back cleanly.
- If you need timezone-aware storage, adapt before assignment.
CVSS: Pulling v2, v3, and Base Scores
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@staticmethod
def _to_float(val) -> float | None:
try:
return float(val)
except Exception:
return None
def _extract_cvss_for_cve(self, advisory: dict, cve_id: str) -> tuple[float | None, float | None, float | None]:
cvss_base = cvss_v2 = cvss_v3 = None
if isinstance(advisory.get("cve"), list):
for c in advisory["cve"]:
if isinstance(c, dict) and c.get("id") == cve_id:
for k in ["cvssBaseScore", "baseScore", "cvss_score", "cvss"]:
val = c.get(k)
if isinstance(val, dict) and "baseScore" in val:
cvss_base = self._to_float(val["baseScore"]) or cvss_base
else:
cvss_base = self._to_float(val) or cvss_base
for k in ["cvssV3BaseScore", "cvss_v3", "cvssV3"]:
val = c.get(k)
if isinstance(val, dict) and "baseScore" in val:
cvss_v3 = self._to_float(val["baseScore"]) or cvss_v3
else:
cvss_v3 = self._to_float(val) or cvss_v3
for k in ["cvssV2BaseScore", "cvss_v2", "cvssV2"]:
val = c.get(k)
if isinstance(val, dict) and "baseScore" in val:
cvss_v2 = self._to_float(val["baseScore"]) or cvss_v2
else:
cvss_v2 = self._to_float(val) or cvss_v2
break
if cvss_base is None:
for k in ["cvssBaseScore", "baseScore", "cvss_score", "cvss"]:
val = advisory.get(k)
if isinstance(val, dict) and "baseScore" in val:
cvss_base = self._to_float(val["baseScore"])
else:
cvss_base = self._to_float(val)
if cvss_base is not None:
break
if cvss_v3 is None:
for k in ["cvssV3BaseScore", "cvss_v3", "cvssV3"]:
val = advisory.get(k)
if isinstance(val, dict) and "baseScore" in val:
cvss_v3 = self._to_float(val["baseScore"])
else:
cvss_v3 = self._to_float(val)
if cvss_v3 is not None:
break
if cvss_v2 is None:
for k in ["cvssV2BaseScore", "cvss_v2", "cvssV2"]:
val = advisory.get(k)
if isinstance(val, dict) and "baseScore" in val:
cvss_v2 = self._to_float(val["baseScore"])
else:
cvss_v2 = self._to_float(val)
if cvss_v2 is not None:
break
if cvss_base is None:
cvss_base = cvss_v3 if cvss_v3 is not None else cvss_v2
return cvss_base, cvss_v2, cvss_v3
Vendors may place CVSS at different nesting levels.
- _extract_cvss_for_cve checks for a specific CVE within an “advisory.cve” list first, then falls back to top-level fields.
- Accepts numbers or dicts like {“baseScore”: 8.8}.
- Returns (cvss_base, cvss_v2, cvss_v3), using v3 or v2 as the base if an explicit base is absent.
Caveats
- Some advisories provide only qualitative severities; numeric CVSS will be None.
- This method is resilient by design—unknown shapes won’t break the job.
Persistence: CVEs, Vulnerabilities, and Affected Software
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def _get_status_for_model(self, model, candidates: list[str]) -> Status | None:
try:
qs = Status.objects.get_for_model(model)
for name in candidates:
s = qs.filter(name__iexact=name).first()
if s:
return s
return qs.first() if qs.exists() else None
except Exception:
return None
def create_cves_and_vulns_for_software(
self,
advisories: list[dict],
sv_lcm: SoftwareLCM | SimpleNamespace,
vendor: str,
commit: bool,
):
"""Create/update CVEs and Vulnerabilities for the given software; associate Affected Software, not platform."""
if not advisories:
return
open_status = self._get_status_for_model(VulnerabilityLCM, ["Open", "Active"]) if commit else None
cve_status_default = self._get_status_for_model(CVELCM, ["Active", "Published"]) if commit else None
software_target = self._resolve_software_instance_for_vuln(sv_lcm, commit=commit)
if commit and software_target is None:
self.logger.warning("Cannot resolve suitable software instance for VulnerabilityLCM.software; skipping vulnerability creation.")
if not commit and software_target is None:
self.logger.info("[Dry-run] Would link vulnerability to resolved software instance (skipped FK resolution).")
created_cves = 0
created_vulns = 0
for adv in advisories:
cve_ids = []
if isinstance(adv.get("cves"), list):
cve_ids = [c for c in adv["cves"] if isinstance(c, str)]
elif isinstance(adv.get("cve"), list):
cve_ids = [c.get("id") for c in adv["cve"] if isinstance(c, dict) and c.get("id")]
elif isinstance(adv.get("advisoryIdentifier", ""), str) and adv["advisoryIdentifier"].startswith("CVE-"):
cve_ids = [adv["advisoryIdentifier"]]
description = adv.get("summary") or adv.get("advisoryTitle") or ""
link = adv.get("publicationUrl") or adv.get("url") or ""
severity = adv.get("severity") or adv.get("sir") or ""
published_date = self._extract_published_date(adv)
last_modified_date = self._extract_last_modified_date(adv, fallback=published_date)
for cve_id in set(filter(None, cve_ids)):
cvss_base, cvss_v2, cvss_v3 = self._extract_cvss_for_cve(adv, cve_id)
if not commit:
exists = CVELCM.objects.filter(name=cve_id).exists()
self.logger.info(
f"[Dry-run] Would {'update' if exists else 'create'} CVE {cve_id} with "
f"published_date={published_date}, last_modified_date={last_modified_date}, link={link or '-'}, "
f"severity={severity or '-'}, cvss={cvss_base}, cvss_v2={cvss_v2}, cvss_v3={cvss_v3}, "
f"status={(adv.get('status') or getattr(cve_status_default, 'name', '')) or '-'}; "
f"and {'ensure' if exists else 'create'} Vulnerability (software='{getattr(sv_lcm, 'version', '?')}'), "
f"plus associate CVE->Affected Software."
)
self._associate_cve_with_affected_software(
cve_obj=SimpleNamespace(name=cve_id, _meta=CVELCM._meta),
sv_lcm=sv_lcm,
software_target=software_target,
commit=False,
)
continue
try:
cve_status_obj = None
adv_status_name = adv.get("status")
if adv_status_name:
cve_status_obj = self._get_status_for_model(CVELCM, [str(adv_status_name)])
if not cve_status_obj:
cve_status_obj = cve_status_default
defaults = {
"published_date": published_date,
"last_modified_date": last_modified_date,
"severity": severity or "",
"cvss": cvss_base,
"cvss_v2": cvss_v2,
"cvss_v3": cvss_v3,
"status": cve_status_obj,
}
if description:
defaults["description"] = description[:1024]
if link:
defaults["link"] = link
cve_obj, created = CVELCM.objects.get_or_create(name=cve_id, defaults=defaults)
if created:
created_cves += 1
self.logger.info(f"Created CVE {cve_id} from {vendor}")
else:
patch_needed = False
if not getattr(cve_obj, "published_date", None):
cve_obj.published_date = published_date
patch_needed = True
if last_modified_date and not getattr(cve_obj, "last_modified_date", None):
cve_obj.last_modified_date = last_modified_date
patch_needed = True
if link and not getattr(cve_obj, "link", ""):
cve_obj.link = link
patch_needed = True
if description and not getattr(cve_obj, "description", ""):
cve_obj.description = description[:1024]
patch_needed = True
if severity and not getattr(cve_obj, "severity", ""):
cve_obj.severity = severity
patch_needed = True
if cvss_base is not None and getattr(cve_obj, "cvss", None) in (None, 0):
cve_obj.cvss = cvss_base
patch_needed = True
if cvss_v2 is not None and getattr(cve_obj, "cvss_v2", None) in (None, 0):
cve_obj.cvss_v2 = cvss_v2
patch_needed = True
if cvss_v3 is not None and getattr(cve_obj, "cvss_v3", None) in (None, 0):
cve_obj.cvss_v3 = cvss_v3
patch_needed = True
if cve_status_obj and not getattr(cve_obj, "status", None):
cve_obj.status = cve_status_obj
patch_needed = True
if patch_needed:
cve_obj.save()
# Associate "Affected Software" on the CVE when supported
self._associate_cve_with_affected_software(
cve_obj=cve_obj,
sv_lcm=sv_lcm,
software_target=software_target,
commit=True,
)
if software_target is None:
continue
vuln_defaults = {}
if open_status:
vuln_defaults["status"] = open_status
vuln, v_created = VulnerabilityLCM.objects.get_or_create(
cve=cve_obj,
software=software_target,
defaults=vuln_defaults,
)
if v_created:
created_vulns += 1
self.logger.info(
f"Created vulnerability {cve_id} for software {getattr(software_target, 'version', getattr(sv_lcm, 'version', '?'))}"
)
except Exception as exc:
self.logger.warning(
f"Failed to persist CVE/Vulnerability for {cve_id} on software {getattr(sv_lcm, 'version', '?')}: {exc}"
)
suffix = "[Dry-run] " if not commit else ""
self.logger.info(
f"{suffix}Persisted CVEs: +{created_cves}, Vulnerabilities: +{created_vulns} for software {getattr(sv_lcm, 'version', '?')}"
)
create_cves_and_vulns_for_software takes a vendor’s advisories and writes them into DLM models, relying on last week’s schema-introspection helpers.
What’s Happening?
- Resolves an appropriate software_target for VulnerabilityLCM.software, accommodating custom schemas.
- For each advisory:
- Extracts all CVE IDs from “cves”, “cve”, or “advisoryIdentifier”.
- Computes dates, severity, and CVSS.
- Dry-run:
- Logs whether it would create or update each CVE.
- Simulates “Affected Software” M2M association.
- Commit:
- get_or_create CVELCM by name, with defaults for published/mod dates, link, description, severity, cvss.
- Non-destructive updates: fills missing fields only, leaving existing populated fields intact.
- Associates “Affected Software” via an M2M if present on the CVE model (idempotent).
- Creates VulnerabilityLCM linking cve and software_target, with status defaulted to Open/Active when available.
Caveats
- Status resolution is best-effort; if your Status choices differ, adjust the candidates list.
- The function is idempotent and additive—safe to re-run as advisories update over time.
- If the environment’s VulnerabilityLCM.software FK points to a custom model, linking still works via platform+version or software references.
Cisco EoX: Hardware Lifecycle Notices
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def pull_cisco_eox_for_pid(self, pid: str) -> list[dict]:
"""Query Cisco EoX v5 API for a single product ID (PID)."""
token = self.get_cisco_token()
if not token:
return []
url = f"https://apix.cisco.com/supporttools/eox/rest/5/EOXByProductID/1/{pid}"
headers = {"Authorization": f"Bearer {token}", "Accept": "application/json", "User-Agent": "nautobot-vendor-cve-job"}
try:
resp = requests.get(url, headers=headers, timeout=30)
if not resp.ok:
self.logger.warning(f"Cisco EoX API failed for PID={pid}: {resp.status_code} {resp.text[:200]}")
return []
data = resp.json() or {}
# Records array can be under "EOXRecord" or similar top-level
records = data.get("EOXRecord") or data.get("EOXRecords") or data.get("Records") or []
if isinstance(records, dict):
records = [records]
return records
except Exception as exc:
self.logger.warning(f"Cisco EoX API request error for PID={pid}: {exc}")
return []
def upsert_hardware_notice_for_device(self, device: Device, commit: bool):
"""Create/update HardwareLCM for device.device_type using its part number via Cisco EoX."""
if not HAS_HARDWARE_LCM:
self.logger.info("HardwareLCM not available in this DLM version; skipping hardware notices.")
return
if not device.device_type:
self.logger.info(f"{device.name}: No DeviceType; skipping EoX.")
return
pid = getattr(device.device_type, "part_number", None) or getattr(device.device_type, "model", None)
if not pid:
self.logger.info(f"{device.name}: DeviceType has no part_number; skipping EoX.")
return
records = self.pull_cisco_eox_for_pid(pid)
if not records:
self.logger.info(f"{device.name}: No EoX records for PID={pid}.")
return
part_field = self._hardware_part_field_name()
if not part_field:
self.logger.info("Could not determine HardwareLCM part-number field; skipping.")
return
field_map = self._hardware_date_field_map()
for rec in records:
mapped = self._eox_record_to_hardware_fields(rec)
# Build filter for get_or_create
filter_kwargs = {"device_type": device.device_type, part_field: pid}
if not commit:
exists = HardwareLCM.objects.filter(**filter_kwargs).exists()
action = "update" if exists else "create"
self.logger.info(
f"[Dry-run] Would {action} HardwareLCM for device_type='{device.device_type.model}' "
f"part='{pid}' with dates: release={mapped['release_date']}, "
f"eosale={mapped['end_of_sale']}, eosupport={mapped['end_of_support']}, "
f"eosw={mapped['end_of_sw_releases']}, eosec={mapped['end_of_security_patches']}, "
f"url={mapped['documentation_url'] or '-'}"
)
continue
try:
obj, created = HardwareLCM.objects.get_or_create(**filter_kwargs)
# Update fields if present in model
changed = False
for key, model_field in field_map.items():
if not model_field:
continue
new_value = mapped.get(key)
if new_value is None:
continue
if getattr(obj, model_field, None) != new_value:
setattr(obj, model_field, new_value)
changed = True
if changed:
obj.save()
self.logger.info(
f"{'Created' if created else 'Updated'} HardwareLCM for device_type='{device.device_type.model}' part='{pid}'"
)
except Exception as exc:
self.logger.warning(f"Failed to upsert HardwareLCM for {device.device_type} / {pid}: {exc}")
We also enrich the hardware side via Cisco’s EoX API by device type PID.
- pull_cisco_eox_for_pid: Queries EOXByProductID v5, returns Records (handles arrays and singletons).
- upsert_hardware_notice_for_device:
- Derives the PID from DeviceType part_number or model.
- Maps Cisco fields into our canonical HardwareLCM fields using last week’s helper map.
- Dry-run: logs intended create/update with parsed dates.
- Commit: get_or_create HardwareLCM by device_type and part number; updates only changed fields.
Caveats
- Requires the optional HardwareLCM model; the code is gated by HAS_HARDWARE_LCM and no-ops otherwise.
- Dates use the same robust parsing as software.
Orchestration: run() with Safe Dry-Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def run(self, data=None, commit=True, **kwargs):
# Accept both legacy (data dict) and new-style keyword args
def _param(key, default=None):
if key in kwargs:
return kwargs.get(key, default)
return (data or {}).get(key, default)
devices = _param("devices")
secrets_group = _param("secrets_group")
dry_run = bool(_param("dry_run", False))
include_hw = bool(_param("include_hardware_notices", False))
effective_commit = bool(commit) and not dry_run
if not effective_commit:
self.logger.info("Dry-run enabled: no changes will be written to the database.")
if not isinstance(secrets_group, SecretsGroup):
self.logger.error("You must choose a valid Secrets Group for credentials.")
return
# Devices queryset
if devices:
try:
qs = devices
except Exception:
try:
pks = [d.pk for d in devices]
except Exception:
pks = list(devices)
qs = Device.objects.filter(pk__in=pks)
else:
active_status = Status.objects.get_for_model(Device).filter(name__iexact="Active").first()
if not active_status:
self.logger.warning("No 'Active' status found; processing all devices.")
devices_qs = Device.objects.all()
else:
devices_qs = Device.objects.filter(status=active_status)
qs = devices_qs.distinct()
processed = 0
for device in qs:
plat_name = (device.platform.name or "").lower() if device.platform else ""
is_cisco_like = any(s in plat_name for s in ("cisco", "ios", "nx", "asa"))
is_arista_like = any(s in plat_name for s in ("arista", "eos"))
# Software CVEs
if is_cisco_like or is_arista_like:
sv_lcm = self.discover_software(device, secrets_group, commit=effective_commit)
if sv_lcm:
if is_cisco_like:
advisories = self.pull_cisco_cves(sv_lcm, device)
self.create_cves_and_vulns_for_software(advisories, sv_lcm, vendor="Cisco", commit=effective_commit)
if is_arista_like:
advisories = self.pull_arista_cves(sv_lcm)
self.create_cves_and_vulns_for_software(advisories, sv_lcm, vendor="Arista", commit=effective_commit)
# Hardware notices (Cisco EoX)
if include_hw and is_cisco_like:
self.upsert_hardware_notice_for_device(device, commit=effective_commit)
processed += 1
self.logger.info(f"Processed {processed} devices. Tip: run the NIST CVE enrichment job separately for comprehensive coverage.")
register_jobs(VendorCVEDiscoveryJob)
The run method ties everything together, supporting both legacy data dict and keyword args.
Flow
- Parameters:
- devices: optional queryset/list of devices; defaults to all “Active” devices.
- secrets_group: required, used to log into devices for discovery via NAPALM.
- dry_run: if true, no DB writes (effective_commit = False).
- include_hardware_notices: toggles the EoX step.
- For each device:
- Detect “Cisco-like” or “Arista-like” via platform name.
- Discover software version via NAPALM (last week’s discover_software).
- Cisco-like: map version, pull PSIRT advisories, persist CVEs/Vulnerabilities.
- Arista-like: scrape advisories, persist CVEs/Vulnerabilities.
- If include_hardware_notices and Cisco-like: upsert EoX HardwareLCM.
- Logs a summary count and suggests running the NIST enrichment job separately for broader coverage.
Caveats
- Requires a valid Secrets Group; the job exits early if missing.
- “Cisco-like”/“Arista-like” detection is name-based; refine if your platform names are custom.
- Dry-run is comprehensive—you get clear preview logs for create/update/link actions without touching the DB.
Wrap-Up
At this point, the job connects to real vendor sources, maps versions correctly, normalizes data, and persists both software CVEs and hardware lifecycle notices—while remaining schema-aware and safe to dry-run. The design emphasizes:
- Normalization first: clean versions, dates, and scores to keep the DB consistent.
- Adaptability: introspection allows different DLM schemas to work out-of-the-box.
- Idempotency: re-runs enrich rather than overwrite. Altogether, this is a concise, end-to-end example of using Nautobot fields, retrieving data from external APIs, normalizing the results, and persisting them back into Nautobot.
