diff --git a/ingestion/internal/watcher/watcher.go b/ingestion/internal/watcher/watcher.go index a65db79..1e2b73c 100644 --- a/ingestion/internal/watcher/watcher.go +++ b/ingestion/internal/watcher/watcher.go @@ -4,6 +4,7 @@ package watcher import ( "context" "fmt" + "io" "log/slog" "os" "path/filepath" @@ -72,6 +73,14 @@ func processDir(ctx context.Context, cfg Config, date string) []error { return nil } + // Skip files that have already been processed or permanently failed. + if _, err := os.Stat(path + ".processed"); err == nil { + return nil + } + if _, err := os.Stat(path + ".failed"); err == nil { + return nil + } + if err := processFile(ctx, cfg, path, date); err != nil { errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err)) } @@ -83,7 +92,9 @@ func processDir(ctx context.Context, cfg Config, date string) []error { return errs } -// processFile reads a file, calls pipeline.Run, moves it to processed/ or failed/. +// processFile reads a file, calls pipeline.Run, copies it to processed/ or failed/, +// and writes a marker file next to the original so the watcher skips it next poll. +// The original file is never deleted, keeping Syncthing-connected vaults (e.g. Obsidian) intact. func processFile(ctx context.Context, cfg Config, path, date string) error { filename := filepath.Base(path) source := deriveSource(filename) @@ -95,46 +106,71 @@ func processFile(ctx context.Context, cfg Config, path, date string) error { _, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false) if runErr != nil { - // Move to failed/. + // Copy to failed/ and leave a .failed marker so we don't retry. failedDir := filepath.Join(cfg.BrainDir, "raw", "failed") if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil { return fmt.Errorf("mkdir failed dir: %w", mkErr) } dest := filepath.Join(failedDir, filename) - if mvErr := os.Rename(path, dest); mvErr != nil { - return fmt.Errorf("move to failed: %w", mvErr) + if cpErr := copyFile(path, dest); cpErr != nil { + return fmt.Errorf("copy to failed: %w", cpErr) + } + if mkErr := os.WriteFile(path+".failed", []byte(runErr.Error()), 0o644); mkErr != nil { + slog.Error("watcher: failed to write .failed marker", "error", mkErr) } - slog.Warn("watcher: file failed, moved to failed/", "file", filename, "error", runErr) + slog.Warn("watcher: file failed", "file", filename, "error", runErr) if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil { slog.Error("watcher: failed to write log entry", "error", logErr) } - // Return nil: the file was quarantined successfully; the error was already - // logged. Returning runErr would cause processDir to log it again at Error level. + // Return nil: quarantine succeeded; error already logged. return nil } - // Move to processed/YYYY-MM-DD/. + // Copy to processed/YYYY-MM-DD/ and leave a .processed marker so we don't re-ingest. processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date) if err := os.MkdirAll(processedDir, 0o755); err != nil { return fmt.Errorf("mkdir processed dir: %w", err) } dest := filepath.Join(processedDir, filename) if _, err := os.Stat(dest); err == nil { - // File already exists in processed; append timestamp to avoid overwriting the archive. + // Archive copy already exists; append timestamp to avoid overwriting. ext := filepath.Ext(filename) base := strings.TrimSuffix(filename, ext) dest = filepath.Join(processedDir, base+"-"+time.Now().UTC().Format("150405")+ext) } - if err := os.Rename(path, dest); err != nil { - return fmt.Errorf("move to processed: %w", err) + if err := copyFile(path, dest); err != nil { + return fmt.Errorf("copy to processed: %w", err) + } + if err := os.WriteFile(path+".processed", []byte(date), 0o644); err != nil { + slog.Error("watcher: failed to write .processed marker", "error", err) } slog.Info("watcher: file processed", "file", filename, "source", source) return nil } +// copyFile copies src to dst, creating dst if it doesn't exist. +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return fmt.Errorf("open src: %w", err) + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return fmt.Errorf("create dst: %w", err) + } + defer out.Close() + + if _, err := io.Copy(out, in); err != nil { + return fmt.Errorf("copy: %w", err) + } + return out.Close() +} + // deriveSource turns a filename into a human-readable source name. // "shape-up-book.md" → "Shape Up Book" func deriveSource(filename string) string { diff --git a/ingestion/internal/watcher/watcher_test.go b/ingestion/internal/watcher/watcher_test.go index 4b20d39..8fd13a6 100644 --- a/ingestion/internal/watcher/watcher_test.go +++ b/ingestion/internal/watcher/watcher_test.go @@ -81,11 +81,15 @@ func TestStart_ProcessesFile(t *testing.T) { } time.Sleep(20 * time.Millisecond) } - require.True(t, found, "file should be moved to processed/") + require.True(t, found, "file should be copied to processed/") - // Original file should be gone. + // Original file should still exist (copy, not move — keeps Obsidian vault intact). _, err := os.Stat(rawFile) - assert.True(t, os.IsNotExist(err), "original file should be gone from raw/") + assert.NoError(t, err, "original file should remain in raw/") + + // A .processed marker should exist next to the original. + _, err = os.Stat(rawFile + ".processed") + assert.NoError(t, err, ".processed marker should be written") // Wiki page should exist. wikiPath := filepath.Join(brainDir, "wiki", "sources", "shape-up-book.md") @@ -130,11 +134,15 @@ func TestStart_MovesToFailedOnError(t *testing.T) { } time.Sleep(20 * time.Millisecond) } - require.True(t, found, "file should be moved to failed/") + require.True(t, found, "file should be copied to failed/") - // Original file should be gone from raw/. + // Original file should still exist (copy, not move — keeps Obsidian vault intact). _, err := os.Stat(rawFile) - assert.True(t, os.IsNotExist(err), "original file should be gone from raw/") + assert.NoError(t, err, "original file should remain in raw/") + + // A .failed marker should exist next to the original. + _, err = os.Stat(rawFile + ".failed") + assert.NoError(t, err, ".failed marker should be written") // log.md should contain a watcher error entry. logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))