在具有时间限制的事件数据中查找子序列的出现

德达鲁斯

我具有以下格式的事件数据:

event     A A A A A C B C D A A A B
timestamp 0 3 4 4 5 5 6 7 7 8 8 9 10

鉴于序列列表S和事件E,我怎样才能有效地找到的非重叠出现SE是一个时间窗口内W,在发生的每个事件的时间间隔内L从以前的事件?

结果示例S = {A, AA, AAA, AAB, BB, CA}, W=3, L=2

occurrences:
A: [0, 3, 4, 4, 5, 8, 8, 9]
AA: [(3,4), (4,5), (8,8)]
AAA: [(3,4,4), (8,8,9)]
AAB: [(4,5,6), (8,9,10)]
BB: []
CA: [(7,8)]

如您所见,事件不必是连续的(即序列中的所有元素都出现在序列中)。时间戳仅显示为整数。

退休43

如果您跟踪有效的迄今不完整的子序列,并在它们完成或无法再完成时立即将其忘记,则可以对数据进行一次传递。为此,我编写了一个Sequence跟踪

  • 序列名称
  • 它的事件发生的索引,以确定它是否与先前完成的序列重叠
  • 事件发生的时间,因为这是我们的输出,因此我们需要它们检查我们的约束
  • 我们当前在序列名称中的位置,以便我们知道下一个事件应该是什么以及序列何时完成,以及
  • 一个标志,如果序列超出了我们的窗口/长度限制,则会忘记该序列。

events = 'AAAAACBCDAAAB'
timestamps = [0, 3, 4, 4, 5, 5, 6, 7, 7, 8, 8, 9, 10]

SEQUENCES = {'A', 'AA', 'AAA', 'AAB', 'BB', 'CA'}
WINDOW = 3
LENGTH = 2

class Sequence:
    def __init__(self, seq, starting_index, starting_time):
        self.sequence = seq
        self.pos = 0
        self.indices = [starting_index]
        self.times = [starting_time]
        self.has_expired = False

    def is_next_event_acceptable(self, event, time):
        if self.sequence[self.pos+1] != event:
            return False
        else:
            if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
                self.has_expired = True
                return False
            return True

    def add_event_if_acceptable(self, event, index, time):
        if self.is_next_event_acceptable(event, time):
            self.pos += 1
            self.indices.append(index)
            self.times.append(time)

    def is_complete(self):
        return len(self.sequence) == self.pos + 1

    def __repr__(self):
        seq = list(self.sequence)
        seq.insert(self.pos, '[')
        seq.insert(self.pos + 2, ']')
        return ''.join(seq)


def find_non_overlapping_subsequences(events, timestamps):
    working_sequences = []
    results = {s: {'seq': [], 'last_index': -1} for s in SEQUENCES}

    for index, (event, time) in enumerate(zip(events, timestamps)):
        # First work with any present sequences in the queue
        # and then introduce any new ones
        for Seq in working_sequences:
            Seq.add_event_if_acceptable(event, index, time)
        for seq in SEQUENCES:
            if seq.startswith(event):
                working_sequences.append(Sequence(seq, index, time))
        # Any successfully completed sequences, or sequences
        # that can't be completed anymore are to be removed
        seq_idx_to_remove = []
        for i, Seq in enumerate(working_sequences):
            if Seq.has_expired:
                seq_idx_to_remove.append(i)
            elif Seq.is_complete():
                seq_idx_to_remove.append(i)
                # Only add the sequence to the results if the indices
                # aren't overlapping with the previous one
                sequence, times, indices = Seq.sequence, Seq.times, Seq.indices
                if results[sequence]['last_index'] < indices[0]:
                    results[sequence]['seq'].append(times)
                    results[sequence]['last_index'] = indices[-1]
        # We must remove the items in reverse order so that
        # we don't disturb the 'forward' ordering
        for i in seq_idx_to_remove[::-1]:
            del working_sequences[i]

    return results

results = find_non_overlapping_subsequences(events, timestamps)
for key, value in sorted(results.items()):
    print(key, value['seq'])

输出量

A [[0], [3], [4], [4], [5], [8], [8], [9]]
AA [[3, 4], [4, 5], [8, 8]]
AAA [[3, 4, 4], [8, 8, 9]]
AAB [[4, 5, 6], [8, 8, 10]]
BB []
CA [[7, 8]]

更新资料

对于较长的事件系列,这可能会花费很长时间,这取决于您在每个步骤中必须考虑多少个序列。这意味着序列寿命越长,在每次迭代中您必须检查的越多。

  • 如果一个序列需要完成更多的事件,则将需要更多的迭代。
  • 的长度越长SEQUENCES,您将在每个步骤中引入的新序列越多。
  • 如果窗口或长度持续时间较长,则序列将在过期之前存活更长的时间。
  • 您拥有的事件越独特(如果它们在系列中统一存在),完成给定序列所需的时间就越长。例如,如果您在每次迭代中仅遇到As和Bs(例如,字母中的任何一个字母),则序列“ AB”将更快地完成。

