use sqlx::SqlitePool; use tracing::info; use axum::body::Bytes; use crate::db::adjustment::add_adjustment_new_stock; use crate::db::inventory_item::add_inventory_item; #[derive(Debug, serde::Deserialize)] struct CatalogRecord { name: String, #[serde(alias = "qty")] quantity: f64, unit: String, fractional: bool, #[serde(alias = "reorder")] reorder_point: f64, #[serde(alias = "price")] unit_price: i64, #[serde(alias = "vetcove id")] vetcove_id: String, } pub async fn ingest_catalog_bytes(bytes: Bytes, db: SqlitePool, user_id: i64) -> anyhow::Result<()> { let reader = csv::Reader::from_reader(bytes.as_ref()); ingest_catalog(reader, db, user_id).await } pub async fn ingest_catalog(mut reader: csv::Reader, db: SqlitePool, user_id: i64) -> anyhow::Result<()> { let timestamp = chrono::Utc::now(); for result in reader.deserialize() { let record: CatalogRecord = result?; let new_entry_id = add_inventory_item(&db, &record.name, record.reorder_point, record.fractional, &record.unit, &None, ).await?; let new_positive_adjustment = add_adjustment_new_stock(&db, new_entry_id, user_id, timestamp, timestamp, record.quantity, record.unit_price).await?; info!("Added new item: {}/{} - {}", new_entry_id, new_positive_adjustment, record.name); } Ok(()) }