[02/10] importer: Add command to import geofeeds into the database

Message ID 20220927164847.3409646-2-michael.tremer@ipfire.org
State Accepted
Commit 0fcdd68928942372db111ac94112f8a161f2d04e
Headers
Series [01/10] importer: Store geofeed URLs from RIR data |

Commit Message

Michael Tremer Sept. 27, 2022, 4:48 p.m. UTC
  Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
---
 src/scripts/location-importer.in | 143 +++++++++++++++++++++++++++++++
 1 file changed, 143 insertions(+)
  

Patch

diff --git a/src/scripts/location-importer.in b/src/scripts/location-importer.in
index 5bd5da3..ddec376 100644
--- a/src/scripts/location-importer.in
+++ b/src/scripts/location-importer.in
@@ -18,6 +18,7 @@ 
 ###############################################################################
 
 import argparse
+import concurrent.futures
 import ipaddress
 import json
 import logging
@@ -95,6 +96,11 @@  class CLI(object):
 		update_announcements.add_argument("server", nargs=1,
 			help=_("Route Server to connect to"), metavar=_("SERVER"))
 
+		# Update geofeeds
+		update_geofeeds = subparsers.add_parser("update-geofeeds",
+			help=_("Update Geofeeds"))
+		update_geofeeds.set_defaults(func=self.handle_update_geofeeds)
+
 		# Update overrides
 		update_overrides = subparsers.add_parser("update-overrides",
 			help=_("Update overrides"),
@@ -183,6 +189,25 @@  class CLI(object):
 				CREATE INDEX IF NOT EXISTS networks_search ON networks USING GIST(network inet_ops);
 
 				-- geofeeds
+				CREATE TABLE IF NOT EXISTS geofeeds(
+					id serial primary key,
+					url text,
+					status integer default null,
+					updated_at timestamp without time zone default null
+				);
+				CREATE UNIQUE INDEX IF NOT EXISTS geofeeds_unique
+					ON geofeeds(url);
+				CREATE TABLE IF NOT EXISTS geofeed_networks(
+					geofeed_id integer references geofeeds(id) on delete cascade,
+					network inet,
+					country text,
+					region text,
+					city text
+				);
+				CREATE INDEX IF NOT EXISTS geofeed_networks_geofeed_id
+					ON geofeed_networks(geofeed_id);
+				CREATE INDEX IF NOT EXISTS geofeed_networks_search
+					ON geofeed_networks(network);
 				CREATE TABLE IF NOT EXISTS network_geofeeds(network inet, url text);
 				CREATE UNIQUE INDEX IF NOT EXISTS network_geofeeds_unique
 					ON network_geofeeds(network);
@@ -1253,6 +1278,124 @@  class CLI(object):
 				# Otherwise return the line
 				yield line
 
+	def handle_update_geofeeds(self, ns):
+		# Fetch all Geofeeds that require an update
+		geofeeds = self.db.query("""
+			SELECT
+				id,
+				url
+			FROM
+				geofeeds
+			WHERE
+				updated_at IS NULL
+			OR
+				updated_at <= CURRENT_TIMESTAMP - INTERVAL '1 week'
+			ORDER BY
+				id
+		""")
+
+		with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
+			results = executor.map(self._fetch_geofeed, geofeeds)
+
+			for result in results:
+				print(result)
+
+	def _fetch_geofeed(self, geofeed):
+		log.debug("Fetching Geofeed %s" % geofeed.url)
+
+		with self.db.transaction():
+			# Open the URL
+			try:
+				req = urllib.request.Request(geofeed.url, headers={
+					"User-Agent" : "location/%s" % location.__version__,
+
+					# We expect some plain text file in CSV format
+					"Accept" : "text/csv, text/plain",
+				})
+
+				# XXX set proxy
+
+				# Send the request
+				with urllib.request.urlopen(req, timeout=10) as f:
+					# Remove any previous data
+					self.db.execute("DELETE FROM geofeed_networks \
+						WHERE geofeed_id = %s", geofeed.id)
+
+					# Read the output line by line
+					for line in f:
+						line = line.decode()
+
+						# Strip any newline
+						line = line.rstrip()
+
+						# Skip empty lines
+						if not line:
+							continue
+
+						# Try to parse the line
+						try:
+							fields = line.split(",", 5)
+						except ValueError:
+							log.debug("Could not parse line: %s" % line)
+							continue
+
+						# Check if we have enough fields
+						if len(fields) < 4:
+							log.debug("Not enough fields in line: %s" % line)
+							continue
+
+						# Fetch all fields
+						network, country, region, city, = fields[:4]
+
+						# Try to parse the network
+						try:
+							network = ipaddress.ip_network(network, strict=False)
+						except ValueError:
+							log.debug("Could not parse network: %s" % network)
+							continue
+
+						# XXX Check the country code
+
+						# Write this into the database
+						self.db.execute("""
+							INSERT INTO
+								geofeed_networks (
+									geofeed_id,
+									network,
+									country,
+									region,
+									city
+								)
+							VALUES (%s, %s, %s, %s, %s)""",
+							geofeed.id,
+							"%s" % network,
+							country,
+							region,
+							city,
+						)
+
+			# Catch any HTTP errors
+			except urllib.request.HTTPError as e:
+				self.db.execute("UPDATE geofeeds SET status = %s \
+					WHERE id = %s", e.code, geofeed.id)
+
+			# Catch any other errors
+			except urllib.request.URLError as e:
+				log.error("Could not fetch URL %s: %s" % (geofeed.url, e))
+
+			# Mark the geofeed as updated
+			else:
+				self.db.execute("""
+					UPDATE
+						geofeeds
+					SET
+						updated_at = CURRENT_TIMESTAMP,
+						status = NULL
+					WHERE
+						id = %s""",
+					geofeed.id,
+				)
+
 	def handle_update_overrides(self, ns):
 		with self.db.transaction():
 			# Only drop manually created overrides, as we can be reasonably sure to have them,