Public Access
1
0

chore: improve postal data import observability

Add progress logging and a status script for postal imports and neighbor builds, and ignore local raw and generated postal datasets.
This commit is contained in:
pguerrerox
2026-04-12 23:22:36 +00:00
parent cc00a439bf
commit dc7686f507
11 changed files with 267 additions and 23 deletions
+62 -9
View File
@@ -1,13 +1,32 @@
import { getDbPool } from '../../server/src/db/pool.js';
import { createScriptLogger } from './postal-logging.js';
async function run() {
async function logPostalAreaCounts() {
const pool = getDbPool();
const client = await pool.connect();
const result = await pool.query<{ country_code: string; area_count: string }>(`
select country_code, count(*)::text as area_count
from public.postal_areas
group by country_code
order by country_code asc
`);
try {
await client.query('begin');
await client.query('truncate table public.postal_area_neighbors');
await client.query(`
return result.rows;
}
async function buildNeighborsForCountry(countryCode: 'US' | 'CA') {
const pool = getDbPool();
const logger = createScriptLogger(`postal-neighbors:${countryCode.toLowerCase()}`);
const sourceCountResult = await pool.query<{ area_count: string }>(
`select count(*)::text as area_count from public.postal_areas where country_code = $1`,
[countryCode],
);
const areaCount = sourceCountResult.rows[0]?.area_count ?? '0';
logger.info(`Starting neighbor build for ${countryCode} with ${areaCount} postal areas.`);
await pool.query(
`
insert into public.postal_area_neighbors (postal_area_id, neighbor_postal_area_id)
select source.id, neighbor.id
from public.postal_areas source
@@ -15,13 +34,47 @@ async function run() {
on source.country_code = neighbor.country_code
and source.id <> neighbor.id
and ST_Touches(source.geom, neighbor.geom)
`);
where source.country_code = $1
`,
[countryCode],
);
const neighborCountResult = await pool.query<{ neighbor_count: string }>(
`
select count(*)::text as neighbor_count
from public.postal_area_neighbors link
join public.postal_areas area on area.id = link.postal_area_id
where area.country_code = $1
`,
[countryCode],
);
logger.info(`Completed neighbor build for ${countryCode}. Built ${neighborCountResult.rows[0]?.neighbor_count ?? '0'} adjacency links.`);
}
async function run() {
const logger = createScriptLogger('postal-neighbors');
const pool = getDbPool();
const client = await pool.connect();
try {
logger.info('Gathering postal area counts before neighbor build.');
const counts = await logPostalAreaCounts();
counts.forEach((row) => logger.info(`Found ${row.area_count} postal areas for ${row.country_code}.`));
await client.query('begin');
logger.info('Clearing existing postal adjacency links.');
await client.query('truncate table public.postal_area_neighbors');
await client.query('commit');
const summary = await client.query<{ count: string }>('select count(*)::text as count from public.postal_area_neighbors');
console.log(`Built ${summary.rows[0]?.count ?? '0'} postal adjacency links.`);
await buildNeighborsForCountry('US');
await buildNeighborsForCountry('CA');
const summary = await pool.query<{ count: string }>('select count(*)::text as count from public.postal_area_neighbors');
logger.info(`Finished building postal neighbors. Total adjacency links: ${summary.rows[0]?.count ?? '0'}.`);
} catch (error) {
await client.query('rollback');
logger.error('Neighbor build failed.');
throw error;
} finally {
client.release();
+41
View File
@@ -0,0 +1,41 @@
import { getDbPool } from '../../server/src/db/pool.js';
import { createScriptLogger } from './postal-logging.js';
async function run() {
const logger = createScriptLogger('postal-status');
const pool = getDbPool();
try {
const postalAreasByCountry = await pool.query<{ country_code: string; area_count: string }>(`
select country_code, count(*)::text as area_count
from public.postal_areas
group by country_code
order by country_code asc
`);
const neighborCountsByCountry = await pool.query<{ country_code: string; neighbor_count: string }>(`
select area.country_code, count(*)::text as neighbor_count
from public.postal_area_neighbors link
join public.postal_areas area on area.id = link.postal_area_id
group by area.country_code
order by area.country_code asc
`);
const totalAreas = await pool.query<{ count: string }>('select count(*)::text as count from public.postal_areas');
const totalNeighbors = await pool.query<{ count: string }>('select count(*)::text as count from public.postal_area_neighbors');
logger.info(`Postal areas loaded: ${totalAreas.rows[0]?.count ?? '0'}`);
postalAreasByCountry.rows.forEach((row) => {
logger.info(` ${row.country_code}: ${row.area_count} postal areas`);
});
logger.info(`Postal neighbor links built: ${totalNeighbors.rows[0]?.count ?? '0'}`);
neighborCountsByCountry.rows.forEach((row) => {
logger.info(` ${row.country_code}: ${row.neighbor_count} adjacency links`);
});
} finally {
await pool.end();
}
}
await run();
+27 -5
View File
@@ -1,7 +1,8 @@
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { getDbPool } from '../../server/src/db/pool.js';
import { getFeatureGeometry, getStringProperty, normalizePostalCode, readFeatureCollection, type PostalDatasetConfig } from './postal-import-utils.js';
import { createScriptLogger } from './postal-logging.js';
import { assertGeometryIsWgs84, getFeatureGeometry, getStringProperty, normalizePostalCode, streamFeatureCollection, type PostalDatasetConfig } from './postal-import-utils.js';
const currentDir = path.dirname(fileURLToPath(import.meta.url));
const datasetsRoot = path.resolve(currentDir, '../datasets/postal');
@@ -23,33 +24,46 @@ const datasetConfigs: PostalDatasetConfig[] = [
},
];
const logEvery = Math.max(1, Number.parseInt(process.env.POSTAL_IMPORT_LOG_EVERY ?? '500', 10) || 500);
async function importDataset(config: PostalDatasetConfig) {
const pool = getDbPool();
const client = await pool.connect();
const logger = createScriptLogger(`postal-import:${config.countryCode.toLowerCase()}`);
let index = 0;
try {
const collection = await readFeatureCollection(config.filePath);
let insertedCount = 0;
let skippedCount = 0;
logger.info(`Starting ${config.label} import from ${config.filePath}`);
await client.query('begin');
for (const [index, feature] of collection.features.entries()) {
for await (const feature of streamFeatureCollection(config.filePath)) {
index += 1;
const rawPostalCode = getStringProperty(feature.properties, config.postalCodeKeys);
if (!rawPostalCode) {
skippedCount += 1;
if (index % logEvery === 0) {
logger.info(`Processed ${index} features, inserted ${insertedCount}, skipped ${skippedCount}`);
}
continue;
}
const normalizedPostalCode = normalizePostalCode(config.countryCode, rawPostalCode);
if (!normalizedPostalCode) {
skippedCount += 1;
if (index % logEvery === 0) {
logger.info(`Processed ${index} features, inserted ${insertedCount}, skipped ${skippedCount}`);
}
continue;
}
const displayName = getStringProperty(feature.properties, config.displayNameKeys) || normalizedPostalCode;
const geometry = getFeatureGeometry(feature, config.filePath, index);
assertGeometryIsWgs84(geometry, config.filePath, index);
await client.query(
`
@@ -72,7 +86,7 @@ async function importDataset(config: PostalDatasetConfig) {
$4,
ST_Multi(ST_SetSRID(ST_GeomFromGeoJSON($5), 4326)),
ST_Centroid(ST_SetSRID(ST_GeomFromGeoJSON($5), 4326))::geography,
greatest(1000, round(sqrt(ST_Area(ST_SetSRID(ST_GeomFromGeoJSON($5), 4326)::geography) / pi()))::integer),
greatest(1000, round(sqrt(ST_Area(ST_Transform(ST_SetSRID(ST_GeomFromGeoJSON($5), 4326), 3857)) / pi()))::integer),
$6::jsonb,
now(),
now()
@@ -98,12 +112,17 @@ async function importDataset(config: PostalDatasetConfig) {
);
insertedCount += 1;
if (index % logEvery === 0) {
logger.info(`Processed ${index} features, inserted ${insertedCount}, skipped ${skippedCount}`);
}
}
await client.query('commit');
console.log(`Imported ${insertedCount} ${config.label} areas from ${config.filePath}. Skipped ${skippedCount}.`);
logger.info(`Completed ${config.label} import. Inserted ${insertedCount} areas, skipped ${skippedCount}.`);
} catch (error) {
await client.query('rollback');
logger.error(`Import failed after processing ${index} features.`);
throw error;
} finally {
client.release();
@@ -111,10 +130,13 @@ async function importDataset(config: PostalDatasetConfig) {
}
async function run() {
const logger = createScriptLogger('postal-import');
for (const config of datasetConfigs) {
await importDataset(config);
}
logger.info('All postal datasets imported successfully.');
await getDbPool().end();
}
+62 -8
View File
@@ -1,4 +1,8 @@
import { readFile } from 'node:fs/promises';
import { createReadStream } from 'node:fs';
import { chain } from 'stream-chain';
import { parser } from 'stream-json';
import { pick } from 'stream-json/filters/pick';
import { streamArray } from 'stream-json/streamers/stream-array';
export type FeatureGeometry = {
type: 'Polygon' | 'MultiPolygon';
@@ -24,15 +28,49 @@ export type PostalDatasetConfig = {
displayNameKeys: string[];
};
export async function readFeatureCollection(filePath: string) {
const raw = await readFile(filePath, 'utf8');
const parsed = JSON.parse(raw) as GeoJsonFeatureCollection;
if (parsed.type !== 'FeatureCollection' || !Array.isArray(parsed.features)) {
throw new Error(`Dataset at ${filePath} is not a valid GeoJSON FeatureCollection.`);
function findFirstCoordinatePair(coordinates: unknown): [number, number] | null {
if (!Array.isArray(coordinates)) {
return null;
}
return parsed;
if (
coordinates.length >= 2 &&
typeof coordinates[0] === 'number' &&
Number.isFinite(coordinates[0]) &&
typeof coordinates[1] === 'number' &&
Number.isFinite(coordinates[1])
) {
return [coordinates[0], coordinates[1]];
}
for (const value of coordinates) {
const pair = findFirstCoordinatePair(value);
if (pair) {
return pair;
}
}
return null;
}
export async function *streamFeatureCollection(filePath: string) {
const pipeline = chain([
createReadStream(filePath, { encoding: 'utf8' }),
parser(),
pick({ filter: 'features' }) as any,
streamArray() as any,
]) as AsyncIterable<{ value: GeoJsonFeature }>;
let foundFeatures = false;
for await (const chunk of pipeline) {
foundFeatures = true;
yield chunk.value as GeoJsonFeature;
}
if (!foundFeatures) {
throw new Error(`Dataset at ${filePath} is not a valid GeoJSON FeatureCollection with a features array.`);
}
}
export function normalizePostalCode(countryCode: 'US' | 'CA', rawPostalCode: string) {
@@ -74,3 +112,19 @@ export function getFeatureGeometry(feature: GeoJsonFeature, filePath: string, in
return feature.geometry;
}
export function assertGeometryIsWgs84(geometry: FeatureGeometry, filePath: string, index: number) {
const pair = findFirstCoordinatePair(geometry.coordinates);
if (!pair) {
throw new Error(`Feature ${index} in ${filePath} does not contain readable coordinates.`);
}
const [lng, lat] = pair;
if (Math.abs(lng) > 180 || Math.abs(lat) > 90) {
throw new Error(
`Feature ${index} in ${filePath} is not in WGS84 lon/lat coordinates. Re-export the dataset to GeoJSON with EPSG:4326, for example with ogr2ogr -t_srs EPSG:4326.`,
);
}
}
+22
View File
@@ -0,0 +1,22 @@
export function createScriptLogger(prefix: string) {
const startedAt = Date.now();
function elapsed() {
const totalSeconds = Math.floor((Date.now() - startedAt) / 1000);
const minutes = Math.floor(totalSeconds / 60);
const seconds = totalSeconds % 60;
return `${minutes}m ${seconds}s`;
}
return {
info(message: string) {
console.log(`[${prefix}] ${message} (${elapsed()})`);
},
warn(message: string) {
console.warn(`[${prefix}] ${message} (${elapsed()})`);
},
error(message: string) {
console.error(`[${prefix}] ${message} (${elapsed()})`);
},
};
}
+7
View File
@@ -0,0 +1,7 @@
declare module 'stream-json/filters/pick' {
export function pick(options: { filter: string }): NodeJS.ReadWriteStream;
}
declare module 'stream-json/streamers/stream-array' {
export function streamArray(): NodeJS.ReadWriteStream;
}