feat: migrate app to local Fastify and Postgres stack
Replace Supabase auth and search runtime with a local Fastify API, PostgreSQL/PostGIS schema, and local session handling. Scaffold the worker and deep-research foundations while keeping the existing research, dashboard, and map flows running on the new backend.
This commit is contained in:
@@ -0,0 +1,30 @@
|
||||
import Fastify from 'fastify';
|
||||
import cookie from '@fastify/cookie';
|
||||
import cors from '@fastify/cors';
|
||||
import { getEnv } from './config/env.js';
|
||||
import { authRoutes } from './routes/auth.js';
|
||||
import { healthRoutes } from './routes/health.js';
|
||||
import { searchJobRoutes } from './routes/search-jobs.js';
|
||||
|
||||
export async function buildApp() {
|
||||
const env = getEnv();
|
||||
const app = Fastify({
|
||||
logger: true,
|
||||
});
|
||||
|
||||
await app.register(cors, {
|
||||
origin: env.APP_ORIGIN,
|
||||
credentials: true,
|
||||
});
|
||||
|
||||
await app.register(cookie, {
|
||||
secret: env.COOKIE_SECRET,
|
||||
hook: 'onRequest',
|
||||
});
|
||||
|
||||
await app.register(healthRoutes, { prefix: '/api' });
|
||||
await app.register(authRoutes, { prefix: '/api' });
|
||||
await app.register(searchJobRoutes, { prefix: '/api' });
|
||||
|
||||
return app;
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
export const SESSION_COOKIE_NAME = 'leads4less_session';
|
||||
export const SESSION_TOKEN_BYTES = 32;
|
||||
@@ -0,0 +1,33 @@
|
||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||
import type { SessionUser } from '../../../shared/types.js';
|
||||
import { getDbPool } from '../db/pool.js';
|
||||
import { getSessionTokenFromRequest, getSessionUserByToken } from './sessions.js';
|
||||
|
||||
declare module 'fastify' {
|
||||
interface FastifyRequest {
|
||||
authUser: SessionUser | null;
|
||||
}
|
||||
}
|
||||
|
||||
export async function hydrateAuthUser(request: FastifyRequest) {
|
||||
const token = getSessionTokenFromRequest(request);
|
||||
|
||||
if (!token) {
|
||||
request.authUser = null;
|
||||
return null;
|
||||
}
|
||||
|
||||
const user = await getSessionUserByToken(getDbPool(), token);
|
||||
request.authUser = user;
|
||||
return user;
|
||||
}
|
||||
|
||||
export async function requireAuth(request: FastifyRequest, reply: FastifyReply) {
|
||||
const user = await hydrateAuthUser(request);
|
||||
|
||||
if (!user) {
|
||||
return reply.code(401).send({ error: 'Unauthorized' });
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
@@ -0,0 +1,9 @@
|
||||
import argon2 from 'argon2';
|
||||
|
||||
export async function hashPassword(password: string) {
|
||||
return argon2.hash(password);
|
||||
}
|
||||
|
||||
export async function verifyPassword(passwordHash: string, password: string) {
|
||||
return argon2.verify(passwordHash, password);
|
||||
}
|
||||
@@ -0,0 +1,111 @@
|
||||
import { createHash, randomBytes } from 'node:crypto';
|
||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||
import type { Pool, PoolClient } from 'pg';
|
||||
import type { SessionUser } from '../../../shared/types.js';
|
||||
import { getEnv } from '../config/env.js';
|
||||
import { SESSION_COOKIE_NAME, SESSION_TOKEN_BYTES } from './constants.js';
|
||||
|
||||
type DbClient = Pool | PoolClient;
|
||||
|
||||
type SessionRow = {
|
||||
session_id: string;
|
||||
id: string;
|
||||
email: string;
|
||||
display_name: string | null;
|
||||
avatar_url: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
function hashSessionToken(token: string) {
|
||||
return createHash('sha256').update(token).digest('hex');
|
||||
}
|
||||
|
||||
function sessionExpiryDate() {
|
||||
const env = getEnv();
|
||||
const expiresAt = new Date();
|
||||
expiresAt.setDate(expiresAt.getDate() + env.SESSION_TTL_DAYS);
|
||||
return expiresAt;
|
||||
}
|
||||
|
||||
function mapSessionRow(row: SessionRow): SessionUser {
|
||||
return {
|
||||
sessionId: row.session_id,
|
||||
id: row.id,
|
||||
email: row.email,
|
||||
displayName: row.display_name || row.email.split('@')[0] || 'User',
|
||||
avatarUrl: row.avatar_url,
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
};
|
||||
}
|
||||
|
||||
export function setSessionCookie(reply: FastifyReply, token: string, expiresAt: Date) {
|
||||
const env = getEnv();
|
||||
|
||||
reply.setCookie(SESSION_COOKIE_NAME, token, {
|
||||
path: '/',
|
||||
httpOnly: true,
|
||||
sameSite: 'lax',
|
||||
secure: env.NODE_ENV === 'production',
|
||||
expires: expiresAt,
|
||||
});
|
||||
}
|
||||
|
||||
export function clearSessionCookie(reply: FastifyReply) {
|
||||
reply.clearCookie(SESSION_COOKIE_NAME, {
|
||||
path: '/',
|
||||
sameSite: 'lax',
|
||||
});
|
||||
}
|
||||
|
||||
export function getSessionTokenFromRequest(request: FastifyRequest) {
|
||||
const cookies = request.cookies as Record<string, string | undefined>;
|
||||
return cookies[SESSION_COOKIE_NAME] ?? null;
|
||||
}
|
||||
|
||||
export async function createSession(db: DbClient, userId: string, metadata: { userAgent?: string; ipAddress?: string | null }) {
|
||||
const token = randomBytes(SESSION_TOKEN_BYTES).toString('hex');
|
||||
const tokenHash = hashSessionToken(token);
|
||||
const expiresAt = sessionExpiryDate();
|
||||
|
||||
const result = await db.query<{ id: string }>(
|
||||
`
|
||||
insert into public.sessions (user_id, token_hash, expires_at, last_seen_at, user_agent, ip_address)
|
||||
values ($1, $2, $3, now(), $4, $5)
|
||||
returning id
|
||||
`,
|
||||
[userId, tokenHash, expiresAt.toISOString(), metadata.userAgent ?? null, metadata.ipAddress ?? null],
|
||||
);
|
||||
|
||||
return {
|
||||
sessionId: result.rows[0].id,
|
||||
token,
|
||||
expiresAt,
|
||||
};
|
||||
}
|
||||
|
||||
export async function deleteSessionByToken(db: DbClient, token: string) {
|
||||
await db.query('delete from public.sessions where token_hash = $1', [hashSessionToken(token)]);
|
||||
}
|
||||
|
||||
export async function getSessionUserByToken(db: DbClient, token: string) {
|
||||
const tokenHash = hashSessionToken(token);
|
||||
const result = await db.query<SessionRow>(
|
||||
`
|
||||
select s.id as session_id, u.id, u.email, u.display_name, u.avatar_url, u.created_at, u.updated_at
|
||||
from public.sessions s
|
||||
join public.users u on u.id = s.user_id
|
||||
where s.token_hash = $1 and s.expires_at > now()
|
||||
limit 1
|
||||
`,
|
||||
[tokenHash],
|
||||
);
|
||||
|
||||
if (result.rowCount === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
await db.query('update public.sessions set last_seen_at = now() where token_hash = $1', [tokenHash]);
|
||||
return mapSessionRow(result.rows[0]);
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
import type { Pool, PoolClient } from 'pg';
|
||||
import type { AppUser } from '../../../shared/types.js';
|
||||
|
||||
type DbClient = Pool | PoolClient;
|
||||
|
||||
type UserRow = {
|
||||
id: string;
|
||||
email: string;
|
||||
password_hash: string;
|
||||
display_name: string | null;
|
||||
avatar_url: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export type UserWithPassword = AppUser & {
|
||||
passwordHash: string;
|
||||
};
|
||||
|
||||
export function normalizeEmail(email: string) {
|
||||
return email.trim().toLowerCase();
|
||||
}
|
||||
|
||||
export function mapUserRow(row: UserRow): UserWithPassword {
|
||||
return {
|
||||
id: row.id,
|
||||
email: row.email,
|
||||
displayName: row.display_name || row.email.split('@')[0] || 'User',
|
||||
avatarUrl: row.avatar_url,
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
passwordHash: row.password_hash,
|
||||
};
|
||||
}
|
||||
|
||||
export function toAppUser(user: UserWithPassword): AppUser {
|
||||
return {
|
||||
id: user.id,
|
||||
email: user.email,
|
||||
displayName: user.displayName,
|
||||
avatarUrl: user.avatarUrl,
|
||||
createdAt: user.createdAt,
|
||||
updatedAt: user.updatedAt,
|
||||
};
|
||||
}
|
||||
|
||||
export async function getUserByEmail(db: DbClient, email: string) {
|
||||
const normalizedEmail = normalizeEmail(email);
|
||||
const result = await db.query<UserRow>(
|
||||
`
|
||||
select id, email, password_hash, display_name, avatar_url, created_at, updated_at
|
||||
from public.users
|
||||
where lower(email) = $1
|
||||
limit 1
|
||||
`,
|
||||
[normalizedEmail],
|
||||
);
|
||||
|
||||
if (result.rowCount === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return mapUserRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function createUser(db: DbClient, input: { email: string; passwordHash: string; displayName?: string | null }) {
|
||||
const normalizedEmail = normalizeEmail(input.email);
|
||||
const result = await db.query<UserRow>(
|
||||
`
|
||||
insert into public.users (email, password_hash, display_name)
|
||||
values ($1, $2, $3)
|
||||
returning id, email, password_hash, display_name, avatar_url, created_at, updated_at
|
||||
`,
|
||||
[normalizedEmail, input.passwordHash, input.displayName?.trim() || null],
|
||||
);
|
||||
|
||||
return mapUserRow(result.rows[0]);
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
import { z } from 'zod';
|
||||
|
||||
const envSchema = z.object({
|
||||
NODE_ENV: z.enum(['development', 'test', 'production']).default('development'),
|
||||
APP_HOST: z.string().default('0.0.0.0'),
|
||||
APP_PORT: z.coerce.number().int().positive().default(4000),
|
||||
APP_ORIGIN: z.string().default('http://localhost:3000'),
|
||||
DATABASE_URL: z.string().min(1, 'DATABASE_URL is required'),
|
||||
COOKIE_SECRET: z.string().min(1, 'COOKIE_SECRET is required'),
|
||||
GOOGLE_MAPS_SERVER_KEY: z.string().optional(),
|
||||
PG_BOSS_SCHEMA: z.string().default('pgboss'),
|
||||
SESSION_TTL_DAYS: z.coerce.number().int().positive().default(30),
|
||||
});
|
||||
|
||||
export type AppEnv = z.infer<typeof envSchema>;
|
||||
|
||||
let cachedEnv: AppEnv | null = null;
|
||||
|
||||
export function getEnv(): AppEnv {
|
||||
if (cachedEnv) {
|
||||
return cachedEnv;
|
||||
}
|
||||
|
||||
cachedEnv = envSchema.parse(process.env);
|
||||
return cachedEnv;
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
import { PgBoss } from 'pg-boss';
|
||||
import { getEnv } from '../config/env.js';
|
||||
|
||||
let boss: PgBoss | null = null;
|
||||
|
||||
export async function getBoss() {
|
||||
if (boss) {
|
||||
return boss;
|
||||
}
|
||||
|
||||
const env = getEnv();
|
||||
boss = new PgBoss({
|
||||
connectionString: env.DATABASE_URL,
|
||||
schema: env.PG_BOSS_SCHEMA,
|
||||
});
|
||||
|
||||
await boss.start();
|
||||
return boss;
|
||||
}
|
||||
|
||||
export async function stopBoss() {
|
||||
if (!boss) {
|
||||
return;
|
||||
}
|
||||
|
||||
await boss.stop();
|
||||
boss = null;
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
import { Pool } from 'pg';
|
||||
import { getEnv } from '../config/env.js';
|
||||
|
||||
let pool: Pool | null = null;
|
||||
|
||||
export function getDbPool() {
|
||||
if (pool) {
|
||||
return pool;
|
||||
}
|
||||
|
||||
const env = getEnv();
|
||||
pool = new Pool({
|
||||
connectionString: env.DATABASE_URL,
|
||||
});
|
||||
|
||||
return pool;
|
||||
}
|
||||
@@ -0,0 +1,16 @@
|
||||
import 'dotenv/config';
|
||||
import { buildApp } from './app.js';
|
||||
import { getEnv } from './config/env.js';
|
||||
|
||||
const env = getEnv();
|
||||
const app = await buildApp();
|
||||
|
||||
try {
|
||||
await app.listen({
|
||||
host: env.APP_HOST,
|
||||
port: env.APP_PORT,
|
||||
});
|
||||
} catch (error) {
|
||||
app.log.error(error);
|
||||
process.exit(1);
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
export const RUN_SEARCH_JOB = 'search.run';
|
||||
export const RUN_DEEP_RESEARCH_BATCH_JOB = 'deep-research.run';
|
||||
@@ -0,0 +1,12 @@
|
||||
import type { PgBoss } from 'pg-boss';
|
||||
import { RUN_DEEP_RESEARCH_BATCH_JOB, RUN_SEARCH_JOB } from './names.js';
|
||||
|
||||
export async function registerJobs(boss: PgBoss) {
|
||||
await boss.work(RUN_SEARCH_JOB, async ([job]) => {
|
||||
console.warn(`Job ${RUN_SEARCH_JOB} is queued but not implemented yet`, job?.id);
|
||||
});
|
||||
|
||||
await boss.work(RUN_DEEP_RESEARCH_BATCH_JOB, async ([job]) => {
|
||||
console.warn(`Job ${RUN_DEEP_RESEARCH_BATCH_JOB} is queued but not implemented yet`, job?.id);
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,112 @@
|
||||
import type { FastifyPluginAsync, FastifyRequest } from 'fastify';
|
||||
import { ZodError, z } from 'zod';
|
||||
import { hashPassword, verifyPassword } from '../auth/passwords.js';
|
||||
import { clearSessionCookie, createSession, deleteSessionByToken, getSessionTokenFromRequest, getSessionUserByToken, setSessionCookie, } from '../auth/sessions.js';
|
||||
import { createUser, getUserByEmail, toAppUser } from '../auth/users.js';
|
||||
import { getDbPool } from '../db/pool.js';
|
||||
|
||||
const signUpSchema = z.object({
|
||||
email: z.string().email(),
|
||||
password: z.string().min(6),
|
||||
displayName: z.string().trim().min(1).max(120).optional(),
|
||||
});
|
||||
|
||||
const loginSchema = z.object({
|
||||
email: z.string().email(),
|
||||
password: z.string().min(1),
|
||||
});
|
||||
|
||||
function getRequestMetadata(request: FastifyRequest) {
|
||||
return {
|
||||
userAgent: request.headers['user-agent'],
|
||||
ipAddress: request.ip || null,
|
||||
};
|
||||
}
|
||||
|
||||
export const authRoutes: FastifyPluginAsync = async (app) => {
|
||||
app.get('/auth/me', async (request) => {
|
||||
const token = getSessionTokenFromRequest(request);
|
||||
|
||||
if (!token) {
|
||||
return { user: null };
|
||||
}
|
||||
|
||||
const user = await getSessionUserByToken(getDbPool(), token);
|
||||
return { user };
|
||||
});
|
||||
|
||||
app.post('/auth/signup', async (request, reply) => {
|
||||
try {
|
||||
const payload = signUpSchema.parse(request.body);
|
||||
const db = getDbPool();
|
||||
|
||||
const existingUser = await getUserByEmail(db, payload.email);
|
||||
if (existingUser) {
|
||||
return reply.code(409).send({ error: 'An account with that email already exists.' });
|
||||
}
|
||||
|
||||
const passwordHash = await hashPassword(payload.password);
|
||||
const user = await createUser(db, {
|
||||
email: payload.email,
|
||||
passwordHash,
|
||||
displayName: payload.displayName,
|
||||
});
|
||||
|
||||
const session = await createSession(db, user.id, getRequestMetadata(request));
|
||||
setSessionCookie(reply, session.token, session.expiresAt);
|
||||
|
||||
return reply.code(201).send({ user: { ...toAppUser(user), sessionId: session.sessionId } });
|
||||
} catch (error) {
|
||||
if (error instanceof ZodError) {
|
||||
return reply.code(400).send({ error: error.issues[0]?.message || 'Invalid signup payload.' });
|
||||
}
|
||||
|
||||
if ((error as { code?: string }).code === '23505') {
|
||||
return reply.code(409).send({ error: 'An account with that email already exists.' });
|
||||
}
|
||||
|
||||
request.log.error(error);
|
||||
return reply.code(500).send({ error: 'Failed to create account.' });
|
||||
}
|
||||
});
|
||||
|
||||
app.post('/auth/login', async (request, reply) => {
|
||||
try {
|
||||
const payload = loginSchema.parse(request.body);
|
||||
const db = getDbPool();
|
||||
const user = await getUserByEmail(db, payload.email);
|
||||
|
||||
if (!user) {
|
||||
return reply.code(401).send({ error: 'Invalid email or password.' });
|
||||
}
|
||||
|
||||
const isValidPassword = await verifyPassword(user.passwordHash, payload.password);
|
||||
if (!isValidPassword) {
|
||||
return reply.code(401).send({ error: 'Invalid email or password.' });
|
||||
}
|
||||
|
||||
const session = await createSession(db, user.id, getRequestMetadata(request));
|
||||
setSessionCookie(reply, session.token, session.expiresAt);
|
||||
|
||||
return { user: { ...toAppUser(user), sessionId: session.sessionId } };
|
||||
} catch (error) {
|
||||
if (error instanceof ZodError) {
|
||||
return reply.code(400).send({ error: error.issues[0]?.message || 'Invalid login payload.' });
|
||||
}
|
||||
|
||||
request.log.error(error);
|
||||
return reply.code(500).send({ error: 'Failed to sign in.' });
|
||||
}
|
||||
});
|
||||
|
||||
app.post('/auth/logout', async (request, reply) => {
|
||||
const token = getSessionTokenFromRequest(request);
|
||||
|
||||
if (token) {
|
||||
await deleteSessionByToken(getDbPool(), token);
|
||||
}
|
||||
|
||||
clearSessionCookie(reply);
|
||||
return { success: true };
|
||||
});
|
||||
};
|
||||
@@ -0,0 +1,9 @@
|
||||
import type { FastifyPluginAsync } from 'fastify';
|
||||
|
||||
export const healthRoutes: FastifyPluginAsync = async (app) => {
|
||||
app.get('/health', async () => ({
|
||||
ok: true,
|
||||
service: 'leads4less-api',
|
||||
timestamp: new Date().toISOString(),
|
||||
}));
|
||||
};
|
||||
@@ -0,0 +1,75 @@
|
||||
import type { FastifyPluginAsync } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import { requireAuth } from '../auth/middleware.js';
|
||||
import { getDbPool } from '../db/pool.js';
|
||||
import { listBusinessesForJobIds, listBusinessesForUser, listSearchJobResultLinksForUser, listSearchJobsForUser, getSearchJobForUser } from '../search/repository.js';
|
||||
import { runSearchForUser } from '../search/run-search.js';
|
||||
|
||||
const runSearchSchema = z.object({
|
||||
name: z.string().trim().min(1).max(160).optional(),
|
||||
location: z.string().trim().min(1),
|
||||
radiusKm: z.coerce.number().positive().max(50),
|
||||
businessType: z.string().trim().min(1),
|
||||
keywords: z.string().trim().optional(),
|
||||
});
|
||||
|
||||
const jobParamsSchema = z.object({
|
||||
jobId: z.string().uuid(),
|
||||
});
|
||||
|
||||
export const searchJobRoutes: FastifyPluginAsync = async (app) => {
|
||||
app.get('/search-jobs', { preHandler: requireAuth }, async (request) => {
|
||||
const limitValue = typeof request.query === 'object' && request.query && 'limit' in request.query ? Number((request.query as { limit?: string }).limit) : 100;
|
||||
const jobs = await listSearchJobsForUser(getDbPool(), request.authUser!.id, Number.isFinite(limitValue) ? Math.min(Math.max(limitValue, 1), 200) : 100);
|
||||
return { jobs };
|
||||
});
|
||||
|
||||
app.post('/search-jobs', { preHandler: requireAuth }, async (request, reply) => {
|
||||
try {
|
||||
const payload = runSearchSchema.parse(request.body);
|
||||
const result = await runSearchForUser(getDbPool(), request.authUser!.id, payload);
|
||||
return reply.code(201).send(result);
|
||||
} catch (error) {
|
||||
if (error instanceof z.ZodError) {
|
||||
return reply.code(400).send({ error: error.issues[0]?.message || 'Invalid research payload.' });
|
||||
}
|
||||
|
||||
request.log.error(error);
|
||||
return reply.code(500).send({ error: error instanceof Error ? error.message : 'Failed to run research.' });
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/search-jobs/:jobId', { preHandler: requireAuth }, async (request, reply) => {
|
||||
const { jobId } = jobParamsSchema.parse(request.params);
|
||||
const job = await getSearchJobForUser(getDbPool(), request.authUser!.id, jobId);
|
||||
|
||||
if (!job) {
|
||||
return reply.code(404).send({ error: 'Research job not found.' });
|
||||
}
|
||||
|
||||
return { job };
|
||||
});
|
||||
|
||||
app.get('/search-jobs/:jobId/businesses', { preHandler: requireAuth }, async (request, reply) => {
|
||||
const { jobId } = jobParamsSchema.parse(request.params);
|
||||
const businesses = await listBusinessesForJobIds(getDbPool(), request.authUser!.id, [jobId]);
|
||||
return { businesses };
|
||||
});
|
||||
|
||||
app.get('/search-job-results/links', { preHandler: requireAuth }, async (request) => {
|
||||
const links = await listSearchJobResultLinksForUser(getDbPool(), request.authUser!.id);
|
||||
return { links };
|
||||
});
|
||||
|
||||
app.get('/businesses', { preHandler: requireAuth }, async (request) => {
|
||||
const query = (request.query ?? {}) as { jobIds?: string | string[]; jobId?: string };
|
||||
const rawJobIds = Array.isArray(query.jobIds) ? query.jobIds : typeof query.jobIds === 'string' ? query.jobIds.split(',') : [];
|
||||
const jobIds = [...rawJobIds, ...(query.jobId ? [query.jobId] : [])].filter(Boolean);
|
||||
|
||||
const businesses = jobIds.length > 0
|
||||
? await listBusinessesForJobIds(getDbPool(), request.authUser!.id, jobIds)
|
||||
: await listBusinessesForUser(getDbPool(), request.authUser!.id);
|
||||
|
||||
return { businesses };
|
||||
});
|
||||
};
|
||||
@@ -0,0 +1,192 @@
|
||||
type AddressComponent = {
|
||||
longText?: string;
|
||||
shortText?: string;
|
||||
types?: string[];
|
||||
};
|
||||
|
||||
type Place = {
|
||||
id?: string;
|
||||
displayName?: { text?: string };
|
||||
formattedAddress?: string;
|
||||
location?: { latitude?: number; longitude?: number };
|
||||
rating?: number;
|
||||
userRatingCount?: number;
|
||||
websiteUri?: string;
|
||||
nationalPhoneNumber?: string;
|
||||
types?: string[];
|
||||
addressComponents?: AddressComponent[];
|
||||
};
|
||||
|
||||
type SearchPlacesResponse = {
|
||||
places: Place[];
|
||||
nextPageToken?: string;
|
||||
};
|
||||
|
||||
const PLACES_PAGE_SIZE = 20;
|
||||
const MAX_PLACES_PER_RUN = 60;
|
||||
const MAX_PLACE_PAGES = Math.ceil(MAX_PLACES_PER_RUN / PLACES_PAGE_SIZE);
|
||||
|
||||
function getAddressComponent(components: AddressComponent[] | undefined, type: string, useShort = false) {
|
||||
if (!components) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const match = components.find((component) => component.types?.includes(type));
|
||||
if (!match) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return useShort ? match.shortText || match.longText || null : match.longText || match.shortText || null;
|
||||
}
|
||||
|
||||
export type BusinessUpsertRecord = {
|
||||
externalSourceId: string | null;
|
||||
source: string;
|
||||
name: string;
|
||||
address: string | null;
|
||||
city: string | null;
|
||||
stateProvince: string | null;
|
||||
postalCode: string | null;
|
||||
country: string | null;
|
||||
phone: string | null;
|
||||
website: string | null;
|
||||
rating: number | null;
|
||||
reviewCount: number | null;
|
||||
category: string;
|
||||
latitude: number | null;
|
||||
longitude: number | null;
|
||||
metadataJson: Record<string, unknown>;
|
||||
firstSeenAt: string;
|
||||
lastSeenAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
export async function geocodeLocation(location: string, apiKey: string) {
|
||||
const url = new URL('https://maps.googleapis.com/maps/api/geocode/json');
|
||||
url.searchParams.set('address', location);
|
||||
url.searchParams.set('key', apiKey);
|
||||
|
||||
const response = await fetch(url);
|
||||
const payload = (await response.json()) as {
|
||||
status?: string;
|
||||
results?: Array<{ geometry?: { location?: { lat: number; lng: number } } }>;
|
||||
};
|
||||
|
||||
if (!response.ok || payload.status !== 'OK' || !payload.results?.[0]?.geometry?.location) {
|
||||
throw new Error('Unable to geocode the requested location');
|
||||
}
|
||||
|
||||
return payload.results[0].geometry.location as { lat: number; lng: number };
|
||||
}
|
||||
|
||||
async function searchPlaces(params: {
|
||||
apiKey: string;
|
||||
textQuery: string;
|
||||
lat: number;
|
||||
lng: number;
|
||||
radiusKm: number;
|
||||
pageToken?: string;
|
||||
}) {
|
||||
const response = await fetch('https://places.googleapis.com/v1/places:searchText', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'X-Goog-Api-Key': params.apiKey,
|
||||
'X-Goog-FieldMask': [
|
||||
'places.id',
|
||||
'places.displayName',
|
||||
'places.formattedAddress',
|
||||
'places.location',
|
||||
'places.rating',
|
||||
'places.userRatingCount',
|
||||
'places.websiteUri',
|
||||
'places.nationalPhoneNumber',
|
||||
'places.types',
|
||||
'places.addressComponents',
|
||||
'nextPageToken',
|
||||
].join(','),
|
||||
},
|
||||
body: JSON.stringify({
|
||||
textQuery: params.textQuery,
|
||||
pageSize: PLACES_PAGE_SIZE,
|
||||
...(params.pageToken ? { pageToken: params.pageToken } : {}),
|
||||
locationBias: {
|
||||
circle: {
|
||||
center: {
|
||||
latitude: params.lat,
|
||||
longitude: params.lng,
|
||||
},
|
||||
radius: Math.min(params.radiusKm * 1000, 50000),
|
||||
},
|
||||
},
|
||||
}),
|
||||
});
|
||||
|
||||
const payload = (await response.json()) as {
|
||||
error?: { message?: string };
|
||||
places?: Place[];
|
||||
nextPageToken?: string;
|
||||
};
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(payload.error?.message || 'Places search failed');
|
||||
}
|
||||
|
||||
return {
|
||||
places: (payload.places || []) as Place[],
|
||||
nextPageToken: typeof payload.nextPageToken === 'string' ? payload.nextPageToken : undefined,
|
||||
} as SearchPlacesResponse;
|
||||
}
|
||||
|
||||
export async function collectPlaces(params: { apiKey: string; textQuery: string; lat: number; lng: number; radiusKm: number }) {
|
||||
const uniquePlaces = new Map<string, Place>();
|
||||
let nextPageToken: string | undefined;
|
||||
|
||||
for (let page = 0; page < MAX_PLACE_PAGES && uniquePlaces.size < MAX_PLACES_PER_RUN; page += 1) {
|
||||
const response = await searchPlaces({ ...params, pageToken: nextPageToken });
|
||||
|
||||
response.places.forEach((place) => {
|
||||
if (!place.id || uniquePlaces.has(place.id) || uniquePlaces.size >= MAX_PLACES_PER_RUN) {
|
||||
return;
|
||||
}
|
||||
|
||||
uniquePlaces.set(place.id, place);
|
||||
});
|
||||
|
||||
if (!response.nextPageToken || response.places.length === 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
nextPageToken = response.nextPageToken;
|
||||
}
|
||||
|
||||
return Array.from(uniquePlaces.values());
|
||||
}
|
||||
|
||||
export function buildBusinessPayload(place: Place, businessType: string): BusinessUpsertRecord {
|
||||
const now = new Date().toISOString();
|
||||
|
||||
return {
|
||||
externalSourceId: place.id ?? null,
|
||||
source: 'google_places',
|
||||
name: place.displayName?.text || 'Unknown business',
|
||||
address: place.formattedAddress ?? null,
|
||||
city: getAddressComponent(place.addressComponents, 'locality'),
|
||||
stateProvince: getAddressComponent(place.addressComponents, 'administrative_area_level_1', true),
|
||||
postalCode: getAddressComponent(place.addressComponents, 'postal_code'),
|
||||
country: getAddressComponent(place.addressComponents, 'country', true),
|
||||
phone: place.nationalPhoneNumber ?? null,
|
||||
website: place.websiteUri ?? null,
|
||||
rating: place.rating ?? null,
|
||||
reviewCount: place.userRatingCount ?? null,
|
||||
category: businessType,
|
||||
latitude: place.location?.latitude ?? null,
|
||||
longitude: place.location?.longitude ?? null,
|
||||
metadataJson: {
|
||||
google_types: place.types ?? [],
|
||||
},
|
||||
firstSeenAt: now,
|
||||
lastSeenAt: now,
|
||||
updatedAt: now,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,231 @@
|
||||
import type { Pool, PoolClient } from 'pg';
|
||||
import type { BusinessUpsertRecord } from './google-places.js';
|
||||
import { mapBusinessRow, mapSearchJobRow, type BusinessDto, type BusinessRow, type RunSearchInput, type SearchJobDto, type SearchJobResultLinkDto, type SearchJobRow, } from './types.js';
|
||||
|
||||
type DbClient = Pool | PoolClient;
|
||||
|
||||
export async function createSearchJob(db: DbClient, userId: string, payload: RunSearchInput) {
|
||||
const now = new Date().toISOString();
|
||||
const jobName = payload.name || `${payload.businessType} in ${payload.location}`;
|
||||
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
insert into public.search_jobs (
|
||||
user_id, name, city, radius_km, business_type, keywords, status, total_results, started_at, created_at, updated_at
|
||||
)
|
||||
values ($1, $2, $3, $4, $5, $6, 'running', 0, $7, $7, $7)
|
||||
returning id, user_id, name, city, address, postal_code, radius_km, business_type, keywords,
|
||||
status, total_results, started_at, completed_at, created_at, updated_at
|
||||
`,
|
||||
[userId, jobName, payload.location, payload.radiusKm, payload.businessType, payload.keywords ?? null, now],
|
||||
);
|
||||
|
||||
return mapSearchJobRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function updateSearchJobCenter(db: DbClient, jobId: string, lat: number, lng: number) {
|
||||
await db.query(
|
||||
`
|
||||
update public.search_jobs
|
||||
set search_center_geom = ST_SetSRID(ST_MakePoint($2, $1), 4326)::geography,
|
||||
updated_at = now()
|
||||
where id = $3
|
||||
`,
|
||||
[lat, lng, jobId],
|
||||
);
|
||||
}
|
||||
|
||||
export async function completeSearchJob(db: DbClient, jobId: string, totalResults: number) {
|
||||
const completedAt = new Date().toISOString();
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
update public.search_jobs
|
||||
set total_results = $2,
|
||||
status = 'completed',
|
||||
completed_at = $3,
|
||||
updated_at = $3
|
||||
where id = $1
|
||||
returning id, user_id, name, city, address, postal_code, radius_km, business_type, keywords,
|
||||
status, total_results, started_at, completed_at, created_at, updated_at
|
||||
`,
|
||||
[jobId, totalResults, completedAt],
|
||||
);
|
||||
|
||||
return mapSearchJobRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function failSearchJob(db: DbClient, jobId: string) {
|
||||
await db.query(
|
||||
`
|
||||
update public.search_jobs
|
||||
set status = 'failed', updated_at = now()
|
||||
where id = $1
|
||||
`,
|
||||
[jobId],
|
||||
);
|
||||
}
|
||||
|
||||
export async function upsertBusiness(db: DbClient, userId: string, business: BusinessUpsertRecord) {
|
||||
const result = await db.query<{ id: string }>(
|
||||
`
|
||||
insert into public.businesses (
|
||||
user_id, external_source_id, source, name, address, city, state_province, postal_code, country,
|
||||
phone, website, rating, review_count, category, latitude, longitude, geom, metadata_json,
|
||||
first_seen_at, last_seen_at, updated_at
|
||||
)
|
||||
values (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9,
|
||||
$10, $11, $12, $13, $14, $15, $16,
|
||||
case when $15 is not null and $16 is not null then ST_SetSRID(ST_MakePoint($16, $15), 4326)::geography else null end,
|
||||
$17::jsonb, $18, $19, $20
|
||||
)
|
||||
on conflict (user_id, source, external_source_id)
|
||||
do update set
|
||||
name = excluded.name,
|
||||
address = excluded.address,
|
||||
city = excluded.city,
|
||||
state_province = excluded.state_province,
|
||||
postal_code = excluded.postal_code,
|
||||
country = excluded.country,
|
||||
phone = excluded.phone,
|
||||
website = excluded.website,
|
||||
rating = excluded.rating,
|
||||
review_count = excluded.review_count,
|
||||
category = excluded.category,
|
||||
latitude = excluded.latitude,
|
||||
longitude = excluded.longitude,
|
||||
geom = excluded.geom,
|
||||
metadata_json = excluded.metadata_json,
|
||||
last_seen_at = excluded.last_seen_at,
|
||||
updated_at = excluded.updated_at
|
||||
returning id
|
||||
`,
|
||||
[
|
||||
userId,
|
||||
business.externalSourceId,
|
||||
business.source,
|
||||
business.name,
|
||||
business.address,
|
||||
business.city,
|
||||
business.stateProvince,
|
||||
business.postalCode,
|
||||
business.country,
|
||||
business.phone,
|
||||
business.website,
|
||||
business.rating,
|
||||
business.reviewCount,
|
||||
business.category,
|
||||
business.latitude,
|
||||
business.longitude,
|
||||
JSON.stringify(business.metadataJson),
|
||||
business.firstSeenAt,
|
||||
business.lastSeenAt,
|
||||
business.updatedAt,
|
||||
],
|
||||
);
|
||||
|
||||
return result.rows[0].id;
|
||||
}
|
||||
|
||||
export async function upsertSearchJobResult(
|
||||
db: DbClient,
|
||||
input: { userId: string; searchJobId: string; businessId: string; matchedKeywords: string[] | null; rank: number; capturedAt: string },
|
||||
) {
|
||||
await db.query(
|
||||
`
|
||||
insert into public.search_job_results (user_id, search_job_id, business_id, matched_keywords, rank, captured_at)
|
||||
values ($1, $2, $3, $4, $5, $6)
|
||||
on conflict (search_job_id, business_id)
|
||||
do update set
|
||||
matched_keywords = excluded.matched_keywords,
|
||||
rank = excluded.rank,
|
||||
captured_at = excluded.captured_at
|
||||
`,
|
||||
[input.userId, input.searchJobId, input.businessId, input.matchedKeywords, input.rank, input.capturedAt],
|
||||
);
|
||||
}
|
||||
|
||||
export async function listSearchJobsForUser(db: DbClient, userId: string, limit = 100) {
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
select id, user_id, name, city, address, postal_code, radius_km, business_type, keywords,
|
||||
status, total_results, started_at, completed_at, created_at, updated_at
|
||||
from public.search_jobs
|
||||
where user_id = $1
|
||||
order by created_at desc
|
||||
limit $2
|
||||
`,
|
||||
[userId, limit],
|
||||
);
|
||||
|
||||
return result.rows.map(mapSearchJobRow);
|
||||
}
|
||||
|
||||
export async function getSearchJobForUser(db: DbClient, userId: string, jobId: string) {
|
||||
const result = await db.query<SearchJobRow>(
|
||||
`
|
||||
select id, user_id, name, city, address, postal_code, radius_km, business_type, keywords,
|
||||
status, total_results, started_at, completed_at, created_at, updated_at
|
||||
from public.search_jobs
|
||||
where user_id = $1 and id = $2
|
||||
limit 1
|
||||
`,
|
||||
[userId, jobId],
|
||||
);
|
||||
|
||||
if (result.rowCount === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return mapSearchJobRow(result.rows[0]);
|
||||
}
|
||||
|
||||
export async function listBusinessesForUser(db: DbClient, userId: string) {
|
||||
const result = await db.query<BusinessRow>(
|
||||
`
|
||||
select id, user_id, external_source_id, source, name, address, city, state_province, postal_code,
|
||||
country, phone, website, rating, review_count, category, hours_json, latitude, longitude,
|
||||
general_info, metadata_json, first_seen_at, last_seen_at, created_at, updated_at
|
||||
from public.businesses
|
||||
where user_id = $1
|
||||
order by created_at desc
|
||||
`,
|
||||
[userId],
|
||||
);
|
||||
|
||||
return result.rows.map(mapBusinessRow);
|
||||
}
|
||||
|
||||
export async function listSearchJobResultLinksForUser(db: DbClient, userId: string): Promise<SearchJobResultLinkDto[]> {
|
||||
const result = await db.query<{ business_id: string; search_job_id: string }>(
|
||||
`
|
||||
select business_id, search_job_id
|
||||
from public.search_job_results
|
||||
where user_id = $1
|
||||
`,
|
||||
[userId],
|
||||
);
|
||||
|
||||
return result.rows.map((row) => ({ businessId: row.business_id, searchJobId: row.search_job_id }));
|
||||
}
|
||||
|
||||
export async function listBusinessesForJobIds(db: DbClient, userId: string, jobIds: string[]): Promise<BusinessDto[]> {
|
||||
if (jobIds.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const result = await db.query<BusinessRow>(
|
||||
`
|
||||
select distinct b.id, b.user_id, b.external_source_id, b.source, b.name, b.address, b.city, b.state_province, b.postal_code,
|
||||
b.country, b.phone, b.website, b.rating, b.review_count, b.category, b.hours_json, b.latitude, b.longitude,
|
||||
b.general_info, b.metadata_json, b.first_seen_at, b.last_seen_at, b.created_at, b.updated_at
|
||||
from public.businesses b
|
||||
join public.search_job_results r on r.business_id = b.id
|
||||
where b.user_id = $1 and r.search_job_id = any($2::uuid[])
|
||||
order by b.created_at desc
|
||||
`,
|
||||
[userId, jobIds],
|
||||
);
|
||||
|
||||
return result.rows.map(mapBusinessRow);
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
import type { Pool } from 'pg';
|
||||
import { getEnv } from '../config/env.js';
|
||||
import { buildBusinessPayload, collectPlaces, geocodeLocation } from './google-places.js';
|
||||
import { completeSearchJob, createSearchJob, failSearchJob, updateSearchJobCenter, upsertBusiness, upsertSearchJobResult } from './repository.js';
|
||||
import type { RunSearchInput, RunSearchResult } from './types.js';
|
||||
|
||||
export async function runSearchForUser(db: Pool, userId: string, payload: RunSearchInput): Promise<RunSearchResult> {
|
||||
const env = getEnv();
|
||||
const job = await createSearchJob(db, userId, payload);
|
||||
const jobId = job.id;
|
||||
|
||||
try {
|
||||
if (!env.GOOGLE_MAPS_SERVER_KEY) {
|
||||
throw new Error('GOOGLE_MAPS_SERVER_KEY is required for running research.');
|
||||
}
|
||||
|
||||
const geocoded = await geocodeLocation(payload.location, env.GOOGLE_MAPS_SERVER_KEY);
|
||||
await updateSearchJobCenter(db, jobId, geocoded.lat, geocoded.lng);
|
||||
|
||||
const places = await collectPlaces({
|
||||
apiKey: env.GOOGLE_MAPS_SERVER_KEY,
|
||||
textQuery: [payload.businessType, payload.keywords].filter(Boolean).join(' '),
|
||||
lat: geocoded.lat,
|
||||
lng: geocoded.lng,
|
||||
radiusKm: payload.radiusKm,
|
||||
});
|
||||
|
||||
const matchedKeywords = payload.keywords
|
||||
? payload.keywords
|
||||
.split(',')
|
||||
.map((keyword) => keyword.trim())
|
||||
.filter(Boolean)
|
||||
: [];
|
||||
|
||||
const capturedAt = new Date().toISOString();
|
||||
let totalResults = 0;
|
||||
|
||||
for (const [index, place] of places.entries()) {
|
||||
if (!place.id || !place.displayName?.text) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const businessId = await upsertBusiness(db, userId, buildBusinessPayload(place, payload.businessType));
|
||||
await upsertSearchJobResult(db, {
|
||||
userId,
|
||||
searchJobId: jobId,
|
||||
businessId,
|
||||
matchedKeywords: matchedKeywords.length > 0 ? matchedKeywords : null,
|
||||
rank: index + 1,
|
||||
capturedAt,
|
||||
});
|
||||
|
||||
totalResults += 1;
|
||||
}
|
||||
|
||||
const completedJob = await completeSearchJob(db, jobId, totalResults);
|
||||
return {
|
||||
job: completedJob,
|
||||
totalResults,
|
||||
};
|
||||
} catch (error) {
|
||||
await failSearchJob(db, jobId);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,158 @@
|
||||
export type SearchJobStatus = 'pending' | 'running' | 'completed' | 'failed' | 'stopped';
|
||||
|
||||
export type SearchJobDto = {
|
||||
id: string;
|
||||
userId: string;
|
||||
name: string;
|
||||
city?: string;
|
||||
address?: string;
|
||||
postalCode?: string;
|
||||
radiusKm: number;
|
||||
businessType: string;
|
||||
keywords?: string;
|
||||
status: SearchJobStatus;
|
||||
totalResults: number;
|
||||
startedAt?: string;
|
||||
completedAt?: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
export type BusinessDto = {
|
||||
id: string;
|
||||
userId: string;
|
||||
externalSourceId?: string;
|
||||
source: string;
|
||||
name: string;
|
||||
address?: string;
|
||||
city?: string;
|
||||
stateProvince?: string;
|
||||
postalCode?: string;
|
||||
country?: string;
|
||||
phone?: string;
|
||||
website?: string;
|
||||
rating?: number;
|
||||
reviewCount?: number;
|
||||
category?: string;
|
||||
hoursJson?: string;
|
||||
latitude?: number;
|
||||
longitude?: number;
|
||||
generalInfo?: string;
|
||||
metadataJson?: string;
|
||||
firstSeenAt?: string;
|
||||
lastSeenAt?: string;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
};
|
||||
|
||||
export type SearchJobResultLinkDto = {
|
||||
businessId: string;
|
||||
searchJobId: string;
|
||||
};
|
||||
|
||||
export type RunSearchInput = {
|
||||
name?: string;
|
||||
location: string;
|
||||
radiusKm: number;
|
||||
businessType: string;
|
||||
keywords?: string;
|
||||
};
|
||||
|
||||
export type RunSearchResult = {
|
||||
job: SearchJobDto;
|
||||
totalResults: number;
|
||||
};
|
||||
|
||||
export type SearchJobRow = {
|
||||
id: string;
|
||||
user_id: string;
|
||||
name: string;
|
||||
city: string | null;
|
||||
address: string | null;
|
||||
postal_code: string | null;
|
||||
radius_km: number;
|
||||
business_type: string;
|
||||
keywords: string | null;
|
||||
status: SearchJobStatus;
|
||||
total_results: number;
|
||||
started_at: string | null;
|
||||
completed_at: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export type BusinessRow = {
|
||||
id: string;
|
||||
user_id: string;
|
||||
external_source_id: string | null;
|
||||
source: string;
|
||||
name: string;
|
||||
address: string | null;
|
||||
city: string | null;
|
||||
state_province: string | null;
|
||||
postal_code: string | null;
|
||||
country: string | null;
|
||||
phone: string | null;
|
||||
website: string | null;
|
||||
rating: number | null;
|
||||
review_count: number | null;
|
||||
category: string | null;
|
||||
hours_json: Record<string, unknown> | null;
|
||||
latitude: number | null;
|
||||
longitude: number | null;
|
||||
general_info: string | null;
|
||||
metadata_json: Record<string, unknown> | null;
|
||||
first_seen_at: string | null;
|
||||
last_seen_at: string | null;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
};
|
||||
|
||||
export function mapSearchJobRow(row: SearchJobRow): SearchJobDto {
|
||||
return {
|
||||
id: row.id,
|
||||
userId: row.user_id,
|
||||
name: row.name,
|
||||
city: row.city ?? undefined,
|
||||
address: row.address ?? undefined,
|
||||
postalCode: row.postal_code ?? undefined,
|
||||
radiusKm: Number(row.radius_km),
|
||||
businessType: row.business_type,
|
||||
keywords: row.keywords ?? undefined,
|
||||
status: row.status,
|
||||
totalResults: row.total_results,
|
||||
startedAt: row.started_at ?? undefined,
|
||||
completedAt: row.completed_at ?? undefined,
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
};
|
||||
}
|
||||
|
||||
export function mapBusinessRow(row: BusinessRow): BusinessDto {
|
||||
return {
|
||||
id: row.id,
|
||||
userId: row.user_id,
|
||||
externalSourceId: row.external_source_id ?? undefined,
|
||||
source: row.source,
|
||||
name: row.name,
|
||||
address: row.address ?? undefined,
|
||||
city: row.city ?? undefined,
|
||||
stateProvince: row.state_province ?? undefined,
|
||||
postalCode: row.postal_code ?? undefined,
|
||||
country: row.country ?? undefined,
|
||||
phone: row.phone ?? undefined,
|
||||
website: row.website ?? undefined,
|
||||
rating: row.rating ?? undefined,
|
||||
reviewCount: row.review_count ?? undefined,
|
||||
category: row.category ?? undefined,
|
||||
hoursJson: row.hours_json ? JSON.stringify(row.hours_json) : undefined,
|
||||
latitude: row.latitude ?? undefined,
|
||||
longitude: row.longitude ?? undefined,
|
||||
generalInfo: row.general_info ?? undefined,
|
||||
metadataJson: row.metadata_json ? JSON.stringify(row.metadata_json) : undefined,
|
||||
firstSeenAt: row.first_seen_at ?? undefined,
|
||||
lastSeenAt: row.last_seen_at ?? undefined,
|
||||
createdAt: row.created_at,
|
||||
updatedAt: row.updated_at,
|
||||
};
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
import 'dotenv/config';
|
||||
import { getBoss, stopBoss } from './db/boss.js';
|
||||
import { getEnv } from './config/env.js';
|
||||
import { registerJobs } from './jobs/register-jobs.js';
|
||||
|
||||
const env = getEnv();
|
||||
const boss = await getBoss();
|
||||
|
||||
await registerJobs(boss);
|
||||
|
||||
console.log(`Leads4less worker started with pg-boss schema '${env.PG_BOSS_SCHEMA}'`);
|
||||
|
||||
const shutdown = async () => {
|
||||
await stopBoss();
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
process.on('SIGINT', () => void shutdown());
|
||||
process.on('SIGTERM', () => void shutdown());
|
||||
Reference in New Issue
Block a user