start of wcs olap importer
This commit is contained in:
parent
b6b28a96cd
commit
467334fb2d
|
@ -0,0 +1,59 @@
|
|||
import argparse
|
||||
import ConfigParser
|
||||
import os
|
||||
import elasticsearch
|
||||
import urlparse
|
||||
import logging
|
||||
import logging.config
|
||||
from . import wcs_api
|
||||
from .feeder import WcsEsFeeder
|
||||
import locale
|
||||
|
||||
|
||||
def main():
|
||||
locale.setlocale(locale.LC_ALL, '')
|
||||
config = ConfigParser.ConfigParser()
|
||||
config_file = os.path.expanduser('~/.wcs_es.ini')
|
||||
if os.path.exists(config_file):
|
||||
config.read(config_file)
|
||||
if config.has_section('loggers'):
|
||||
logging.config.fileConfig(config_file)
|
||||
urls = [url for url in config.sections() if url.startswith('http://') or
|
||||
url.startswith('https://')]
|
||||
parser = argparse.ArgumentParser(description='Engine ES with W.C.S. data', add_help=False)
|
||||
parser.add_argument('--url', help='url of the w.c.s. instance', required=not urls,
|
||||
default=(urls or [None])[0])
|
||||
args, rest = parser.parse_known_args()
|
||||
defaults = {}
|
||||
if getattr(args, 'url') and config.has_section(args.url):
|
||||
defaults = dict(config.items(args.url))
|
||||
parser.add_argument("-h", "--help", action="help", help="show this help message and exit")
|
||||
parser.add_argument('--orig', help='origin of the request for signatures',
|
||||
required='orig' not in defaults)
|
||||
parser.add_argument('--key', help='HMAC key for signatures', required='key' not in defaults)
|
||||
group = parser.add_mutually_exclusive_group(
|
||||
required='email' not in defaults and 'name_id' not in defaults)
|
||||
group.add_argument('--email', help='email for authentication')
|
||||
group.add_argument('--name-id', help='NameID for authentication')
|
||||
parser.add_argument('--es-host', help='ElasticSearch hostname', default='localhost')
|
||||
parser.add_argument('--es-port', help='ElasticSearch port', type=int, default=9200)
|
||||
parser.add_argument('--es-no-recreate', help='do not recreate all indexes, juste update them', dest='es_recreate',
|
||||
default=True, action='store_false')
|
||||
if defaults:
|
||||
parser.set_defaults(**defaults)
|
||||
|
||||
args = parser.parse_args()
|
||||
api = wcs_api.WcsApi(url=args.url, orig=args.orig, key=args.key, email=args.email,
|
||||
name_id=args.name_id)
|
||||
base_index_name = urlparse.urlparse(args.url).netloc.split(':')[0].replace('.', '_')
|
||||
es_hosts = [{'host': args.es_host, 'port': args.es_port, 'use_ssl': False}]
|
||||
es = elasticsearch.Elasticsearch(es_hosts)
|
||||
logger = logging.getLogger('wcs-es')
|
||||
logger.info('starting synchronizing w.c.s. at %r with ES at %s:%s', args.url, args.es_host,
|
||||
args.es_port)
|
||||
feeder = WcsEsFeeder(api, es, base_index_name, recreate=args.es_recreate, logger=logger)
|
||||
feeder.feed()
|
||||
logger.info('finished')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,64 @@
|
|||
import logging
|
||||
from . import wcs_api
|
||||
|
||||
from sqlalchemy import create_engine, MetaData, Table, Column, UnicodeText, DATETIME, Index, INTEGER
|
||||
from sqlalchemy.dialects.postgresql import HSTORE
|
||||
|
||||
class WcsOlapFeeder(object):
|
||||
def __init__(self, api, db_url, base_index_name, recreate=False, logger=None):
|
||||
self.api = api
|
||||
self.engine = create_engine(db_url)
|
||||
self.meta = MetaData()
|
||||
self.recreate = recreate
|
||||
self.logger = logger or logging.getLogger(__name__)
|
||||
self.initialize_base_table()
|
||||
|
||||
def initialize_base_table(self):
|
||||
self.base_table = Table('forms', self.meta,
|
||||
Column('id', INTEGER, nullable=False),
|
||||
Column('formdef', UnicodeText, nullable=False),
|
||||
Column('receipt_time', DATETIME, nullable=False),
|
||||
Column('fields', HSTORE), nullable=True))
|
||||
Index('base_index', self.base_table.c.formdef, self.base_table.c.receipt_time)
|
||||
Index('filter_index', self.base_table.c.formdef, self.base_table.c.receipt_time,
|
||||
self.base_table.c.fields, postgresql_using='gin')
|
||||
self.base_table.create(self.engine, check_first=True)
|
||||
|
||||
def feed(self):
|
||||
for formdef in self.api.get_formdefs():
|
||||
self.logger.info('created index %r', self.formdef_index)
|
||||
self.feed_formdef(formdef)
|
||||
|
||||
def feed_formdef(self, formdef):
|
||||
self.logger.info('start loading data for formdef %r', formdef.slug)
|
||||
conn = self.engine.connect()
|
||||
# Indef formdatas
|
||||
if self.recreate:
|
||||
conn.execute(self.base_table.delete().where(self.base_table.formdef == formdef.slug)
|
||||
try:
|
||||
datas = formdef.datas
|
||||
except wcs_api.WcsApiError, e:
|
||||
logging.error('unable to get formdatas for formdef %r: %s', formdef.slug, e)
|
||||
else:
|
||||
for data in datas:
|
||||
|
||||
def configure_formdef_mapping(self, index, doc_type, formdef):
|
||||
self.configure_field(index, doc_type, ['display_id'], {'type': 'long'})
|
||||
for field in formdef.schema.fields:
|
||||
if field['type'] == 'map' and field.get('varname'):
|
||||
self.configure_field(index, doc_type, ['fields', field['varname']],
|
||||
{'type': 'geo_point'})
|
||||
if field['type'] in ('item', 'string') and field.get('varname'):
|
||||
self.configure_field(index, doc_type, ['fields', field['varname']],
|
||||
{'type': 'string', 'index': 'not_analyzed'})
|
||||
|
||||
def configure_field(self, index, doc_type, field_path, defn):
|
||||
assert field_path
|
||||
|
||||
body = {}
|
||||
cursor = body
|
||||
for part in field_path:
|
||||
cursor['properties'] = {}
|
||||
cursor['properties'][part] = cursor = {}
|
||||
cursor.update(defn)
|
||||
self.es.indices.put_mapping(index=index, doc_type=doc_type, body=body)
|
|
@ -0,0 +1,72 @@
|
|||
import datetime
|
||||
import base64
|
||||
import hmac
|
||||
import hashlib
|
||||
import urllib
|
||||
import random
|
||||
import urlparse
|
||||
|
||||
'''Simple signature scheme for query strings'''
|
||||
|
||||
|
||||
def sign_url(url, key, algo='sha256', timestamp=None, nonce=None):
|
||||
parsed = urlparse.urlparse(url)
|
||||
new_query = sign_query(parsed.query, key, algo, timestamp, nonce)
|
||||
return urlparse.urlunparse(parsed[:4] + (new_query,) + parsed[5:])
|
||||
|
||||
|
||||
def sign_query(query, key, algo='sha256', timestamp=None, nonce=None):
|
||||
if timestamp is None:
|
||||
timestamp = datetime.datetime.utcnow()
|
||||
timestamp = timestamp.strftime('%Y-%m-%dT%H:%M:%SZ')
|
||||
if nonce is None:
|
||||
nonce = hex(random.SystemRandom().getrandbits(128))[2:-1]
|
||||
new_query = query
|
||||
if new_query:
|
||||
new_query += '&'
|
||||
new_query += urllib.urlencode((
|
||||
('algo', algo),
|
||||
('timestamp', timestamp),
|
||||
('nonce', nonce)))
|
||||
signature = base64.b64encode(sign_string(new_query, key, algo=algo))
|
||||
new_query += '&signature=' + urllib.quote(signature)
|
||||
return new_query
|
||||
|
||||
|
||||
def sign_string(s, key, algo='sha256', timedelta=30):
|
||||
digestmod = getattr(hashlib, algo)
|
||||
if isinstance(key, unicode):
|
||||
key = key.encode('utf-8')
|
||||
hash = hmac.HMAC(key, digestmod=digestmod, msg=s)
|
||||
return hash.digest()
|
||||
|
||||
|
||||
def check_url(url, key, known_nonce=None, timedelta=30):
|
||||
parsed = urlparse.urlparse(url, 'https')
|
||||
return check_query(parsed.query, key)
|
||||
|
||||
|
||||
def check_query(query, key, known_nonce=None, timedelta=30):
|
||||
parsed = urlparse.parse_qs(query)
|
||||
signature = base64.b64decode(parsed['signature'][0])
|
||||
algo = parsed['algo'][0]
|
||||
timestamp = parsed['timestamp'][0]
|
||||
timestamp = datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
|
||||
nonce = parsed['nonce']
|
||||
unsigned_query = query.split('&signature=')[0]
|
||||
if known_nonce is not None and known_nonce(nonce):
|
||||
return False
|
||||
if abs(datetime.datetime.utcnow() - timestamp) > datetime.timedelta(seconds=timedelta):
|
||||
return False
|
||||
return check_string(unsigned_query, signature, key, algo=algo)
|
||||
|
||||
|
||||
def check_string(s, signature, key, algo='sha256'):
|
||||
# constant time compare
|
||||
signature2 = sign_string(s, key, algo=algo)
|
||||
if len(signature2) != len(signature):
|
||||
return False
|
||||
res = 0
|
||||
for a, b in zip(signature, signature2):
|
||||
res |= ord(a) ^ ord(b)
|
||||
return res == 0
|
|
@ -0,0 +1,113 @@
|
|||
import requests
|
||||
import urlparse
|
||||
import urllib
|
||||
import datetime
|
||||
|
||||
from . import signature
|
||||
|
||||
|
||||
class WcsApiError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BaseObject(object):
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
self.__wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
||||
def json(self):
|
||||
d = self.__dict__.copy()
|
||||
for key in d.keys():
|
||||
if key.startswith('_'):
|
||||
del d[key]
|
||||
return d
|
||||
|
||||
|
||||
class FormData(BaseObject):
|
||||
def json(self):
|
||||
d = super(FormData, self).json()
|
||||
formdef = d.pop('formdef')
|
||||
receipt_time = datetime.datetime.strptime(d['receipt_time'], "%Y-%m-%dT%H:%M:%SZ")
|
||||
d['receipt_time__dow'] = receipt_time.strftime('%A')
|
||||
d['receipt_time__dow_int'] = int(receipt_time.strftime('%w'))
|
||||
d['receipt_time__month'] = receipt_time.strftime('%B')
|
||||
d['receipt_time__month_int'] = int(receipt_time.strftime('%m'))
|
||||
d['receipt_time__hour'] = int(receipt_time.strftime('%H'))
|
||||
d['formdef_slug'] = formdef.slug
|
||||
return d
|
||||
|
||||
def __repr__(self):
|
||||
return '<{klass} {display_id!r}>'.format(klass=self.__class__.__name__,
|
||||
display_id=self.id)
|
||||
|
||||
|
||||
class FormDef(BaseObject):
|
||||
def __init__(self, wcs_api, **kwargs):
|
||||
self.__wcs_api = wcs_api
|
||||
self.__dict__.update(**kwargs)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.title
|
||||
|
||||
@property
|
||||
def datas(self):
|
||||
datas = self.__wcs_api.get_formdata(self.slug)
|
||||
for data in datas:
|
||||
data.formdef = self
|
||||
return datas
|
||||
|
||||
@property
|
||||
def schema(self):
|
||||
return self.__wcs_api.get_schema(self.slug)
|
||||
|
||||
def __repr__(self):
|
||||
return '<{klass} {slug!r}>'.format(klass=self.__class__.__name__, slug=self.slug)
|
||||
|
||||
|
||||
class WcsApi(object):
|
||||
def __init__(self, url, orig, key, name_id=None, email=None, verify=False):
|
||||
self.url = url
|
||||
self.orig = orig
|
||||
self.key = key
|
||||
self.email = email
|
||||
self.name_id = name_id
|
||||
self.verify = verify
|
||||
|
||||
@property
|
||||
def formdefs_url(self):
|
||||
return urlparse.urljoin(self.url, 'api/formdefs/')
|
||||
|
||||
@property
|
||||
def forms_url(self):
|
||||
return urlparse.urljoin(self.url, 'api/forms/')
|
||||
|
||||
def get_json(self, *url_parts):
|
||||
url = reduce(lambda x, y: urlparse.urljoin(x, y), url_parts)
|
||||
params = {'orig': self.orig}
|
||||
if self.email:
|
||||
params['email'] = self.email
|
||||
if self.name_id:
|
||||
params['NameID'] = self.name_id
|
||||
query_string = urllib.urlencode(params)
|
||||
presigned_url = url + ('&' if '?' in url else '?') + query_string
|
||||
signed_url = signature.sign_url(presigned_url, self.key)
|
||||
try:
|
||||
response = requests.get(signed_url, verify=False)
|
||||
response.raise_for_status()
|
||||
except requests.RequestException, e:
|
||||
raise WcsApiError('GET request failed', signed_url, e)
|
||||
else:
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError, e:
|
||||
raise WcsApiError('Invalid JSON content', signed_url, e)
|
||||
|
||||
def get_formdefs(self):
|
||||
return [FormDef(wcs_api=self, **d) for d in self.get_json(self.formdefs_url)]
|
||||
|
||||
def get_formdata(self, slug):
|
||||
return [FormData(wcs_api=self, **d) for d in self.get_json(self.forms_url, slug + '/',
|
||||
'list?full=on')]
|
||||
|
||||
def get_schema(self, slug):
|
||||
return BaseObject(wcs_api=self, **self.get_json(self.formdefs_url, slug+'/', 'schema'))
|
Loading…
Reference in New Issue