"""
Created on August 16, 2019
@author: tcanham
"""
import struct
import zlib
from fprime.common.models.serialize.type_exceptions import TypeMismatchException
from fprime.common.models.serialize.numerical_types import U8Type, U16Type, U32Type
[docs]class SeqBinaryWriter:
"""
Write out the Binary (ASTERIA) form of sequencer file.
"""
def __init__(self, timebase=0xFFFF):
"""
Constructor
"""
self.__fd = None
self.__timebase = timebase
[docs] def open(self, filename):
"""
Open the ASCII file
"""
self.__fd = open(filename, "wb")
[docs] def __binaryCmdRecord(self, cmd_obj):
"""
Return the binary command record the sequencer is expecting.
@todo: Currently the command descriptor is always zero for immediate commands and execution time tags needed in
command objects and seq_panel.
"""
def __time_tag(cmd_obj):
"""
TODO: support a timebase in the cmd obj? This is mission specific, so it is tough to handle. For now
I am hardcoding this to 2 which is TB_NONE
"""
# return TimeType(timeBase=2, seconds=cmd_obj.getSeconds(), useconds=cmd_obj.getUseconds()).serialize()
# TKC - new command time format
return (
U32Type(cmd_obj.getSeconds()).serialize()
+ U32Type(cmd_obj.getUseconds()).serialize()
)
def __descriptor(cmd_obj):
# subtract 1 from the value because enum34 enums start at 1, and this can't be changed
return U8Type(cmd_obj.getDescriptor().value - 1).serialize()
def __command(cmd_obj):
command = U32Type(
0
).serialize() # serialize combuffer type enum: FW_PACKET_COMMAND
command += U32Type(cmd_obj.getOpCode()).serialize() # serialize opcode
# Command arguments
for arg in cmd_obj.getArgs():
command += arg[2].serialize()
return command
def __length(command):
return U32Type(len(command)).serialize()
def __print(byteBuffer):
print("Byte buffer size: %d" % len(byteBuffer))
for entry in range(0, len(byteBuffer)):
print(
"Byte %d: 0x%02X (%c)"
% (
entry,
struct.unpack("B", byteBuffer[entry])[0],
struct.unpack("B", byteBuffer[entry])[0],
)
)
# This is no longer in the sequence file format.
# def __checksum(data):
# csum = 0
# for entry in range(0,len(data)):
# byte = struct.unpack("B",data[entry])[0]
# csum += byte
# return U64Type(long(csum)).serialize()
# Form header:
descriptor = __descriptor(cmd_obj)
time = __time_tag(cmd_obj)
header = descriptor + time
# Command opcode:
command = __command(cmd_obj)
# Command length:
length = __length(command)
# Checksum:
# This is no longer in the sequence file format.
# checksum = __checksum(header + length + command)
# Debug printing (comment out when not debugging):
# print "descriptor:"
# __print(descriptor)
# print "time:"
# __print(time)
# print "length:"
# __print(length)
# print "command:"
# __print(command)
# print "total record:"
# __print(header + checksum + length + command)
# Construct the record:
return header + length + command
[docs] def write(self, seq_cmds_list):
"""
Write out each command record
"""
num_records = len(seq_cmds_list)
sequence = b""
for cmd in seq_cmds_list:
sequence += self.__binaryCmdRecord(cmd)
size = len(sequence)
if self.__timebase == 0xFFFF:
tb_txt = b"ANY"
else:
tb_txt = bytes(self.__timebase)
print("Sequence is %d bytes with timebase %s" % (size, tb_txt))
header = b""
header += U32Type(
size + 4
).serialize() # Write out size of the sequence file in bytes here
header += U32Type(num_records).serialize() # Write number of records
header += U16Type(self.__timebase).serialize() # Write time base
header += U8Type(0xFF).serialize() # write time context
sequence = header + sequence # Write the list of command records here
# compute CRC. Ported from Utils/Hassh/libcrc/libcrc.h (update_crc_32)
crc = self.computeCrc(sequence)
print("CRC: %d (0x%04X)" % (crc, crc))
try:
sequence += U32Type(crc).serialize()
except TypeMismatchException as typeErr:
print("Exception: %s" % typeErr.getMsg())
raise
# Write the list of command records here
self.__fd.write(sequence)
[docs] def close(self):
"""
Close the Binary file
"""
self.__fd.close()
[docs] def computeCrc(self, buff):
# See http://stackoverflow.com/questions/30092226/how-to-calculate-crc32-with-python-to-match-online-results
# RE: signed to unsigned CRC
return zlib.crc32(buff) % (1 << 32)
[docs]class SeqAsciiWriter:
"""
Write out the ASCII record form of sequencer file.
"""
def __init__(self):
"""
Constructor
"""
self.__fd = None
[docs] def open(self, filename):
"""
Open the ASCII file
"""
self.__fd = open(filename, "w")
[docs] def __getCmdString(self, cmd_obj):
"""
For an command return it stringified.
"""
mnemonic = cmd_obj.getMnemonic()
opcode = cmd_obj.getOpCode()
args = cmd_obj.getArgs()
#
cmd = "{} (0x{:x})".format(mnemonic, int(opcode))
for arg in args:
cmd += ", %s" % arg[2].val
return cmd
[docs] def write(self, seq_cmds_list):
"""
Write out each record as it appears in the listbox widget.
"""
for cmd in seq_cmds_list:
self.__fd.write(self.__getCmdString(cmd) + "\n")
[docs] def close(self):
"""
Close the ASCII file
"""
self.__fd.close()