feat: add deep research planning and postal batch runs
Add a dedicated Deep Research view with postal-area preview overlays, batch execution, and bundled map results. Also add postal dataset import tooling and fix local API networking and research insert issues needed to support the new workflow.
This commit is contained in:
+26
-1
@@ -2,19 +2,43 @@ import Fastify from 'fastify';
|
||||
import cookie from '@fastify/cookie';
|
||||
import cors from '@fastify/cors';
|
||||
import { getEnv } from './config/env.js';
|
||||
import { deepResearchRoutes } from './routes/deep-research.js';
|
||||
import { authRoutes } from './routes/auth.js';
|
||||
import { healthRoutes } from './routes/health.js';
|
||||
import { searchJobRoutes } from './routes/search-jobs.js';
|
||||
|
||||
function parseAllowedOrigins(rawOrigins: string) {
|
||||
return rawOrigins
|
||||
.split(',')
|
||||
.map((origin) => origin.trim())
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
function resolveCorsOrigin(origin: string | undefined, allowedOrigins: string[], isProduction: boolean) {
|
||||
if (!origin) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!isProduction) {
|
||||
return origin;
|
||||
}
|
||||
|
||||
return allowedOrigins.includes(origin) ? origin : false;
|
||||
}
|
||||
|
||||
export async function buildApp() {
|
||||
const env = getEnv();
|
||||
const allowedOrigins = parseAllowedOrigins(env.APP_ORIGIN);
|
||||
const app = Fastify({
|
||||
logger: true,
|
||||
});
|
||||
|
||||
await app.register(cors, {
|
||||
origin: env.APP_ORIGIN,
|
||||
origin(origin, callback) {
|
||||
callback(null, resolveCorsOrigin(origin, allowedOrigins, env.NODE_ENV === 'production'));
|
||||
},
|
||||
credentials: true,
|
||||
methods: ['GET', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE', 'OPTIONS'],
|
||||
});
|
||||
|
||||
await app.register(cookie, {
|
||||
@@ -25,6 +49,7 @@ export async function buildApp() {
|
||||
await app.register(healthRoutes, { prefix: '/api' });
|
||||
await app.register(authRoutes, { prefix: '/api' });
|
||||
await app.register(searchJobRoutes, { prefix: '/api' });
|
||||
await app.register(deepResearchRoutes, { prefix: '/api' });
|
||||
|
||||
return app;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,21 @@
|
||||
import { existsSync } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import dotenv from 'dotenv';
|
||||
import { z } from 'zod';
|
||||
|
||||
const projectRoot = path.resolve(path.dirname(fileURLToPath(import.meta.url)), '../../..');
|
||||
const envFiles = [
|
||||
path.join(projectRoot, '.env.local'),
|
||||
path.join(projectRoot, '.env'),
|
||||
];
|
||||
|
||||
for (const envFile of envFiles) {
|
||||
if (existsSync(envFile)) {
|
||||
dotenv.config({ path: envFile, override: false });
|
||||
}
|
||||
}
|
||||
|
||||
const envSchema = z.object({
|
||||
NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
|
||||
APP_HOST: z.string().default('0.0.0.0'),
|
||||
|
||||
@@ -0,0 +1,230 @@
|
||||
import type { Pool, PoolClient } from 'pg';
|
||||
import type { DeepResearchBatchDetail, DeepResearchBatchSummary, DeepResearchChildJobSummary, JobStatus } from '../../../shared/types.js';
|
||||
import { listSearchJobsForBatch } from '../search/repository.js';
|
||||
|
||||
type DbClient = Pool | PoolClient;
|
||||
|
||||
type DeepResearchBatchRow = {
|
||||
id: string;
|
||||
user_id: string;
|
||||
pin_lat: number;
|
||||
pin_lng: number;
|
||||
base_postal_code: string | null;
|
||||
country_code: string | null;
|
||||
propagation: number;
|
||||
business_type: string;
|
||||
keywords: string | null;
|
||||
status: JobStatus;
|
||||
total_postal_areas: number;
|
||||
total_results: number;
|
||||
child_job_count: number;
|
||||
completed_job_count: number;
|
||||
failed_job_count: number;
|
||||
started_at: string | null;
|
||||
completed_at: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
function mapDeepResearchBatchRow(row: DeepResearchBatchRow): DeepResearchBatchSummary {
|
||||
return {
|
||||
id: row.id,
|
||||
userId: row.user_id,
|
||||
pinLat: row.pin_lat,
|
||||
pinLng: row.pin_lng,
|
||||
basePostalCode: row.base_postal_code ?? undefined,
|
||||
countryCode: row.country_code ?? undefined,
|
||||
propagation: row.propagation,
|
||||
businessType: row.business_type,
|
||||
keywords: row.keywords ?? undefined,
|
||||
status: row.status,
|
||||
totalPostalAreas: row.total_postal_areas,
|
||||
totalResults: row.total_results,
|
||||
childJobCount: row.child_job_count,
|
||||
completedJobCount: row.completed_job_count,
|
||||
failedJobCount: row.failed_job_count,
|
||||
startedAt: row.started_at ?? undefined,
|
||||
completedAt: row.completed_at ?? undefined,
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
};
|
||||
}
|
||||
|
||||
const batchSummarySelect = `
|
||||
select
|
||||
batch.id,
|
||||
batch.user_id,
|
||||
batch.pin_lat,
|
||||
batch.pin_lng,
|
||||
batch.base_postal_code,
|
||||
batch.country_code,
|
||||
batch.propagation,
|
||||
batch.business_type,
|
||||
batch.keywords,
|
||||
batch.status,
|
||||
batch.total_postal_areas,
|
||||
batch.total_results,
|
||||
count(job.id)::int as child_job_count,
|
||||
count(*) filter (where job.status = 'completed')::int as completed_job_count,
|
||||
count(*) filter (where job.status = 'failed')::int as failed_job_count,
|
||||
batch.started_at,
|
||||
batch.completed_at,
|
||||
batch.created_at,
|
||||
batch.updated_at
|
||||
from public.deep_research_batches batch
|
||||
left join public.search_jobs job on job.deep_research_batch_id = batch.id
|
||||
`;
|
||||
|
||||
export async function createDeepResearchBatch(
|
||||
db: DbClient,
|
||||
userId: string,
|
||||
input: {
|
||||
pinLat: number;
|
||||
pinLng: number;
|
||||
basePostalCode: string;
|
||||
countryCode: string;
|
||||
propagation: number;
|
||||
businessType: string;
|
||||
keywords?: string;
|
||||
totalPostalAreas: number;
|
||||
},
|
||||
) {
|
||||
const startedAt = new Date().toISOString();
|
||||
const result = await db.query<{ id: string }>(
|
||||
`
|
||||
insert into public.deep_research_batches (
|
||||
user_id,
|
||||
pin_lat,
|
||||
pin_lng,
|
||||
pin_geom,
|
||||
base_postal_code,
|
||||
country_code,
|
||||
propagation,
|
||||
business_type,
|
||||
keywords,
|
||||
status,
|
||||
total_postal_areas,
|
||||
total_results,
|
||||
started_at,
|
||||
created_at,
|
||||
updated_at
|
||||
)
|
||||
values (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
ST_SetSRID(ST_MakePoint($3, $2), 4326)::geography,
|
||||
$4,
|
||||
$5,
|
||||
$6,
|
||||
$7,
|
||||
$8,
|
||||
'running',
|
||||
$9,
|
||||
0,
|
||||
$10,
|
||||
$10,
|
||||
$10
|
||||
)
|
||||
returning id
|
||||
`,
|
||||
[
|
||||
userId,
|
||||
input.pinLat,
|
||||
input.pinLng,
|
||||
input.basePostalCode,
|
||||
input.countryCode,
|
||||
input.propagation,
|
||||
input.businessType,
|
||||
input.keywords ?? null,
|
||||
input.totalPostalAreas,
|
||||
startedAt,
|
||||
],
|
||||
);
|
||||
|
||||
return result.rows[0].id;
|
||||
}
|
||||
|
||||
export async function finalizeDeepResearchBatch(
|
||||
db: DbClient,
|
||||
batchId: string,
|
||||
input: { status: JobStatus; totalResults: number },
|
||||
) {
|
||||
const completedAt = new Date().toISOString();
|
||||
await db.query(
|
||||
`
|
||||
update public.deep_research_batches
|
||||
set status = $2,
|
||||
total_results = $3,
|
||||
completed_at = $4,
|
||||
updated_at = $4
|
||||
where id = $1
|
||||
`,
|
||||
[batchId, input.status, input.totalResults, completedAt],
|
||||
);
|
||||
}
|
||||
|
||||
export async function failDeepResearchBatch(db: DbClient, batchId: string) {
|
||||
await db.query(
|
||||
`
|
||||
update public.deep_research_batches
|
||||
set status = 'failed', updated_at = now()
|
||||
where id = $1
|
||||
`,
|
||||
[batchId],
|
||||
);
|
||||
}
|
||||
|
||||
export async function listDeepResearchBatchesForUser(db: DbClient, userId: string): Promise<DeepResearchBatchSummary[]> {
|
||||
const result = await db.query<DeepResearchBatchRow>(
|
||||
`${batchSummarySelect}
|
||||
where batch.user_id = $1
|
||||
group by batch.id
|
||||
order by batch.created_at desc
|
||||
`,
|
||||
[userId],
|
||||
);
|
||||
|
||||
return result.rows.map(mapDeepResearchBatchRow);
|
||||
}
|
||||
|
||||
export async function getDeepResearchBatchSummaryForUser(db: DbClient, userId: string, batchId: string) {
|
||||
const result = await db.query<DeepResearchBatchRow>(
|
||||
`${batchSummarySelect}
|
||||
where batch.user_id = $1 and batch.id = $2
|
||||
group by batch.id
|
||||
limit 1
|
||||
`,
|
||||
[userId, batchId],
|
||||
);
|
||||
|
||||
if (result.rowCount === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return mapDeepResearchBatchRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function getDeepResearchBatchDetailForUser(db: DbClient, userId: string, batchId: string): Promise<DeepResearchBatchDetail | null> {
|
||||
const batch = await getDeepResearchBatchSummaryForUser(db, userId, batchId);
|
||||
if (!batch) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const childJobs = await listSearchJobsForBatch(db, userId, batchId);
|
||||
const mappedChildJobs: DeepResearchChildJobSummary[] = childJobs.map((job) => ({
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
postalCode: job.postalCode,
|
||||
status: job.status,
|
||||
totalResults: job.totalResults,
|
||||
createdAt: job.createdAt,
|
||||
completedAt: job.completedAt,
|
||||
}));
|
||||
|
||||
return {
|
||||
...batch,
|
||||
childJobs: mappedChildJobs,
|
||||
jobIds: mappedChildJobs.map((job) => job.id),
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,99 @@
|
||||
import type { Pool } from 'pg';
|
||||
import type { CreateDeepResearchBatchRequest, DeepResearchBatchDetail, DeepResearchBatchSummary, DeepResearchPreviewRequest, JobStatus } from '../../../shared/types.js';
|
||||
import { listPostalAreasByPropagation, findPostalAreaContainingPoint } from '../postal/repository.js';
|
||||
import { previewDeepResearchForPoint } from '../postal/service.js';
|
||||
import { runSearchForPostalArea } from '../search/run-search.js';
|
||||
import {
|
||||
createDeepResearchBatch,
|
||||
failDeepResearchBatch,
|
||||
finalizeDeepResearchBatch,
|
||||
getDeepResearchBatchDetailForUser,
|
||||
listDeepResearchBatchesForUser,
|
||||
} from './repository.js';
|
||||
|
||||
function toRadiusKm(searchRadiusMeters: number | null) {
|
||||
return Math.min(50, Math.max(1, Math.ceil((searchRadiusMeters ?? 1000) / 1000)));
|
||||
}
|
||||
|
||||
export async function listDeepResearchBatches(db: Pool, userId: string): Promise<DeepResearchBatchSummary[]> {
|
||||
return listDeepResearchBatchesForUser(db, userId);
|
||||
}
|
||||
|
||||
export async function getDeepResearchBatchDetail(db: Pool, userId: string, batchId: string): Promise<DeepResearchBatchDetail | null> {
|
||||
return getDeepResearchBatchDetailForUser(db, userId, batchId);
|
||||
}
|
||||
|
||||
export async function createDeepResearchBatchForUser(
|
||||
db: Pool,
|
||||
userId: string,
|
||||
input: CreateDeepResearchBatchRequest,
|
||||
): Promise<DeepResearchBatchDetail> {
|
||||
const preview = await previewDeepResearchForPoint(db, input as DeepResearchPreviewRequest);
|
||||
const baseArea = await findPostalAreaContainingPoint(db, input.lat, input.lng);
|
||||
|
||||
if (!baseArea) {
|
||||
throw new Error('No supported postal area was found for the selected pin.');
|
||||
}
|
||||
|
||||
const areaRows = await listPostalAreasByPropagation(db, baseArea.id, input.propagation);
|
||||
const batchId = await createDeepResearchBatch(db, userId, {
|
||||
pinLat: input.lat,
|
||||
pinLng: input.lng,
|
||||
basePostalCode: preview.baseArea.postalCode,
|
||||
countryCode: preview.countryCode,
|
||||
propagation: input.propagation,
|
||||
businessType: input.businessType,
|
||||
keywords: input.keywords,
|
||||
totalPostalAreas: preview.totalAreas,
|
||||
});
|
||||
|
||||
let totalResults = 0;
|
||||
let hadFailures = false;
|
||||
|
||||
try {
|
||||
for (const area of areaRows) {
|
||||
if (typeof area.centroid_lat !== 'number' || typeof area.centroid_lng !== 'number') {
|
||||
hadFailures = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await runSearchForPostalArea(db, userId, {
|
||||
name: `${input.businessType} in ${area.display_name || area.postal_code}`,
|
||||
city: area.display_name || area.postal_code,
|
||||
address: area.display_name || area.postal_code,
|
||||
postalCode: area.postal_code,
|
||||
countryCode: area.country_code,
|
||||
radiusKm: toRadiusKm(area.search_radius_m),
|
||||
businessType: input.businessType,
|
||||
keywords: input.keywords,
|
||||
lat: area.centroid_lat,
|
||||
lng: area.centroid_lng,
|
||||
deepResearchBatchId: batchId,
|
||||
postalAreaId: area.id,
|
||||
queryContextTerms: [area.postal_code, area.country_code],
|
||||
});
|
||||
|
||||
totalResults += result.totalResults;
|
||||
} catch {
|
||||
hadFailures = true;
|
||||
}
|
||||
}
|
||||
|
||||
const finalStatus: JobStatus = hadFailures ? 'failed' : 'completed';
|
||||
await finalizeDeepResearchBatch(db, batchId, {
|
||||
status: finalStatus,
|
||||
totalResults,
|
||||
});
|
||||
} catch (error) {
|
||||
await failDeepResearchBatch(db, batchId);
|
||||
throw error;
|
||||
}
|
||||
|
||||
const detail = await getDeepResearchBatchDetailForUser(db, userId, batchId);
|
||||
if (!detail) {
|
||||
throw new Error('Failed to load the created deep research batch.');
|
||||
}
|
||||
|
||||
return detail;
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
import 'dotenv/config';
|
||||
import { buildApp } from './app.js';
|
||||
import { getEnv } from './config/env.js';
|
||||
|
||||
|
||||
@@ -0,0 +1,86 @@
|
||||
import type { Pool, PoolClient } from 'pg';
|
||||
|
||||
type DbClient = Pool | PoolClient;
|
||||
|
||||
export type PostalAreaRow = {
|
||||
id: string;
|
||||
country_code: string;
|
||||
postal_code: string;
|
||||
normalized_postal_code: string;
|
||||
display_name: string | null;
|
||||
centroid_lat: number | null;
|
||||
centroid_lng: number | null;
|
||||
search_radius_m: number | null;
|
||||
propagation_ring: number;
|
||||
geom_json: string;
|
||||
};
|
||||
|
||||
export async function findPostalAreaContainingPoint(db: DbClient, lat: number, lng: number) {
|
||||
const result = await db.query<PostalAreaRow>(
|
||||
`
|
||||
select
|
||||
id,
|
||||
country_code,
|
||||
postal_code,
|
||||
normalized_postal_code,
|
||||
display_name,
|
||||
ST_Y(centroid::geometry) as centroid_lat,
|
||||
ST_X(centroid::geometry) as centroid_lng,
|
||||
search_radius_m,
|
||||
0 as propagation_ring,
|
||||
ST_AsGeoJSON(geom) as geom_json
|
||||
from public.postal_areas
|
||||
where ST_Covers(geom, ST_SetSRID(ST_MakePoint($2, $1), 4326))
|
||||
order by ST_Area(geom) asc
|
||||
limit 1
|
||||
`,
|
||||
[lat, lng],
|
||||
);
|
||||
|
||||
if (result.rowCount === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return result.rows[0];
|
||||
}
|
||||
|
||||
export async function listPostalAreasByPropagation(db: DbClient, basePostalAreaId: string, propagation: number) {
|
||||
const result = await db.query<PostalAreaRow>(
|
||||
`
|
||||
with recursive walk as (
|
||||
select id, 0::int as propagation_ring
|
||||
from public.postal_areas
|
||||
where id = $1
|
||||
|
||||
union
|
||||
|
||||
select neighbor.neighbor_postal_area_id, walk.propagation_ring + 1
|
||||
from walk
|
||||
join public.postal_area_neighbors neighbor on neighbor.postal_area_id = walk.id
|
||||
where walk.propagation_ring < $2
|
||||
),
|
||||
ranked as (
|
||||
select id, min(propagation_ring)::int as propagation_ring
|
||||
from walk
|
||||
group by id
|
||||
)
|
||||
select
|
||||
area.id,
|
||||
area.country_code,
|
||||
area.postal_code,
|
||||
area.normalized_postal_code,
|
||||
area.display_name,
|
||||
ST_Y(area.centroid::geometry) as centroid_lat,
|
||||
ST_X(area.centroid::geometry) as centroid_lng,
|
||||
area.search_radius_m,
|
||||
ranked.propagation_ring,
|
||||
ST_AsGeoJSON(area.geom) as geom_json
|
||||
from ranked
|
||||
join public.postal_areas area on area.id = ranked.id
|
||||
order by ranked.propagation_ring asc, area.postal_code asc
|
||||
`,
|
||||
[basePostalAreaId, propagation],
|
||||
);
|
||||
|
||||
return result.rows;
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
import type { Pool } from 'pg';
|
||||
import type { DeepResearchOverlayProperties, DeepResearchPreview, DeepResearchPreviewRequest, GeoJsonFeatureCollection, PostalAreaPreview } from '../../../shared/types.js';
|
||||
import { findPostalAreaContainingPoint, listPostalAreasByPropagation } from './repository.js';
|
||||
|
||||
function mapPreviewArea(row: {
|
||||
id: string;
|
||||
country_code: string;
|
||||
postal_code: string;
|
||||
normalized_postal_code: string;
|
||||
display_name: string | null;
|
||||
centroid_lat: number | null;
|
||||
centroid_lng: number | null;
|
||||
propagation_ring: number;
|
||||
}) : PostalAreaPreview {
|
||||
return {
|
||||
id: row.id,
|
||||
countryCode: row.country_code,
|
||||
postalCode: row.postal_code,
|
||||
normalizedPostalCode: row.normalized_postal_code,
|
||||
displayName: row.display_name || row.postal_code,
|
||||
propagationRing: row.propagation_ring,
|
||||
centroidLat: row.centroid_lat,
|
||||
centroidLng: row.centroid_lng,
|
||||
};
|
||||
}
|
||||
|
||||
export async function previewDeepResearchForPoint(db: Pool, input: DeepResearchPreviewRequest): Promise<DeepResearchPreview> {
|
||||
const baseAreaRow = await findPostalAreaContainingPoint(db, input.lat, input.lng);
|
||||
|
||||
if (!baseAreaRow) {
|
||||
throw new Error('No supported postal area was found for the selected pin.');
|
||||
}
|
||||
|
||||
if (!['US', 'CA'].includes(baseAreaRow.country_code)) {
|
||||
throw new Error(`Unsupported country code '${baseAreaRow.country_code}' for deep research.`);
|
||||
}
|
||||
|
||||
const areaRows = await listPostalAreasByPropagation(db, baseAreaRow.id, input.propagation);
|
||||
const areas = areaRows.map(mapPreviewArea);
|
||||
const baseArea = areas.find((area) => area.id === baseAreaRow.id) ?? mapPreviewArea(baseAreaRow);
|
||||
|
||||
const overlay: GeoJsonFeatureCollection<DeepResearchOverlayProperties> = {
|
||||
type: 'FeatureCollection',
|
||||
features: areaRows.map((row) => ({
|
||||
type: 'Feature',
|
||||
geometry: JSON.parse(row.geom_json) as { type: 'Polygon' | 'MultiPolygon'; coordinates: unknown },
|
||||
properties: {
|
||||
postalAreaId: row.id,
|
||||
countryCode: row.country_code,
|
||||
postalCode: row.postal_code,
|
||||
displayName: row.display_name || row.postal_code,
|
||||
propagationRing: row.propagation_ring,
|
||||
},
|
||||
})),
|
||||
};
|
||||
|
||||
return {
|
||||
baseArea,
|
||||
areas,
|
||||
overlay,
|
||||
propagation: input.propagation,
|
||||
countryCode: baseArea.countryCode,
|
||||
totalAreas: areas.length,
|
||||
estimatedChildJobs: areas.length,
|
||||
businessType: input.businessType,
|
||||
keywords: input.keywords,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
import type { FastifyPluginAsync } from 'fastify';
|
||||
import { ZodError, z } from 'zod';
|
||||
import { requireAuth } from '../auth/middleware.js';
|
||||
import { getDbPool } from '../db/pool.js';
|
||||
import { createDeepResearchBatchForUser, getDeepResearchBatchDetail, listDeepResearchBatches } from '../deep-research/service.js';
|
||||
import { previewDeepResearchForPoint } from '../postal/service.js';
|
||||
|
||||
const previewSchema = z.object({
|
||||
lat: z.number().finite().min(-90).max(90),
|
||||
lng: z.number().finite().min(-180).max(180),
|
||||
propagation: z.coerce.number().int().min(0).max(5),
|
||||
businessType: z.string().trim().min(1),
|
||||
keywords: z.string().trim().optional(),
|
||||
});
|
||||
|
||||
const batchParamsSchema = z.object({
|
||||
batchId: z.string().uuid(),
|
||||
});
|
||||
|
||||
export const deepResearchRoutes: FastifyPluginAsync = async (app) => {
|
||||
app.post('/deep-research/preview', { preHandler: requireAuth }, async (request, reply) => {
|
||||
try {
|
||||
const payload = previewSchema.parse(request.body);
|
||||
const preview = await previewDeepResearchForPoint(getDbPool(), payload);
|
||||
return reply.send({ preview });
|
||||
} catch (error) {
|
||||
if (error instanceof ZodError) {
|
||||
return reply.code(400).send({ error: error.issues[0]?.message || 'Invalid deep research preview payload.' });
|
||||
}
|
||||
|
||||
if (error instanceof Error && error.message.includes('postal area')) {
|
||||
return reply.code(404).send({ error: error.message });
|
||||
}
|
||||
|
||||
request.log.error(error);
|
||||
return reply.code(500).send({ error: error instanceof Error ? error.message : 'Failed to preview deep research.' });
|
||||
}
|
||||
});
|
||||
|
||||
app.post('/deep-research/batches', { preHandler: requireAuth }, async (request, reply) => {
|
||||
try {
|
||||
const payload = previewSchema.parse(request.body);
|
||||
const batch = await createDeepResearchBatchForUser(getDbPool(), request.authUser!.id, payload);
|
||||
return reply.code(201).send({ batch });
|
||||
} catch (error) {
|
||||
if (error instanceof ZodError) {
|
||||
return reply.code(400).send({ error: error.issues[0]?.message || 'Invalid deep research batch payload.' });
|
||||
}
|
||||
|
||||
if (error instanceof Error && error.message.includes('postal area')) {
|
||||
return reply.code(404).send({ error: error.message });
|
||||
}
|
||||
|
||||
request.log.error(error);
|
||||
return reply.code(500).send({ error: error instanceof Error ? error.message : 'Failed to run deep research.' });
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/deep-research/batches', { preHandler: requireAuth }, async (request) => {
|
||||
const batches = await listDeepResearchBatches(getDbPool(), request.authUser!.id);
|
||||
return { batches };
|
||||
});
|
||||
|
||||
app.get('/deep-research/batches/:batchId', { preHandler: requireAuth }, async (request, reply) => {
|
||||
try {
|
||||
const { batchId } = batchParamsSchema.parse(request.params);
|
||||
const batch = await getDeepResearchBatchDetail(getDbPool(), request.authUser!.id, batchId);
|
||||
|
||||
if (!batch) {
|
||||
return reply.code(404).send({ error: 'Deep research batch not found.' });
|
||||
}
|
||||
|
||||
return { batch };
|
||||
} catch (error) {
|
||||
if (error instanceof ZodError) {
|
||||
return reply.code(400).send({ error: error.issues[0]?.message || 'Invalid deep research batch id.' });
|
||||
}
|
||||
|
||||
request.log.error(error);
|
||||
return reply.code(500).send({ error: 'Failed to load deep research batch.' });
|
||||
}
|
||||
});
|
||||
};
|
||||
@@ -23,6 +23,89 @@ export async function createSearchJob(db: DbClient, userId: string, payload: Run
|
||||
return mapSearchJobRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function createSearchJobForCoordinates(
|
||||
db: DbClient,
|
||||
userId: string,
|
||||
input: {
|
||||
name: string;
|
||||
city?: string | null;
|
||||
address?: string | null;
|
||||
postalCode?: string | null;
|
||||
countryCode?: string | null;
|
||||
radiusKm: number;
|
||||
businessType: string;
|
||||
keywords?: string | null;
|
||||
lat: number;
|
||||
lng: number;
|
||||
deepResearchBatchId?: string | null;
|
||||
postalAreaId?: string | null;
|
||||
},
|
||||
) {
|
||||
const now = new Date().toISOString();
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
insert into public.search_jobs (
|
||||
user_id,
|
||||
deep_research_batch_id,
|
||||
postal_area_id,
|
||||
name,
|
||||
city,
|
||||
address,
|
||||
postal_code,
|
||||
country_code,
|
||||
radius_km,
|
||||
business_type,
|
||||
keywords,
|
||||
status,
|
||||
total_results,
|
||||
search_center_geom,
|
||||
started_at,
|
||||
created_at,
|
||||
updated_at
|
||||
)
|
||||
values (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6,
|
||||
$7,
|
||||
$8,
|
||||
$9,
|
||||
$10,
|
||||
$11,
|
||||
'running',
|
||||
0,
|
||||
ST_SetSRID(ST_MakePoint($13, $12), 4326)::geography,
|
||||
$14,
|
||||
$14,
|
||||
$14
|
||||
)
|
||||
returning id, user_id, name, city, address, postal_code, radius_km, business_type, keywords,
|
||||
status, total_results, started_at, completed_at, created_at, updated_at
|
||||
`,
|
||||
[
|
||||
userId,
|
||||
input.deepResearchBatchId ?? null,
|
||||
input.postalAreaId ?? null,
|
||||
input.name,
|
||||
input.city ?? null,
|
||||
input.address ?? null,
|
||||
input.postalCode ?? null,
|
||||
input.countryCode ?? null,
|
||||
input.radiusKm,
|
||||
input.businessType,
|
||||
input.keywords ?? null,
|
||||
input.lat,
|
||||
input.lng,
|
||||
now,
|
||||
],
|
||||
);
|
||||
|
||||
return mapSearchJobRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function updateSearchJobCenter(db: DbClient, jobId: string, lat: number, lng: number) {
|
||||
await db.query(
|
||||
`
|
||||
@@ -76,7 +159,11 @@ export async function upsertBusiness(db: DbClient, userId: string, business: Bus
|
||||
values (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9,
|
||||
$10, $11, $12, $13, $14, $15, $16,
|
||||
case when $15 is not null and $16 is not null then ST_SetSRID(ST_MakePoint($16, $15), 4326)::geography else null end,
|
||||
case
|
||||
when $15::double precision is not null and $16::double precision is not null
|
||||
then ST_SetSRID(ST_MakePoint($16::double precision, $15::double precision), 4326)::geography
|
||||
else null::geography
|
||||
end,
|
||||
$17::jsonb, $18, $19, $20
|
||||
)
|
||||
on conflict (user_id, source, external_source_id)
|
||||
@@ -161,6 +248,21 @@ export async function listSearchJobsForUser(db: DbClient, userId: string, limit
|
||||
return result.rows.map(mapSearchJobRow);
|
||||
}
|
||||
|
||||
export async function listSearchJobsForBatch(db: DbClient, userId: string, batchId: string) {
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
select id, user_id, name, city, address, postal_code, radius_km, business_type, keywords,
|
||||
status, total_results, started_at, completed_at, created_at, updated_at
|
||||
from public.search_jobs
|
||||
where user_id = $1 and deep_research_batch_id = $2
|
||||
order by created_at asc
|
||||
`,
|
||||
[userId, batchId],
|
||||
);
|
||||
|
||||
return result.rows.map(mapSearchJobRow);
|
||||
}
|
||||
|
||||
export async function getSearchJobForUser(db: DbClient, userId: string, jobId: string) {
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
|
||||
+121
-35
@@ -1,15 +1,76 @@
|
||||
import type { Pool } from 'pg';
|
||||
import { getEnv } from '../config/env.js';
|
||||
import { buildBusinessPayload, collectPlaces, geocodeLocation } from './google-places.js';
|
||||
import { completeSearchJob, createSearchJob, failSearchJob, updateSearchJobCenter, upsertBusiness, upsertSearchJobResult } from './repository.js';
|
||||
import { completeSearchJob, createSearchJob, createSearchJobForCoordinates, failSearchJob, updateSearchJobCenter, upsertBusiness, upsertSearchJobResult } from './repository.js';
|
||||
import type { RunSearchInput, RunSearchResult } from './types.js';
|
||||
|
||||
export async function runSearchForUser(db: Pool, userId: string, payload: RunSearchInput): Promise<RunSearchResult> {
|
||||
function buildMatchedKeywords(keywords?: string) {
|
||||
return keywords
|
||||
? keywords
|
||||
.split(',')
|
||||
.map((keyword) => keyword.trim())
|
||||
.filter(Boolean)
|
||||
: [];
|
||||
}
|
||||
|
||||
async function executeSearchJobAtCoordinates(
|
||||
db: Pool,
|
||||
input: {
|
||||
jobId: string;
|
||||
userId: string;
|
||||
lat: number;
|
||||
lng: number;
|
||||
radiusKm: number;
|
||||
businessType: string;
|
||||
keywords?: string;
|
||||
queryContextTerms?: string[];
|
||||
},
|
||||
) {
|
||||
const env = getEnv();
|
||||
|
||||
if (!env.GOOGLE_MAPS_SERVER_KEY) {
|
||||
throw new Error('GOOGLE_MAPS_SERVER_KEY is required for running research.');
|
||||
}
|
||||
|
||||
const places = await collectPlaces({
|
||||
apiKey: env.GOOGLE_MAPS_SERVER_KEY,
|
||||
textQuery: [input.businessType, input.keywords, ...(input.queryContextTerms ?? [])].filter(Boolean).join(' '),
|
||||
lat: input.lat,
|
||||
lng: input.lng,
|
||||
radiusKm: input.radiusKm,
|
||||
});
|
||||
|
||||
const matchedKeywords = buildMatchedKeywords(input.keywords);
|
||||
const capturedAt = new Date().toISOString();
|
||||
let totalResults = 0;
|
||||
|
||||
for (const [index, place] of places.entries()) {
|
||||
if (!place.id || !place.displayName?.text) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const businessId = await upsertBusiness(db, input.userId, buildBusinessPayload(place, input.businessType));
|
||||
await upsertSearchJobResult(db, {
|
||||
userId: input.userId,
|
||||
searchJobId: input.jobId,
|
||||
businessId,
|
||||
matchedKeywords: matchedKeywords.length > 0 ? matchedKeywords : null,
|
||||
rank: index + 1,
|
||||
capturedAt,
|
||||
});
|
||||
|
||||
totalResults += 1;
|
||||
}
|
||||
|
||||
return completeSearchJob(db, input.jobId, totalResults);
|
||||
}
|
||||
|
||||
export async function runSearchForUser(db: Pool, userId: string, payload: RunSearchInput): Promise<RunSearchResult> {
|
||||
const job = await createSearchJob(db, userId, payload);
|
||||
const jobId = job.id;
|
||||
|
||||
try {
|
||||
const env = getEnv();
|
||||
if (!env.GOOGLE_MAPS_SERVER_KEY) {
|
||||
throw new Error('GOOGLE_MAPS_SERVER_KEY is required for running research.');
|
||||
}
|
||||
@@ -17,49 +78,74 @@ export async function runSearchForUser(db: Pool, userId: string, payload: RunSea
|
||||
const geocoded = await geocodeLocation(payload.location, env.GOOGLE_MAPS_SERVER_KEY);
|
||||
await updateSearchJobCenter(db, jobId, geocoded.lat, geocoded.lng);
|
||||
|
||||
const places = await collectPlaces({
|
||||
apiKey: env.GOOGLE_MAPS_SERVER_KEY,
|
||||
textQuery: [payload.businessType, payload.keywords].filter(Boolean).join(' '),
|
||||
const completedJob = await executeSearchJobAtCoordinates(db, {
|
||||
jobId,
|
||||
userId,
|
||||
lat: geocoded.lat,
|
||||
lng: geocoded.lng,
|
||||
radiusKm: payload.radiusKm,
|
||||
businessType: payload.businessType,
|
||||
keywords: payload.keywords,
|
||||
});
|
||||
|
||||
const matchedKeywords = payload.keywords
|
||||
? payload.keywords
|
||||
.split(',')
|
||||
.map((keyword) => keyword.trim())
|
||||
.filter(Boolean)
|
||||
: [];
|
||||
|
||||
const capturedAt = new Date().toISOString();
|
||||
let totalResults = 0;
|
||||
|
||||
for (const [index, place] of places.entries()) {
|
||||
if (!place.id || !place.displayName?.text) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const businessId = await upsertBusiness(db, userId, buildBusinessPayload(place, payload.businessType));
|
||||
await upsertSearchJobResult(db, {
|
||||
userId,
|
||||
searchJobId: jobId,
|
||||
businessId,
|
||||
matchedKeywords: matchedKeywords.length > 0 ? matchedKeywords : null,
|
||||
rank: index + 1,
|
||||
capturedAt,
|
||||
});
|
||||
|
||||
totalResults += 1;
|
||||
}
|
||||
|
||||
const completedJob = await completeSearchJob(db, jobId, totalResults);
|
||||
return {
|
||||
job: completedJob,
|
||||
totalResults,
|
||||
totalResults: completedJob.totalResults,
|
||||
};
|
||||
} catch (error) {
|
||||
await failSearchJob(db, jobId);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
export async function runSearchForPostalArea(db: Pool, userId: string, input: {
|
||||
name: string;
|
||||
city?: string | null;
|
||||
address?: string | null;
|
||||
postalCode?: string | null;
|
||||
countryCode?: string | null;
|
||||
radiusKm: number;
|
||||
businessType: string;
|
||||
keywords?: string;
|
||||
lat: number;
|
||||
lng: number;
|
||||
deepResearchBatchId: string;
|
||||
postalAreaId: string;
|
||||
queryContextTerms?: string[];
|
||||
}): Promise<RunSearchResult> {
|
||||
const job = await createSearchJobForCoordinates(db, userId, {
|
||||
name: input.name,
|
||||
city: input.city,
|
||||
address: input.address,
|
||||
postalCode: input.postalCode,
|
||||
countryCode: input.countryCode,
|
||||
radiusKm: input.radiusKm,
|
||||
businessType: input.businessType,
|
||||
keywords: input.keywords,
|
||||
lat: input.lat,
|
||||
lng: input.lng,
|
||||
deepResearchBatchId: input.deepResearchBatchId,
|
||||
postalAreaId: input.postalAreaId,
|
||||
});
|
||||
|
||||
try {
|
||||
const completedJob = await executeSearchJobAtCoordinates(db, {
|
||||
jobId: job.id,
|
||||
userId,
|
||||
lat: input.lat,
|
||||
lng: input.lng,
|
||||
radiusKm: input.radiusKm,
|
||||
businessType: input.businessType,
|
||||
keywords: input.keywords,
|
||||
queryContextTerms: input.queryContextTerms,
|
||||
});
|
||||
|
||||
return {
|
||||
job: completedJob,
|
||||
totalResults: completedJob.totalResults,
|
||||
};
|
||||
} catch (error) {
|
||||
await failSearchJob(db, job.id);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import 'dotenv/config';
|
||||
import { getBoss, stopBoss } from './db/boss.js';
|
||||
import { getEnv } from './config/env.js';
|
||||
import { registerJobs } from './jobs/register-jobs.js';
|
||||
|
||||
Reference in New Issue
Block a user