fix(watcher): copy files instead of moving them, leave originals for Obsidian
Files dropped into brain/raw/ are now copied to processed/ or failed/ rather than moved. A .processed or .failed marker is written next to the original so the watcher skips it on subsequent polls without deleting it. This keeps Syncthing-synced Obsidian vaults intact after ingestion. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,7 @@ package watcher
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -72,6 +73,14 @@ func processDir(ctx context.Context, cfg Config, date string) []error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip files that have already been processed or permanently failed.
|
||||||
|
if _, err := os.Stat(path + ".processed"); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if _, err := os.Stat(path + ".failed"); err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err := processFile(ctx, cfg, path, date); err != nil {
|
if err := processFile(ctx, cfg, path, date); err != nil {
|
||||||
errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err))
|
errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err))
|
||||||
}
|
}
|
||||||
@@ -83,7 +92,9 @@ func processDir(ctx context.Context, cfg Config, date string) []error {
|
|||||||
return errs
|
return errs
|
||||||
}
|
}
|
||||||
|
|
||||||
// processFile reads a file, calls pipeline.Run, moves it to processed/ or failed/.
|
// processFile reads a file, calls pipeline.Run, copies it to processed/ or failed/,
|
||||||
|
// and writes a marker file next to the original so the watcher skips it next poll.
|
||||||
|
// The original file is never deleted, keeping Syncthing-connected vaults (e.g. Obsidian) intact.
|
||||||
func processFile(ctx context.Context, cfg Config, path, date string) error {
|
func processFile(ctx context.Context, cfg Config, path, date string) error {
|
||||||
filename := filepath.Base(path)
|
filename := filepath.Base(path)
|
||||||
source := deriveSource(filename)
|
source := deriveSource(filename)
|
||||||
@@ -95,46 +106,71 @@ func processFile(ctx context.Context, cfg Config, path, date string) error {
|
|||||||
|
|
||||||
_, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false)
|
_, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false)
|
||||||
if runErr != nil {
|
if runErr != nil {
|
||||||
// Move to failed/.
|
// Copy to failed/ and leave a .failed marker so we don't retry.
|
||||||
failedDir := filepath.Join(cfg.BrainDir, "raw", "failed")
|
failedDir := filepath.Join(cfg.BrainDir, "raw", "failed")
|
||||||
if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil {
|
if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil {
|
||||||
return fmt.Errorf("mkdir failed dir: %w", mkErr)
|
return fmt.Errorf("mkdir failed dir: %w", mkErr)
|
||||||
}
|
}
|
||||||
dest := filepath.Join(failedDir, filename)
|
dest := filepath.Join(failedDir, filename)
|
||||||
if mvErr := os.Rename(path, dest); mvErr != nil {
|
if cpErr := copyFile(path, dest); cpErr != nil {
|
||||||
return fmt.Errorf("move to failed: %w", mvErr)
|
return fmt.Errorf("copy to failed: %w", cpErr)
|
||||||
|
}
|
||||||
|
if mkErr := os.WriteFile(path+".failed", []byte(runErr.Error()), 0o644); mkErr != nil {
|
||||||
|
slog.Error("watcher: failed to write .failed marker", "error", mkErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Warn("watcher: file failed, moved to failed/", "file", filename, "error", runErr)
|
slog.Warn("watcher: file failed", "file", filename, "error", runErr)
|
||||||
|
|
||||||
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
|
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
|
||||||
slog.Error("watcher: failed to write log entry", "error", logErr)
|
slog.Error("watcher: failed to write log entry", "error", logErr)
|
||||||
}
|
}
|
||||||
// Return nil: the file was quarantined successfully; the error was already
|
// Return nil: quarantine succeeded; error already logged.
|
||||||
// logged. Returning runErr would cause processDir to log it again at Error level.
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Move to processed/YYYY-MM-DD/.
|
// Copy to processed/YYYY-MM-DD/ and leave a .processed marker so we don't re-ingest.
|
||||||
processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date)
|
processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date)
|
||||||
if err := os.MkdirAll(processedDir, 0o755); err != nil {
|
if err := os.MkdirAll(processedDir, 0o755); err != nil {
|
||||||
return fmt.Errorf("mkdir processed dir: %w", err)
|
return fmt.Errorf("mkdir processed dir: %w", err)
|
||||||
}
|
}
|
||||||
dest := filepath.Join(processedDir, filename)
|
dest := filepath.Join(processedDir, filename)
|
||||||
if _, err := os.Stat(dest); err == nil {
|
if _, err := os.Stat(dest); err == nil {
|
||||||
// File already exists in processed; append timestamp to avoid overwriting the archive.
|
// Archive copy already exists; append timestamp to avoid overwriting.
|
||||||
ext := filepath.Ext(filename)
|
ext := filepath.Ext(filename)
|
||||||
base := strings.TrimSuffix(filename, ext)
|
base := strings.TrimSuffix(filename, ext)
|
||||||
dest = filepath.Join(processedDir, base+"-"+time.Now().UTC().Format("150405")+ext)
|
dest = filepath.Join(processedDir, base+"-"+time.Now().UTC().Format("150405")+ext)
|
||||||
}
|
}
|
||||||
if err := os.Rename(path, dest); err != nil {
|
if err := copyFile(path, dest); err != nil {
|
||||||
return fmt.Errorf("move to processed: %w", err)
|
return fmt.Errorf("copy to processed: %w", err)
|
||||||
|
}
|
||||||
|
if err := os.WriteFile(path+".processed", []byte(date), 0o644); err != nil {
|
||||||
|
slog.Error("watcher: failed to write .processed marker", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.Info("watcher: file processed", "file", filename, "source", source)
|
slog.Info("watcher: file processed", "file", filename, "source", source)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// copyFile copies src to dst, creating dst if it doesn't exist.
|
||||||
|
func copyFile(src, dst string) error {
|
||||||
|
in, err := os.Open(src)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open src: %w", err)
|
||||||
|
}
|
||||||
|
defer in.Close()
|
||||||
|
|
||||||
|
out, err := os.Create(dst)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create dst: %w", err)
|
||||||
|
}
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
if _, err := io.Copy(out, in); err != nil {
|
||||||
|
return fmt.Errorf("copy: %w", err)
|
||||||
|
}
|
||||||
|
return out.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// deriveSource turns a filename into a human-readable source name.
|
// deriveSource turns a filename into a human-readable source name.
|
||||||
// "shape-up-book.md" → "Shape Up Book"
|
// "shape-up-book.md" → "Shape Up Book"
|
||||||
func deriveSource(filename string) string {
|
func deriveSource(filename string) string {
|
||||||
|
|||||||
@@ -81,11 +81,15 @@ func TestStart_ProcessesFile(t *testing.T) {
|
|||||||
}
|
}
|
||||||
time.Sleep(20 * time.Millisecond)
|
time.Sleep(20 * time.Millisecond)
|
||||||
}
|
}
|
||||||
require.True(t, found, "file should be moved to processed/")
|
require.True(t, found, "file should be copied to processed/")
|
||||||
|
|
||||||
// Original file should be gone.
|
// Original file should still exist (copy, not move — keeps Obsidian vault intact).
|
||||||
_, err := os.Stat(rawFile)
|
_, err := os.Stat(rawFile)
|
||||||
assert.True(t, os.IsNotExist(err), "original file should be gone from raw/")
|
assert.NoError(t, err, "original file should remain in raw/")
|
||||||
|
|
||||||
|
// A .processed marker should exist next to the original.
|
||||||
|
_, err = os.Stat(rawFile + ".processed")
|
||||||
|
assert.NoError(t, err, ".processed marker should be written")
|
||||||
|
|
||||||
// Wiki page should exist.
|
// Wiki page should exist.
|
||||||
wikiPath := filepath.Join(brainDir, "wiki", "sources", "shape-up-book.md")
|
wikiPath := filepath.Join(brainDir, "wiki", "sources", "shape-up-book.md")
|
||||||
@@ -130,11 +134,15 @@ func TestStart_MovesToFailedOnError(t *testing.T) {
|
|||||||
}
|
}
|
||||||
time.Sleep(20 * time.Millisecond)
|
time.Sleep(20 * time.Millisecond)
|
||||||
}
|
}
|
||||||
require.True(t, found, "file should be moved to failed/")
|
require.True(t, found, "file should be copied to failed/")
|
||||||
|
|
||||||
// Original file should be gone from raw/.
|
// Original file should still exist (copy, not move — keeps Obsidian vault intact).
|
||||||
_, err := os.Stat(rawFile)
|
_, err := os.Stat(rawFile)
|
||||||
assert.True(t, os.IsNotExist(err), "original file should be gone from raw/")
|
assert.NoError(t, err, "original file should remain in raw/")
|
||||||
|
|
||||||
|
// A .failed marker should exist next to the original.
|
||||||
|
_, err = os.Stat(rawFile + ".failed")
|
||||||
|
assert.NoError(t, err, ".failed marker should be written")
|
||||||
|
|
||||||
// log.md should contain a watcher error entry.
|
// log.md should contain a watcher error entry.
|
||||||
logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))
|
logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))
|
||||||
|
|||||||
Reference in New Issue
Block a user