我有一个二进制文件,其中包含来自传感器的时间序列。数据格式如下:
#1(t0) #2(t0) #3(t0) ... #n(t0) #1(t1) #2(t1) #3(t1) ... #n(t1) ...
一次,来自 n 个传感器的测量数据以二进制格式存储在文件中。我想重建传感器的时间序列,使得
#1(t0) #1(t1) #1(t2) ...
从#1(t0) 到#1(t1) 的距离,步幅是固定且已知的,传感器的数量也是已知的。以下代码是我的实现。我的实现试图一次获取一个数据,而不是那么快。有什么方法可以像 MPI 中的集体 io 一样提高读取非连续数据的速度?
def collect_signal(fp, channel_no, stride, dtype):
byteSize = np.dtype(dtype).itemsize
fp.seek(0,2) # go to the file end
eof = fp.tell() # get the eof address
fp.seek(0,0) # rewind
fp.seek(0 + channel_no,0) # starting point per each channel
signal = []
while True:
start = fp.tell()
sample = np.frombuffer(fp.read(byteSize), dtype=dtype)
signal.append(sample[0])
if fp.tell() == eof or fp.tell() + stride > eof:
break;
else:
fp.seek(start + stride, 0)
return signal
这个更简单的代码可能会更快。您可能还想考虑使用mmap
将文件映射到进程的地址空间,这样可以绕过内核 I/O 调用层。
def collect_signal(fp, channel_no, stride, dtype):
byte_size = np.dtype(dtype).itemsize
fp.seek(channel_no, 0)
# Assuming that your read will always return an entire sample
# or an empty string.
for sample in iter(lambda: fp.read(byte_size), ''):
sample = np.frombuffer(sample, dtype=dtype)
signal.append(sample[0])
fp.seek(stride, 1)
frombuffer
如果您知道有多少个频道,另一种选择可能是让您为您处理步幅。这涉及在每一步将稍微多一点的数据读入内存,但如果输入被缓冲,则您可能已经将比fp.read
实际返回的数据更多的数据读入缓冲区。
def collect_signal(fp, channel_no, stride, type):
byte_size = np.dtype(dtype).itemsize)
offset = channel_no * byte_size
while True:
sample = fp.read(byte_size * numChannels)
if not sample:
break
sample = np.frombuffer(sample, dtype=dtype, count=1, offset=offset)
signal.append(sample[0])
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句