Source code for fprime_gds.common.logger.data_logger

"""
@brief Class to log raw binary input and output as well as telemetry and events
"""

import os

import fprime_gds.common.handlers
from fprime_gds.common.data_types.ch_data import ChData
from fprime_gds.common.data_types.cmd_data import CmdData
from fprime_gds.common.data_types.event_data import EventData
from fprime_gds.common.data_types.pkt_data import PktData


[docs]class DataLogger(fprime_gds.common.handlers.DataHandler): def __init__(self, logdir, verbose=False, csv=False, prefix=""): self.logdir = logdir self.recv_file = prefix + "recv.bin" self.send_file = prefix + "sent.bin" self.telem_file = prefix + "channel.log" self.event_file = prefix + "event.log" self.command_file = prefix + "command.log" self.verbose = verbose self.csv = csv self.f_r = open(self.logdir + os.sep + self.recv_file, "wb+") self.f_s = open(self.logdir + os.sep + self.send_file, "wb+") self.f_telem = open(self.logdir + os.sep + self.telem_file, "w+") self.f_event = open(self.logdir + os.sep + self.event_file, "w+") self.f_command = open(self.logdir + os.sep + self.command_file, "w+")
[docs] def __del__(self): self.f_r.close() self.f_s.close() self.f_telem.close() self.f_event.close()
[docs] def data_callback(self, data, sender=None): if isinstance(data, ChData) or isinstance(data, PktData): self.f_telem.write(data.get_str(verbose=self.verbose, csv=self.csv) + "\n") self.f_telem.flush() if isinstance(data, EventData): self.f_event.write(data.get_str(verbose=self.verbose, csv=self.csv) + "\n") self.f_event.flush() if isinstance(data, CmdData): self.f_command.write( data.get_str(verbose=self.verbose, csv=self.csv) + "\n" ) self.f_command.flush()
[docs] def send(self, data, dest): """Send callback for the encoder Arguments: data {bin} -- binary data packet dest {string} -- where the data will be sent by the server """ self.f_s.write(data) self.f_s.flush()
# Some data was recvd
[docs] def on_recv(self, data): """Data was recved on the socket server Arguments: data {bin} --binnary data string that was recved """ self.f_r.write(data) self.f_r.flush()