Skip to main content
Version: v13.0.0

Ingest Notification in Workflows

On deployment, an SQS queue and three SNS topics, one for executions, granules, and PDRs, are created and used for handling notification messages related to the workflow.

The ingest notification reporting SQS queue is populated via a Cloudwatch rule for any Step Function execution state transitions. The sfEventSqsToDbRecords Lambda consumes this queue. The queue and Lambda are included in the cumulus module and the Cloudwatch rule in the workflow module and are included by default in a Cumulus deployment.

The sfEventSqsToDbRecords Lambda function reads from the sfEventSqsToDbRecordsInputQueue queue and updates the RDS database records for granules, executions, and PDRs. When the records are updated, messages are posted to the three SNS topics. This Lambda is invoked both when the workflow starts and when it reaches a terminal state (completion or failure).

Diagram of architecture for reporting workflow ingest notifications from AWS Step Functions

Sending SQS messages to report status

Publishing granule/PDR reports directly to the SQS queue

If you have a non-Cumulus workflow or process ingesting data and would like to update the status of your granules or PDRs, you can publish directly to the reporting SQS queue. Publishing messages to this queue will result in those messages being stored as granule/PDR records in the Cumulus database and having the status of those granules/PDRs being visible on the Cumulus dashboard. The queue does have certain expectations as it expects a Cumulus Message nested within a Cloudwatch Step Function Event object.

Posting directly to the queue will require knowing the queue URL. Assuming that you are using the cumulus module for your deployment, you can get the queue URL by adding them to outputs.tf for your Terraform deployment as in our example deployment:

output "stepfunction_event_reporter_queue_url" {
value = module.cumulus.stepfunction_event_reporter_queue_url
}

output "report_executions_sns_topic_arn" {
value = module.cumulus.report_executions_sns_topic_arn
}
output "report_granules_sns_topic_arn" {
value = module.cumulus.report_executions_sns_topic_arn
}
output "report_pdrs_sns_topic_arn" {
value = module.cumulus.report_pdrs_sns_topic_arn
}

Then, when you run terraform deploy, you should see the topic ARNs printed to your console:

Outputs:
...
stepfunction_event_reporter_queue_url = https://sqs.us-east-1.amazonaws.com/xxxxxxxxx/<prefix>-sfEventSqsToDbRecordsInputQueue
report_executions_sns_topic_arn = arn:aws:sns:us-east-1:xxxxxxxxx:<prefix>-report-executions-topic
report_granules_sns_topic_arn = arn:aws:sns:us-east-1:xxxxxxxxx:<prefix>-report-executions-topic
report_pdrs_sns_topic_arn = arn:aws:sns:us-east-1:xxxxxxxxx:<prefix>-report-pdrs-topic

Once you have the queue URL, you can use the AWS SDK for your language of choice to publish messages to the topic. The expected format of these messages is that of a Cloudwatch Step Function event containing a Cumulus message. For SUCCEEDED events, the Cumulus message is expected to be in detail.output. For all other events statuses, a Cumulus Message is expected in detail.input. The Cumulus Message populating these fields MUST be a JSON string, not an object. Messages that do not conform to the schemas will fail to be created as records.

If you are not seeing records persist to the database or show up in the Cumulus dashboard, you can investigate the Cloudwatch logs of the SQS consumer Lambda:

  • /aws/lambda/<prefix>-sfEventSqsToDbRecords

In a workflow

As described above, ingest notifications will automatically be published to the SNS topics on workflow start and completion/failure, so you should not include a workflow step to publish the initial or final status of your workflows.

However, if you want to report your ingest status at any point during a workflow execution, you can add a workflow step using the SfSqsReport Lambda. In the following example from cumulus-tf/parse_pdr_workflow.tf, the ParsePdr workflow is configured to use the SfSqsReport Lambda, primarily to update the PDR ingestion status.

Note: ${sf_sqs_report_task_arn} is an interpolated value referring to a Terraform resource. See the example deployment code for the ParsePdr workflow.

  "PdrStatusReport": {
"Parameters": {
"cma": {
"event.$": "$",
"ReplaceConfig": {
"FullMessage": true
},
"task_config": {
"cumulus_message": {
"input": "{$}"
}
}
}
},
"ResultPath": null,
"Type": "Task",
"Resource": "${sf_sqs_report_task_arn}",
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException"
],
"IntervalSeconds": 2,
"MaxAttempts": 6,
"BackoffRate": 2
}
],
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"ResultPath": "$.exception",
"Next": "WorkflowFailed"
}
],
"Next": "WaitForSomeTime"
},

Subscribing additional listeners to SNS topics

Additional listeners to SNS topics can be configured in a .tf file for your Cumulus deployment. Shown below is configuration that subscribes an additional Lambda function (test_lambda) to receive messages from the report_executions SNS topic. To subscribe to the report_granules or report_pdrs SNS topics instead, simply replace report_executions in the code block below with either of those values.

resource "aws_lambda_function" "test_lambda" {
function_name = "${var.prefix}-testLambda"
filename = "./testLambda.zip"
source_code_hash = filebase64sha256("./testLambda.zip")
handler = "index.handler"
role = module.cumulus.lambda_processing_role_arn
runtime = "nodejs10.x"
}

resource "aws_sns_topic_subscription" "test_lambda" {
topic_arn = module.cumulus.report_executions_sns_topic_arn
protocol = "lambda"
endpoint = aws_lambda_function.test_lambda.arn
}

resource "aws_lambda_permission" "test_lambda" {
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.test_lambda.arn
principal = "sns.amazonaws.com"
source_arn = module.cumulus.report_executions_sns_topic_arn
}

SNS message format

Subscribers to the SNS topics can expect to find the published message in the SNS event at Records[0].Sns.Message. The message will be a JSON stringified version of the ingest notification record for an execution or a PDR. For granules, the message will be a JSON stringified object with ingest notification record in the record property and the event type as the event property.

The ingest notification record of the execution, granule, or PDR should conform to the data model schema for the given record type.

Summary

Workflows can be configured to send SQS messages at any point using the sf-sqs-report task.

Additional listeners can be easily configured to trigger when messages are sent to the SNS topics.