diff --git a/geolinks/__init__.py b/geolinks/__init__.py index d5026a2..281c35e 100644 --- a/geolinks/__init__.py +++ b/geolinks/__init__.py @@ -29,8 +29,17 @@ import logging import sys - import click +from owslib.wms import WebMapService as WMS +from owslib.wfs import WebFeatureService as WFS +from owslib.ogcapi.features import Features as OAPIF +from owslib.ogcapi.coverages import Coverages as OAPIC +from owslib.ogcapi.records import Records as OAPIR +from owslib.wcs import WebCoverageService as WCS +from owslib.csw import CatalogueServiceWeb as CSW +from owslib.wps import WebProcessingService as WPS +from owslib.sos import SensorObservationService as SOS +from owslib.wmts import WebMapTileService as WMTS LOGGER = logging.getLogger(__name__) @@ -49,9 +58,9 @@ def callback(ctx, param, value): return click.option('--verbosity', '-v', type=click.Choice(logging_options), help='Verbosity', + default='WARNING', callback=callback)(f) - def inurl(needles, haystack, position='any'): """convenience function to make string.find return bool""" @@ -78,7 +87,7 @@ def inurl(needles, haystack, position='any'): return False -def sniff_link(url): +def sniff_link(url, extended=False, first=True): """performs basic heuristics to detect what the URL is""" protocol = None @@ -123,7 +132,106 @@ def sniff_link(url): elif inurl(['kml', 'kmz'], link, 'end'): protocol = 'OGC:KML' else: - LOGGER.info('No link type detected') + if (extended): + protocol = [] + #for each servicetype, head out to see if it is valid + try: + wms = WMS(link) + if (wms.identification.type == 'OGC:WMS'): + if (first): + return wms.identification.type + else: + protocol.append(wms.identification.type) + except: + pass # No need to log? + try: + wmts = WMTS(link) + if (wmts.identification.type == 'OGC:WMTS'): + if (first): + return wmts.identification.type + else: + protocol.append(wmts.identification.type) + except: + pass + try: + wps = WPS(link, verbose=False, skip_caps=True) + wps.getcapabilities() + if (wps.identification.type == 'OGC:WPS'): + if (first): + return wps.identification.type + else: + protocol.append(wps.identification.type) + except: + pass + try: + wfs = WFS(link) + if (wfs.identification.type == 'OGC:WFS'): + if (first): + return wfs.identification.type + else: + protocol.append(wfs.identification.type) + except: + pass + try: + csw = CSW('http://geodiscover.cgdi.ca/wes/serviceManagerCSW/csw') + if (csw.identification.type == 'OGC:CSW'): + if (first): + return csw.identification.type + else: + protocol.append(csw.identification.type) + except: + pass + try: + wcs = WCS(link) + if (wcs.identification.type == 'OGC:WCS'): + if (first): + return wcs.identification.type + else: + protocol.append(wcs.identification.type) + except: + pass + try: + sos = SOS(link) + if (sos.identification.type == 'OGC:SOS'): + if (first): + return sos.identification.type + else: + protocol.append(sos.identification.type) + except: + pass + try: + oapir = OAPIR(link) + if (oapir.conformance()): + if (first): + return "OGCAPI:records" + else: + protocol.append("OGCAPI:records") + except: + pass + try: + oapif = OAPIF(link) + if (oapir.conformance()): + if (first): + return "OGCAPI:features" + else: + protocol.append("OGCAPI:features") + except: + pass + try: + oapic = OAPIC(link) + if (oapir.conformance()): + if (first): + return "OGCAPI:coverages" + else: + protocol.append("OGCAPI:coverages") + except: + pass + + if len(protocol) == 1: + protocol = protocol[0] + + else: + LOGGER.info('No link type detected') return protocol @@ -142,16 +250,24 @@ def link(): @click.command() @click.argument('link') +@click.option("--probe", show_default=True, default=False, help="Probe the link to evaluate its type") +@click.option("--first", show_default=True, default=True, help="Use the first protocol identified") @CLICK_OPTION_VERBOSITY -def sniff(link, verbosity): + + +def sniff(link, probe=False, first=True, verbosity='WARNING'): """Sniff link""" click.echo(f'Sniffing link: {link}') - link_type = sniff_link(link) - - click.echo(f'Link type: {link_type}') + link_type = sniff_link(link, probe, first) + if (not link_type): + click.echo(f'No type detected') + elif isinstance(link_type, str): + click.echo(f'Link type: {link_type}') + else: + click.echo(f'Link types: {", ".join(link_type)}') link.add_command(sniff) cli.add_command(link) diff --git a/requirements.txt b/requirements.txt index dca9a90..384b6d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ click +owslib \ No newline at end of file diff --git a/tests/run_tests.py b/tests/run_tests.py index 5b4e1a7..789eb67 100644 --- a/tests/run_tests.py +++ b/tests/run_tests.py @@ -59,7 +59,7 @@ def test_link_types(self): """simple link type tests""" for test in self.test_data['test_data']: - self.assertEqual(sniff_link(test['link']), test['expected'], + self.assertEqual(sniff_link(test['link'], probe=test.get('probe',False), first=test.get('first',True)), test['expected'], 'Expected %s and %s to be equal' % (test['link'], test['expected'])) diff --git a/tests/test_data.json b/tests/test_data.json index dbd2f7e..d8addc9 100644 --- a/tests/test_data.json +++ b/tests/test_data.json @@ -1,6 +1,8 @@ { "test_data": [ {"link": "http://host/wms?service=WMS", "expected": "OGC:WMS"}, + {"link": "https://maps.isric.org/mapserv?map=/map/bdod.map", "expected": "", "probe": "False"}, + {"link": "https://maps.isric.org/mapserv?map=/map/bdod.map", "expected": "OGC:WMS", "probe": "True"}, {"link": "http://host/ows?service=WFS", "expected": "OGC:WFS"}, {"link": "http://host/ows?service=WCS", "expected": "OGC:WCS"}, {"link": "http://host/ows?service=WPS", "expected": "OGC:WPS"},