#!/usr/bin/env python import overpy import numpy as np import pandas as pd from tqdm import tqdm # brandname : overpass query filters BRANDS: dict[str, str] = { "greggs": "[\"brand:wikidata\"=\"Q3403981\"]", } DATA_FOLDER = "" EncodedLocation = list[tuple[float, list[float]]] def fetch_data(brand: str) -> list[tuple[float, float]]: """Fetch a list of locations from OSM.""" api = overpy.Overpass() filters = BRANDS[brand] query = api.query(f"nwr{filters}; out center;") result = [] for way in query.ways: result.append((float(way.center_lat), float(way.center_lon))) for node in query.nodes: result.append((float(node.lat), float(node.lon))) for (lat, lon) in result: if (lat is None) or (lon is None): raise ValueError("Item missing coords!") return result def spherical_dist(pos1, pos2, r=6378137): """Calculate sperical distances between two arrays of coordinates.""" pos1 = pos1 * np.pi / 180 pos2 = pos2 * np.pi / 180 cos_lat1 = np.cos(pos1[..., 0]) cos_lat2 = np.cos(pos2[..., 0]) cos_lat_d = np.cos(pos1[..., 0] - pos2[..., 0]) cos_lon_d = np.cos(pos1[..., 1] - pos2[..., 1]) return r * np.arccos(cos_lat_d - cos_lat1 * cos_lat2 * (1 - cos_lon_d)) def encode(location: tuple[float, float]) -> EncodedLocation: """Encode a location.""" greggs = np.array(fetch_data("greggs")) repeat_rows = np.tile(greggs, (len(greggs), 1, 1)) repeat_cols = np.transpose(repeat_rows, (1, 0, 2)) dist_matrix = spherical_dist(repeat_rows, repeat_cols) #find closest greggs # distances = pd.Series(np.zeros(len(greggs))) # for i in range(len(greggs)): # current = greggs[i] # #distances[i] = np.sqrt((current[0]-location[0])**2 + (current[1]-location[1])**2 # distances[i] = distance.distance(distance.lonlat(*current), distance.lonlat(*location)).km*1000 repeated = np.tile(location, (len(greggs), 1)) distances = spherical_dist(repeated, greggs) distances = pd.Series(distances) print(distances) distances = distances.sort_values() top3 = distances.head(3) print(top3) # Stub return [ (5., [1., 2., 3.]), (6., [4., 5., 6.]), ] def decode(location: EncodedLocation) -> tuple[float, float]: """Decode into a location.""" # Stub return (0.091659, 52.210796) def format_location(location: EncodedLocation) -> str: """Format an encoded location as a string.""" return ";".join([f"{a}:{','.join(map(str, b))}" for (a, b) in location]) def parse_location(location: str) -> EncodedLocation: """Parse a location string into an EncodedLocation.""" # Stub return [ (5., [1., 2., 3.]), (6., [4., 5., 6.]), ] def main(): """Testing.""" print("Running query...") #greggs = fetch_data("greggs") #print(f"Query done - got {len(greggs)} Greggs!") print(format_location(encode((52.210796, 0.091659)))) if __name__ == "__main__": main()