Skip to content

model.geoinfo

model.geoinfo

Enrich connection records with GeoIP and ASN data from MaxMind databases.

Expected files in the configured data_dir: - GeoLite2-City.mmdb - GeoLite2-ASN.mmdb

When database files are missing or cannot be opened, run in best-effort mode and return None for unavailable fields.

_EMPTY_RESULT = {'lat': None, 'lon': None, 'city': None, 'country': None, 'asn': None, 'asn_org': None} module-attribute

GeoResult

Bases: TypedDict

Define a normalized lookup result for the application.

Source code in model/geoinfo.py
21
22
23
24
25
26
27
28
29
class GeoResult(TypedDict):
    """Define a normalized lookup result for the application."""

    lat: float | None
    lon: float | None
    city: str | None
    country: str | None
    asn: int | None
    asn_org: str | None

GeoDbPaths dataclass

Store file paths for the MaxMind databases.

Source code in model/geoinfo.py
42
43
44
45
46
47
@dataclass(frozen=True)
class GeoDbPaths:
    """Store file paths for the MaxMind databases."""

    city_db: Path
    asn_db: Path

GeoInfo

Enrich connection dictionaries with best-effort GeoIP and ASN data.

Run without databases, cache lookups per IP (LRU), and support reload when .mmdb files appear on disk.

