Custom Asset Processors

Guide for building a custom Lambda processor for Nomad Media — Lambda types, data structures, Redis state management, and deployment.

Nomad Media supports custom background processors implemented as AWS Lambda functions. When an asset is uploaded to a configured S3 bucket, the pipeline automatically triggers. Custom processors integrate into this pipeline by following the standard patterns for input handling, state management via Redis, related asset registration, and manifest finalization.

Background

The processing pipeline uses background Lambdas that call AWS services (Rekognition, Transcribe, Textract, etc.) and write results to a manifest file (JSON). A custom processor follows the same conventions to ensure its output is available alongside built-in processor output.

Audience

This guide is for developers writing new Lambda processors for server-side asset processing. It covers how to create a Lambda, integrate it with the Nomad Media pipeline, and ensure that the resulting metadata is available in the asset manifest.

Lambda Types

There are four Lambda patterns used in the pipeline:

  1. Standard / Synchronous Lambda — Handles an SQS event. Starts a job, completes it, and exits. Input follows aws.SQSMessage format with a body matching the LambdaInput struct.

  2. Batch Generation Lambda — Used for processing that cannot be handled in a single Lambda invocation. Receives SQS input, creates multiple batches, and fires them into a Batch Handling Lambda.

  3. Batch Handling Lambda — Receives batches from a Batch Generation Lambda. Input format is specific to the Lambda. May hand off to external services (e.g., Rekognition) and then exit. If it calls another Lambda, it must supply LambdaInput as input.

  4. Pass-through Asynchronous Lambda — Handles events from sources other than SQS (CloudWatch Events, S3 Events, SNS). If it calls an SQS-consuming Lambda downstream, it must supply LambdaInput.

LambdaInput Structure

The standard input struct passed between Lambdas:

type LambdaInput struct {
    AssetID             string   `json:"assetId,omitempty"`
    BucketName          string   `json:"bucketName,omitempty"`
    ObjectKey           string   `json:"objectKey,omitempty"`
    CompleteSQSTriggers []string `json:"completedSqsTriggers,omitempty"`
    SourceAssetID       string   `json:"sourceAssetId,omitempty"`
    TriggeredBy         string   `json:"triggeredBy,omitempty"`
    // Rekognition-specific
    SegmentID           string   `json:"segmentID,omitempty"`
    SegmentOrder        int64    `json:"segmentOrder,omitempty"`
    RekognitionJobType  string   `json:"jobType,omitempty"`
    RequestID           string   `json:"requestId,omitempty"`
}

At minimum, assetId and triggeredBy must be populated.

Naming Conventions

Output file names follow the pattern <processortype>.<extension> (e.g., rekognitionimagedetecttext.json).

Files are stored under:

  • Metadata content bucket — for data accessed by the UI
  • Metadata archive bucket — for internal processing data not accessed by the UI

Both buckets use the same path structure:

<bucket>/<first-2>/<next-2>/<next-2>/<full-assetId>/<filename>

Example:

metadata-content/4d/5b/02/4d5b02b2-b829-47c8-b8a3-eb5f68af4bd7/manifest_asset.json

Bucket names are configured in system/metadataBucketSettings.

Error Handling and Logging

Redis is used as the centralized state hub during processing. The Redis connection config is read from AWS Secrets Manager at <projectPrefix>/redis:

{ "Host": "<host>", "Port": "<port>" }

Logging format:

{
  "Level": "INFO",
  "Date": "2020-11-17T14:59:25.794Z",
  "LogSource": "",
  "Message": "",
  "RequestID": "",
  "LogName": "",
  "InstanceID": "",
  "Properties": {}
}

WARN and ERROR levels additionally include:

  • "error": ""
  • "errorVerbose": ""

Error handling rules:

  • Lambdas should never return an error — log it instead.
  • Lambdas should always dequeue the SQS message that spawned them, unless the error is an AWS throttling error.

Registering a Running Job in Redis

When your Lambda starts, write a key to Redis to register it as running:

<assetId>_job_<processorName>

