Skip to main content
Version: Next

Workflow Message Granule Writes

Overview

When an AWS Step Function Event occurs for a Cumulus workflow or a write is attempted via the sf-sqs-report task a message is dispatched to the sfEventSqsToDbRecordsInputQueue for processing.

Messages on the sfEventSqsToDbRecordsInputQueue (which correspond to lambda invocations or workflow events) are processed in batches of 10 and the sfEventSqsToDbRecords Lambda is triggered for each. The corresponding execution/PDR is attempted to write, then the granule records associated with the message are also attempted to be written.

For each granule in the batch of granules one of the following occurs:

  • The granule is written successfully.
  • The granule write is dropped, due to asynchronous write constraints.
  • The lambda fails to write the granule in an unexpected way (e.g. lambda failure, AWS outage, etc). In this case, the granule will become visible again after the sfEventSqsToDbRecordsInputQueue visibility timeout (currently set as a function of the rds_connection_timing_configuration terraform variable:
var.rds_connection_timing_configuration.acquireTimeoutMillis / 1000) + 60
  • The granule fails to write due to a schema violation, database connection issue or other expected/caught error. The message is immediately written to the Dead Letter Archive for manual intervention/investigation.

Caveats

  • All non-bulk Cumulus API granule operations are not constrained by this logic and do not utilize the SQS update queue. They are instead invoked synchronously and follow expected RESTful logic without any asynchronous write constraints or default message values.
  • This information is correct as of release v16 of Cumulus Core. Please review the CHANGELOG and migration instructions for updated features/changes/bugfixes.

Granule Write Constraints

For each granule to be written, the following constraints apply:

  • granuleId must be unique.

    Granule write will not be allowed if granuleId already exists in the database for another collection, granules in this state will be rejected to write and wind up in the Dead Letter Archive

  • Message granule must match the API Granule schema.

    If not the write will be rejected, the granule status will be updated to failed, and the message will wind up in the Dead Letter Archive

  • If the granule is being updated to a running/queued status:

    • Only status, timestamp, updated_at and created_at are updated. All other values are retained as they currently exist in the database.
    • The write will only be allowed if the following are true, else the write request will be ignored as out-of-order/stale:
      • The granule createdAt value is newer or the same as the existing record.
      • If the granule is being updated to running, the execution the granule is being associated with doesn’t already exist in the following states: completed, failed.
      • If the granule is being updated to queued, the execution the granule is being associated with does not exist in any state in the database.
  • If the granule is being updated to a failed/completed state:

    • All fields provided will override existing values in the database, if any.
    • The write will only be allowed if the following are true, else the write request will be ignored as out-of-order/stale:
      • The granule createdAt value is newer or the same as the existing record.

Message Granule Write Behavior

The granule object values are set based on the incoming Cumulus Message values (unless otherwise specified the message values overwrite the granule payload values):

ColumnValue
collectionDerived from meta.collection.name and meta.collection.version
createdAtDefaults to cumulus_meta.workflow_start_time, else payload.granule.createdAt
durationCalculated based on the delta between cumulus_meta.workflow_start_time and when the database message writes
errorObject taken directly from the message.error object
executionDerived from cumulus_meta.state_machine and cumulus_meta.execution_name
filesTaken directly from payload.granule.files. If files is null, set it to an empty list []
pdrNameTaken directly from payload.pdr.name
processingEndDateTimeDerived from AWS API interrogation (sfn().describeExecution) based on execution value
processingStartDateTimeDerived from AWS API interrogation (sfn().describeExecution) based on execution value
productVolumeSums the values of the passed in payload.granules.files.size. Does not validate against S3
providerInferred from meta.provider value in cumulus message
publishedTaken directly from granule.published, if not specified or null is specified, defaults to false
queryFieldsObject taken directly from meta.granule.queryFields
statusTaken directly from meta.status
statusUses meta.status if provided, else payload.granule.status
timeStampSet to the date-time value for the sfEventSqsToDbRecords invocation
timeToArchiveTaken from payload.granule.post_to_cmr_duration/1000, provided by Core task or user task. Value will be set to zero if no value set
timeToPreprocesspayload.granule.sync_granule_duration, provided by core or user task. Will set to 0 if value is not set
updatedAtSet to the date-time value for the sfEventSqsToDbRecords invocation
beginningDateTimeSee: CMR Temporal Values section below
endingDateTimeSee: CMR Temporal Values section below
productionDateTimeSee: CMR Temporal Values section below
lastUpdateDateTimeSee: CMR Temporal Values section below

CMR Temporal Values

The following fields are generated based on values in the associated granule CMR file, if available:

  • beginningDateTime

    • If there is a beginning and end DateTime:

      • UMMG: TemporalExtent.RangeDateTime.BeginningDateTime
      • ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimePeriod:gml:beginPosition
    • If not:

      • UMMG: TemporalExtent.SingleDateTime
      • ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimeInstant.gml:timePosition
  • endingDateTime

    • If there is a beginning and end DateTime:

      • UMMG: TemporalExtent.RangeDateTime.BeginningDateTime
      • ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimePeriod:gml:beginPosition
    • If not:

      • UMMG: TemporalExtent.SingleDateTime
      • ISO: gmd:MD_DataIdentification.gmd:extent.gmd:EX_Extent.gmd:temporalElement.gmd:EX_TemporalExtent.gmd:extent.gml:TimeInstant.gml:timePosition
  • productionDateTime

    • UMMG: DataGranule.ProductionDateTime
    • ISO: gmd:identificationInfo:gmd:dataQualityInfo.gmd:DQ_DataQuality.gmd:lineage.gmd:LI_Lineage.gmd:processStep.gmi:LE_ProcessStep.gmd:dateTime.gco:DateTime
  • lastUpdateDateTime

    • UMMG:

    Given DataGranule.ProductionDateTime values where Type is in Update, Insert, Create , select most recent value.

    • ISO: Given a node matching gmd:MD_DataIdentification.gmd:citation.gmd:CI_Citation.gmd:title.gco:CharacterString === UpdateTime, use gmd:identificationInfo:gmd:MD_DataIdentification.gmd:citation.gmd:CI_Citation.gmd:date.gmd:CI_Date.gmd:date.gco:DateTime