212 lines
7.0 KiB
Python
212 lines
7.0 KiB
Python
# chrono - agendas system
|
||
# Copyright (C) 2017 Entr'ouvert
|
||
#
|
||
# This program is free software: you can redistribute it and/or modify it
|
||
# under the terms of the GNU Affero General Public License as published
|
||
# by the Free Software Foundation, either version 3 of the License, or
|
||
# (at your option) any later version.
|
||
#
|
||
# This program is distributed in the hope that it will be useful,
|
||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
# GNU Affero General Public License for more details.
|
||
#
|
||
# You should have received a copy of the GNU Affero General Public License
|
||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||
|
||
import bisect
|
||
import collections
|
||
|
||
Interval = collections.namedtuple('Interval', ['begin', 'end'])
|
||
|
||
|
||
class IntervalSet:
|
||
"""Store a set made of an union of disjoint open/closed intervals (it
|
||
currently does not really care about their openness), i.e.
|
||
|
||
S = [a[0], b[0]] ∪ ∪ [a[n-1], b[n-1]]
|
||
|
||
where
|
||
|
||
forall i < n. a_i < b_i
|
||
forall i < (n-1). b_i < a[i+1]
|
||
"""
|
||
|
||
__slots__ = ['begin', 'end']
|
||
|
||
def __init__(self, iterable=(), already_sorted=False):
|
||
"""
|
||
Initialize a new IntervalSet from a list of Interval or 2-tuple.
|
||
|
||
Iterable will be sorted, if it's already sorted use the
|
||
from_ordered() classmethod.
|
||
|
||
It's faster than using add() because intervals are merged as we
|
||
traverse the list, and self.begin and self.end are built in O(n)
|
||
time where len(iterable) = n.
|
||
"""
|
||
if not already_sorted:
|
||
iterable = sorted(iterable)
|
||
self.begin = []
|
||
self.end = []
|
||
last_begin, last_end = None, None
|
||
for begin, end in iterable:
|
||
# check good order property along the way
|
||
if last_begin and not (last_begin < begin or last_begin == begin and last_end <= end):
|
||
raise ValueError('not well ordered: ! %s <= %s' % ((last_begin, last_end), (begin, end)))
|
||
if self.begin and begin <= self.end[-1]:
|
||
self.end[-1] = max(self.end[-1], end)
|
||
else:
|
||
self.begin.append(begin)
|
||
self.end.append(end)
|
||
last_begin, last_end = begin, end
|
||
|
||
@classmethod
|
||
def simple(cls, begin, end=Ellipsis):
|
||
begin, end = cls._begin_or_interval(begin, end)
|
||
|
||
if hasattr(begin, 'keep_only_weekday_and_time'):
|
||
begin = begin.keep_only_weekday_and_time()
|
||
end = end.keep_only_weekday_and_time()
|
||
|
||
return cls.from_ordered([(begin, end)])
|
||
|
||
@classmethod
|
||
def from_ordered(cls, iterable):
|
||
return cls(iterable, already_sorted=True)
|
||
|
||
def __repr__(self):
|
||
return repr(list(map(tuple, self)))
|
||
|
||
@classmethod
|
||
def _begin_or_interval(cls, begin, end=Ellipsis):
|
||
if end is Ellipsis:
|
||
return begin
|
||
else:
|
||
return begin, end
|
||
|
||
def add(self, begin, end=Ellipsis):
|
||
"""Add a new interval to the set, eventually merging it with actual
|
||
ones.
|
||
|
||
It uses bisect_left() to maintaint the ordering of intervals.
|
||
"""
|
||
begin, end = self._begin_or_interval(begin, end)
|
||
# insert an interval by merging with previous and following intervals
|
||
# if they overlap
|
||
i = bisect.bisect_left(self.begin, begin)
|
||
# merge with previous intervals
|
||
while i > 0 and begin <= self.end[i - 1]:
|
||
# [begin, end] overlaps previous interval
|
||
# so remove it to merge them.
|
||
previous_begin = self.begin.pop(i - 1)
|
||
previous_end = self.end.pop(i - 1)
|
||
i = i - 1
|
||
if previous_begin < begin:
|
||
# but it does not include it, so replace if begin by previous.begin
|
||
begin = previous_begin
|
||
# [begin, end] is completely included in previous interval,
|
||
# replace end by previous.end
|
||
end = max(end, previous_end)
|
||
break
|
||
# merge with following
|
||
while i < len(self.begin) and self.begin[i] <= end:
|
||
# [begin, end] overlaps next interval, so remove it to merge them.
|
||
next_end = self.end.pop(i)
|
||
self.begin.pop(i)
|
||
if end < next_end:
|
||
# but it does not include it, so replace end by next.end
|
||
end = next_end
|
||
# no symetry with the previous "while" loop as .bisect_left()
|
||
# garanty that begin is left or equal to next.begin.
|
||
break
|
||
self.begin.insert(i, begin)
|
||
self.end.insert(i, end)
|
||
|
||
def overlaps(self, begin, end=Ellipsis):
|
||
"""
|
||
Find if the [begin, end] has a non-empty (or closed) overlaps with
|
||
one of the contained intervals.
|
||
"""
|
||
begin, end = self._begin_or_interval(begin, end)
|
||
# look for non-zero size overlap
|
||
i = bisect.bisect_left(self.begin, begin)
|
||
if i > 0 and begin < self.end[i - 1]:
|
||
return True
|
||
if i < len(self.begin) and self.begin[i] < end:
|
||
return True
|
||
return False
|
||
|
||
def __iter__(self):
|
||
"""
|
||
Generate the ordered list of included intervals as 2-tuples.
|
||
"""
|
||
return map(Interval._make, zip(self.begin, self.end))
|
||
|
||
def __eq__(self, other):
|
||
if isinstance(other, self.__class__):
|
||
return list(self) == list(other)
|
||
if hasattr(other, '__iter__'):
|
||
return self == self.cast(other)
|
||
return False
|
||
|
||
def __bool__(self):
|
||
return bool(self.begin)
|
||
|
||
@classmethod
|
||
def cast(cls, value):
|
||
if isinstance(value, cls):
|
||
return value
|
||
return cls(value)
|
||
|
||
def __sub__(self, other):
|
||
l1 = iter(self)
|
||
l2 = iter(self.cast(other))
|
||
|
||
def gen():
|
||
state = 3
|
||
while True:
|
||
if state & 1:
|
||
c1 = next(l1, None)
|
||
if state & 2:
|
||
c2 = next(l2, None)
|
||
if not c1:
|
||
break
|
||
if not c2:
|
||
yield c1
|
||
for c1 in l1:
|
||
yield c1
|
||
break
|
||
if c1[1] <= c2[0]:
|
||
yield c1
|
||
state = 1
|
||
elif c2[1] <= c1[0]:
|
||
state = 2
|
||
elif c1[1] < c2[1]:
|
||
if c1[0] < c2[0]:
|
||
yield (c1[0], c2[0])
|
||
state = 1
|
||
elif c2[1] < c1[1]:
|
||
if c1[0] < c2[0]:
|
||
yield (c1[0], c2[0])
|
||
c1 = (c2[1], c1[1])
|
||
state = 2
|
||
elif c1[1] == c2[1]:
|
||
if c1[0] < c2[0]:
|
||
yield (c1[0], c2[0])
|
||
state = 3
|
||
else:
|
||
raise AssertionError('not reachable')
|
||
|
||
return self.__class__.from_ordered(gen())
|
||
|
||
def min(self):
|
||
if self:
|
||
return self.begin[0]
|
||
return None
|
||
|
||
def max(self):
|
||
if self:
|
||
return self.end[-1]
|
||
return None
|