74 lines
2.2 KiB
Python
74 lines
2.2 KiB
Python
# chrono - agendas system
|
|
# Copyright (C) 2023 Entr'ouvert
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import requests
|
|
from django.conf import settings
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
|
|
class AntsHubException(Exception):
|
|
pass
|
|
|
|
|
|
def make_http_session(retries=3):
|
|
session = requests.Session()
|
|
retry = Retry(
|
|
total=retries,
|
|
read=retries,
|
|
connect=retries,
|
|
backoff_factor=0.5,
|
|
status_forcelist=(502, 503),
|
|
)
|
|
adapter = HTTPAdapter(max_retries=retry)
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
return session
|
|
|
|
|
|
def make_url(path):
|
|
return f'{settings.CHRONO_ANTS_HUB_URL}{path}'
|
|
|
|
|
|
def ping(timeout=1):
|
|
session = make_http_session()
|
|
try:
|
|
response = session.get(make_url('ping/'), timeout=timeout)
|
|
response.raise_for_status()
|
|
err = response.json()['err']
|
|
if err != 0:
|
|
raise AntsHubException(err)
|
|
except requests.Timeout:
|
|
pass
|
|
except (TypeError, KeyError, requests.RequestException) as e:
|
|
raise AntsHubException(str(e))
|
|
|
|
|
|
def push_rendez_vous_disponibles(payload):
|
|
session = make_http_session()
|
|
try:
|
|
response = session.post(make_url('rendez-vous-disponibles/'), json=payload)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
err = data['err']
|
|
if err != 0:
|
|
raise AntsHubException(err)
|
|
return data
|
|
except requests.Timeout:
|
|
return True
|
|
except (TypeError, KeyError, requests.RequestException) as e:
|
|
raise AntsHubException(str(e))
|