尽管以上因素最终定义了每个迭代步骤可能需要多长时间,但是可以进行一些优化。在每一步中,我们都要检查所有当前不完整的序列,working_sequences并检查新事件对它们的影响。但是,如果我们重做Sequence该类,则每次更新序列时,我们都可以计算下一个事件是什么。然后,在每个步骤中,我们都可以根据该事实对这些序列进行分类。这样,如果下一个事件为“ A”,我们将仅检查接受该事件的所有序列。这也可以方便地拆分已完成或已过期的序列。

第二种方法(影响较小的优化方法)是预先计算从特定事件开始的所有序列,这样我们就不必SEQUENCES每次都进行迭代

这应该避免任何不必要的检查并提高整体性能。但是,最坏的情况仍然与上面的简单版本相同。例如,如果90%的事件为“ A”,而90%的起始事件或序列的下一个事件为“ A”,则与之前相比,这仍需要90%的时间。

代码中的以下更改反映了这些优化。我还假定时间戳严格增加,因此indices可以简化依赖该属性的任何内容

EXPIRED = '#'
COMPLETED = '='

class Sequence:
    def __init__(self, seq, starting_time):
        self.sequence = seq
        self.pos = 0
        self.times = [starting_time]
        self.has_expired = False
        self.next_event = self.next_event_query()

    def is_next_event_acceptable(self, event, time):
        if self.next_event != event:
            return False
        if time - self.times[0] > WINDOW or time - self.times[-1] > LENGTH:
            self.has_expired = True
            return False
        return True

    def update_sequence(self, event, time):
        if self.is_next_event_acceptable(event, time):
            self.pos += 1
            self.times.append(time)
        self.next_event = self.next_event_query()

    def next_event_query(self):
        if self.has_expired:
            return EXPIRED
        return COMPLETED if len(self.sequence) == self.pos + 1 else self.sequence[self.pos+1]

    def __repr__(self):
        seq = list(self.sequence)
        seq.insert(self.pos, '[')
        seq.insert(self.pos + 2, ']')
        return ''.join(seq)


def find_non_overlapping_subsequences(events, timestamps): 
    unique_events = set(events)
    starting_events = {}
    for seq in SEQUENCES:
        unique_events.update(seq)
        first_event = seq[0]
        if first_event not in starting_events:
            starting_events[first_event] = []
        starting_events[first_event].append(seq)
    for e in unique_events:
        if e not in starting_events:
            starting_events[e] = []

    all_symbols = ''.join(unique_events) + EXPIRED + COMPLETED
    working_sequences = {event: [] for event in all_symbols}
    next_event_lists = {event: [] for event in all_symbols}
    results = {s: {'seq': [], 'last_time': timestamps[0]-1} for s in SEQUENCES}

    for event, time in zip(events, timestamps):
        next_event_lists[event] = []
        for S in working_sequences[event]:
            S.update_sequence(event, time)
            next_event_lists[S.next_event].append(S)
        for seq in starting_events[event]:
            S = Sequence(seq, time)
            next_event_lists[S.next_event].append(S)
        for S in next_event_lists[COMPLETED]:
            # Only add the sequence to the results if the timestamps
            # don't overlap with the previous one
            sequence, times = S.sequence, S.times
            if results[sequence]['last_time'] < times[0]:
                results[sequence]['seq'].append(times)
                results[sequence]['last_time'] = times[-1]
        next_event_lists[EXPIRED] = []
        next_event_lists[COMPLETED] = []
        working_sequences = next_event_lists.copy()

    return results

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章

有时间限制的计算

ggplot2中具有时间序列的月和日的时间序列在年份中具有方面

C3.js-具有时间的时间序列无法解析

R中具有时间条件的子集数据集

R中具有时间序列模型(ARIMA-ARCH)的分位数回归

将数据帧中带有时间戳的多行事件转换为具有开始和结束日期时间的单行

具有时间间隔的无功ObjC发射序列

8086有时间限制的编程

R中具有时间序列的2SLS(带有dynlm包?)

如何在MySQL中获取具有时间限制的数据

如何添加具有时间增量的数据帧?

在FullCalendar v4中如何从具有时间数据类型的数据库中获取事件

预测现有时间序列数据

Laravel嵌套:事件(带有时间轴)和子事件(带有时间轴)

提取在同一行中同时具有时间戳和位置的数据框

删除具有时间序列条件的重复项

R使用facet_wrap绘制具有时间间隔的时间序列

如何从具有时间戳列的数据库表中检索

具有时间限制的CouchDB视图

如何基于先验信息在具有时间维度的R数据帧中包括缺失的观测值?

node.js:具有时间间隔的货币受限并行执行(速率限制)

Linux在文件夹中查找文件并将其移动到具有时间前缀的新位置

Python Websocket-具有时间和消息限制

具有时间序列数据意义的自定义楼层/天花板

带有时间序列数据的 Amchart 散点图

pandas 重新采样以获得具有时间序列数据的月平均值

GTM 事件数据中是否有时间戳功能?

Python使用查找所有时间戳方法从txt文件中读取数据

具有时间限制的从 ec2 到 s3 存储桶的数据传输