Recipe: bulk-import events from a spreadsheet (idempotent upsert + live schedule)
Recipe: bulk-import events from a spreadsheet (idempotent upsert + live schedule)
Recipe: bulk-import events from a spreadsheet (idempotent upsert + live schedule)
Prompt exampleImport every row of "Nomad Event Scheduler AIH26.xlsx" into the event scheduler: create an event per row with its venue, slug and short description, convert the local start/end times to UTC, attach the live schedule (primary input, slate, archive folder, external outputs), and make a re-run update the same events instead of creating duplicates.
Customers routinely hand us a spreadsheet of events and ask us to load it into the event
scheduler. This recipe turns that into a repeatable flow: discover the event entity's
fields, print a mapping plan, dry-run one row so a human can validate the exact
payloads, then idempotently upsert every row (keyed on slug) and attach each event's
live schedule.
Safety (Class C). Events are global account objects with no folder anchor, so this is non-prod only (validated on
dev-06). Tear down anything you create withdelete_event. Production imports must be a human-reviewed, explicitly-approved action.
The writes per row
We do not use create_and_update_event - its update path is a no-op, it misspells the
datetime keys, and it swallows the required-eventType validation error. Instead each row is the proven contract sequence:
create_content(EVENT_CD)(only when the slug is new) → a blank event whosecontentIdwe keep (update_contentreturnsNoneeven on success -). On a re-run the slug already resolves to an existingcontentId, so we skip this and update it.update_content(event CD, one properties-only patch matchingContentUpdateModel) →name,slug,eventType(required ref - omit it and the write409s),startDatetime,endDatetime,venue,shortDescription. This is where theslugidempotency key is written.add_live_schedule_to_event→primary_live_input,archive_folder,slate_video,postroll_video,external_output_profiles- attached only for rows whose slate name resolves on the target (the schedule's requiredpostroll_videoreuses that same resolved slate -). The live schedule is a separate object (not event content), so it is read back withget_live_schedulefor verification - never trust the call's return, which isNoneon success and failure. Every reference is resolved by name on the target deployment (the sheet's GUID columns hold the source env's ids): the slate/postroll via theassetCD, the archive folder via a folder search, and the primary input / external output profiles viaget_live_inputs/get_live_output_profiles. A name that does not resolve is skipped and surfaced as a diff in the verification report.preroll/backup/ secure-output are left null per the import plan.
Mapping plan (spreadsheet column → destination)
Every row's eventType defaults to "Auto Scheduled" (a customer assumption - resolved
once against the eventType CD), so the sheet's Event Type column is not read.
| Spreadsheet column | Destination (write) | How it is resolved |
|---|---|---|
Name | event.name (write 2) | verbatim |
Slug | event.slug (write 2) | verbatim - idempotency key |
| (constant) | event.eventType (write 2) | "Auto Scheduled" → {id, description} via the eventType CD (required) |
Date + Start | event.startDatetime (write 2) | combine date + time-of-day, convert to UTC Z |
Date + End | event.endDatetime (write 2) | combine date + time-of-day, convert to UTC Z |
Duration | (cross-check End) | optional sanity check only |
Timezone | (offset source) | name → utcOffset via the timeZone CD |
Short Description | event.shortDescription (write 2) | verbatim |
Venue | event.venue (write 2) | name → {id, description} via the venue CD (auto-created when CREATE_MISSING_REFS - see below) |
Slate Video | slate_video + postroll_video (write 3) | name → {id, description} via the asset CD; the same resolved slate also fills the required postroll_video |
Archive Folder | archive_folder (write 3) | name (leaf segment of the slash path) → {id} via a folder search on the target |
Primary Input Channel | primary_live_input (write 3) | name → {id, description} via get_live_inputs on the target |
External Output Profile / … 2 | external_output_profiles[] (write 3) | each name → {id, description} via get_live_output_profiles on the target |
* ID (Archive Folder ID, Primary Input Channel ID, External Output ID / … 2) | (not written) | source-deployment GUIDs - ignored. Foreign on any other deployment, so write 3 resolves the matching name column on the target instead |
Every name-only column is resolved by name on the target deployment, never by the sheet's
GUID: the event refs (Venue, Timezone) via their content definition, and the live-schedule
refs (Slate Video, Archive Folder, Primary Input Channel, External Output Profile[ 2]) via
the asset CD / a folder search / get_live_inputs / get_live_output_profiles. The sheet's
parallel * ID columns hold the source env's ids and are not used (they would 404 on the
target). A live-schedule name that does not resolve on the target is skipped (its ref left
unset) and surfaced as a diff in the verification report - per the import rule. Run
get_content_definition(EVENT_CD) first and print contentFields[*].properties.propertyName to
confirm the field names against this table (see reference/schema.md for the offline field list).
Casing quirk. The CD reports
propertyNamein PascalCase (Name,Slug,StartDatetime,EventType, …), but the keys you write withupdate_contentand filter on withsearchare camelCase (name,slug,startDatetime,eventType). Write/read camelCase; first-letter-lower the CD names before comparing.
Create missing sub-tables (opt-in)
A Venue (or, by extension, Performer) on the sheet may not yet exist on the deployment -
on dev-06 the sample's "East Lawn" resolves to nothing, so by default event.venue is left
unset (a deliberate, fail-safe miss; we never guess an id). When the customer wants those
sub-tables created on the fly, set CREATE_MISSING_REFS = True and ensure_references
runs a pre-pass before any event is written:
- It loops the whole sheet once and collects the unique, non-blank names per
reference column listed in
REFERENCE_SPECS(the column → sub-table-CD map; ship withVenue, addPerformer: PERFORMER_CDas needed). - For each name it
searches the CD; on a miss it creates the record the same contract way as an event -create_content→ one properties-onlyupdate_content({name, slug})- so each distinct venue/performer is created exactly once, not per row. - It returns a
{(cd, name) → ref}cache;build_payloads/importRowthen look the ref up from that cache, so every event reuses the resolved-or-created id.
Not assets.
REFERENCE_SPECScovers true reference sub-tables only.Slate Videoresolves against theassetCD but is never auto-created - media must be ingested (uploaded), not fabricated from a name - so it stays a plainresolve_by_namelookup.
Teardown (Class C). Created venues/performers are global account objects with no folder anchor (like events), and for a real import they are meant to persist as reference data. In a non-prod test, delete anything you create with
delete_content(sdk, id, cd)(the reference-record analogue ofdelete_event). On prod, creating reference data must be an explicitly human-approved action -CREATE_MISSING_REFSdefaults toFalse.
Datetime → UTC
Date is a calendar date and Start/End are times-of-day. Cell types vary by column in
real sheets - openpyxl (with data_only=True) returns Python datetime/date/time
objects, raw Excel serial numbers, or plain text depending on how each cell was authored. In
the AIH26 sample, for example, Date/Start come back as datetimes while End is the string
"4:30 PM", so the helper below accepts all three. The Timezone name resolves to a
utcOffset on the timeZone CD; UTC = local − offset (east-positive). That offset lives under
the search item's nested identifiers bag and arrives as a string in hours (e.g. "-6"
for MDT), so the resolver reads identifiers.utcOffset and casts it.
Confirm at the dry-run: the
utcOffsetunit (hours here) and sign are verified by eyeballing the first row's printedstartDatetimeagainst the sheet. FlipOFFSET_IS_MINUTES/ the subtraction if a deployment ever reports it differently.
Verify the import (pull-back + diff)
Writing is not believing. After the upserts, verify_import pulls every event back - the
event through the search API and its live schedule through get_live_schedule - and diffs both
against the source so you can see, per row, per field, what actually landed versus what the sheet
said. It runs by default at the tail of run_import (verify=True); call it standalone any time to
audit a prior import.
- For each row it rebuilds the same source-derived payloads the write used (
build_payloads), so "expected" is exactly what the import intended - not a second, drifting transcription. - It
searches the event byslugand reads the item'sidentifiersbag, which already carries every written field -name,slug,startDatetime,endDatetime,shortDescription, and thevenue/eventTyperefs as{id, description}(confirmed ondev-06; seereference/return-shapes.md). No extra content read is needed - the search result is the pulled-back record. diff_eventcompares field-by-field, normalizing references to theirid(so a differingdescriptionlabel is not a false diff) and checking only the keys we wrote (untouched platform fields never show up as noise). A missing record reports as one(record): MISSINGrow.- The live schedule is a separate object, invisible to the event content and the search bag,
so
verify_importreads it back withget_live_schedule(event_id)- the only trustworthy signal, sinceadd_live_schedule_to_eventreturnsNoneon success and failure.diff_schedulecompares the resolvedslate_video/postroll_video/archive_folder/primary_live_input/external_output_profilesbyid. A row that intended a schedule but has none attached reports(live schedule): MISSING; every name that did not resolve on the target surfaces as its ownUNRESOLVED ON TARGETdiff (per the import rule: skip + report).
The result is a {slug: [{field, source, platform}]} report (event and live-schedule diffs
merged per row), printed for a human to decide how to resolve - there is no automatic
reconciliation. Typical resolutions:
- Platform drifted from the sheet (someone edited the event in the UI): re-run the import,
which re-applies the source (source-wins, idempotent on
slug). - The sheet is wrong (bad date/offset, mistyped venue): fix the sheet, then re-import.
- A row never landed (
(record): MISSING): inspect the write error for that slug and retry. - A live-schedule ref is
UNRESOLVED ON TARGET: the named slate/input/folder/profile does not exist on this deployment - create it (or correct the name in the sheet), then re-import.
Search indexing is near-real-time, not transactional. The
identifiersbag is reindexed just after the write (note itsindexedDate), so a verify fired immediately could read a field a beat stale. In an interactive run this is moot; in automation, poll the pulled-back value until it settles (the smoke test does) before treating a diff as real.
Python
# Components: get_content_definition, search, create_content, update_content, delete_content,
# add_live_schedule_to_event, get_live_schedule, get_live_inputs,
# get_live_output_profiles, delete_event.
import datetime as dt
import json
import re
from openpyxl import load_workbook
EVENT_CD = "412a30e3-73ee-4eae-b739-e1fc87601c7d" # generic event entity
EVENT_TYPE_CD = "0ee492a3-7875-4288-8690-f9895a44cb43"
VENUE_CD = "7684c940-6532-44f9-bc7f-6d7d7da72c2f"
PERFORMER_CD = "33cec5ca-6170-4237-842b-78bf1ef17932" # generic extension (see REFERENCE_SPECS)
TIMEZONE_CD = "6ffc9376-af95-4f70-864e-1b00b8f8a283"
ASSET_CD = "3ff29f61-bd0b-4c17-b855-49db5a292aeb"
EVENT_TYPE_NAME = "Auto Scheduled" # customer assumption: every row uses this eventType
EXCEL_EPOCH = dt.datetime(1899, 12, 30) # Excel day 0 (1900 leap-year bug baked in)
OFFSET_IS_MINUTES = False # timeZone.utcOffset is in HOURS (dev-06); CONFIRM at dry-run
# Opt-in: create absent sub-table records (venue/performer/...) instead of leaving the event
# field unset. ensure_references() pre-creates each unique name ONCE, then events reuse the ids.
CREATE_MISSING_REFS = False
# Name-only reference columns eligible for auto-create -> the sub-table CD each resolves against.
# Extend per deployment (e.g. "Performer": PERFORMER_CD). Assets are NOT here: media must be
# ingested, never fabricated from a name. Each sub-table is created minimally (name + slug).
REFERENCE_SPECS = {
"Venue": VENUE_CD,
}
def read_rows(path):
"""Yield each event row as a dict keyed by header name (row 1 is the header)."""
ws = load_workbook(path, data_only=True).active
rows = ws.iter_rows(values_only=True)
headers = [str(h).strip() if h is not None else "" for h in next(rows)]
for raw in rows:
if any(c is not None for c in raw):
yield {h: v for h, v in zip(headers, raw) if h}
# Cells vary BY COLUMN in real sheets: Date/Start may be datetimes while End is TEXT
# ("4:30 PM"). Accept datetime/date/time, Excel serials, AND strings.
STR_DT_FORMATS = ("%I:%M %p", "%I:%M:%S %p", "%H:%M", "%H:%M:%S",
"%m/%d/%Y", "%Y-%m-%d", "%m/%d/%y")
def _to_datetime(value):
"""Coerce an openpyxl cell (datetime/date/time, Excel serial, OR text) to a naive datetime."""
if isinstance(value, dt.datetime):
return value
if isinstance(value, dt.date):
return dt.datetime(value.year, value.month, value.day)
if isinstance(value, dt.time):
return EXCEL_EPOCH + dt.timedelta(hours=value.hour, minutes=value.minute,
seconds=value.second)
if isinstance(value, str): # e.g. "4:30 PM" / "6/22/2026"
for fmt in STR_DT_FORMATS:
try:
return dt.datetime.strptime(value.strip(), fmt) # time-only -> 1900 placeholder
except ValueError:
pass
raise ValueError(f"unrecognized date/time text: {value!r}")
return EXCEL_EPOCH + dt.timedelta(days=float(value)) # raw Excel serial
def combine_local(date_cell, time_cell):
"""Combine a date cell + a time-of-day cell into one naive local datetime."""
d, t = _to_datetime(date_cell), _to_datetime(time_cell)
return dt.datetime(d.year, d.month, d.day, t.hour, t.minute, t.second)
def to_utc_iso(date_cell, time_cell, offset):
"""Local date+time (+ tz offset) -> ISO-8601 UTC 'Z'. UTC = local - offset(east)."""
local = combine_local(date_cell, time_cell)
delta = dt.timedelta(minutes=offset) if OFFSET_IS_MINUTES else dt.timedelta(hours=offset)
return (local - delta).strftime("%Y-%m-%dT%H:%M:%SZ")
def ref(target_id, description=None):
"""Reference/lookup wire shape: {id, description} (description optional).
Strips a string id defensively - sheet GUID cells can carry trailing whitespace/newlines
(the AIH26 sample's `External Output ID 2` ends in "\\n").
"""
if isinstance(target_id, str):
target_id = target_id.strip()
return {"id": target_id, "description": description} if description else {"id": target_id}
def resolve_by_name(sdk, content_definition_id, name, field="name"):
"""Resolve an entity record by exact name to a {id, description} ref; None if blank/absent."""
if not name:
return None
flt = [
{"fieldName": "contentDefinitionId", "operator": "Equals", "values": content_definition_id},
{"fieldName": field, "operator": "Equals", "values": name},
]
items = (search(sdk, filters=flt, size=1) or {}).get("items", [])
return ref(items[0]["id"], name) if items else None
def load_live_index(sdk):
"""Fetch the TARGET's live inputs + output profiles ONCE; index each by lowercased name.
These are account-global lists (get_live_inputs / get_live_output_profiles), NOT content, so
they are not searchable - and the sheet's source GUIDs are foreign to other deployments. We
therefore resolve a Primary Input Channel / External Output Profile BY NAME against the target.
"""
def index(objs):
out = {}
for o in (objs or []):
name = (o or {}).get("name")
if name:
out[name.strip().lower()] = ref(o["id"], name)
return out
return {"inputs": index(get_live_inputs(sdk)),
"profiles": index(get_live_output_profiles(sdk))}
def live_ref(live_index, kind, name):
"""Resolve a live input / output profile from the load_live_index cache by name (None on miss)."""
if not name:
return None
return live_index[kind].get(name.strip().lower())
def resolve_folder_by_name(sdk, path):
"""Resolve an archive FOLDER by name to a {id, description} ref; None if absent on the target.
The sheet carries a slash path (e.g. "Content/Public/.../AIH-2026"); folders index under their
LEAF segment, so we search that. Narrow the filter by asset/folder type if a name is ambiguous.
"""
if not path:
return None
leaf = path.strip().strip("/").split("/")[-1]
flt = [{"fieldName": "name", "operator": "Equals", "values": leaf}]
items = (search(sdk, filters=flt, size=1) or {}).get("items", [])
return ref(items[0]["id"], leaf) if items else None
def slugify(name):
"""Lowercase + hyphenate a display name into a slug for a NEW sub-table record."""
return re.sub(r"[^a-z0-9]+", "-", name.strip().lower()).strip("-")
def create_reference(sdk, content_definition_id, name):
"""Create a minimal sub-table record (name + slug) and return its {id, description} ref.
Same contract write as an event: create_content mints a blank row, one properties-only
update_content populates it (update_content returns None even on success - drift #20), so
we keep the contentId from create. Tear these down with delete_content in non-prod.
"""
content_id = create_content(sdk, content_definition_id)["contentId"]
update_content(sdk, content_id, content_definition_id, {"name": name, "slug": slugify(name)})
return ref(content_id, name)
def ensure_references(sdk, rows, create_missing=False):
"""Pre-pass: resolve every unique name-only reference ONCE; optionally create the missing ones.
Loops the WHOLE sheet first so each distinct venue/performer is touched once (not per row),
returning a cache keyed (cd, name_lower) -> {id, description} | None. With create_missing the
absent records are CREATED here, so the later event writes just reuse the new ids; without it
an absent name stays None and the event's reference field is left unset.
"""
cache = {}
for column, cd in REFERENCE_SPECS.items():
names = sorted({(r.get(column) or "").strip() for r in rows} - {""})
for name in names:
existing = resolve_by_name(sdk, cd, name)
if existing is None and create_missing:
existing = create_reference(sdk, cd, name)
print(f"created {column}: {name!r} -> {existing['id']}")
cache[(cd, name.lower())] = existing
return cache
def cached_ref(cache, content_definition_id, name):
"""Look up a pre-resolved reference from the ensure_references cache (None if blank/absent)."""
if not name:
return None
return cache.get((content_definition_id, name.strip().lower()))
def resolve_timezone_offset(sdk, name):
"""Search the timeZone CD by name; return its utcOffset in HOURS (float) or None.
The value lives under the search item's nested `identifiers` bag and comes back as a
STRING (e.g. "-6"), so read `identifiers.utcOffset` and cast it ().
"""
if not name:
return None
flt = [
{"fieldName": "contentDefinitionId", "operator": "Equals", "values": TIMEZONE_CD},
{"fieldName": "name", "operator": "Equals", "values": name},
]
items = (search(sdk, filters=flt, size=1) or {}).get("items", [])
if not items:
return None
raw = (items[0].get("identifiers") or {}).get("utcOffset")
return float(raw) if raw not in (None, "") else None
def assert_unique_slugs(rows):
"""Fail-closed: abort the whole import if any slug repeats in the sheet."""
seen, dupes = set(), set()
for r in rows:
slug = (r.get("Slug") or "").strip()
(dupes if slug in seen else seen).add(slug)
if dupes:
raise ValueError(f"duplicate slugs in spreadsheet (aborting import): {sorted(dupes)}")
def find_event_id_by_slug(sdk, slug):
"""Idempotency probe: the existing event contentId for slug, or None."""
flt = [
{"fieldName": "contentDefinitionId", "operator": "Equals", "values": EVENT_CD},
{"fieldName": "slug", "operator": "Equals", "values": slug},
]
items = (search(sdk, filters=flt, size=1) or {}).get("items", [])
return items[0]["id"] if items else None
def build_payloads(sdk, row, event_type, refs, live_index):
"""Resolve one sheet row into (props, schedule) for the content + live-schedule writes.
Name-only sub-table refs (venue, ...) come from the `refs` cache built by ensure_references,
so they are resolved/created ONCE for the whole sheet rather than re-searched per row. Every
live-schedule ref is resolved BY NAME on the TARGET (`live_index` + search), since the sheet's
GUID columns hold the SOURCE deployment's ids. Names that miss are recorded under
`schedule["unresolved"]` so the write skips them and verify surfaces them as diffs.
"""
offset = resolve_timezone_offset(sdk, row.get("Timezone")) or 0
props = { # write 2: ONE properties-only patch; only non-null keys are sent
"name": row["Name"],
"slug": row["Slug"],
"eventType": event_type, # REQUIRED ref or the write 409s
"startDatetime": to_utc_iso(row["Date"], row["Start"], offset), # NOTE: lowercase 't'
"endDatetime": to_utc_iso(row["Date"], row["End"], offset),
"shortDescription": row.get("Short Description"),
"venue": cached_ref(refs, VENUE_CD, row.get("Venue")), # pre-resolved (maybe auto-created)
}
# Write 3: resolve every ref by NAME on the target. The slate doubles as the REQUIRED postroll
# (drift #23). Track names that do not resolve so verify can report them (per the import rule:
# skip the unresolved ref and surface it as a diff).
slate = resolve_by_name(sdk, ASSET_CD, row.get("Slate Video"))
primary = live_ref(live_index, "inputs", row.get("Primary Input Channel"))
archive = resolve_folder_by_name(sdk, row.get("Archive Folder"))
profiles, unresolved = [], {}
for col in ("External Output Profile", "External Output Profile 2"):
name = (row.get(col) or "").strip()
if not name:
continue
resolved = live_ref(live_index, "profiles", name)
if resolved:
profiles.append(resolved)
else:
unresolved.setdefault("external_output_profiles", []).append(name)
for field, name, resolved in (("slate_video", row.get("Slate Video"), slate),
("primary_live_input", row.get("Primary Input Channel"), primary),
("archive_folder", row.get("Archive Folder"), archive)):
if (name or "").strip() and not resolved:
unresolved[field] = name.strip()
schedule = {
"slate_video": slate,
"postroll_video": slate, # REQUIRED (drift #23): reuse the resolved slate
"archive_folder": archive,
"primary_live_input": primary,
"external_output_profiles": profiles or None,
"unresolved": unresolved, # {field: name(s)} that did not resolve on the target
}
return props, schedule
def dry_run(sdk, row, event_type, refs, live_index):
"""Print the EXACT payloads for one row so a human can validate before the full import."""
props, schedule = build_payloads(sdk, row, event_type, refs, live_index)
print("PROPS :", json.dumps({k: v for k, v in props.items() if v is not None}, indent=2))
print("SCHEDULE :", json.dumps({k: v for k, v in schedule.items() if v}, indent=2))
return props, schedule
def import_row(sdk, row, event_type, refs, live_index):
"""Idempotent upsert of one event + its live schedule; return the event contentId."""
props, schedule = build_payloads(sdk, row, event_type, refs, live_index)
event_id = find_event_id_by_slug(sdk, props["slug"]) # id -> update in place
if not event_id: # else mint a blank event
event_id = create_content(sdk, EVENT_CD)["contentId"]
# update_content returns None even on success () - keep event_id from create.
update_content(sdk, event_id, EVENT_CD, {k: v for k, v in props.items() if v is not None})
# Write 3 needs the REQUIRED postroll (drift #23) = the resolved slate. Attach ONLY when the
# slate name resolves on the target; otherwise skip and let verify report what was unresolved.
# add_live_schedule_to_event returns None on success AND failure (drift #24) - never trust it.
if schedule["slate_video"]:
add_live_schedule_to_event(sdk, event_id,
slate_video=schedule["slate_video"],
postroll_video=schedule["postroll_video"],
archive_folder=schedule["archive_folder"],
primary_live_input=schedule["primary_live_input"],
external_output_profiles=schedule["external_output_profiles"])
elif schedule["unresolved"]:
print(f" [{props['slug']}] live schedule SKIPPED (unresolved on target): {schedule['unresolved']}")
return event_id
def fetch_event_by_slug(sdk, slug):
"""Pull one event BACK via the SEARCH API; return its `identifiers` bag (the indexed field
values) or None. That bag already carries every field we wrote - name/slug/startDatetime/
endDatetime/shortDescription and the venue/eventType refs as {id, description} - so the
verification needs no extra content read (confirmed on dev-06; see return-shapes.md)."""
flt = [
{"fieldName": "contentDefinitionId", "operator": "Equals", "values": EVENT_CD},
{"fieldName": "slug", "operator": "Equals", "values": slug},
]
items = (search(sdk, filters=flt, size=1) or {}).get("items", [])
return items[0].get("identifiers") if items else None
def _normalize(value):
"""Reduce a field to a comparable form: a {id, ...} ref -> its id; a list -> normalized list."""
if isinstance(value, dict):
return value.get("id", value)
if isinstance(value, list):
return [_normalize(v) for v in value]
return value
def diff_event(expected, actual):
"""Diff the source-derived `expected` props against the pulled-back `actual` identifiers bag.
Returns a list of {field, source, platform} rows (refs compared by id); [] means they agree.
Only the keys we wrote are checked, so untouched platform fields never show as spurious diffs.
"""
if actual is None:
return [{"field": "(record)", "source": "present", "platform": "MISSING"}]
diffs = []
for key, exp in expected.items():
if _normalize(exp) != _normalize(actual.get(key)):
diffs.append({"field": key, "source": _normalize(exp), "platform": _normalize(actual.get(key))})
return diffs
# Expected schedule key -> the key it lands under in the get_live_schedule read-back (refs by id).
SCHEDULE_FIELDS = {
"slate_video": "slateVideo",
"postroll_video": "postrollVideo",
"archive_folder": "archiveFolderAsset",
"primary_live_input": "primaryLiveInput",
"external_output_profiles": "externalOutputProfiles",
}
def diff_schedule(expected, landed, unresolved):
"""Diff the source-derived live-schedule refs (by id) against the get_live_schedule read-back.
The live schedule is a SEPARATE object (not in the event content / search bag), so it is read
via get_live_schedule. add_live_schedule_to_event returns None on success AND failure (drift
#24), making this read the only trustworthy signal. Every name that did not resolve on the
target (`unresolved`) surfaces as its own diff; [] means everything that could land, landed.
"""
diffs = [{"field": field, "source": name, "platform": "UNRESOLVED ON TARGET"}
for field, name in (unresolved or {}).items()]
if landed is None:
# Nothing attached: a diff only if the source intended a schedule (the slate resolved -> the
# required postroll existed). A pure unresolved-name row is already reported above.
if expected.get("slate_video"):
diffs.append({"field": "(live schedule)", "source": "present", "platform": "MISSING"})
return diffs
for exp_key, landed_key in SCHEDULE_FIELDS.items():
exp = expected.get(exp_key)
if not exp or exp_key in (unresolved or {}):
continue # skipped/unresolved refs are reported above
if _normalize(exp) != _normalize(landed.get(landed_key)):
diffs.append({"field": exp_key, "source": _normalize(exp),
"platform": _normalize(landed.get(landed_key))})
return diffs
def verify_import(sdk, rows, event_type, refs, live_index):
"""Post-import check: pull each row's event + live schedule BACK and diff them against the source.
Rebuilds each row's source-derived payloads the SAME way the write did (build_payloads), so the
comparison reflects exactly what the import intended. Diffs the event via the search pull-back
AND the live schedule via get_live_schedule (a separate object); unresolved-on-target names
surface as diffs too. Prints a per-row, per-field report and returns {slug: [diffs]}. Resolution
stays a HUMAN decision: re-run the import (source wins), or fix the sheet - the diff says which.
"""
report = {}
for row in rows:
props, schedule = build_payloads(sdk, row, event_type, refs, live_index)
expected = {k: v for k, v in props.items() if v is not None}
slug = expected["slug"]
diffs = diff_event(expected, fetch_event_by_slug(sdk, slug))
event_id = find_event_id_by_slug(sdk, slug)
landed = get_live_schedule(sdk, event_id) if event_id else None # separate object (drift #24)
diffs += diff_schedule(schedule, landed, schedule["unresolved"])
if diffs:
report[slug] = diffs
if not report:
print(f"VERIFY: all {len(rows)} row(s) match the source (event + live schedule).")
else:
print(f"VERIFY: {len(report)} row(s) differ from the source (decide how to resolve):")
for slug, diffs in report.items():
for d in diffs:
print(f" [{slug}] {d['field']}: source={d['source']!r} platform={d['platform']!r}")
return report
def run_import(sdk, path, dry_run_first=True, create_missing_refs=CREATE_MISSING_REFS, verify=True):
"""Discover -> dup-scan -> resolve eventType -> ensure sub-tables -> index target live refs ->
dry-run row 1 -> upsert all -> verify (pull event + live schedule back and diff vs source)."""
rows = list(read_rows(path))
assert_unique_slugs(rows) # fail-closed before any write
fields = (get_content_definition(sdk, EVENT_CD) or {}).get("contentFields", [])
print("event fields:", [f["properties"]["propertyName"] for f in fields]) # mapping check
event_type = resolve_by_name(sdk, EVENT_TYPE_CD, EVENT_TYPE_NAME) # required for every row
if not event_type:
raise ValueError(f'eventType "{EVENT_TYPE_NAME}" not found on this deployment')
refs = ensure_references(sdk, rows, create_missing=create_missing_refs) # pre-pass: ONE per name
live_index = load_live_index(sdk) # target's live inputs + output profiles, by name
if dry_run_first and rows:
dry_run(sdk, rows[0], event_type, refs, live_index) # validate ONE row, then proceed
ids = [import_row(sdk, r, event_type, refs, live_index) for r in rows]
if verify:
verify_import(sdk, rows, event_type, refs, live_index) # pull back + diff; human resolves
return ids
def teardown_by_slug(sdk, slug):
"""Class C cleanup: delete the event created for slug (idempotent)."""
event_id = find_event_id_by_slug(sdk, slug)
if event_id:
delete_event(sdk, event_id, EVENT_CD)JavaScript
Same flow; parse the workbook with a JS xlsx reader (e.g. exceljs/xlsx) into row objects
keyed by header, then resolve/convert/upsert exactly as in Python. The reference shapes and the
write sequence are identical (createContent + properties-only updateContent). The
toUtcIso below assumes Excel serials; if your reader yields Date objects or text
cells (the sample's End is "4:30 PM"), coerce them first - mirror the Python _to_datetime
string branch so date/time text parses the same way.
// Components: getContentDefinition, search, createContent, updateContent, deleteContent,
// addLiveScheduleToEvent, getLiveSchedule, getLiveInputs, getLiveOutputProfiles,
// deleteEvent.
const EVENT_CD = "412a30e3-73ee-4eae-b739-e1fc87601c7d";
const EVENT_TYPE_CD = "0ee492a3-7875-4288-8690-f9895a44cb43";
const VENUE_CD = "7684c940-6532-44f9-bc7f-6d7d7da72c2f";
const PERFORMER_CD = "33cec5ca-6170-4237-842b-78bf1ef17932"; // generic extension (see REFERENCE_SPECS)
const TIMEZONE_CD = "6ffc9376-af95-4f70-864e-1b00b8f8a283";
const ASSET_CD = "3ff29f61-bd0b-4c17-b855-49db5a292aeb";
const EVENT_TYPE_NAME = "Auto Scheduled"; // customer assumption: every row uses this eventType
const EXCEL_EPOCH = Date.UTC(1899, 11, 30); // Excel day 0
const OFFSET_IS_MINUTES = false; // timeZone.utcOffset is in HOURS (dev-06); confirm at dry-run
// Opt-in: create absent sub-table records instead of leaving the event field unset.
const CREATE_MISSING_REFS = false;
// Name-only reference columns eligible for auto-create -> the sub-table CD each resolves against.
// Extend per deployment (e.g. Performer: PERFORMER_CD). Assets are excluded: media is ingested,
// never fabricated from a name. Each sub-table is created minimally (name + slug).
const REFERENCE_SPECS = { Venue: VENUE_CD };
function ref(targetId, description = null) {
// Strip a string id defensively - sheet GUID cells can carry trailing whitespace/newlines
// (the AIH26 sample's `External Output ID 2` ends in "\n").
if (typeof targetId === "string") targetId = targetId.trim();
return description ? { id: targetId, description } : { id: targetId };
}
function toUtcIso(dateSerial, timeFraction, offset) {
// Both cells as Excel serials: integer days + fractional day. UTC = local - offset(east).
const ms = EXCEL_EPOCH + (Math.trunc(dateSerial) + (timeFraction % 1)) * 86400000;
const off = OFFSET_IS_MINUTES ? offset * 60000 : offset * 3600000;
return new Date(ms - off).toISOString().replace(/\.\d{3}Z$/, "Z");
}
async function resolveByName(sdk, contentDefinitionId, name, field = "name") {
if (!name) return null;
const flt = [
{ fieldName: "contentDefinitionId", operator: "Equals", values: contentDefinitionId },
{ fieldName: field, operator: "Equals", values: name },
];
const items = ((await search(sdk, null, flt, null, 1)) || {}).items || [];
return items.length ? ref(items[0].id, name) : null;
}
async function loadLiveIndex(sdk) {
// Fetch the TARGET's live inputs + output profiles ONCE and index each by lowercased name.
// These are account-global lists (NOT content, so not searchable), and the sheet's source
// GUIDs are foreign to other deployments - so resolve those refs BY NAME against the target.
const index = (objs) => {
const out = new Map();
for (const o of objs || []) {
const name = o && o.name;
if (name) out.set(name.trim().toLowerCase(), ref(o.id, name));
}
return out;
};
return { inputs: index(await getLiveInputs(sdk)),
profiles: index(await getLiveOutputProfiles(sdk)) };
}
function liveRef(liveIndex, kind, name) {
// Resolve a live input / output profile from the loadLiveIndex cache by name (null on miss).
if (!name) return null;
return liveIndex[kind].get(name.trim().toLowerCase()) || null;
}
async function resolveFolderByName(sdk, path) {
// Resolve an archive FOLDER by name to a {id, description} ref; null if absent on the target.
// The sheet carries a slash path (e.g. "Content/Public/.../AIH-2026"); folders index under
// their LEAF segment, so search that. Narrow by asset/folder type if a name is ambiguous.
if (!path) return null;
const leaf = path.trim().replace(/^\/+|\/+$/g, "").split("/").pop();
const flt = [{ fieldName: "name", operator: "Equals", values: leaf }];
const items = ((await search(sdk, null, flt, null, 1)) || {}).items || [];
return items.length ? ref(items[0].id, leaf) : null;
}
function slugify(name) {
// Lowercase + hyphenate a display name into a slug for a NEW sub-table record.
return name.trim().toLowerCase().replace(/[^a-z0-9]+/g, "-").replace(/^-+|-+$/g, "");
}
async function createReference(sdk, contentDefinitionId, name) {
// create_content mints a blank row; one properties-only updateContent populates name+slug
// (updateContent returns null even on success - drift #20). Tear down with deleteContent.
const contentId = (await createContent(sdk, contentDefinitionId)).contentId;
await updateContent(sdk, contentId, contentDefinitionId, { name, slug: slugify(name) });
return ref(contentId, name);
}
async function ensureReferences(sdk, rows, createMissing = false) {
// Pre-pass: resolve every unique name-only reference ONCE; optionally create the missing ones.
// Returns a Map keyed `${cd}\n${nameLower}` -> {id, description} | null, so the later event
// writes reuse ids instead of re-searching (or re-creating) per row.
const cache = new Map();
for (const [column, cd] of Object.entries(REFERENCE_SPECS)) {
const names = [...new Set(rows.map((r) => (r[column] || "").trim()).filter(Boolean))].sort();
for (const name of names) {
let existing = await resolveByName(sdk, cd, name);
if (existing == null && createMissing) {
existing = await createReference(sdk, cd, name);
console.log(`created ${column}: ${name} -> ${existing.id}`);
}
cache.set(`${cd}\n${name.toLowerCase()}`, existing);
}
}
return cache;
}
function cachedRef(cache, contentDefinitionId, name) {
// Look up a pre-resolved reference from the ensureReferences cache (null if blank/absent).
if (!name) return null;
return cache.get(`${contentDefinitionId}\n${name.trim().toLowerCase()}`) || null;
}
async function resolveTimezoneOffset(sdk, name) {
if (!name) return null;
const flt = [
{ fieldName: "contentDefinitionId", operator: "Equals", values: TIMEZONE_CD },
{ fieldName: "name", operator: "Equals", values: name },
];
const items = ((await search(sdk, null, flt, null, 1)) || {}).items || [];
if (!items.length) return null;
// utcOffset is a STRING in HOURS under the nested `identifiers` bag ().
const raw = (items[0].identifiers || {}).utcOffset;
return raw == null || raw === "" ? null : Number(raw);
}
function assertUniqueSlugs(rows) {
const seen = new Set(), dupes = new Set();
for (const r of rows) {
const slug = (r.Slug || "").trim();
if (seen.has(slug)) dupes.add(slug); else seen.add(slug);
}
if (dupes.size) throw new Error(`duplicate slugs (aborting): ${[...dupes].join(", ")}`);
}
async function findEventIdBySlug(sdk, slug) {
const flt = [
{ fieldName: "contentDefinitionId", operator: "Equals", values: EVENT_CD },
{ fieldName: "slug", operator: "Equals", values: slug },
];
const items = ((await search(sdk, null, flt, null, 1)) || {}).items || [];
return items.length ? items[0].id : null;
}
async function buildEventProps(sdk, row, eventType, refs) {
// Source-derived properties patch (write 2). Factored out so the WRITE and the VERIFY use
// identical expected values - keeping them from drifting apart. Only non-null keys are sent.
const offset = (await resolveTimezoneOffset(sdk, row.Timezone)) || 0;
const props = {
name: row.Name, slug: row.Slug, eventType, // eventType REQUIRED or it 409s
startDatetime: toUtcIso(row.Date, row.Start, offset), // NOTE: lowercase 't'
endDatetime: toUtcIso(row.Date, row.End, offset),
shortDescription: row["Short Description"],
venue: cachedRef(refs, VENUE_CD, row.Venue), // pre-resolved (maybe auto-created)
};
return Object.fromEntries(Object.entries(props).filter(([, v]) => v != null));
}
async function buildSchedule(sdk, row, liveIndex) {
// Resolve every live-schedule ref BY NAME on the TARGET (liveIndex + search); the sheet's GUID
// columns hold the SOURCE deployment's ids. The slate doubles as the REQUIRED postroll (drift
// #23). Names that miss go in `unresolved` so the write skips them and verify reports them.
const slate = await resolveByName(sdk, ASSET_CD, row["Slate Video"]);
const primary = liveRef(liveIndex, "inputs", row["Primary Input Channel"]);
const archive = await resolveFolderByName(sdk, row["Archive Folder"]);
const profiles = [], unresolved = {};
for (const col of ["External Output Profile", "External Output Profile 2"]) {
const name = (row[col] || "").trim();
if (!name) continue;
const resolved = liveRef(liveIndex, "profiles", name);
if (resolved) profiles.push(resolved);
else (unresolved.external_output_profiles ||= []).push(name);
}
for (const [field, name, resolved] of [["slate_video", row["Slate Video"], slate],
["primary_live_input", row["Primary Input Channel"], primary],
["archive_folder", row["Archive Folder"], archive]]) {
if ((name || "").trim() && !resolved) unresolved[field] = name.trim();
}
return {
slate_video: slate,
postroll_video: slate, // REQUIRED (drift #23): reuse the resolved slate
archive_folder: archive,
primary_live_input: primary,
external_output_profiles: profiles.length ? profiles : null,
unresolved, // {field: name(s)} that did not resolve on the target
};
}
async function importRow(sdk, row, eventType, refs, liveIndex) {
// Idempotent upsert: create-or-find the event, patch props, attach live schedule.
// eventType is the "Auto Scheduled" ref resolved ONCE by the caller; `refs` is the
// ensureReferences cache; `liveIndex` is the target's live inputs/profiles indexed by name.
let eventId = await findEventIdBySlug(sdk, row.Slug); // id -> update in place
if (!eventId) eventId = (await createContent(sdk, EVENT_CD)).contentId;
// updateContent returns null even on success () - keep eventId from create.
await updateContent(sdk, eventId, EVENT_CD, await buildEventProps(sdk, row, eventType, refs));
const s = await buildSchedule(sdk, row, liveIndex);
// Write 3 needs the REQUIRED postroll (drift #23) = the resolved slate. Attach ONLY when the
// slate name resolves on the target; otherwise skip and let verify report what was unresolved.
// addLiveScheduleToEvent returns null on success AND failure (drift #24) - never trust it.
if (s.slate_video) {
await addLiveScheduleToEvent(sdk, eventId, s.slate_video, null, s.postroll_video, null,
s.archive_folder, s.primary_live_input, null, null, null, s.external_output_profiles);
} else if (Object.keys(s.unresolved).length) {
console.log(` [${row.Slug}] live schedule SKIPPED (unresolved on target):`, s.unresolved);
}
return eventId;
}
async function fetchEventBySlug(sdk, slug) {
// Pull one event BACK via the SEARCH API; return its `identifiers` bag (the indexed field
// values) or null. That bag already carries every field we wrote - name/slug/startDatetime/
// endDatetime/shortDescription and the venue/eventType refs - so verify needs no extra read.
const flt = [
{ fieldName: "contentDefinitionId", operator: "Equals", values: EVENT_CD },
{ fieldName: "slug", operator: "Equals", values: slug },
];
const items = ((await search(sdk, null, flt, null, 1)) || {}).items || [];
return items.length ? items[0].identifiers : null;
}
function normalizeField(value) {
// Reduce a field to a comparable form: a {id, ...} ref -> its id; an array -> normalized array.
if (Array.isArray(value)) return value.map(normalizeField);
if (value && typeof value === "object") return value.id ?? value;
return value;
}
function diffEvent(expected, actual) {
// Diff source-derived `expected` props against the pulled-back `actual` identifiers bag.
// Returns [{field, source, platform}] (refs compared by id); [] means they agree.
if (actual == null) return [{ field: "(record)", source: "present", platform: "MISSING" }];
const diffs = [];
for (const [key, exp] of Object.entries(expected)) {
const e = normalizeField(exp), a = normalizeField(actual[key]);
if (JSON.stringify(e) !== JSON.stringify(a)) diffs.push({ field: key, source: e, platform: a });
}
return diffs;
}
// Expected schedule key -> the key it lands under in the getLiveSchedule read-back (refs by id).
const SCHEDULE_FIELDS = {
slate_video: "slateVideo",
postroll_video: "postrollVideo",
archive_folder: "archiveFolderAsset",
primary_live_input: "primaryLiveInput",
external_output_profiles: "externalOutputProfiles",
};
function diffSchedule(expected, landed, unresolved) {
// Diff source-derived live-schedule refs (by id) against the getLiveSchedule read-back. The
// schedule is a SEPARATE object (not in the event content/search bag); addLiveScheduleToEvent
// returns null on success AND failure (drift #24), so this read is the only trustworthy signal.
// Every name unresolved on the target surfaces as its own diff; [] means all that could, landed.
const diffs = Object.entries(unresolved || {}).map(
([field, name]) => ({ field, source: name, platform: "UNRESOLVED ON TARGET" }));
if (landed == null) {
if (expected.slate_video) // source intended a schedule (slate -> postroll existed)
diffs.push({ field: "(live schedule)", source: "present", platform: "MISSING" });
return diffs;
}
for (const [expKey, landedKey] of Object.entries(SCHEDULE_FIELDS)) {
const exp = expected[expKey];
if (!exp || expKey in (unresolved || {})) continue; // skipped/unresolved reported above
const e = normalizeField(exp), a = normalizeField(landed[landedKey]);
if (JSON.stringify(e) !== JSON.stringify(a)) diffs.push({ field: expKey, source: e, platform: a });
}
return diffs;
}
async function verifyImport(sdk, rows, eventType, refs, liveIndex) {
// Post-import check: pull each row's event + live schedule BACK and diff them against the source.
// The event comes via the search bag; the live schedule via getLiveSchedule (a separate object).
// Prints a per-row, per-field report (source vs platform); returns {slug: [diffs]}. Resolution
// stays a HUMAN call: re-run the import to re-apply the sheet, or fix the sheet.
const report = {};
for (const row of rows) {
const expected = await buildEventProps(sdk, row, eventType, refs);
const schedule = await buildSchedule(sdk, row, liveIndex);
const diffs = diffEvent(expected, await fetchEventBySlug(sdk, expected.slug));
const eventId = await findEventIdBySlug(sdk, expected.slug);
const landed = eventId ? await getLiveSchedule(sdk, eventId) : null; // separate object (drift #24)
diffs.push(...diffSchedule(schedule, landed, schedule.unresolved));
if (diffs.length) report[expected.slug] = diffs;
}
const n = Object.keys(report).length;
if (!n) {
console.log(`VERIFY: all ${rows.length} row(s) match the source (event + live schedule).`);
} else {
console.log(`VERIFY: ${n} row(s) differ from the source (decide how to resolve):`);
for (const [slug, diffs] of Object.entries(report))
for (const d of diffs)
console.log(` [${slug}] ${d.field}: source=${JSON.stringify(d.source)} platform=${JSON.stringify(d.platform)}`);
}
return report;
}
async function runImport(sdk, rows, { dryRunFirst = true, createMissingRefs = CREATE_MISSING_REFS, verify = true } = {}) {
// Resolve eventType ONCE, ensure sub-tables ONCE, index target live refs, upsert every row
// (dup-scan first), then verify (pull event + live schedule back and diff vs source).
assertUniqueSlugs(rows);
const eventType = await resolveByName(sdk, EVENT_TYPE_CD, EVENT_TYPE_NAME);
if (!eventType) throw new Error(`eventType "${EVENT_TYPE_NAME}" not found on this deployment`);
const refs = await ensureReferences(sdk, rows, createMissingRefs); // pre-pass: ONE per name
const liveIndex = await loadLiveIndex(sdk); // target inputs + profiles, by name
void dryRunFirst; // dry-run is the human gate; print row[0] payloads here if desired
const ids = [];
for (const row of rows) ids.push(await importRow(sdk, row, eventType, refs, liveIndex));
if (verify) await verifyImport(sdk, rows, eventType, refs, liveIndex); // pull back + diff; human resolves
return ids;
}
async function teardownBySlug(sdk, slug) {
const eventId = await findEventIdBySlug(sdk, slug);
if (eventId) await deleteEvent(sdk, eventId, EVENT_CD);
}Notes
- Discover first.
get_content_definition(EVENT_CD)returnscontentFields[]; printingproperties.propertyNameconfirms the field names (slug,venue,shortDescription, …) against the mapping table before any write. The offline equivalent isreference/schema.md(event/eventType/venue field tables). - Write events via the primitives, not
create_and_update_event. That SDK wrapper's update path is a no-op, it misspellsstartDatetime/endDatetime, and it swallows the required-eventType409. We usecreate_content(new slugs) + a single properties-onlyupdate_contentinstead. eventTypeis required. Every row defaults to "Auto Scheduled" (customer assumption), resolved once to a{id, description}ref. Omit it and theupdate_contentwrite409s ("The event type is missing or invalid"), which the SDK swallows to a silent no-write.- Idempotency is the slug. Each row's
slugis the natural key:find_event_id_by_slugreturns an existingcontentId(→ update in place) orNone(→create_content), so re-running the import updates in place instead of duplicating.assert_unique_slugsaborts fail-closed if the sheet itself has duplicate slugs. - Validate one row before the batch.
dry_runprints the exactprops/schedulepayloads for row 1. Eyeball the convertedstartDatetimeagainst the sheet to confirm thetimeZone.utcOffsetunit (hours) and sign - it's read from the nestedidentifiersbag as a string and cast; flipOFFSET_IS_MINUTES/ the subtraction only if a deployment reports it differently - then run the full import. - Reference shapes.
eventType,venue,slate_video,primary_live_input,archive_folder, and eachexternal_output_profileselement are{id, description}dicts (a bare GUID is wrapped as{id}). The* IDcolumns are already-resolved GUIDs; the name-only columns are resolved by searching their CD. - Auto-create missing sub-tables (opt-in). With
CREATE_MISSING_REFS = True,ensure_referencespre-creates each unique unresolvedVenue/Performer(create_content+update_content({name, slug})) once for the whole sheet and caches the ids, so events reuse them. DefaultFalseleaves an unresolved reference unset. Assets are excluded (media is ingested, never name-fabricated). See Create missing sub-tables above. - Live schedule refs resolve by NAME on the target. Write 3 (
add_live_schedule_to_event) resolves the slate/input/folder/output-profile from the row's name columns against the target deployment (the sheet's parallel* IDcolumns are the source env's GUIDs - ignored). It attaches only when the slate name resolves, because the schedule's requiredpostroll_videoreuses that resolved slate; any name that misses is skipped and reported by verify.preroll/backup/secure-output andlongDescriptionare left null per the import plan - add them later by extendingbuild_payloads. update_contentis a partial patch (read-modify-write) - only the non-nullpropskeys are sent, so unrelated event fields are untouched. It returnsNoneeven on success, so the code keeps thecontentIdfromcreate_content. Seerecipes/content-update.md.- Verify by pulling back - event AND live schedule.
verify_import(default-on tail ofrun_import) re-searches each event byslug, reads the written fields straight from the result'sidentifiersbag, and reads the live schedule back withget_live_schedule(a separate object - the only trustworthy signal,). It diffs both against the source-derived payloads (refs compared byid, only written keys checked) and reports{slug: [{field, source, platform}]}- includingUNRESOLVED ON TARGETrows for any live-schedule name with no match - for a human to resolve (re-import, or fix the sheet/target); it never auto-reconciles. See Verify the import above. - Tear down what you create (Class C). Events have no folder anchor, so the run-root cascade
cannot reclaim them - delete each by slug with
delete_event/teardown_by_slug. Non-prod only (validated ondev-06); a production import must be explicitly human-approved.
