-
-
Notifications
You must be signed in to change notification settings - Fork 377
Description
Describe the bug
When playing back a playback file, the incoming data is similar to the current time, not the time of the original file. Over time the current time gets more and more out of sync with the incoming data time, leading to weird behavior.
time_before = datetime.now()
new_data = self.board_shim.get_board_data(self.sampling_rate, BrainFlowPresets.DEFAULT_PRESET)
time_after = datetime.now()
print(f"\n Actual time before get_board_data: {time_before}")
print(f" Actual time after get_board_data: {time_after}")
print(f"\n New data start timestamp: {datetime.fromtimestamp(new_data[30][0])}")
print(f" New data end timestamp: {datetime.fromtimestamp(new_data[30][-1])}")
print(f"\n New data start timestamp in original csv: {datetime.fromtimestamp(self.loaded_data.iloc[self.points_collected-1, 30])}")
print(f" New data end timestamp in original csv: {datetime.fromtimestamp(self.loaded_data.iloc[self.points_collected-1, 30])}") Actual time before get_board_data: 2025-04-06 22:22:06.182844
Actual time after get_board_data: 2025-04-06 22:22:06.182980
New data start timestamp: 2025-04-06 22:22:04.870278
New data end timestamp: 2025-04-06 22:22:05.863121
New data start timestamp in csv: 2025-03-30 02:15:41.193330
New data end timestamp in csv: 2025-03-30 02:15:41.193330Here you can see brainflow inputs a playback time in the data. It doesn't use the actual timestamp in the csv data. Brainflow uses a timestamp (22:22:04) similar to the actual current time (22:22:06).
However over time the timestamp that brainflow inputs gets further apart from the current time. This means its not playing at the same sampling rate as the original file.
Actual time before get_board_data: 2025-04-06 22:22:41.630676
Actual time after get_board_data: 2025-04-06 22:22:41.630860
New data start timestamp: 2025-04-06 22:22:27.866087
New data end timestamp: 2025-04-06 22:22:28.152609
New data start timestamp in csv: 2025-03-30 02:16:03.435820
New data end timestamp in csv: 2025-03-30 02:16:03.435820Here you can see the actual time (22:22:41) is 14 seconds ahead of the playback data time (22:22:27).
This also gets confusing when there's a gap in the time in the original csv.
For example, if there's a 30 second gap in the csv, the gap seems to playback faster than 30 seconds if the playback has become out of sync with the real-time.
---Gap detected ---
Index of gap start: 2912
-- Gap ended--
New data received at 2025-04-06 22:22:58.158187
Timestamps of start and end of gap:
Playback data start: 2025-04-06 22:22:28.152609
Playback data end : 2025-04-06 22:22:58.157839
30.01 seconds, gap length in playback data
csv start: 2025-03-30 02:16:03.435820
csv end : 2025-03-30 02:16:33.435820
30.00 seconds, gap length in csv data
actual time gap start: 2025-04-06 22:22:43.219246
actual time gap end : 2025-04-06 22:22:58.158187
14.94 seconds, brainflow played back the gapAbove, you can see the gap in the data is 30 seconds but brainflow only played back the gap for 15 seconds, because the playback was already delayed by 15 seconds (as shown in the logs earlier).
But when I move the gap up to an earlier index in the data, like at index 5, you can see the played back gap does match the real gap length:
---Gap detected ---
Index of gap start: 5
-- Gap ended--
New data received at 2025-04-06 22:39:33.123416
Timestamps of start and end of gap:
Playback data start: 2025-04-06 22:39:03.119650
Playback data end : 2025-04-06 22:39:33.122972
30.00 seconds, gap length in playback data
csv start: 2025-03-30 02:16:03.435820
csv end : 2025-03-30 02:16:33.435820
30.00 seconds, gap length in csv data
actual time gap start: 2025-04-06 22:39:03.122410
actual time gap end : 2025-04-06 22:39:33.123416
30.00 seconds, brainflow played back the gap
Index at which new data starts: 6
Length of new data: 1To sum up, this seems to be an issue with the sampling rate during playback.
To Reproduce
Below you'll find the script to playback the data. I used data from a daisy cyton recording.
data examples
30 second gap early on, no delay: https://drive.google.com/file/d/1HSzQ47NAS6nJrrFPXf51EwRjJMPFbr7c/view?usp=sharing
30 second gap later on, 15 second delay: https://drive.google.com/drive/home?dmr=1&ec=wgc-drive-globalnav-goto
Script
import logging
import time
from datetime import datetime
from brainflow.board_shim import BoardShim, BrainFlowInputParams, BoardIds, BrainFlowPresets
import numpy as np
import torch
import pandas as pd
# data_file_path = "data/realtime_inference_test/BrainFlow-RAW_2025-03-29_23-14-54_0.csv"
# data_file_path = "data/cyton_BrainFlow-fake-consecutive-numbers.csv"
data_file_path = "data/cyton_BrainFlow-gap_short.csv"
# data_file_path = "data/cyton_BrainFlow-adjusted-timestamps.csv"
class DataAcquisition:
"""Handles board setup and data collection"""
def __init__(self, file_path: str):
self.file_path = file_path
self.loaded_data = None
self.board_shim = None
self.sampling_rate = None
self.buffer_size = 450000 # Move buffer size to class
self.points_collected = 0
self.recording_start_time = None
self.in_gap = False
self.gap_start_time = None
def setup_board(self):
"""Initialize and setup the board for data collection"""
BoardShim.enable_dev_board_logger()
logging.basicConfig(level=logging.DEBUG)
params = BrainFlowInputParams()
params.board_id = BoardIds.PLAYBACK_FILE_BOARD
params.master_board = BoardIds.CYTON_DAISY_BOARD
# params.master_board = BoardIds.GANGLION_BOARD
params.file = self.file_path
params.playback_file_max_count = 1
params.playback_speed = 1
params.playback_file_offset = 0
self.board_shim = BoardShim(BoardIds.PLAYBACK_FILE_BOARD, params)
try:
self.board_shim.prepare_session()
self.sampling_rate = BoardShim.get_sampling_rate(self.board_shim.get_board_id())
return self.board_shim
except Exception as e:
logging.error(f"Failed to setup board: {str(e)}")
raise
def start_stream(self):
"""Start the data stream"""
if not self.board_shim:
raise RuntimeError("Board not initialized. Call setup_board first.")
try:
self.board_shim.start_stream(self.buffer_size)
self.recording_start_time = time.time()
initial_count = self.board_shim.get_board_data_count()
return initial_count
except Exception as e:
logging.error(f"Failed to start stream: {str(e)}")
raise
def get_initial_data(self):
"""Get initial data from the board"""
if not self.board_shim:
raise RuntimeError("Board not initialized. Call setup_board first.")
try:
initial_data = self.board_shim.get_board_data(self.sampling_rate, BrainFlowPresets.DEFAULT_PRESET)
self.points_collected += len(initial_data[0])
return initial_data
except Exception as e:
logging.error(f"Failed to get initial data: {str(e)}")
raise
def get_new_data(self):
"""Get new data from the board"""
if not self.board_shim:
raise RuntimeError("Board not initialized. Call setup_board first.")
try:
time_before = datetime.now()
new_data = self.board_shim.get_board_data(self.sampling_rate, BrainFlowPresets.DEFAULT_PRESET)
time_after = datetime.now()
if new_data.size > 0:
self.points_collected += len(new_data[0])
print(f"\n Actual time before get_board_data: {time_before}")
print(f" Actual time after get_board_data: {time_after}")
# log the current time and the start and end timestamps of the new_data
print(f"\n New data start timestamp: {datetime.fromtimestamp(new_data[30][0])}")
print(f" New data end timestamp: {datetime.fromtimestamp(new_data[30][-1])}")
# log the start and end in the csv file
print(f"\n New data start timestamp in csv: {datetime.fromtimestamp(self.loaded_data.iloc[self.points_collected-1, 30])}")
print(f" New data end timestamp in csv: {datetime.fromtimestamp(self.loaded_data.iloc[self.points_collected-1, 30])}")
return new_data
except Exception as e:
logging.error(f"Failed to get new data: {str(e)}")
raise
def release(self):
"""Release the board session"""
if self.board_shim and self.board_shim.is_prepared():
try:
self.board_shim.release_session()
logging.info('Session released successfully')
except Exception as e:
logging.error(f"Failed to release session: {str(e)}")
raise
def load_csv_data(self):
"""Load data from CSV file for debugging purposes."""
try:
self.loaded_data = pd.read_csv(self.file_path,
sep='\t', # Use tab as separator
dtype=float,
header=None) # Treat first row as data
print(f"Loaded {len(self.loaded_data)} samples from {self.file_path}")
return True
except Exception as e:
print(f"Error loading CSV file: {e}")
self.loaded_data = None
return False
class BufferManager:
"""Manages data buffers and their processing"""
def __init__(self, board_shim, sampling_rate):
self.board_shim = board_shim
self.sampling_rate = sampling_rate
# Buffer configuration
self.points_per_epoch = 30 * sampling_rate
self.points_per_step = 5 * sampling_rate
self.buffer_start = 0
self.buffer_end = 30
self.buffer_step = 5
# Initialize channels and buffers
self.all_channels, self.timestamp_channel, self.all_channels_with_timestamp = self._init_channels()
self.all_previous_data = [[] for _ in range(len(self.all_channels_with_timestamp))]
# Buffer tracking
self.points_collected = 0
self.last_processed_buffer = -1
self.processed_buffer_start_idx = [[] for _ in range(6)] # 6 buffers
# Initialize hidden states for each buffer
self.buffer_hidden_states = [
[torch.zeros(10, 1, 256) for _ in range(7)] # 7 hidden states for 7 combinations
for _ in range(6) # 6 buffers (0s to 25s in 5s steps)
]
def _init_channels(self):
"""Initialize channel information"""
all_channels = self.board_shim.get_exg_channels(self.board_shim.get_board_id())
timestamp_channel = self.board_shim.get_timestamp_channel(self.board_shim.get_board_id())
all_channels_with_timestamp = list(all_channels)
if timestamp_channel is not None and timestamp_channel not in all_channels:
all_channels_with_timestamp.append(timestamp_channel)
return all_channels, timestamp_channel, all_channels_with_timestamp
def add_data(self, new_data, is_initial=False):
"""Add new data to the buffer"""
if new_data.size == 0:
return False
# Validate data
if np.any(np.isnan(new_data)) or np.any(np.isinf(new_data)):
logging.warning("Data contains NaN or infinite values!")
return False
# Update points collected
self.points_collected += len(new_data[0])
# Update all_previous_data
if not is_initial:
for i, channel in enumerate(self.all_channels_with_timestamp):
self.all_previous_data[i].extend(new_data[channel].tolist())
else:
for i, channel in enumerate(self.all_channels_with_timestamp):
self.all_previous_data[i] = new_data[channel].tolist()
# Load and compare with original CSV data
try:
# Read TSV file and convert to numeric values
csv_data = pd.read_csv(data_file_path,
sep='\t', # Use tab as separator
dtype=float,
header=None) # Add this to treat first row as data
# Validate first value
if csv_data.iloc[0, 0] != 0:
print("\nFirst value mismatch!")
print(f"Expected: 0, Got: {csv_data.iloc[0, 0]}")
return False
original_channel_1 = csv_data.iloc[:len(self.all_previous_data[0]), 1].to_numpy()
current_channel_1 = np.array(self.all_previous_data[0])
if not np.allclose(original_channel_1, current_channel_1, rtol=1e-10, atol=1e-10):
print("\nData mismatch detected!")
print(f"Length of new data: {len(new_data[0])}")
# Find indices where the data differs
mismatch_mask = ~np.isclose(original_channel_1, current_channel_1, rtol=1e-10, atol=1e-10)
mismatch_indices = np.where(mismatch_mask)[0]
if len(mismatch_indices) > 0:
print("\nFirst 5 mismatches:")
for idx in mismatch_indices[:5]:
print(f"Index {idx}:")
print(f" Original value: {original_channel_1[idx]}")
print(f" Current value: {current_channel_1[idx]}")
print(f" Difference: {abs(original_channel_1[idx] - current_channel_1[idx])}")
return False
except Exception as e:
logging.error(f"Error comparing with CSV: {str(e)}")
return False
return True
def _format_relative_time(self, seconds):
"""Format seconds into a readable time string."""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
if hours > 0:
return f"{hours}h {minutes}m {secs:.3f}s"
elif minutes > 0:
return f"{minutes}m {secs:.3f}s"
else:
return f"{secs:.3f}s"
def main():
# Create data acquisition instance
data_acquisition = DataAcquisition(data_file_path)
# Load CSV data for validation
if not data_acquisition.load_csv_data():
print("Warning: Failed to load CSV data for validation")
try:
# Setup and start board
board_shim = data_acquisition.setup_board()
initial_count = data_acquisition.start_stream()
# Create buffer manager and set recording start time
buffer_manager = BufferManager(board_shim, data_acquisition.sampling_rate)
# Get initial data
initial_data = data_acquisition.get_initial_data()
buffer_manager.add_data(initial_data, is_initial=True)
while True:
# Get new data
new_data = data_acquisition.get_new_data()
if new_data.size == 0:
# Only log if this is the start of a gap (i.e., we had data before)
if not hasattr(data_acquisition, 'in_gap') or not data_acquisition.in_gap:
current_time = datetime.now()
print(f"\n---Gap detected ---")
timestamp_index = buffer_manager.all_channels_with_timestamp.index(buffer_manager.timestamp_channel)
last_timestamp = buffer_manager.all_previous_data[timestamp_index][-1]
index_of_last_timestamp = len(buffer_manager.all_previous_data[1]) - 1
csv_timestamp = data_acquisition.loaded_data.iloc[index_of_last_timestamp, buffer_manager.timestamp_channel]
print(f"Index of gap start: {len(buffer_manager.all_previous_data[1])-1}")
data_acquisition.in_gap = True
data_acquisition.gap_start_playback_data_timestamp = last_timestamp
data_acquisition.gap_start_csv_timestamp = csv_timestamp
data_acquisition.gap_detected_at = current_time
continue
# If we were in a gap and now have data, log the gap end
if hasattr(data_acquisition, 'in_gap') and data_acquisition.in_gap:
current_time = datetime.now()
data_acquisition.gap_stop_detected_at = current_time
print(f"\n -- Gap ended-- ")
print(f" New data received at {current_time}")
new_timestamp = new_data[buffer_manager.timestamp_channel][0]
gap_duration = new_timestamp - data_acquisition.gap_start_playback_data_timestamp
index_of_new_timestamp = len(buffer_manager.all_previous_data[1])
csv_timestamp = data_acquisition.loaded_data.iloc[index_of_new_timestamp, buffer_manager.timestamp_channel]
# log the seconds for the gap in the csv data
print(f" Timestamps of start and end of gap:")
print(f" Playback data start: {datetime.fromtimestamp(data_acquisition.gap_start_playback_data_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')}")
print(f" Playback data end : {datetime.fromtimestamp(new_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')}")
print(f" {gap_duration:.2f} seconds, gap length in playback data")
print(f"\n csv start: {datetime.fromtimestamp(data_acquisition.gap_start_csv_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')}")
print(f" csv end : {datetime.fromtimestamp(csv_timestamp).strftime('%Y-%m-%d %H:%M:%S.%f')}")
print(f" {csv_timestamp - data_acquisition.gap_start_csv_timestamp:.2f} seconds, gap length in csv data")
# log the actual gap length in seconds
print(f"\n actual time gap start: {data_acquisition.gap_detected_at}")
print(f" actual time gap end : {data_acquisition.gap_stop_detected_at}")
print(f" {(data_acquisition.gap_stop_detected_at - data_acquisition.gap_detected_at).total_seconds():.2f} seconds, brainflow played back the gap")
print(f"\n Index at which new data starts: {index_of_new_timestamp}")
print(f" Length of new data: {len(new_data[0])}")
data_acquisition.in_gap = False
# Add data to buffer
if not buffer_manager.add_data(new_data):
continue
# Process buffers
time.sleep(0.1)
except BaseException:
logging.warning('Exception', exc_info=True)
finally:
logging.info('End')
data_acquisition.release()
if __name__ == '__main__':
main()Info (please complete the following information):
- Cyton Daisy, ID: 2
- Mac OS 15.3.1
- Python 3.9.20
- Architecture: macOS-15.3.1-arm64-arm-64bit
- Pip list :
Installed Packages:
Package Name Version
brainflow 5.16.0
certifi 2025.1.31
charset-normalizer 3.4.1
contourpy 1.3.0
cycler 0.12.1
decorator 5.2.1
filelock 3.17.0
fonttools 4.56.0
fsspec 2025.2.0
gmpy2 2.1.2
gssc 0.0.9
h5py 3.12.1
idna 3.10
importlib_resources 6.5.2
Jinja2 3.1.5
joblib 1.4.2
kiwisolver 1.4.7
lazy_loader 0.4
MarkupSafe 3.0.2
matplotlib 3.9.4
mne 1.8.0
Expected behavior
I expect the actual time to stay in sync with the data time, or at least I expect to have documentation on what to expect with playback file times.