Workflow Message Granule Writes
Overview
When an AWS Step Function Event occurs for a Cumulus workflow or a write is attempted via the sf-sqs-report task a message is dispatched to the sfEventSqsToDbRecordsInputQueue for processing.
Messages on the sfEventSqsToDbRecordsInputQueue (which correspond to lambda invocations or workflow events) are processed in batches of 10 and the sfEventSqsToDbRecords Lambda is triggered for each. The corresponding execution/PDR is attempted to write, then the granule records associated with the message are also attempted to be written.
For each granule in the batch of granules one of the following occurs:
- The granule is written successfully.
- The granule write is dropped, due to asynchronous write constraints.
- The lambda fails to write the granule in an unexpected way (e.g. lambda failure, AWS outage, etc).   In this case, the granule will become visible again after the sfEventSqsToDbRecordsInputQueuevisibility timeout (currently set as a function of the rds_connection_timing_configuration terraform variable:
var.rds_connection_timing_configuration.acquireTimeoutMillis / 1000) + 60
- The granule fails to write due to a schema violation, database connection issue or other expected/caught error. The message is immediately written to the Dead Letter Archive for manual intervention/investigation.
Caveats
- All non-bulk Cumulus API granule operations are not constrained by this logic and do not utilize the SQS update queue. They are instead invoked synchronously and follow expected RESTful logic without any asynchronous write constraints or default message values.
- This information is correct as of release v16 of Cumulus Core. Please review the CHANGELOG and migration instructions for updated features/changes/bugfixes.
Granule Write Constraints
For each granule to be written, the following constraints apply:
- granuleIdmust be unique.- Granule write will not be allowed if - granuleIdalready exists in the database for another collection, granules in this state will be rejected to write and wind up in the Dead Letter Archive
- Message granule must match the API Granule schema. - If not the write will be rejected, the granule status will be updated to - failed, and the message will wind up in the Dead Letter Archive
- If the granule is being updated to a - running/- queuedstatus:- Only status,timestamp,updated_atandcreated_atare updated. All other values are retained as they currently exist in the database.
- The write will only be allowed if the following are true, else the write request will be ignored as out-of-order/stale:- The granule createdAt value is newer or the same as the existing record.
- If the granule is being updated to running, the execution the granule is being associated with doesn’t already exist in the following states:completed,failed.
- If the granule is being updated to queued, the execution the granule is being associated with does not exist in any state in the database.
 
 
- Only 
- If the granule is being updated to a failed/completed state: - All fields provided will override existing values in the database, if any.
- The write will only be allowed if the following are true, else the write request will be ignored as out-of-order/stale:- The granule createdAt value is newer or the same as the existing record.
 
 
Message Granule Write Behavior
The granule object values are set based on the incoming Cumulus Message values (unless otherwise specified the message values overwrite the granule payload values):
| Column | Value | 
|---|---|
| collection | Derived from meta.collection.name and meta.collection.version | 
| createdAt | Defaults to cumulus_meta.workflow_start_time, elsepayload.granule.createdAt | 
| duration | Calculated based on the delta between cumulus_meta.workflow_start_timeand when the database message writes | 
| error | Object taken directly from the message.errorobject | 
| execution | Derived from cumulus_meta.state_machineandcumulus_meta.execution_name | 
| files | Taken directly from payload.granule.files.   If files isnull, set it to an empty list[] | 
| pdrName | Taken directly from payload.pdr.name | 
| processingEndDateTime | Derived from AWS API interrogation ( sfn().describeExecution)  based onexecutionvalue | 
| processingStartDateTime | Derived from AWS API interrogation ( sfn().describeExecution)  based onexecutionvalue | 
| productVolume | Sums the values of the passed in payload.granules.files.size.   Does not validate against S3 | 
| provider | Inferred from meta.providervalue in cumulus message | 
| published | Taken directly from granule.published, if not specified or null is specified, defaults tofalse | 
| queryFields | Object taken directly from meta.granule.queryFields | 
| status | Taken directly from meta.status | 
| status | Uses meta.statusif provided, elsepayload.granule.status | 
| timeStamp | Set to the date-time value for the sfEventSqsToDbRecordsinvocation | 
| timeToArchive | Taken from payload.granule.post_to_cmr_duration/1000, provided by Core task or user task.  Value will be set to zero if no value set | 
| timeToPreprocess | payload.granule.sync_granule_duration, provided by core or user task. Will set to 0 if value is not set | 
| updatedAt | Set to the date-time value for the sfEventSqsToDbRecordsinvocation | 
| beginningDateTime | See: CMR Temporal Values section below | 
| endingDateTime | See: CMR Temporal Values section below | 
| productionDateTime | See: CMR Temporal Values section below | 
| lastUpdateDateTime | See: CMR Temporal Values section below | 
CMR Temporal Values
The following fields are generated based on values in the associated granule CMR file, if available:
- beginningDateTime - If there is a beginning and end DateTime: - UMMG: TemporalExtent.RangeDateTime.BeginningDateTime
- ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimePeriod:gml:beginPosition
 
- UMMG: 
- If not: - UMMG: TemporalExtent.SingleDateTime
- ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimeInstant.gml:timePosition
 
- UMMG: 
 
- endingDateTime - If there is a beginning and end DateTime: - UMMG: TemporalExtent.RangeDateTime.BeginningDateTime
- ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimePeriod:gml:beginPosition
 
- UMMG: 
- If not: - UMMG: TemporalExtent.SingleDateTime
- ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimeInstant.gml:timePosition
 
- UMMG: 
 
- productionDateTime - UMMG: DataGranule.ProductionDateTime
- ISO: gmd:identificationInfo:gmd:dataQualityInfo.gmd:DQ_DataQuality.gmd:lineage.gmd:LI_Lineage.gmd:processStep.gmi:LE_ProcessStep.gmd:dateTime.gco:DateTime
 
- UMMG: 
- lastUpdateDateTime - UMMG:
 - Given DataGranule.ProductionDateTime values where Type is in - Update,- Insert,- Create, select most recent value.- ISO: Given a node matching gmd:MD_DataIdentification.gmd:citation.gmd:CI_Citation.gmd:title.gco:CharacterString===UpdateTime, usegmd:identificationInfo:gmd:MD_DataIdentification.gmd:citation.gmd:CI_Citation.gmd:date.gmd:CI_Date.gmd:date.gco:DateTime