Example: 6c495d10-2c72-4961-9edc-746f2b35bc75_job_RekognitionDetectLabels

When your Lambda finishes (success or error), delete this key.

Related Assets

Any files generated by your processor must be registered as related assets in Redis before the manifest is finalized. The Redis key depends on file type:

File TypeRedis Key
Audio<assetId>_RelatedAudio
Video<assetId>_RelatedVideos
Image<assetId>_RelatedImages
Anything else<assetId>_RelatedAssets

If the key already exists, append to the array. Otherwise, create a new array with a single entry.

RelatedAsset struct:

type RelatedAsset struct {
    ID                  string    `json:"id,assetId,AssetID,omitempty"`
    JobID               string    `json:"jobID"`
    URL                 string    `json:"url,omitempty"`
    LanguageCode        string    `json:"languageCode,omitempty"`
    MetadataTypeDisplay string    `json:"metadataTypeDisplay,omitempty"`
    MetadataType        int       `json:"metadataType,omitempty"`
    Language            *Language `json:"language,omitempty"`
    Title               string    `json:"title,omitempty"`
}

type Language struct {
    ID             string `json:"id"`
    Description    string `json:"title"`
    LanguageName   string `json:"translatedTitle"`
    FiveLetterCode string `json:"iso5"`
    TwoLetterCode  string `json:"iso2"`
}
  • URL: <bucketName>::<objectKey>
  • MetadataType: numeric ID of the metadata type
  • MetadataTypeDisplay: written name of the metadata type
  • Title: MetadataTypeDisplay split by word boundaries and title-cased (e.g., RekognitionImageDetectTextRekognition Image Detect Text)
  • Language: populate only when relevant (e.g., transcripts, subtitles)
  • JobID: the job ID that ran the processor

Jobs

Each processor run must also be registered in Redis as a job. Append to <assetId>_Jobs (or create the array if it does not exist):

type Job struct {
    ID          string    `json:"id,omitempty"`
    Duration    string    `json:"duration,omitempty"`
    StartTime   time.Time `json:"startTime,omitempty"`
    EndTime     time.Time `json:"EndTime,omitempty"`
    ItemCount   int       `json:"itemCount,omitempty"`
    Name        string    `json:"name,omitempty"`
    Status      string    `json:"status,omitempty"`
    Description string    `json:"description,omitempty"`
    TriggeredBy string    `json:"triggeredBy,omitempty"`
}
  • Duration: seconds
  • StartTime / EndTime: set at Lambda start and finish
  • Name: descriptive Lambda name (e.g., Rekognition, not demo3-process-rekognition)
  • Status: Started, Error, or Completed
  • TriggeredBy: the Lambda that triggered this job

PostProcessAsset — Finalizing the Manifest

When your Lambda finishes, check Redis for remaining jobs:

MGET <assetId>_job*
  • If any jobs remain → exit. Another processor will handle finalization.
  • If no jobs remain → your processor is the last one. Post a message to the SQS URL at system/SQSQueueSettings -> postProcessAssetQueueUrl:
{
  "assetId": "<the asset id being processed>",
  "triggeredBy": "<the name of your lambda>"
}

This triggers the PostProcessAsset Lambda, which assembles and registers the final manifest.

Environment Variables

VariableRequiredDescription
ProjectPrefixYesUnique string identifying the environment
configS3BucketPathYes<bucketName>/<objectKey> — path to the environment config file in S3
AdminServiceUrlIf using .NET APIsURL for Nomad Media Admin API
ApiUrlIf using .NET APIsURL for Nomad Media Portal API
KeepRedisAssetTesting onlySet to any value to prevent Redis cleanup after processing. Never use in production.

Adding Your Processor to the System

A new processor requires a JobType to be added to the Processors and JobService.JobType struct. This is currently a manual step performed by the Nomad Media team. Contact your team liaison to register the new processor type before deployment.

To enable the processor on assets, add it to the processorList in the application/processors config section. See Turning On/Off Asset Processors for details.