218 lines
7.5 KiB
Python
218 lines
7.5 KiB
Python
"""Routines to search for maxima and zero crossings."""
|
|
|
|
from __future__ import print_function, division
|
|
|
|
from numpy import (add, append, argsort, concatenate, diff, flatnonzero,
|
|
linspace, multiply, reshape, sign)
|
|
from .constants import DAY_S
|
|
EPSILON = 0.001 / DAY_S
|
|
|
|
def find_discrete(start_time, end_time, f, epsilon=EPSILON, num=12):
|
|
"""Find the times at which a discrete function of time changes value.
|
|
|
|
This routine is used to find instantaneous events like sunrise,
|
|
transits, and the seasons. See :doc:`searches` for how to use it
|
|
yourself.
|
|
|
|
"""
|
|
ts = start_time.ts
|
|
jd0 = start_time.tt
|
|
jd1 = end_time.tt
|
|
if jd0 >= jd1:
|
|
raise ValueError('your start_time {0} is later than your end_time {1}'
|
|
.format(start_time, end_time))
|
|
|
|
step_days = getattr(f, 'step_days', None)
|
|
if step_days is None:
|
|
# Legacy "rough_period" attribute.
|
|
periods = (jd1 - jd0) / f.rough_period
|
|
if periods < 1.0:
|
|
periods = 1.0
|
|
sample_count = int(periods * num)
|
|
else:
|
|
# Insist on at least 2 samples even if the dates are less than
|
|
# step_days apart, so the range at least has endpoints.
|
|
sample_count = int((jd1 - jd0) / step_days) + 2
|
|
|
|
jd = linspace(jd0, jd1, sample_count)
|
|
return _find_discrete(ts, jd, f, epsilon, num)
|
|
|
|
# TODO: pass in `y` so it can be precomputed?
|
|
|
|
def _find_discrete(ts, jd, f, epsilon, num):
|
|
"""Algorithm core, for callers that already have a `jd` vector."""
|
|
end_mask = linspace(0.0, 1.0, num)
|
|
start_mask = end_mask[::-1]
|
|
o = multiply.outer
|
|
|
|
while True:
|
|
t = ts.tt_jd(jd)
|
|
y = f(t)
|
|
|
|
indices = flatnonzero(diff(y))
|
|
if not len(indices):
|
|
# Nothing found, so immediately return empty arrays.
|
|
ends = jd.take(indices)
|
|
y = y.take(indices)
|
|
break
|
|
|
|
starts = jd.take(indices)
|
|
ends = jd.take(indices + 1)
|
|
|
|
# Since we start with equal intervals, they all should fall
|
|
# below epsilon at around the same time; so for efficiency we
|
|
# only test the first pair.
|
|
if ends[0] - starts[0] <= epsilon:
|
|
y = y.take(indices + 1)
|
|
# Keep only the last of several zero crossings that might
|
|
# possibly be separated by less than epsilon.
|
|
mask = concatenate(((diff(ends) > 3.0 * epsilon), (True,)))
|
|
ends = ends[mask]
|
|
y = y[mask]
|
|
break
|
|
|
|
jd = o(starts, start_mask).flatten() + o(ends, end_mask).flatten()
|
|
|
|
return ts.tt_jd(ends), y
|
|
|
|
def find_minima(start_time, end_time, f, epsilon=1.0 / DAY_S, num=12):
|
|
"""Find the local minima in the values returned by a function of time.
|
|
|
|
This routine is used to find events like minimum elongation. See
|
|
:doc:`searches` for how to use it yourself.
|
|
|
|
"""
|
|
def g(t): return -f(t)
|
|
g.rough_period = getattr(f, 'rough_period', None)
|
|
g.step_days = getattr(f, 'step_days', None)
|
|
t, y = find_maxima(start_time, end_time, g, epsilon=1.0 / DAY_S, num=12)
|
|
return t, -y
|
|
|
|
def find_maxima(start_time, end_time, f, epsilon=1.0 / DAY_S, num=12):
|
|
"""Find the local maxima in the values returned by a function of time.
|
|
|
|
This routine is used to find events like highest altitude and
|
|
maximum elongation. See :doc:`searches` for how to use it yourself.
|
|
|
|
"""
|
|
# @@ @@_@@ @@_@@_@@_@@
|
|
# / \ / \ / \
|
|
# @@ @@ @@ @@ @@ @@
|
|
# +1 -1 +1 0 -1 +1 0 0 0 -1 sd = sign(diff(y))
|
|
# -2 -1 -1 -1 0 0 -1 diff(sign(diff(y))
|
|
|
|
ts = start_time.ts
|
|
jd0 = start_time.tt
|
|
jd1 = end_time.tt
|
|
|
|
if jd0 >= jd1:
|
|
raise ValueError('start_time {0} is not earlier than end_time {1}'
|
|
.format(start_time, end_time))
|
|
|
|
# We find maxima by investigating every point that is higher than
|
|
# both points next to it. This presents a problem: if the initial
|
|
# heights are, for example, [1.7, 1.1, 0.3, ...], there might be a
|
|
# maximum 1.8 hidden between the first two heights, but it would not
|
|
# meet the criteria for further investigation because we can't see
|
|
# whether the curve is on its way up or down to the left of 1.7. So
|
|
# we put an extra point out beyond each end of our range, then
|
|
# filter our final result to remove maxima that fall outside the
|
|
# range.
|
|
step_days = getattr(f, 'step_days', None)
|
|
if step_days is None:
|
|
bump = f.rough_period / num
|
|
bumps = int((jd1 - jd0) / bump) + 3
|
|
jd = linspace(jd0 - bump, jd1 + bump, bumps)
|
|
else:
|
|
# Insist on at least 3 samples, even for very close dates; and
|
|
# add 2 more to stand outside the range.
|
|
steps = int((jd1 - jd0) / step_days) + 3
|
|
real_step = (jd1 - jd0) / steps
|
|
jd = linspace(jd0 - real_step, jd1 + real_step, steps + 2)
|
|
|
|
end_alpha = linspace(0.0, 1.0, num)
|
|
start_alpha = end_alpha[::-1]
|
|
o = multiply.outer
|
|
|
|
while True:
|
|
t = ts.tt_jd(jd)
|
|
y = f(t)
|
|
|
|
# Since we start with equal intervals, they all should fall
|
|
# below epsilon at around the same time; so for efficiency we
|
|
# only test the first pair.
|
|
if t[1] - t[0] <= epsilon:
|
|
jd, y = _identify_maxima(jd, y)
|
|
|
|
# Filter out maxima that fell slightly outside our bounds.
|
|
keepers = (jd >= jd0) & (jd <= jd1)
|
|
jd = jd[keepers]
|
|
y = y[keepers]
|
|
|
|
# Keep only the first of several maxima that are separated
|
|
# by less than epsilon.
|
|
if len(jd):
|
|
mask = concatenate(((True,), diff(jd) > epsilon))
|
|
jd = jd[mask]
|
|
y = y[mask]
|
|
|
|
break
|
|
|
|
left, right = _choose_brackets(y)
|
|
|
|
if not len(left):
|
|
# No maxima found.
|
|
jd = y = y[0:0]
|
|
break
|
|
|
|
starts = jd.take(left)
|
|
ends = jd.take(right)
|
|
|
|
jd = o(starts, start_alpha).flatten() + o(ends, end_alpha).flatten()
|
|
jd = _remove_adjacent_duplicates(jd)
|
|
|
|
return ts.tt_jd(jd), y
|
|
|
|
def _choose_brackets(y):
|
|
"""Return the indices between which we should search for maxima of `y`."""
|
|
dsd = diff(sign(diff(y)))
|
|
indices = flatnonzero(dsd < 0)
|
|
left = reshape(add.outer(indices, [0, 1]), -1)
|
|
left = _remove_adjacent_duplicates(left)
|
|
right = left + 1
|
|
return left, right
|
|
|
|
def _identify_maxima(x, y):
|
|
"""Return the maxima we can see in the series y as simple points."""
|
|
dsd = diff(sign(diff(y)))
|
|
|
|
# Choose every point that is higher than the two adjacent points.
|
|
indices = flatnonzero(dsd == -2) + 1
|
|
peak_x = x.take(indices)
|
|
peak_y = y.take(indices)
|
|
|
|
# Also choose the midpoint between the edges of a plateau, if both
|
|
# edges are in view. First we eliminate runs of zeroes, then look
|
|
# for adjacent -1 values, then map those back to the main array.
|
|
indices = flatnonzero(dsd)
|
|
dsd2 = dsd.take(indices)
|
|
minus_ones = dsd2 == -1
|
|
plateau_indices = flatnonzero(minus_ones[:-1] & minus_ones[1:])
|
|
plateau_left_indices = indices.take(plateau_indices)
|
|
plateau_right_indices = indices.take(plateau_indices + 1) + 2
|
|
plateau_x = x.take(plateau_left_indices) + x.take(plateau_right_indices)
|
|
plateau_x /= 2.0
|
|
plateau_y = y.take(plateau_left_indices + 1)
|
|
|
|
x = concatenate((peak_x, plateau_x))
|
|
y = concatenate((peak_y, plateau_y))
|
|
indices = argsort(x)
|
|
return x[indices], y[indices]
|
|
|
|
def _remove_adjacent_duplicates(a):
|
|
if not len(a):
|
|
return a
|
|
mask = diff(a) != 0
|
|
mask = append(mask, [True])
|
|
return a[mask]
|