Source code in model/geoinfo.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class GeoInfo:
    """Enrich connection dictionaries with best-effort GeoIP and ASN data.

    Run without databases, cache lookups per IP (LRU), and support reload when
    .mmdb files appear on disk.
    """

    CITY_DB_NAME: Final[str] = "GeoLite2-City.mmdb"
    ASN_DB_NAME: Final[str] = "GeoLite2-ASN.mmdb"

    def __init__(
        self,
        data_dir: Path,
        *,
        cache_size: int = 10_000,
        silent: bool = True,
    ) -> None:
        """Initialize GeoInfo.

        Args:
            data_dir: Directory containing GeoLite2 .mmdb files.
            cache_size: Maximum number of IP lookup results kept in memory.
            silent: When False, raise on database open errors.
        """
        self._cache_size = max(0, int(cache_size))
        self._silent = bool(silent)

        data_dir = Path(data_dir)
        self._paths = GeoDbPaths(
            city_db=data_dir / self.CITY_DB_NAME,
            asn_db=data_dir / self.ASN_DB_NAME,
        )

        self._city_reader: maxminddb.Reader | None = None
        self._asn_reader: maxminddb.Reader | None = None

        # Cache: IP string -> GeoResult
        self._ip_cache: OrderedDict[str, GeoResult] = OrderedDict()

        self._open_readers()

    # Properties

    @property
    def paths(self) -> GeoDbPaths:
        """Return configured database file paths."""
        return self._paths

    @property
    def enabled(self) -> bool:
        """Return True if at least one database is available."""
        return self.city_enabled or self.asn_enabled

    @property
    def city_enabled(self) -> bool:
        """Return True when the City database is open (lat/lon, city, country)."""
        return self._city_reader is not None

    @property
    def asn_enabled(self) -> bool:
        """Return True when the ASN database is open (asn, asn_org)."""
        return self._asn_reader is not None

    # Lifecycle

    def reload(self) -> bool:
        """Reopen database readers from disk.

        Returns:
            True if at least one database is available after reload.
        """
        self.close()
        self._open_readers()
        return self.enabled

    def close(self) -> None:
        """Close any open MaxMind DB readers."""
        if self._city_reader is not None:
            self._city_reader.close()
            self._city_reader = None
        if self._asn_reader is not None:
            self._asn_reader.close()
            self._asn_reader = None

    def __enter__(self) -> GeoInfo:
        """Return self for use as a context manager."""
        return self

    def __exit__(self, exc_type, exc, tb) -> None:
        """Close database readers on context exit."""
        self.close()

    # Public API

    def enrich(self, connections: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Enrich connection dictionaries in-place using raddr_ip.

        Args:
            connections: List of connection dicts.

        Returns:
            The same list object, enriched in-place.
        """
        if not isinstance(connections, list) or not connections:
            return connections

        if not self.enabled:
            return connections

        for conn in connections:
            if not isinstance(conn, dict):
                continue

            ip = conn.get("raddr_ip")
            if not isinstance(ip, str) or not ip:
                continue

            geo = self.lookup(ip)
            conn["lat"] = geo["lat"]
            conn["lon"] = geo["lon"]
            conn["city"] = geo["city"]
            conn["country"] = geo["country"]
            conn["asn"] = geo["asn"]
            conn["asn_org"] = geo["asn_org"]

        return connections

    def lookup(self, ip: str) -> GeoResult:
        """Look up GeoIP and ASN data for an IP address.

        Returns:
            GeoResult with None for unavailable fields.
        """
        if not ip:
            return dict(_EMPTY_RESULT)

        cached = self._ip_cache_get(ip)
        if cached is not None:
            return dict(cached)

        result: GeoResult = dict(_EMPTY_RESULT)

        if self._city_reader is not None:
            self._fill_city(ip, result)

        if self._asn_reader is not None:
            self._fill_asn(ip, result)

        self._ip_cache_put(ip, result)
        return result

    # Internal helpers

    def _open_readers(self) -> None:
        """Open database readers if files exist."""
        self._city_reader = None
        self._asn_reader = None

        try:
            if self._paths.city_db.is_file():
                self._city_reader = maxminddb.open_database(self._paths.city_db)
        except Exception:
            if not self._silent:
                raise

        try:
            if self._paths.asn_db.is_file():
                self._asn_reader = maxminddb.open_database(self._paths.asn_db)
        except Exception:
            if not self._silent:
                raise

    def _fill_city(self, ip: str, result: GeoResult) -> None:
        """Fill city-related fields into result if available."""
        try:
            record = self._city_reader.get(ip) if self._city_reader else None
        except Exception:
            return

        if not isinstance(record, dict):
            return

        loc = record.get("location")
        if isinstance(loc, dict):
            lat = loc.get("latitude")
            lon = loc.get("longitude")
            result["lat"] = float(lat) if isinstance(lat, (int, float)) else None
            result["lon"] = float(lon) if isinstance(lon, (int, float)) else None

        city_block = record.get("city")
        if isinstance(city_block, dict):
            names = city_block.get("names")
            if isinstance(names, dict):
                name = names.get("en")
                result["city"] = name if isinstance(name, str) and name else None

        country_block = record.get("country")
        if isinstance(country_block, dict):
            names = country_block.get("names")
            if isinstance(names, dict):
                name = names.get("en")
                result["country"] = name if isinstance(name, str) and name else None

    def _fill_asn(self, ip: str, result: GeoResult) -> None:
        """Fill ASN-related fields into result if available."""
        try:
            record = self._asn_reader.get(ip) if self._asn_reader else None
        except Exception:
            return

        if not isinstance(record, dict):
            return

        asn_num = record.get("autonomous_system_number")
        result["asn"] = int(asn_num) if isinstance(asn_num, int) else None

        org = record.get("autonomous_system_organization")
        result["asn_org"] = org if isinstance(org, str) and org else None

    # LRU cache

    def _ip_cache_get(self, ip: str) -> GeoResult | None:
        """Return cached value and refresh LRU order."""
        if self._cache_size <= 0:
            return None
        val = self._ip_cache.get(ip)
        if val is None:
            return None
        self._ip_cache.move_to_end(ip, last=True)
        return val

    def _ip_cache_put(self, ip: str, result: GeoResult) -> None:
        """Insert into cache and evict least-recently-used items if needed."""
        if self._cache_size <= 0:
            return
        self._ip_cache[ip] = result
        self._ip_cache.move_to_end(ip, last=True)
        while len(self._ip_cache) > self._cache_size:
            self._ip_cache.popitem(last=False)

paths property

Return configured database file paths.

enabled property

Return True if at least one database is available.

city_enabled property

Return True when the City database is open (lat/lon, city, country).

asn_enabled property

Return True when the ASN database is open (asn, asn_org).

__init__(data_dir, *, cache_size=10000, silent=True)

Initialize GeoInfo.

Parameters:

Name Type Description Default
data_dir Path

Directory containing GeoLite2 .mmdb files.

required
cache_size int

Maximum number of IP lookup results kept in memory.

10000
silent bool

When False, raise on database open errors.

True
Source code in model/geoinfo.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def __init__(
    self,
    data_dir: Path,
    *,
    cache_size: int = 10_000,
    silent: bool = True,
) -> None:
    """Initialize GeoInfo.

    Args:
        data_dir: Directory containing GeoLite2 .mmdb files.
        cache_size: Maximum number of IP lookup results kept in memory.
        silent: When False, raise on database open errors.
    """
    self._cache_size = max(0, int(cache_size))
    self._silent = bool(silent)

    data_dir = Path(data_dir)
    self._paths = GeoDbPaths(
        city_db=data_dir / self.CITY_DB_NAME,
        asn_db=data_dir / self.ASN_DB_NAME,
    )

    self._city_reader: maxminddb.Reader | None = None
    self._asn_reader: maxminddb.Reader | None = None

    # Cache: IP string -> GeoResult
    self._ip_cache: OrderedDict[str, GeoResult] = OrderedDict()

    self._open_readers()

reload()

Reopen database readers from disk.

Returns:

Type Description
bool

True if at least one database is available after reload.

Source code in model/geoinfo.py
115
116
117
118
119
120
121
122
123
def reload(self) -> bool:
    """Reopen database readers from disk.

    Returns:
        True if at least one database is available after reload.
    """
    self.close()
    self._open_readers()
    return self.enabled

close()

Close any open MaxMind DB readers.

Source code in model/geoinfo.py
125
126
127
128
129
130
131
132
def close(self) -> None:
    """Close any open MaxMind DB readers."""
    if self._city_reader is not None:
        self._city_reader.close()
        self._city_reader = None
    if self._asn_reader is not None:
        self._asn_reader.close()
        self._asn_reader = None

__enter__()

Return self for use as a context manager.

Source code in model/geoinfo.py
134
135
136
def __enter__(self) -> GeoInfo:
    """Return self for use as a context manager."""
    return self

__exit__(exc_type, exc, tb)

Close database readers on context exit.

Source code in model/geoinfo.py
138
139
140
def __exit__(self, exc_type, exc, tb) -> None:
    """Close database readers on context exit."""
    self.close()

enrich(connections)

Enrich connection dictionaries in-place using raddr_ip.

Parameters:

Name Type Description Default
connections list[dict[str, Any]]

List of connection dicts.

required

Returns:

Type Description
list[dict[str, Any]]

The same list object, enriched in-place.

Source code in model/geoinfo.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def enrich(self, connections: list[dict[str, Any]]) -> list[dict[str, Any]]:
    """Enrich connection dictionaries in-place using raddr_ip.

    Args:
        connections: List of connection dicts.

    Returns:
        The same list object, enriched in-place.
    """
    if not isinstance(connections, list) or not connections:
        return connections

    if not self.enabled:
        return connections

    for conn in connections:
        if not isinstance(conn, dict):
            continue

        ip = conn.get("raddr_ip")
        if not isinstance(ip, str) or not ip:
            continue

        geo = self.lookup(ip)
        conn["lat"] = geo["lat"]
        conn["lon"] = geo["lon"]
        conn["city"] = geo["city"]
        conn["country"] = geo["country"]
        conn["asn"] = geo["asn"]
        conn["asn_org"] = geo["asn_org"]

    return connections

lookup(ip)

Look up GeoIP and ASN data for an IP address.

Returns:

Type Description
GeoResult

GeoResult with None for unavailable fields.

Source code in model/geoinfo.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def lookup(self, ip: str) -> GeoResult:
    """Look up GeoIP and ASN data for an IP address.

    Returns:
        GeoResult with None for unavailable fields.
    """
    if not ip:
        return dict(_EMPTY_RESULT)

    cached = self._ip_cache_get(ip)
    if cached is not None:
        return dict(cached)

    result: GeoResult = dict(_EMPTY_RESULT)

    if self._city_reader is not None:
        self._fill_city(ip, result)

    if self._asn_reader is not None:
        self._fill_asn(ip, result)

    self._ip_cache_put(ip, result)
    return result