#!/usr/bin/python2 # -*- coding: utf-8 -*- ############################################################################### # $Id: validate_gpkg.py 3a387ceeca8dcb9dea0f3934f65ab13360233473 2019-03-12 18:09:43 +0100 Even Rouault $ # # Project: GDAL/OGR # Purpose: Test compliance of GeoPackage database w.r.t GeoPackage spec # Author: Even Rouault <even.rouault at spatialys.com> # ############################################################################### # Copyright (c) 2017, Even Rouault <even.rouault at spatialys.com> # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the "Software"), # to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. ############################################################################### import datetime import os import sqlite3 import struct import sys # GDAL may be used for checks on tile content for the tiled gridded extension. # If not available, those tests will be skipped try: from osgeo import gdal has_gdal = True except ImportError: has_gdal = False def _esc_literal(literal): return literal.replace("'", "''") def _esc_id(identifier): return '"' + identifier.replace('"', "\"\"") + '"' def _is_valid_data_type(typ): return typ in ('BOOLEAN', 'TINYINT', 'SMALLINT', 'MEDIUMINT', 'INT', 'INTEGER', 'FLOAT', 'DOUBLE', 'REAL', 'TEXT', 'BLOB', 'DATE', 'DATETIME') or \ typ.startswith('TEXT(') or typ.startswith('BLOB(') class GPKGCheckException(Exception): pass class GPKGChecker(object): EXT_GEOM_TYPES = ('CIRCULARSTRING', 'COMPOUNDCURVE', 'CURVEPOLYGON', 'MULTICURVE', 'MULTISURFACE', 'CURVE', 'SURFACE') def __init__(self, filename, abort_at_first_error=True, verbose=False): self.filename = filename self.extended_pragma_info = False self.abort_at_first_error = abort_at_first_error self.verbose = verbose self.errors = [] def _log(self, msg): if self.verbose: print(msg) def _assert(self, cond, req, msg): # self._log('Verified requirement %s' % req) if not cond: self.errors += [(req, msg)] if self.abort_at_first_error: if req: raise GPKGCheckException('Req %s: %s' % (str(req), msg)) else: raise GPKGCheckException(msg) return cond def _check_structure(self, columns, expected_columns, req, table_name): self._assert(len(columns) == len(expected_columns), req, 'Table %s has %d columns, whereas %d are expected' % (table_name, len(columns), len(expected_columns))) for (_, expected_name, expected_type, expected_notnull, expected_default, expected_pk) in expected_columns: found = False for (_, name, typ, notnull, default, pk) in columns: if name != expected_name: continue if expected_type == 'INTEGER' and expected_pk: expected_notnull = 1 if typ == 'INTEGER' and pk: notnull = 1 if not self.extended_pragma_info and expected_pk > 1: expected_pk = 1 self._assert(typ == expected_type, req, 'Wrong type for %s of %s. Expected %s, got %s' % (name, table_name, expected_type, typ)) self._assert(notnull == expected_notnull, req, ('Wrong notnull for %s of %s. ' + 'Expected %s, got %s') % (name, table_name, expected_notnull, notnull)) self._assert(default == expected_default, req, ('Wrong default for %s of %s. ' + 'Expected %s, got %s') % (name, table_name, expected_default, default)) self._assert(pk == expected_pk, req, 'Wrong pk for %s of %s. Expected %s, got %s' % (name, table_name, expected_pk, pk)) found = True break self._assert(found, req, 'Column %s of %s not found!' % (expected_name, table_name)) def _check_gpkg_spatial_ref_sys(self, c): self._log('Checking gpkg_spatial_ref_sys') c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_spatial_ref_sys'") if not self._assert(c.fetchone() is not None, 10, "gpkg_spatial_ref_sys table missing"): return c.execute("PRAGMA table_info(gpkg_spatial_ref_sys)") columns = c.fetchall() has_definition_12_063 = False for (_, name, _, _, _, _) in columns: if name == 'definition_12_063': has_definition_12_063 = True c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'") row = None if c.fetchone() is not None: c.execute("SELECT scope FROM gpkg_extensions WHERE " "extension_name = 'gpkg_crs_wkt'") row = c.fetchone() if row: scope, = row self._assert(scope == 'read-write', 145, 'scope of gpkg_crs_wkt extension should be read-write') self._assert( has_definition_12_063, 145, "gpkg_spatial_ref_sys should have a definition_12_063 column, " "as gpkg_crs_wkt extension is declared") else: self._assert( not has_definition_12_063, 145, "gpkg_extensions should declare gpkg_crs_wkt extension " "as gpkg_spatial_ref_sys has a definition_12_063 column") if has_definition_12_063: expected_columns = [ (0, 'srs_name', 'TEXT', 1, None, 0), (1, 'srs_id', 'INTEGER', 1, None, 1), (2, 'organization', 'TEXT', 1, None, 0), (3, 'organization_coordsys_id', 'INTEGER', 1, None, 0), (4, 'definition', 'TEXT', 1, None, 0), (5, 'description', 'TEXT', 0, None, 0), (6, 'definition_12_063', 'TEXT', 1, None, 0) ] else: expected_columns = [ (0, 'srs_name', 'TEXT', 1, None, 0), (1, 'srs_id', 'INTEGER', 1, None, 1), (2, 'organization', 'TEXT', 1, None, 0), (3, 'organization_coordsys_id', 'INTEGER', 1, None, 0), (4, 'definition', 'TEXT', 1, None, 0), (5, 'description', 'TEXT', 0, None, 0) ] self._check_structure(columns, expected_columns, 10, 'gpkg_spatial_ref_sys') if has_definition_12_063: c.execute("SELECT srs_id, organization, organization_coordsys_id, " "definition, definition_12_063 " "FROM gpkg_spatial_ref_sys " "WHERE srs_id IN (-1, 0, 4326) ORDER BY srs_id") else: c.execute("SELECT srs_id, organization, organization_coordsys_id, " "definition FROM gpkg_spatial_ref_sys " "WHERE srs_id IN (-1, 0, 4326) ORDER BY srs_id") ret = c.fetchall() self._assert(len(ret) == 3, 11, 'There should be at least 3 records in ' 'gpkg_spatial_ref_sys') if len(ret) != 3: return self._assert(ret[0][1] == 'NONE', 11, 'wrong value for organization for srs_id = -1: %s' % ret[0][1]) self._assert(ret[0][2] == -1, 11, 'wrong value for organization_coordsys_id for ' 'srs_id = -1: %s' % ret[0][2]) self._assert(ret[0][3] == 'undefined', 11, 'wrong value for definition for srs_id = -1: %s' % ret[0][3]) if has_definition_12_063: self._assert(ret[0][4] == 'undefined', 116, 'wrong value for definition_12_063 for ' + 'srs_id = -1: %s' % ret[0][4]) self._assert(ret[1][1] == 'NONE', 11, 'wrong value for organization for srs_id = 0: %s' % ret[1][1]) self._assert(ret[1][2] == 0, 11, 'wrong value for organization_coordsys_id for ' 'srs_id = 0: %s' % ret[1][2]) self._assert(ret[1][3] == 'undefined', 11, 'wrong value for definition for srs_id = 0: %s' % ret[1][3]) if has_definition_12_063: self._assert(ret[1][4] == 'undefined', 116, 'wrong value for definition_12_063 for ' + 'srs_id = 0: %s' % ret[1][4]) self._assert(ret[2][1].lower() == 'epsg', 11, 'wrong value for organization for srs_id = 4326: %s' % ret[2][1]) self._assert(ret[2][2] == 4326, 11, 'wrong value for organization_coordsys_id for ' 'srs_id = 4326: %s' % ret[2][2]) self._assert(ret[2][3] != 'undefined', 11, 'wrong value for definition for srs_id = 4326: %s' % ret[2][3]) if has_definition_12_063: self._assert(ret[2][4] != 'undefined', 116, 'wrong value for definition_12_063 for ' + 'srs_id = 4326: %s' % ret[2][4]) if has_definition_12_063: c.execute("SELECT srs_id FROM gpkg_spatial_ref_sys " "WHERE srs_id NOT IN (0, -1) AND " "definition = 'undefined' AND " "definition_12_063 = 'undefined'") rows = c.fetchall() for (srs_id, ) in rows: self._assert(False, 117, 'srs_id = %d has both definition and ' % srs_id + 'definition_12_063 undefined') def _check_gpkg_contents(self, c): self._log('Checking gpkg_contents') c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_contents'") self._assert(c.fetchone() is not None, 13, "gpkg_contents table missing") c.execute("PRAGMA table_info(gpkg_contents)") columns = c.fetchall() expected_columns = [ (0, 'table_name', 'TEXT', 1, None, 1), (1, 'data_type', 'TEXT', 1, None, 0), (2, 'identifier', 'TEXT', 0, None, 0), (3, 'description', 'TEXT', 0, "''", 0), (4, 'last_change', 'DATETIME', 1, "strftime('%Y-%m-%dT%H:%M:%fZ','now')", 0), (5, 'min_x', 'DOUBLE', 0, None, 0), (6, 'min_y', 'DOUBLE', 0, None, 0), (7, 'max_x', 'DOUBLE', 0, None, 0), (8, 'max_y', 'DOUBLE', 0, None, 0), (9, 'srs_id', 'INTEGER', 0, None, 0) ] self._check_structure(columns, expected_columns, 13, 'gpkg_contents') c.execute("SELECT 1 FROM gpkg_contents " "WHERE data_type IN ('features', 'tiles')") self._assert(c.fetchone() is not None, 17, 'gpkg_contents should at least have one table with ' 'data_type = features and/or tiles') c.execute("SELECT table_name, data_type FROM gpkg_contents " "WHERE data_type NOT IN " "('features', 'tiles', 'attributes', '2d-gridded-coverage')") ret = c.fetchall() self._assert(len(ret) == 0, 17, 'Unexpected data types in gpkg_contents: %s' % str(ret)) c.execute('SELECT table_name, last_change, srs_id FROM gpkg_contents') rows = c.fetchall() for (table_name, last_change, srs_id) in rows: c.execute("SELECT 1 FROM sqlite_master WHERE " "lower(name) = lower(?) AND type IN ('table', 'view')", (table_name,)) self._assert(c.fetchone() is not None, 14, ('table_name=%s in gpkg_contents is not a ' + 'table or view') % table_name) try: datetime.datetime.strptime( last_change, '%Y-%m-%dT%H:%M:%S.%fZ') except ValueError: self._assert(False, 15, ('last_change = %s for table_name = %s ' + 'is invalid datetime') % (last_change, table_name)) if srs_id is not None: c.execute('SELECT 1 FROM gpkg_spatial_ref_sys ' 'WHERE srs_id = ?', (srs_id, )) self._assert(c.fetchone() is not None, 14, ("table_name=%s has srs_id=%d in gpkg_contents " + "which isn't found in gpkg_spatial_ref_sys") % (table_name, srs_id)) def _check_vector_user_table(self, c, table_name): self._log('Checking vector user table ' + table_name) c.execute("SELECT column_name, z, m, geometry_type_name, srs_id " "FROM gpkg_geometry_columns WHERE table_name = ?", (table_name,)) rows_gpkg_geometry_columns = c.fetchall() self._assert(len(rows_gpkg_geometry_columns) == 1, 22, ('table_name = %s is not registered in ' + 'gpkg_geometry_columns') % table_name) geom_column_name = rows_gpkg_geometry_columns[0][0] z = rows_gpkg_geometry_columns[0][1] m = rows_gpkg_geometry_columns[0][2] geometry_type_name = rows_gpkg_geometry_columns[0][3] srs_id = rows_gpkg_geometry_columns[0][4] c.execute('PRAGMA table_info(%s)' % _esc_id(table_name)) base_geom_types = ('GEOMETRY', 'POINT', 'LINESTRING', 'POLYGON', 'MULTIPOINT', 'MULTILINESTRING', 'MULTIPOLYGON', 'GEOMETRYCOLLECTION') cols = c.fetchall() found_geom = False count_pkid = 0 for (_, name, typ, notnull, default, pk) in cols: if name.lower() == geom_column_name.lower(): found_geom = True self._assert( typ in base_geom_types or typ in GPKGChecker.EXT_GEOM_TYPES, 25, ('invalid type (%s) for geometry ' + 'column of table %s') % (typ, table_name)) self._assert(typ == geometry_type_name, 31, ('table %s has geometry column of type %s in ' + 'SQL and %s in geometry_type_name of ' + 'gpkg_geometry_columns') % (table_name, typ, geometry_type_name)) elif pk == 1: count_pkid += 1 self._assert(typ == 'INTEGER', 29, ('table %s has a PRIMARY KEY of type %s ' + 'instead of INTEGER') % (table_name, typ)) else: self._assert(_is_valid_data_type(typ), 5, ('table %s has column %s of unexpected type %s' % (table_name, name, typ))) self._assert(found_geom, 24, 'table %s has no %s column' % (table_name, geom_column_name)) self._assert(count_pkid == 1, 29, 'table %s has no INTEGER PRIMARY KEY' % table_name) self._assert(z in (0, 1, 2), 27, ("z value of %s is %d. " + "Expected 0, 1 or 2") % (table_name, z)) self._assert(m in (0, 1, 2), 27, ("m value of %s is %d. " + "Expected 0, 1 or 2") % (table_name, m)) if geometry_type_name in GPKGChecker.EXT_GEOM_TYPES: c.execute("SELECT 1 FROM gpkg_extensions WHERE " "extension_name = 'gpkg_geom_%s' AND " "table_name = ? AND column_name = ? AND " "scope = 'read-write'" % geometry_type_name, (table_name, geom_column_name)) self._assert(c.fetchone() is not None, 68, "gpkg_geom_%s extension should be declared for " "table %s" % (geometry_type_name, table_name)) wkb_geometries = base_geom_types + GPKGChecker.EXT_GEOM_TYPES c.execute("SELECT %s FROM %s " % (_esc_id(geom_column_name), _esc_id(table_name))) found_geom_types = set() for (blob,) in c.fetchall(): if blob is None: continue self._assert(len(blob) >= 8, 19, 'Invalid geometry') max_size_needed = min(len(blob), 8 + 4 * 2 * 8 + 5) blob_ar = struct.unpack('B' * max_size_needed, blob[0:max_size_needed]) self._assert(blob_ar[0] == ord('G'), 19, 'Invalid geometry') self._assert(blob_ar[1] == ord('P'), 19, 'Invalid geometry') self._assert(blob_ar[2] == 0, 19, 'Invalid geometry') flags = blob_ar[3] big_endian = (flags & 1) == 0 env_ind = (flags >> 1) & 7 self._assert(((flags >> 5) & 1) == 0, 19, 'Invalid geometry: ExtendedGeoPackageBinary not ' 'allowed') self._assert(env_ind <= 4, 19, 'Invalid geometry: invalid envelope indicator code') if big_endian: geom_srs_id = struct.unpack('>I' * 1, blob[4:8])[0] else: geom_srs_id = struct.unpack('<I' * 1, blob[4:8])[0] self._assert(srs_id == geom_srs_id, 33, ('table %s has geometries with SRID %d, ' + 'whereas only %d is expected') % (table_name, geom_srs_id, srs_id)) if env_ind == 0: coord_dim = 0 elif env_ind == 1: coord_dim = 2 elif env_ind == 2 or env_ind == 3: coord_dim = 3 else: coord_dim = 4 # if env_ind == 2 or env_ind == 4: # self._assert(z > 0, 19, # 'z found in geometry, but not in gpkg_geometry_columns') # if env_ind == 3 or env_ind == 4: # self._assert(m > 0, 19, # 'm found in geometry, but not in gpkg_geometry_columns') header_len = 8 + coord_dim * 2 * 8 self._assert(len(blob) >= header_len, 19, 'Invalid geometry') wkb_endianness = blob_ar[header_len] wkb_big_endian = (wkb_endianness == 0) if wkb_big_endian: wkb_geom_type = struct.unpack( '>I' * 1, blob[header_len + 1:header_len + 5])[0] else: wkb_geom_type = struct.unpack( '<I' * 1, blob[header_len + 1:header_len + 5])[0] self._assert(wkb_geom_type >= 0 and (wkb_geom_type % 1000) < len(wkb_geometries), 19, 'Invalid WKB geometry type') wkb_dim = int(wkb_geom_type / 1000) if z == 1: self._assert(wkb_dim == 1 or wkb_dim == 3, 19, 'geometry without Z found') if m == 1: self._assert(wkb_dim == 2 or wkb_dim == 3, 19, 'geometry without M found') if wkb_dim == 1 or wkb_dim == 3: # Z or ZM self._assert(z > 0, 19, 'z found in geometry, but not in ' 'gpkg_geometry_columns') if wkb_dim == 2 or wkb_dim == 3: # M or ZM self._assert(m > 0, 19, 'm found in geometry, but not in ' 'gpkg_geometry_columns') found_geom_types.add(wkb_geometries[wkb_geom_type % 1000]) if geometry_type_name in ('POINT', 'LINESTRING', 'POLYGON', 'MULTIPOINT', 'MULTILINESTRING', 'MULTIPOLYGON'): self._assert(not found_geom_types or found_geom_types == set([geometry_type_name]), 32, 'in table %s, found geometry types %s' % (table_name, str(found_geom_types))) elif geometry_type_name == 'GEOMETRYCOLLECTION': self._assert(not found_geom_types or not found_geom_types.difference( set(['GEOMETRYCOLLECTION', 'MULTIPOINT', 'MULTILINESTRING', 'MULTIPOLYGON', 'MULTICURVE', 'MULTISURFACE'])), 32, 'in table %s, found geometry types %s' % (table_name, str(found_geom_types))) elif geometry_type_name in ('CURVEPOLYGON', 'SURFACE'): self._assert(not found_geom_types or not found_geom_types.difference( set(['POLYGON', 'CURVEPOLYGON'])), 32, 'in table %s, found geometry types %s' % (table_name, str(found_geom_types))) elif geometry_type_name == 'MULTICURVE': self._assert(not found_geom_types or not found_geom_types.difference( set(['MULTILINESTRING', 'MULTICURVE'])), 32, 'in table %s, found geometry types %s' % (table_name, str(found_geom_types))) elif geometry_type_name == 'MULTISURFACE': self._assert(not found_geom_types or not found_geom_types.difference( set(['MULTIPOLYGON', 'MULTISURFACE'])), 32, 'in table %s, found geometry types %s' % (table_name, str(found_geom_types))) elif geometry_type_name == 'CURVE': self._assert(not found_geom_types or not found_geom_types.difference( set(['LINESTRING', 'CIRCULARSTRING', 'COMPOUNDCURVE'])), 32, 'in table %s, found geometry types %s' % (table_name, str(found_geom_types))) for geom_type in found_geom_types: if geom_type in GPKGChecker.EXT_GEOM_TYPES: c.execute("SELECT 1 FROM gpkg_extensions WHERE " "extension_name = 'gpkg_geom_%s' AND " "table_name = ? AND column_name = ? AND " "scope = 'read-write'" % geom_type, (table_name, geom_column_name)) self._assert(c.fetchone() is not None, 68, "gpkg_geom_%s extension should be declared for " "table %s" % (geom_type, table_name)) rtree_name = 'rtree_%s_%s' % (table_name, geom_column_name) c.execute("SELECT 1 FROM sqlite_master WHERE name = ?", (rtree_name,)) has_rtree = c.fetchone() is not None if has_rtree: c.execute("SELECT 1 FROM gpkg_extensions WHERE " "extension_name = 'gpkg_rtree_index' AND " "table_name=? AND column_name=? AND " "scope='write-only'", (table_name, geom_column_name)) self._assert(c.fetchone() is not None, 78, ("Table %s has a RTree, but not declared in " + "gpkg_extensions") % table_name) c.execute('PRAGMA table_info(%s)' % _esc_id(rtree_name)) columns = c.fetchall() expected_columns = [ (0, 'id', '', 0, None, 0), (1, 'minx', '', 0, None, 0), (2, 'maxx', '', 0, None, 0), (3, 'miny', '', 0, None, 0), (4, 'maxy', '', 0, None, 0) ] self._check_structure(columns, expected_columns, 77, rtree_name) c.execute("SELECT 1 FROM sqlite_master WHERE type = 'trigger' " + "AND name = '%s_insert'" % _esc_literal(rtree_name)) self._assert(c.fetchone() is not None, 75, "%s_insert trigger missing" % rtree_name) for i in range(4): c.execute("SELECT 1 FROM sqlite_master WHERE " + "type = 'trigger' " + "AND name = '%s_update%d'" % (_esc_literal(rtree_name), i + 1)) self._assert(c.fetchone() is not None, 75, "%s_update%d trigger missing" % (rtree_name, i + 1)) c.execute("SELECT 1 FROM sqlite_master WHERE type = 'trigger' " + "AND name = '%s_delete'" % _esc_literal(rtree_name)) self._assert(c.fetchone() is not None, 75, "%s_delete trigger missing" % rtree_name) def _check_features(self, c): self._log('Checking features') c.execute("SELECT 1 FROM gpkg_contents WHERE data_type = 'features'") if c.fetchone() is None: self._log('... No features table') return self._log('Checking gpkg_geometry_columns') c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_geometry_columns'") self._assert(c.fetchone() is not None, 21, "gpkg_geometry_columns table missing") c.execute("PRAGMA table_info(gpkg_geometry_columns)") columns = c.fetchall() expected_columns = [ (0, 'table_name', 'TEXT', 1, None, 1), (1, 'column_name', 'TEXT', 1, None, 2), (2, 'geometry_type_name', 'TEXT', 1, None, 0), (3, 'srs_id', 'INTEGER', 1, None, 0), (4, 'z', 'TINYINT', 1, None, 0), (5, 'm', 'TINYINT', 1, None, 0) ] self._check_structure(columns, expected_columns, 21, 'gpkg_geometry_columns') c.execute("SELECT table_name FROM gpkg_contents WHERE " "data_type = 'features'") rows = c.fetchall() for (table_name,) in rows: self._check_vector_user_table(c, table_name) c.execute("SELECT table_name, srs_id FROM gpkg_geometry_columns") rows = c.fetchall() for (table_name, srs_id) in rows: c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ? " + "AND data_type='features'", (table_name,)) ret = c.fetchall() self._assert(len(ret) == 1, 23, ('table_name = %s is registered in ' + 'gpkg_geometry_columns, but not in gpkg_contents') % table_name) c.execute('SELECT 1 FROM gpkg_spatial_ref_sys WHERE ' + 'srs_id = ?', (srs_id, )) self._assert(c.fetchone() is not None, 14, ("table_name=%s has srs_id=%d in " + "gpkg_geometry_columns which isn't found in " + "gpkg_spatial_ref_sys") % (table_name, srs_id)) def _check_attributes(self, c): self._log('Checking attributes') c.execute("SELECT table_name FROM gpkg_contents WHERE " "data_type = 'attributes'") rows = c.fetchall() if not rows: self._log('... No attributes table') for (table_name,) in rows: self._log('Checking attributes table ' + table_name) c.execute('PRAGMA table_info(%s)' % _esc_id(table_name)) cols = c.fetchall() count_pkid = 0 for (_, name, typ, _, _, pk) in cols: if pk == 1: count_pkid += 1 self._assert(typ == 'INTEGER', 119, ('table %s has a PRIMARY KEY of type %s ' + 'instead of INTEGER') % (table_name, typ)) else: self._assert(_is_valid_data_type(typ), 5, 'table %s has column %s of unexpected type %s' % (table_name, name, typ)) self._assert(count_pkid == 1, 119, 'table %s has no INTEGER PRIMARY KEY' % table_name) def _check_tile_user_table(self, c, table_name, data_type): self._log('Checking tile pyramid user table ' + table_name) c.execute("PRAGMA table_info(%s)" % _esc_id(table_name)) columns = c.fetchall() expected_columns = [ (0, 'id', 'INTEGER', 0, None, 1), (1, 'zoom_level', 'INTEGER', 1, None, 0), (2, 'tile_column', 'INTEGER', 1, None, 0), (3, 'tile_row', 'INTEGER', 1, None, 0), (4, 'tile_data', 'BLOB', 1, None, 0) ] self._check_structure(columns, expected_columns, 54, 'gpkg_tile_matrix_set') c.execute("SELECT DISTINCT zoom_level FROM %s" % _esc_id(table_name)) rows = c.fetchall() for (zoom_level, ) in rows: c.execute("SELECT 1 FROM gpkg_tile_matrix WHERE table_name = ? " "AND zoom_level = ?", (table_name, zoom_level)) self._assert(c.fetchone() is not None, 44, ("Table %s has data for zoom_level = %d, but no " + "corresponding row in gpkg_tile_matrix") % (table_name, zoom_level)) zoom_other_levels = False c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'") if c.fetchone() is not None: c.execute("SELECT column_name FROM gpkg_extensions WHERE " "table_name = ? " "AND extension_name = 'gpkg_zoom_other'", (table_name,)) row = c.fetchone() if row is not None: (column_name, ) = row self._assert(column_name == 'tile_data', 88, 'Wrong column_name in gpkg_extensions for ' 'gpkg_zoom_other') zoom_other_levels = True c.execute("SELECT zoom_level, pixel_x_size, pixel_y_size " "FROM gpkg_tile_matrix " "WHERE table_name = ? ORDER BY zoom_level", (table_name,)) rows = c.fetchall() prev_zoom_level = None prev_pixel_x_size = None prev_pixel_y_size = None for (zoom_level, pixel_x_size, pixel_y_size) in rows: if prev_pixel_x_size is not None: self._assert( pixel_x_size < prev_pixel_x_size and pixel_y_size < prev_pixel_y_size, 53, ('For table %s, pixel size are not consistent ' + 'with zoom_level') % table_name) if prev_zoom_level is not None and \ zoom_level == prev_zoom_level + 1 and not zoom_other_levels: self._assert( abs((pixel_x_size - prev_pixel_x_size / 2) / prev_pixel_x_size) < 1e-5, 35, "Expected pixel_x_size=%f for zoom_level=%d. Got %f" % (prev_pixel_x_size / 2, zoom_level, pixel_x_size)) self._assert( abs((pixel_y_size - prev_pixel_y_size / 2) / prev_pixel_y_size) < 1e-5, 35, "Expected pixel_y_size=%f for zoom_level=%d. Got %f" % (prev_pixel_y_size / 2, zoom_level, pixel_y_size)) prev_pixel_x_size = pixel_x_size prev_pixel_y_size = pixel_y_size prev_zoom_level = zoom_level c.execute("SELECT max_x - min_x, " " MIN(matrix_width * tile_width * pixel_x_size), " " MAX(matrix_width * tile_width * pixel_x_size), " " max_y - min_y, " " MIN(matrix_height * tile_height * pixel_y_size), " " MAX(matrix_height * tile_height * pixel_y_size) " "FROM gpkg_tile_matrix tm JOIN gpkg_tile_matrix_set tms " "ON tm.table_name = tms.table_name WHERE tm.table_name = ?", (table_name,)) rows = c.fetchall() if rows: (dx, min_dx, max_dx, dy, min_dy, max_dy) = rows[0] self._assert(abs((min_dx - dx) / dx) < 1e-3 and abs((max_dx - dx) / dx) < 1e-3 and abs((min_dy - dy) / dy) < 1e-3 and abs((max_dy - dy) / dy) < 1e-3, 45, ("Inconsistent values in gpkg_tile_matrix and " + "gpkg_tile_matrix_set for table %s") % table_name) c.execute("SELECT DISTINCT zoom_level FROM %s" % _esc_id(table_name)) rows = c.fetchall() for (zoom_level,) in rows: c.execute(("SELECT MIN(tile_column), MAX(tile_column), " + "MIN(tile_row), MAX(tile_row) FROM %s " + "WHERE zoom_level = %d") % (_esc_id(table_name), zoom_level)) min_col, max_col, min_row, max_row = c.fetchone() c.execute("SELECT matrix_width, matrix_height FROM " "gpkg_tile_matrix " "WHERE table_name = ? AND zoom_level = ?", (table_name, zoom_level)) rows2 = c.fetchall() if not rows2: self._assert(False, 55, "Invalid zoom_level in %s" % table_name) else: matrix_width, matrix_height = rows2[0] self._assert(min_col >= 0 and min_col < matrix_width, 56, "Invalid tile_col in %s" % table_name) self._assert(min_row >= 0 and min_row < matrix_height, 57, "Invalid tile_row in %s" % table_name) c.execute("SELECT tile_data FROM %s" % _esc_id(table_name)) found_webp = False for (blob,) in c.fetchall(): self._assert(blob is not None and len(blob) >= 12, 19, 'Invalid blob') max_size_needed = 12 blob_ar = struct.unpack('B' * max_size_needed, blob[0:max_size_needed]) is_jpeg = blob_ar[0:3] == (0xff, 0xd8, 0xff) is_png = blob_ar[0:4] == (0x89, 0x50, 0x4E, 0x47) is_webp = blob_ar[0:4] == (ord('R'), ord('I'), ord('F'), ord('F')) and \ blob_ar[8:12] == (ord('W'), ord('E'), ord('B'), ord('P')) is_tiff = blob_ar[0:4] == (0x49, 0x49, 0x2A, 0x00) or \ blob_ar[0:4] == (0x4D, 0x4D, 0x00, 0x2A) self._assert(is_jpeg or is_png or is_webp or is_tiff, 36, 'Unrecognized image mime type') if data_type == 'tiles': self._assert(is_jpeg or is_png or is_webp, 36, 'Unrecognized image mime type') elif data_type == '2d-gridded-coverage': self._assert(is_png or is_tiff, 36, 'Unrecognized image mime type') if is_webp: found_webp = True if found_webp: c.execute("SELECT 1 FROM gpkg_extensions WHERE " "table_name = ? AND column_name = 'tile_data' AND " "extension_name = 'gpkg_webp' AND " "scope = 'read-write'", (table_name, )) self._assert(c.fetchone() is not None, 91, ("Table %s has webp content, but not registered " "in gpkg_extensions" % table_name)) def _check_tiles(self, c): self._log('Checking tiles') c.execute("SELECT 1 FROM gpkg_contents WHERE data_type IN " "('tiles', '2d-gridded-coverage')") if c.fetchone() is None: self._log('... No tiles table') return self._log('Checking gpkg_tile_matrix_set ') c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_tile_matrix_set'") self._assert(c.fetchone() is not None, 38, "gpkg_tile_matrix_set table missing") c.execute("PRAGMA table_info(gpkg_tile_matrix_set)") columns = c.fetchall() expected_columns = [ (0, 'table_name', 'TEXT', 1, None, 1), (1, 'srs_id', 'INTEGER', 1, None, 0), (2, 'min_x', 'DOUBLE', 1, None, 0), (3, 'min_y', 'DOUBLE', 1, None, 0), (4, 'max_x', 'DOUBLE', 1, None, 0), (5, 'max_y', 'DOUBLE', 1, None, 0)] self._check_structure(columns, expected_columns, 38, 'gpkg_tile_matrix_set') c.execute("SELECT table_name, srs_id FROM gpkg_tile_matrix_set") rows = c.fetchall() for (table_name, srs_id) in rows: c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ? " + "AND data_type IN ('tiles', '2d-gridded-coverage')", (table_name,)) ret = c.fetchall() self._assert(len(ret) == 1, 39, ('table_name = %s is registered in ' + 'gpkg_tile_matrix_set, but not in gpkg_contents') % table_name) c.execute('SELECT 1 FROM gpkg_spatial_ref_sys WHERE srs_id = ?', (srs_id, )) self._assert(c.fetchone() is not None, 41, ("table_name=%s has srs_id=%d in " + "gpkg_tile_matrix_set which isn't found in " + "gpkg_spatial_ref_sys") % (table_name, srs_id)) self._log('Checking gpkg_tile_matrix') c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_tile_matrix'") self._assert(c.fetchone() is not None, 42, "gpkg_tile_matrix table missing") c.execute("PRAGMA table_info(gpkg_tile_matrix)") columns = c.fetchall() expected_columns = [ (0, 'table_name', 'TEXT', 1, None, 1), (1, 'zoom_level', 'INTEGER', 1, None, 2), (2, 'matrix_width', 'INTEGER', 1, None, 0), (3, 'matrix_height', 'INTEGER', 1, None, 0), (4, 'tile_width', 'INTEGER', 1, None, 0), (5, 'tile_height', 'INTEGER', 1, None, 0), (6, 'pixel_x_size', 'DOUBLE', 1, None, 0), (7, 'pixel_y_size', 'DOUBLE', 1, None, 0) ] self._check_structure(columns, expected_columns, 42, 'gpkg_tile_matrix') c.execute("SELECT table_name, zoom_level, matrix_width, " "matrix_height, tile_width, tile_height, pixel_x_size, " "pixel_y_size FROM gpkg_tile_matrix") rows = c.fetchall() for (table_name, zoom_level, matrix_width, matrix_height, tile_width, tile_height, pixel_x_size, pixel_y_size) in rows: c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ? " "AND data_type IN ('tiles', '2d-gridded-coverage')", (table_name,)) ret = c.fetchall() self._assert(len(ret) == 1, 43, ('table_name = %s is registered in ' + 'gpkg_tile_matrix, but not in gpkg_contents') % table_name) self._assert(zoom_level >= 0, 46, "Invalid zoom_level = %d for table %s" % (zoom_level, table_name)) self._assert(matrix_width > 0, 47, "Invalid matrix_width = %d for table %s" % (matrix_width, table_name)) self._assert(matrix_height > 0, 48, "Invalid matrix_height = %d for table %s" % (matrix_height, table_name)) self._assert(tile_width > 0, 49, "Invalid tile_width = %d for table %s" % (tile_width, table_name)) self._assert(tile_height > 0, 50, "Invalid tile_height = %d for table %s" % (tile_height, table_name)) self._assert(pixel_x_size > 0, 51, "Invalid pixel_x_size = %f for table %s" % (pixel_x_size, table_name)) self._assert(pixel_y_size > 0, 52, "Invalid pixel_y_size = %f for table %s" % (pixel_y_size, table_name)) c.execute("SELECT table_name, data_type FROM gpkg_contents WHERE " "data_type IN ('tiles', '2d-gridded-coverage')") rows = c.fetchall() for (table_name, data_type) in rows: self._check_tile_user_table(c, table_name, data_type) def _check_tiled_gridded_coverage_data(self, c): self._log('Checking tiled gridded elevation data') c.execute("SELECT table_name FROM gpkg_contents WHERE " "data_type = '2d-gridded-coverage'") tables = c.fetchall() if not tables: self._log('... No tiled gridded coverage table') return tables = [tables[i][0] for i in range(len(tables))] c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_2d_gridded_coverage_ancillary'") self._assert(c.fetchone() is not None, 'gpkg_2d_gridded_coverage#1', 'gpkg_2d_gridded_coverage_ancillary table is missing') c.execute("PRAGMA table_info(gpkg_2d_gridded_coverage_ancillary)") columns = c.fetchall() expected_columns = [ (0, 'id', 'INTEGER', 1, None, 1), (1, 'tile_matrix_set_name', 'TEXT', 1, None, 0), (2, 'datatype', 'TEXT', 1, "'integer'", 0), (3, 'scale', 'REAL', 1, '1.0', 0), (4, 'offset', 'REAL', 1, '0.0', 0), (5, 'precision', 'REAL', 0, '1.0', 0), (6, 'data_null', 'REAL', 0, None, 0), (7, 'grid_cell_encoding', 'TEXT', 0, "'grid-value-is-center'", 0), (8, 'uom', 'TEXT', 0, None, 0), (9, 'field_name', 'TEXT', 0, "'Height'", 0), (10, 'quantity_definition', 'TEXT', 0, "'Height'", 0) ] self._check_structure(columns, expected_columns, 'gpkg_2d_gridded_coverage#1', 'gpkg_2d_gridded_coverage_ancillary') c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_2d_gridded_tile_ancillary'") self._assert(c.fetchone() is not None, 'gpkg_2d_gridded_coverage#2', 'gpkg_2d_gridded_tile_ancillary table is missing') c.execute("PRAGMA table_info(gpkg_2d_gridded_tile_ancillary)") columns = c.fetchall() expected_columns = [ (0, 'id', 'INTEGER', 0, None, 1), (1, 'tpudt_name', 'TEXT', 1, None, 0), (2, 'tpudt_id', 'INTEGER', 1, None, 0), (3, 'scale', 'REAL', 1, '1.0', 0), (4, 'offset', 'REAL', 1, '0.0', 0), (5, 'min', 'REAL', 0, 'NULL', 0), (6, 'max', 'REAL', 0, 'NULL', 0), (7, 'mean', 'REAL', 0, 'NULL', 0), (8, 'std_dev', 'REAL', 0, 'NULL', 0) ] self._check_structure(columns, expected_columns, 'gpkg_2d_gridded_coverage#2', 'gpkg_2d_gridded_tile_ancillary') c.execute("SELECT srs_id, organization, organization_coordsys_id, " "definition FROM gpkg_spatial_ref_sys " "WHERE srs_id = 4979") ret = c.fetchall() self._assert(len(ret) == 1, 'gpkg_2d_gridded_coverage#3', "gpkg_spatial_ref_sys shall have a row for srs_id=4979") self._assert(ret[0][1].lower() == 'epsg', 'gpkg_2d_gridded_coverage#3', 'wrong value for organization for srs_id = 4979: %s' % ret[0][1]) self._assert(ret[0][2] == 4979, 'gpkg_2d_gridded_coverage#3', ('wrong value for organization_coordsys_id for ' + 'srs_id = 4979: %s') % ret[0][2]) c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'") self._assert(c.fetchone() is not None, 'gpkg_2d_gridded_coverage#6', 'gpkg_extensions does not exist') c.execute("SELECT table_name, column_name, definition, scope FROM " "gpkg_extensions WHERE " "extension_name = 'gpkg_2d_gridded_coverage'") rows = c.fetchall() self._assert(len(rows) == 2 + len(tables), 'gpkg_2d_gridded_coverage#6', "Wrong number of entries in gpkg_extensions with " "2d_gridded_coverage extension name") found_gpkg_2d_gridded_coverage_ancillary = False found_gpkg_2d_gridded_tile_ancillary = False expected_def = \ 'http://docs.opengeospatial.org/is/17-066r1/17-066r1.html' for (table_name, column_name, definition, scope) in rows: if table_name == 'gpkg_2d_gridded_coverage_ancillary': found_gpkg_2d_gridded_coverage_ancillary = True self._assert(column_name is None, 'gpkg_2d_gridded_coverage#6', "Wrong entry for " "gpkg_2d_gridded_coverage_ancillary " "in gpkg_extensions") self._assert(definition == expected_def, 'gpkg_2d_gridded_coverage#6', "Wrong entry (definition) for " "gpkg_2d_gridded_coverage_ancillary " "in gpkg_extensions") self._assert(scope == 'read-write', 'gpkg_2d_gridded_coverage#6', "Wrong entry for " "gpkg_2d_gridded_coverage_ancillary " "in gpkg_extensions") elif table_name == 'gpkg_2d_gridded_tile_ancillary': found_gpkg_2d_gridded_tile_ancillary = True self._assert(column_name is None, 'gpkg_2d_gridded_coverage#6', "Wrong entry for " "gpkg_2d_gridded_tile_ancillary " "in gpkg_extensions") self._assert(definition == expected_def, 'gpkg_2d_gridded_coverage#6', "Wrong entry (definition) for " "gpkg_2d_gridded_tile_ancillary " "in gpkg_extensions") self._assert(scope == 'read-write', 'gpkg_2d_gridded_coverage#6', "Wrong entry for " "gpkg_2d_gridded_tile_ancillary " "in gpkg_extensions") else: self._assert(table_name in tables, 'gpkg_2d_gridded_coverage#6', "Unexpected table_name registered for " + "2d_gridded_coverage: %s" % table_name) self._assert(column_name == 'tile_data', 'gpkg_2d_gridded_coverage#6', "Wrong entry for %s " % table_name + "in gpkg_extensions") self._assert(definition == expected_def, 'gpkg_2d_gridded_coverage#6', "Wrong entry (definition) for %s " % table_name + "in gpkg_extensions") self._assert(scope == 'read-write', 'gpkg_2d_gridded_coverage#6', "Wrong entry for %s " % table_name + "in gpkg_extensions") self._assert(found_gpkg_2d_gridded_coverage_ancillary, 'gpkg_2d_gridded_coverage#6', "gpkg_2d_gridded_coverage_ancillary not registered " "for 2d_gridded_coverage") self._assert(found_gpkg_2d_gridded_tile_ancillary, 'gpkg_2d_gridded_coverage#6', "gpkg_2d_gridded_tile_ancillary not registered " "for 2d_gridded_coverage") c.execute("SELECT tile_matrix_set_name, datatype FROM " "gpkg_2d_gridded_coverage_ancillary") rows = c.fetchall() self._assert(len(rows) == len(tables), 'gpkg_2d_gridded_coverage#7', "Wrong number of entries in " "gpkg_2d_gridded_coverage_ancillary") for (tile_matrix_set_name, datatype) in rows: self._assert(tile_matrix_set_name in tables, 'gpkg_2d_gridded_coverage#7', "Table %s has a row in " % tile_matrix_set_name + "gpkg_2d_gridded_coverage_ancillary, but not in " "gpkg_contents") c.execute('SELECT 1 FROM gpkg_tile_matrix_set WHERE ' 'table_name = ?', (tile_matrix_set_name,)) self._assert(c.fetchone() is not None, 'gpkg_2d_gridded_coverage#8', 'missing entry in gpkg_tile_matrix_set ' + 'for %s' % tile_matrix_set_name) self._assert(datatype in ('integer', 'float'), 'gpkg_2d_gridded_coverage#9', 'Unexpected datatype = %s' % datatype) for table in tables: c.execute("SELECT COUNT(*) FROM %s" % _esc_id(table)) count_tpudt = c.fetchone() c.execute("SELECT COUNT(*) FROM gpkg_2d_gridded_tile_ancillary " "WHERE tpudt_name = ?", (table, )) count_tile_ancillary = c.fetchone() self._assert(count_tpudt == count_tile_ancillary, 'gpkg_2d_gridded_coverage#10', ("Inconsistent number of rows in " + "gpkg_2d_gridded_tile_ancillary for %s") % table) c.execute("SELECT DISTINCT tpudt_name FROM " "gpkg_2d_gridded_tile_ancillary") rows = c.fetchall() for (tpudt_name, ) in rows: self._assert(tpudt_name in tables, 'gpkg_2d_gridded_coverage#11', "tpudt_name = %s is invalid" % tpudt_name) c.execute("SELECT tile_matrix_set_name FROM " "gpkg_2d_gridded_coverage_ancillary WHERE " "datatype = 'float'") rows = c.fetchall() for (tile_matrix_set_name, ) in rows: c.execute("SELECT 1 FROM gpkg_2d_gridded_tile_ancillary WHERE " "tpudt_name = ? AND " "NOT (offset == 0.0 AND scale == 1.0)", (tile_matrix_set_name,)) self._assert(len(c.fetchall()) == 0, 'gpkg_2d_gridded_coverage#9', "Wrong scale and offset values " + "for %s " % tile_matrix_set_name + "in gpkg_2d_gridded_coverage_ancillary") for table in tables: c.execute("SELECT 1 FROM gpkg_2d_gridded_tile_ancillary WHERE " + "tpudt_name = ? AND tpudt_id NOT IN (SELECT id FROM " + "%s)" % table, (table,)) self._assert(len(c.fetchall()) == 0, 'gpkg_2d_gridded_coverage#12', "tpudt_id in gpkg_2d_gridded_coverage_ancillary " + "not referencing an id from %s" % table) c.execute("SELECT tile_matrix_set_name, datatype FROM " "gpkg_2d_gridded_coverage_ancillary") rows = c.fetchall() warn_gdal_not_available = False for (table_name, datatype) in rows: c.execute("SELECT id, tile_data FROM %s" % _esc_id(table_name)) for (ident, blob) in c.fetchall(): self._assert(blob is not None and len(blob) >= 12, 19, 'Invalid blob') max_size_needed = 12 blob_ar = struct.unpack('B' * max_size_needed, blob[0:max_size_needed]) is_png = blob_ar[0:4] == (0x89, 0x50, 0x4E, 0x47) is_tiff = blob_ar[0:4] == (0x49, 0x49, 0x2A, 0x00) or \ blob_ar[0:4] == (0x4D, 0x4D, 0x00, 0x2A) if datatype == 'integer': self._assert(is_png, 'gpkg_2d_gridded_coverage#13', 'Tile for %s should be PNG' % table_name) if has_gdal: tmp_file = '/vsimem/temp_validate_gpkg.tif' try: blob = bytes(blob) except: blob = str(blob) gdal.FileFromMemBuffer(tmp_file, blob) ds = gdal.Open(tmp_file) try: self._assert(ds is not None, 'gpkg_2d_gridded_coverage#13', 'Invalid tile %d in %s' % (ident, table_name)) self._assert(ds.RasterCount == 1, 'gpkg_2d_gridded_coverage#13', 'Invalid tile %d in %s' % (ident, table_name)) self._assert(ds.GetRasterBand(1).DataType == gdal.GDT_UInt16, 'gpkg_2d_gridded_coverage#13', 'Invalid tile %d in %s' % (ident, table_name)) finally: gdal.Unlink(tmp_file) else: if not warn_gdal_not_available: warn_gdal_not_available = True self._log('GDAL not available. Req gpkg_2d_gridded_coverage#13 not tested') elif datatype == 'float': self._assert(is_tiff, 'gpkg_2d_gridded_coverage#14', 'Tile for %s should be TIFF' % table_name) if has_gdal: tmp_file = '/vsimem/temp_validate_gpkg.tif' try: blob = bytes(blob) except: blob = str(blob) gdal.FileFromMemBuffer(tmp_file, blob) ds = gdal.Open(tmp_file) try: self._assert(ds is not None, 'gpkg_2d_gridded_coverage#15', 'Invalid tile %d in %s' % (ident, table_name)) self._assert(ds.RasterCount == 1, 'gpkg_2d_gridded_coverage#16', 'Invalid tile %d in %s' % (ident, table_name)) self._assert(ds.GetRasterBand(1).DataType == gdal.GDT_Float32, 'gpkg_2d_gridded_coverage#17', 'Invalid tile %d in %s' % (ident, table_name)) compression = ds.GetMetadataItem('COMPRESSION', 'IMAGE_STRUCTURE') self._assert(compression is None or compression == 'LZW', 'gpkg_2d_gridded_coverage#18', 'Invalid tile %d in %s' % (ident, table_name)) ovr_count = ds.GetRasterBand(1).GetOverviewCount() self._assert(not ds.GetSubDatasets() and ovr_count == 0, 'gpkg_2d_gridded_coverage#19', 'Invalid tile %d in %s' % (ident, table_name)) (blockxsize, _) = \ ds.GetRasterBand(1).GetBlockSize() self._assert(blockxsize == ds.RasterXSize, 'gpkg_2d_gridded_coverage#20', 'Invalid tile %d in %s' % (ident, table_name)) finally: gdal.Unlink(tmp_file) else: if not warn_gdal_not_available: warn_gdal_not_available = True self._log('GDAL not available. ' 'Req gpkg_2d_gridded_coverage#15 to gpkg_2d_gridded_coverage#19 not tested') def _check_gpkg_extensions(self, c): self._log('Checking gpkg_extensions') c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'") if c.fetchone() is None: self._log('... No extensions') return c.execute("PRAGMA table_info(gpkg_extensions)") columns = c.fetchall() expected_columns = [ (0, 'table_name', 'TEXT', 0, None, 0), (1, 'column_name', 'TEXT', 0, None, 0), (2, 'extension_name', 'TEXT', 1, None, 0), (3, 'definition', 'TEXT', 1, None, 0), (4, 'scope', 'TEXT', 1, None, 0)] self._check_structure(columns, expected_columns, 58, 'gpkg_extensions') c.execute("SELECT table_name, column_name FROM gpkg_extensions WHERE " "table_name IS NOT NULL") rows = c.fetchall() for (table_name, column_name) in rows: # Doesn't work for gpkg_2d_gridded_coverage_ancillary # c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ?", \ # (table_name,) ) # ret = c.fetchall() # self._assert(len(ret) == 1, \ # 60, ('table_name = %s is registered in ' +\ # 'gpkg_extensions, but not in gpkg_contents') % table_name) if column_name is not None: try: c.execute('SELECT %s FROM %s' % (_esc_id(column_name), _esc_id(table_name))) c.fetchone() except: self._assert(False, 61, ("Column %s of table %s mentioned in " + "gpkg_extensions doesn't exist") % (column_name, table_name)) c.execute("SELECT extension_name FROM gpkg_extensions") rows = c.fetchall() KNOWN_EXTENSIONS = ['gpkg_rtree_index', 'gpkg_zoom_other', 'gpkg_webp', 'gpkg_metadata', 'gpkg_schema', 'gpkg_crs_wkt', 'gpkg_elevation_tiles', # deprecated one 'gpkg_2d_gridded_coverage' ] for geom_name in GPKGChecker.EXT_GEOM_TYPES: KNOWN_EXTENSIONS += ['gpkg_geom_' + geom_name] for (extension_name,) in rows: if extension_name.startswith('gpkg_'): self._assert(extension_name in KNOWN_EXTENSIONS, 62, "extension_name %s not valid" % extension_name) else: self._assert('_' in extension_name, 62, "extension_name %s not valid" % extension_name) author = extension_name[0:extension_name.find('_')] ext_name = extension_name[extension_name.find('_') + 1:] for x in author: self._assert((x >= 'a' and x <= 'z') or (x >= 'A' and x <= 'Z') or (x >= '0' and x <= '9'), 62, "extension_name %s not valid" % extension_name) for x in ext_name: self._assert((x >= 'a' and x <= 'z') or (x >= 'A' and x <= 'Z') or (x >= '0' and x <= '9') or x == '_', 62, "extension_name %s not valid" % extension_name) # c.execute("SELECT extension_name, definition FROM gpkg_extensions " # "WHERE definition NOT LIKE 'Annex %' AND " # "definition NOT LIKE 'http%' AND " # "definition NOT LIKE 'mailto:%' AND " # "definition NOT LIKE 'Extension Title%' ") # rows = c.fetchall() # for (extension_name, definition) in rows: # self._assert(False, 63, # "extension_name %s has invalid definition %s" % # (extension_name, definition)) c.execute("SELECT extension_name, scope FROM gpkg_extensions " "WHERE scope NOT IN ('read-write', 'write-only')") rows = c.fetchall() for (extension_name, scope) in rows: self._assert(False, 64, "extension_name %s has invalid scope %s" % (extension_name, scope)) c.execute("SELECT table_name, scope FROM gpkg_extensions " "WHERE extension_name = 'gpkg_rtree_index' ") rows = c.fetchall() for (table_name, scope) in rows: c.execute("SELECT 1 FROM gpkg_contents WHERE lower(table_name) = lower(?) " "AND data_type = 'features'", (table_name,)) self._assert(c.fetchone() is not None, 75, ('gpkg_extensions declares gpkg_rtree_index for %s,' + ' but this is not a features table') % table_name) self._assert(scope == 'write-only', 75, 'Invalid scope %s for gpkg_rtree_index' % scope) def _check_metadata(self, c): self._log('Checking gpkg_metadata') must_have_gpkg_metadata = False c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'") if c.fetchone() is not None: c.execute("SELECT scope FROM gpkg_extensions WHERE " "extension_name = 'gpkg_metadata'") row = c.fetchone() if row is not None: must_have_gpkg_metadata = True (scope, ) = row self._assert(scope == 'read-write', 140, "Wrong scope for gpkg_metadata in " "gpkg_extensions") c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_metadata'") if c.fetchone() is None: if must_have_gpkg_metadata: self._assert(False, 140, "gpkg_metadata table missing") else: self._log('... No metadata') return c.execute("PRAGMA table_info(gpkg_metadata)") columns = c.fetchall() expected_columns = [ (0, 'id', 'INTEGER', 1, None, 1), (1, 'md_scope', 'TEXT', 1, "'dataset'", 0), (2, 'md_standard_uri', 'TEXT', 1, None, 0), (3, 'mime_type', 'TEXT', 1, "'text/xml'", 0), (4, 'metadata', 'TEXT', 1, "''", 0) ] self._check_structure(columns, expected_columns, 93, 'gpkg_metadata') c.execute("SELECT 1 FROM sqlite_master " "WHERE name = 'gpkg_metadata_reference'") self._assert(c.fetchone() is not None, 95, "gpkg_metadata_reference is missing") c.execute("PRAGMA table_info(gpkg_metadata_reference)") columns = c.fetchall() expected_columns = [ (0, 'reference_scope', 'TEXT', 1, None, 0), (1, 'table_name', 'TEXT', 0, None, 0), (2, 'column_name', 'TEXT', 0, None, 0), (3, 'row_id_value', 'INTEGER', 0, None, 0), (4, 'timestamp', 'DATETIME', 1, "strftime('%Y-%m-%dT%H:%M:%fZ','now')", 0), (5, 'md_file_id', 'INTEGER', 1, None, 0), (6, 'md_parent_id', 'INTEGER', 0, None, 0) ] self._check_structure(columns, expected_columns, 95, 'gpkg_metadata_reference') c.execute("SELECT DISTINCT md_scope FROM gpkg_metadata WHERE " "md_scope NOT IN ('undefined', 'fieldSession', " "'collectionSession', 'series', 'dataset', 'featureType', " "'feature', 'attributeType', 'attribute', 'tile', " "'model', 'catalog', 'schema', 'taxonomy', 'software', " "'service', 'collectionHardware', 'nonGeographicDataset', " "'dimensionGroup')") rows = c.fetchall() for (md_scope, ) in rows: self._assert(False, 94, 'Invalid md_scope %s found' % md_scope) c.execute("SELECT DISTINCT reference_scope FROM " "gpkg_metadata_reference WHERE " "reference_scope NOT IN ('geopackage', 'table', " "'column', 'row', 'row/col')") rows = c.fetchall() for (md_scope, ) in rows: self._assert(False, 96, 'Invalid reference_scope %s found' % md_scope) c.execute("SELECT table_name FROM " "gpkg_metadata_reference WHERE " "reference_scope = 'geopackage' AND table_name is NOT NULL") rows = c.fetchall() for (table_name, ) in rows: self._assert(False, 97, "row in gpkg_metadata_reference with table_name " + "not null (%s)" % table_name + "but reference_scope = geopackage") c.execute("SELECT table_name FROM " "gpkg_metadata_reference WHERE " "reference_scope != 'geopackage'") rows = c.fetchall() for (table_name, ) in rows: self._assert(table_name is not None, 97, "row in gpkg_metadata_reference with null table_name") c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ?", (table_name,)) self._assert(c.fetchone() is not None, 97, "row in gpkg_metadata_reference with table_name " + "not null (%s) with no reference in " % table_name + "gpkg_contents but reference_scope != geopackage") c.execute("SELECT table_name FROM " "gpkg_metadata_reference WHERE " "reference_scope IN ('geopackage', 'table', 'row') " "AND column_name is NOT NULL") rows = c.fetchall() for (table_name, ) in rows: self._assert(False, 98, "row in gpkg_metadata_reference with column_name " + "not null (table=%s)" % table_name + "but reference_scope = geopackage, table or row") c.execute("SELECT table_name, column_name FROM " "gpkg_metadata_reference WHERE " "reference_scope NOT IN ('geopackage', 'table', 'row')") rows = c.fetchall() for (table_name, column_name) in rows: self._assert(column_name is not None, 98, "row in gpkg_metadata_reference with null " "column_name") try: c.execute("SELECT %s FROM %s" % (_esc_id(column_name), _esc_id(table_name))) except: self._assert(False, 98, "column %s of %s does not exist" % (column_name, table_name)) c.execute("SELECT table_name FROM " "gpkg_metadata_reference WHERE " "reference_scope IN ('geopackage', 'table', 'column') " "AND row_id_value is NOT NULL") rows = c.fetchall() for (table_name, ) in rows: self._assert(False, 99, "row in gpkg_metadata_reference with row_id_value " + "not null (table=%s)" % table_name + "but reference_scope = geopackage, table or column") c.execute("SELECT table_name, row_id_value FROM " "gpkg_metadata_reference WHERE " "reference_scope NOT IN ('geopackage', 'table', 'column')") rows = c.fetchall() for (table_name, row_id_value) in rows: self._assert(row_id_value is not None, 99, "row in gpkg_metadata_reference with null " "row_id_value") c.execute("SELECT 1 FROM %s WHERE ROWID = ?" % _esc_id(column_name), (row_id_value, )) self._assert(c.fetchone() is not None, 99, "row %s of %s does not exist" % (str(row_id_value), table_name)) c.execute("SELECT timestamp FROM gpkg_metadata_reference") rows = c.fetchall() for (timestamp, ) in rows: try: datetime.datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ') except ValueError: self._assert(False, 100, ('timestamp = %s in gpkg_metadata_reference' + 'is invalid datetime') % (timestamp)) c.execute("SELECT md_file_id FROM gpkg_metadata_reference") rows = c.fetchall() for (md_file_id, ) in rows: c.execute("SELECT 1 FROM gpkg_metadata WHERE id = ?", (md_file_id,)) self._assert(c.fetchone() is not None, 101, "md_file_id = %s " % str(md_file_id) + "does not have a row in gpkg_metadata") c.execute("SELECT md_parent_id FROM gpkg_metadata_reference " "WHERE md_parent_id IS NOT NULL") rows = c.fetchall() for (md_parent_id, ) in rows: c.execute("SELECT 1 FROM gpkg_metadata WHERE id = ?", (md_parent_id,)) self._assert(c.fetchone() is not None, 102, "md_parent_id = %s " % str(md_parent_id) + "does not have a row in gpkg_metadata") c.execute("SELECT md_file_id FROM " "gpkg_metadata_reference WHERE md_parent_id IS NOT NULL " "AND md_file_id = md_parent_id") rows = c.fetchall() for (md_file_id, ) in rows: self._assert(False, 102, "Row with md_file_id = md_parent_id = %s " % str(md_file_id)) def _check_schema(self, c): # Partial: doesn't check gpkg_data_column_constraints self._log('Checking gpkg_schema (partial)') must_have_gpkg_schema = False c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'") if c.fetchone() is not None: c.execute("SELECT scope FROM gpkg_extensions WHERE " "extension_name = 'gpkg_schema'") row = c.fetchone() if row is not None: must_have_gpkg_schema = True (scope, ) = row self._assert(scope == 'read-write', 141, "Wrong scope for gpkg_schema in " "gpkg_extensions") c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_data_columns'") if c.fetchone() is None: if must_have_gpkg_schema: self._log("gpkg_data_columns table missing. Not forbidden by requirements, but odd") else: self._log('... No schema') return c.execute("PRAGMA table_info(gpkg_data_columns)") columns = c.fetchall() expected_columns = [ (0, 'table_name', 'TEXT', 1, None, 1), (1, 'column_name', 'TEXT', 1, None, 2), (2, 'name', 'TEXT', 0, None, 0), (3, 'title', 'TEXT', 0, None, 0), (4, 'description', 'TEXT', 0, None, 0), (5, 'mime_type', 'TEXT', 0, None, 0), (6, 'constraint_name', 'TEXT', 0, None, 0) ] self._check_structure(columns, expected_columns, 107, 'gpkg_data_columns') c.execute("SELECT table_name, column_name FROM gpkg_data_columns") rows = c.fetchall() for (table_name, column_name) in rows: c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ?", (table_name,)) self._assert(c.fetchone(), 104, ("table_name = %s " % table_name + "in gpkg_data_columns refer to non-existing " + "table/view in gpkg_contents")) try: c.execute("SELECT %s FROM %s" % (_esc_id(column_name), _esc_id(table_name))) except sqlite3.OperationalError: self._assert(False, 105, ("table_name = %s, " % table_name + "column_name = %s " % column_name + "in gpkg_data_columns refer to non-existing " + "column")) def check(self): self._assert(os.path.exists(self.filename), None, "%s does not exist" % self.filename) self._assert(self.filename.lower().endswith('.gpkg'), 3, "filename extension isn't .gpkg'") with open(self.filename, 'rb') as f: f.seek(68, 0) application_id = struct.unpack('B' * 4, f.read(4)) gp10 = struct.unpack('B' * 4, 'GP10'.encode('ASCII')) gp11 = struct.unpack('B' * 4, 'GP11'.encode('ASCII')) gpkg = struct.unpack('B' * 4, 'GPKG'.encode('ASCII')) self._assert(application_id in (gp10, gp11, gpkg), 2, ("Wrong application_id: %s. " + "Expected one of GP10, GP11, GPKG") % str(application_id)) if application_id == gpkg: f.seek(60, 0) user_version = f.read(4) expected_version = 10200 user_version = struct.unpack('>I', user_version)[0] self._assert(user_version >= expected_version, 2, 'Wrong user_version: %d. Expected >= %d' % (user_version, expected_version)) conn = sqlite3.connect(':memory:') c = conn.cursor() c.execute('CREATE TABLE foo(one TEXT, two TEXT, ' 'CONSTRAINT pk PRIMARY KEY (one, two))') c.execute('PRAGMA table_info(foo)') rows = c.fetchall() if rows[1][5] == 2: self.extended_pragma_info = True c.close() conn.close() conn = sqlite3.connect(self.filename) c = conn.cursor() try: try: c.execute('SELECT 1 FROM sqlite_master') c.fetchone() except: self._assert(False, 1, 'not a sqlite3 database') c.execute('PRAGMA foreign_key_check') ret = c.fetchall() self._assert(len(ret) == 0, 7, 'foreign_key_check failed: %s' % str(ret)) c.execute('PRAGMA integrity_check') self._assert(c.fetchone()[0] == 'ok', 6, 'integrity_check failed') self._check_gpkg_spatial_ref_sys(c) self._check_gpkg_contents(c) self._check_features(c) self._check_tiles(c) self._check_attributes(c) self._check_tiled_gridded_coverage_data(c) self._check_gpkg_extensions(c) self._check_metadata(c) self._check_schema(c) finally: c.close() conn.close() def check(filename, abort_at_first_error=True, verbose=False): checker = GPKGChecker(filename, abort_at_first_error=abort_at_first_error, verbose=verbose) checker.check() return checker.errors def Usage(): print('validate_gpkg.py [[-v]|[-q]] [-k] my.gpkg') print('') print('-q: quiet mode') print('-k: (try to) keep going when error is encountered') sys.exit(1) if __name__ == '__main__': filename = None verbose = False abort_at_first_error = True if len(sys.argv) == 1: Usage() for arg in sys.argv[1:]: if arg == '-k': abort_at_first_error = False elif arg == '-q': verbose = False elif arg == '-v': verbose = True elif arg[0] == '-': Usage() else: filename = arg if filename is None: Usage() ret = check(filename, abort_at_first_error=abort_at_first_error, verbose=verbose) if not abort_at_first_error: if not ret: sys.exit(0) else: for (req, msg) in ret: if req: print('Req %d: %s' % (req, msg)) else: print(msg) sys.exit(1)