From 097c4b3d84867abf7292a82faf3fbec898830d36 Mon Sep 17 00:00:00 2001 From: Christian Schmied Date: Thu, 10 Oct 2024 16:12:29 +0200 Subject: [PATCH] implement in order of creation for oldFiles --- folderwatcher.go | 102 +++++++++++----- folderwatcher_globbing_test.go | 162 ++++++++++++++++++++++++++ folderwatcher_oldFilesInOrder_test.go | 87 ++++++++++++++ folderwatcher_test.go | 152 ------------------------ 4 files changed, 324 insertions(+), 179 deletions(-) create mode 100644 folderwatcher_globbing_test.go create mode 100644 folderwatcher_oldFilesInOrder_test.go diff --git a/folderwatcher.go b/folderwatcher.go index 596ff7e..e454da3 100644 --- a/folderwatcher.go +++ b/folderwatcher.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "regexp" + "slices" "sync" "time" @@ -22,8 +23,10 @@ type FolderWatcher struct { quit chan struct{} mutex sync.Mutex wg sync.WaitGroup + debounceWg sync.WaitGroup debounceMap map[string]*time.Timer debouncedChan chan string + existingList []string enqueuedFiles int } @@ -71,6 +74,7 @@ func NewFolderWatcher(conf Config, includeExisting bool, quit chan struct{}) (*F quit: quit, debounceMap: make(map[string]*time.Timer), debouncedChan: make(chan string), + existingList: []string{}, } if includeExisting { @@ -81,10 +85,18 @@ func NewFolderWatcher(conf Config, includeExisting bool, quit chan struct{}) (*F } for _, entry := range entries { if !entry.IsDir() { - folderWatcher.foundFileEvent(filepath.Join(path, entry.Name())) + fullFilePath := filepath.Join(path, entry.Name()) + if folderWatcher.matchesPattern(fullFilePath) { + folderWatcher.existingList = append(folderWatcher.existingList, fullFilePath) + } } } } + slices.SortFunc(folderWatcher.existingList, func(a, b string) int { + infoA, _ := os.Stat(a) + infoB, _ := os.Stat(b) + return infoA.ModTime().Compare(infoB.ModTime()) + }) } return folderWatcher, nil @@ -95,21 +107,14 @@ type HandlerCallback func(path string, err error) func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) { defer w.watcher.Close() - for stopped := false; !stopped; { - select { - case <-w.quit: - logger.Info("Stopping watcher") - stopped = true - case event, ok := <-w.watcher.Events: - if !ok { - stopped = true - break - } - logger.Debug("event:", event) - if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) { - w.foundFileEvent(event.Name) - } - case file, _ := <-w.debouncedChan: + initialWait := sync.WaitGroup{} + + handlerWg := sync.WaitGroup{} + handlerWg.Add(1) + // Start File Handling + go func() { + defer handlerWg.Done() + for file := range w.debouncedChan { triggerCallback, err := handler(file) if err != nil { logger.Warn("error during handle:", err) @@ -117,7 +122,42 @@ func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) { if triggerCallback { callback(file, err) } - w.wg.Done() + w.wg.Done() // mark enqueued file as done + } + }() + + if len(w.existingList) > 0 { + logger.Infof("got %d existing files", len(w.existingList)) + initialWait.Add(1) + go func() { + defer initialWait.Done() + for _, path := range w.existingList { + w.enqueue(path) + } + }() + } + + // Start Event Handling + for stopped := false; !stopped; { + select { + case <-w.quit: + logger.Info("Stopping watcher") + w.debounceWg.Wait() // wait for debounce to settle + w.wg.Wait() // wait for existing files to be handled + close(w.debouncedChan) //close the debounced chan to mark it as done + stopped = true + case event, ok := <-w.watcher.Events: + initialWait.Wait() // wait that all initial are enqueued, before handling of first event + if !ok { + stopped = true + break + } + logger.Debug("event:", event) + if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) { + if w.matchesPattern(event.Name) { + w.enqueueDebounce(event.Name) + } + } case err, ok := <-w.watcher.Errors: if !ok { stopped = true @@ -126,31 +166,39 @@ func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) { logger.Warn("error:", err) } } - w.wg.Wait() + handlerWg.Wait() // wait for filehandler to be closed, implies debouncedChan is closed logger.Infof("watcher handled %d files", w.enqueuedFiles) } -func (w *FolderWatcher) foundFileEvent(fileName string) { +func (w *FolderWatcher) matchesPattern(fileName string) bool { baseName := filepath.Base(fileName) - if w.regex == nil || w.regex.MatchString(baseName) { - w.enqueueDebounce(fileName) + return w.regex == nil || w.regex.MatchString(baseName) +} + +func (w *FolderWatcher) enqueue(f string) { + w.wg.Add(1) + w.mutex.Lock() + defer w.mutex.Unlock() + if timer, ok := w.debounceMap[f]; ok && timer != nil { + timer.Stop() } + delete(w.debounceMap, f) + w.enqueuedFiles = w.enqueuedFiles + 1 + w.debouncedChan <- f } func (w *FolderWatcher) enqueueDebounce(f string) { + logger.Infof("Debounced Enqueue of %s", f) w.mutex.Lock() defer w.mutex.Unlock() if timer, ok := w.debounceMap[f]; ok && timer != nil { timer.Stop() } else { - w.wg.Add(1) + w.debounceWg.Add(1) } w.debounceMap[f] = time.AfterFunc(DebounceTime, func() { - w.mutex.Lock() - defer w.mutex.Unlock() - delete(w.debounceMap, f) - w.enqueuedFiles = w.enqueuedFiles + 1 - w.debouncedChan <- f + defer w.debounceWg.Done() + w.enqueue(f) }) } diff --git a/folderwatcher_globbing_test.go b/folderwatcher_globbing_test.go new file mode 100644 index 0000000..a6feb00 --- /dev/null +++ b/folderwatcher_globbing_test.go @@ -0,0 +1,162 @@ +package folderwatcher + +import ( + "context" + "os" + "path/filepath" + "strconv" + "sync" + "testing" + "time" +) + +func TestGlobbingN1(t *testing.T) { + tmpPath, err := os.MkdirTemp("", ".folderwatcher_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpPath) + quitChan := make(chan struct{}) + + _ = os.MkdirAll(filepath.Join(tmpPath, "g0"), os.ModePerm) + _ = os.MkdirAll(filepath.Join(tmpPath, "g1"), os.ModePerm) + _ = os.MkdirAll(filepath.Join(tmpPath, "g2"), os.ModePerm) + + filesIn := []string{"A", "B", "C"} + var filesOut []string + + wg := sync.WaitGroup{} + + conf := Config{ + Folder: filepath.Join(tmpPath, "*"), + } + + watcher, err := NewFolderWatcher(conf, true, quitChan) + if err != nil { + t.Fatal(err) + } + + go watcher.Watch(func(filePath string) (bool, error) { + if !filepath.IsAbs(filePath) { + t.Errorf("file %s is not an absolute path", filePath) + } + rel, err := filepath.Rel(tmpPath, filePath) + if err != nil { + return true, err + } + filesOut = append(filesOut, rel) + wg.Done() + return true, nil + }, func(s string, err error) { + }) + + for i, f := range filesIn { + wg.Add(1) + _ = os.WriteFile(filepath.Join(tmpPath, "g"+strconv.Itoa(i), f), []byte{0}, os.ModePerm) + } + + finishedChan := make(chan struct{}) + go func() { + wg.Wait() + finishedChan <- struct{}{} + }() + + ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer ctxCancel() + + select { + case <-finishedChan: + case <-ctx.Done(): + t.Error(ctx.Err()) + } + quitChan <- struct{}{} + + if len(filesOut) != len(filesIn) { + t.Errorf("filesOut length %d != %d", len(filesOut), len(filesIn)) + } + for i, f := range filesIn { + if !Contains(filesOut, filepath.Join("g"+strconv.Itoa(i), f)) { + t.Errorf("File %s not found in %s", f, filesOut) + } + } +} + +func TestGlobbingN2(t *testing.T) { + tmpPath, err := os.MkdirTemp("", ".folderwatcher_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpPath) + quitChan := make(chan struct{}) + + _ = os.MkdirAll(filepath.Join(tmpPath, "g0", "s0"), os.ModePerm) + _ = os.MkdirAll(filepath.Join(tmpPath, "g0", "s2"), os.ModePerm) + _ = os.MkdirAll(filepath.Join(tmpPath, "g1", "s1"), os.ModePerm) + _ = os.MkdirAll(filepath.Join(tmpPath, "g1", "s3"), os.ModePerm) + _ = os.MkdirAll(filepath.Join(tmpPath, "gX"), os.ModePerm) + + filesIn := []string{"A", "B", "C", "D"} + var filesOut []string + + wg := sync.WaitGroup{} + + conf := Config{ + Folder: filepath.Join(tmpPath, "*", "*"), + } + + watcher, err := NewFolderWatcher(conf, true, quitChan) + if err != nil { + t.Fatal(err) + } + + go watcher.Watch(func(filePath string) (bool, error) { + if !filepath.IsAbs(filePath) { + t.Errorf("file %s is not an absolute path", filePath) + } + rel, err := filepath.Rel(tmpPath, filePath) + if err != nil { + return true, err + } + filesOut = append(filesOut, rel) + wg.Done() + return true, nil + }, func(s string, err error) { + }) + + for i, f := range filesIn { + wg.Add(1) + _ = os.WriteFile(filepath.Join(tmpPath, "g"+strconv.Itoa(i%2), "s"+strconv.Itoa(i), f), []byte{0}, os.ModePerm) + } + + wg.Add(1) + _ = os.WriteFile(filepath.Join(tmpPath, "gX", "foo"), []byte{0}, os.ModePerm) + + finishedChan := make(chan struct{}) + go func() { + wg.Wait() + finishedChan <- struct{}{} + }() + + ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer ctxCancel() + + select { + case <-finishedChan: + case <-ctx.Done(): + wg.Done() // ctx Will Timeout, because gX/foo will always be remaining + // t.Error(ctx.Err()) + } + quitChan <- struct{}{} + + if len(filesOut) != len(filesIn) { + t.Errorf("filesOut length %d != %d", len(filesOut), len(filesIn)) + } + for i, f := range filesIn { + if !Contains(filesOut, filepath.Join("g"+strconv.Itoa(i%2), "s"+strconv.Itoa(i), f)) { + t.Errorf("File %s not found in %s", f, filesOut) + } + } + if Contains(filesOut, filepath.Join("gX", "foo")) { + t.Errorf("File %s found in %s", filepath.Join("gX", "foo"), filesOut) + } +} diff --git a/folderwatcher_oldFilesInOrder_test.go b/folderwatcher_oldFilesInOrder_test.go new file mode 100644 index 0000000..da7bd1e --- /dev/null +++ b/folderwatcher_oldFilesInOrder_test.go @@ -0,0 +1,87 @@ +package folderwatcher + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +func TestOldFilesInCreationOrder(t *testing.T) { + tmpPath, err := os.MkdirTemp("", ".folderwatcher_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpPath) + quitChan := make(chan struct{}) + + filesIn := []string{"2", "C", "B", "A", "1"} + makeAbsFilePaths(t, tmpPath, &filesIn) + var filesOut []string + + wg := sync.WaitGroup{} + + conf := Config{ + Folder: tmpPath, + } + + for _, f := range filesIn { + wg.Add(1) + _ = os.WriteFile(f, []byte{0}, os.ModePerm) + time.Sleep(10 * time.Millisecond) + } + + watcher, err := NewFolderWatcher(conf, true, quitChan) + if err != nil { + t.Fatal(err) + } + + go watcher.Watch(func(filePath string) (bool, error) { + if !filepath.IsAbs(filePath) { + t.Errorf("file %s is not an absolute path", filePath) + } + filesOut = append(filesOut, filePath) + wg.Done() + return true, nil + }, func(s string, err error) { + }) + + finishedChan := make(chan struct{}) + go func() { + wg.Wait() + finishedChan <- struct{}{} + }() + + ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer ctxCancel() + + select { + case <-finishedChan: + case <-ctx.Done(): + t.Error(ctx.Err()) + } + quitChan <- struct{}{} + + if len(filesOut) != len(filesIn) { + t.Errorf("filesOut length %d != %d", len(filesOut), len(filesIn)) + } + + filesInS := fmt.Sprintf("%s", filesIn) + filesOutS := fmt.Sprintf("%s", filesOut) + if filesInS != filesOutS { + t.Errorf("filesIn != filesOut %s != %s", filesIn, filesOut) + } +} + +func makeAbsFilePaths(t *testing.T, rootPath string, filesIn *[]string) { + t.Helper() + for i, f := range *filesIn { //remap filesIn to Full Path + (*filesIn)[i] = filepath.Join(rootPath, f) + if !filepath.IsAbs((*filesIn)[i]) { + t.Fatalf("file %s is not an absolute path", (*filesIn)[i]) + } + } +} diff --git a/folderwatcher_test.go b/folderwatcher_test.go index ab842ed..6e3543a 100644 --- a/folderwatcher_test.go +++ b/folderwatcher_test.go @@ -4,7 +4,6 @@ import ( "context" "os" "path/filepath" - "strconv" "sync" "testing" "time" @@ -314,157 +313,6 @@ func TestOldAndNewFileHandlingWithoutExisting(t *testing.T) { } } -func TestGlobbingN1(t *testing.T) { - tmpPath, err := os.MkdirTemp("", ".folderwatcher_test") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmpPath) - quitChan := make(chan struct{}) - - _ = os.MkdirAll(filepath.Join(tmpPath, "g0"), os.ModePerm) - _ = os.MkdirAll(filepath.Join(tmpPath, "g1"), os.ModePerm) - _ = os.MkdirAll(filepath.Join(tmpPath, "g2"), os.ModePerm) - - filesIn := []string{"A", "B", "C"} - var filesOut []string - - wg := sync.WaitGroup{} - - conf := Config{ - Folder: filepath.Join(tmpPath, "*"), - } - - watcher, err := NewFolderWatcher(conf, true, quitChan) - if err != nil { - t.Fatal(err) - } - - go watcher.Watch(func(filePath string) (bool, error) { - if !filepath.IsAbs(filePath) { - t.Errorf("file %s is not an absolute path", filePath) - } - rel, err := filepath.Rel(tmpPath, filePath) - if err != nil { - return true, err - } - filesOut = append(filesOut, rel) - wg.Done() - return true, nil - }, func(s string, err error) { - }) - - for i, f := range filesIn { - wg.Add(1) - _ = os.WriteFile(filepath.Join(tmpPath, "g"+strconv.Itoa(i), f), []byte{0}, os.ModePerm) - } - - finishedChan := make(chan struct{}) - go func() { - wg.Wait() - finishedChan <- struct{}{} - }() - - ctx, ctxCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer ctxCancel() - - select { - case <-finishedChan: - case <-ctx.Done(): - t.Error(ctx.Err()) - } - quitChan <- struct{}{} - - if len(filesOut) != len(filesIn) { - t.Errorf("filesOut length %d != %d", len(filesOut), len(filesIn)) - } - for i, f := range filesIn { - if !Contains(filesOut, filepath.Join("g"+strconv.Itoa(i), f)) { - t.Errorf("File %s not found in %s", f, filesOut) - } - } -} - -func TestGlobbingN2(t *testing.T) { - tmpPath, err := os.MkdirTemp("", ".folderwatcher_test") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmpPath) - quitChan := make(chan struct{}) - - _ = os.MkdirAll(filepath.Join(tmpPath, "g0", "s0"), os.ModePerm) - _ = os.MkdirAll(filepath.Join(tmpPath, "g0", "s2"), os.ModePerm) - _ = os.MkdirAll(filepath.Join(tmpPath, "g1", "s1"), os.ModePerm) - _ = os.MkdirAll(filepath.Join(tmpPath, "g1", "s3"), os.ModePerm) - _ = os.MkdirAll(filepath.Join(tmpPath, "gX"), os.ModePerm) - - filesIn := []string{"A", "B", "C", "D"} - var filesOut []string - - wg := sync.WaitGroup{} - - conf := Config{ - Folder: filepath.Join(tmpPath, "*", "*"), - } - - watcher, err := NewFolderWatcher(conf, true, quitChan) - if err != nil { - t.Fatal(err) - } - - go watcher.Watch(func(filePath string) (bool, error) { - if !filepath.IsAbs(filePath) { - t.Errorf("file %s is not an absolute path", filePath) - } - rel, err := filepath.Rel(tmpPath, filePath) - if err != nil { - return true, err - } - filesOut = append(filesOut, rel) - wg.Done() - return true, nil - }, func(s string, err error) { - }) - - for i, f := range filesIn { - wg.Add(1) - _ = os.WriteFile(filepath.Join(tmpPath, "g"+strconv.Itoa(i%2), "s"+strconv.Itoa(i), f), []byte{0}, os.ModePerm) - } - - wg.Add(1) - _ = os.WriteFile(filepath.Join(tmpPath, "gX", "foo"), []byte{0}, os.ModePerm) - - finishedChan := make(chan struct{}) - go func() { - wg.Wait() - finishedChan <- struct{}{} - }() - - ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer ctxCancel() - - select { - case <-finishedChan: - case <-ctx.Done(): - wg.Done() // ctx Will Timeout, because gX/foo will always be remaining - // t.Error(ctx.Err()) - } - quitChan <- struct{}{} - - if len(filesOut) != len(filesIn) { - t.Errorf("filesOut length %d != %d", len(filesOut), len(filesIn)) - } - for i, f := range filesIn { - if !Contains(filesOut, filepath.Join("g"+strconv.Itoa(i%2), "s"+strconv.Itoa(i), f)) { - t.Errorf("File %s not found in %s", f, filesOut) - } - } - if Contains(filesOut, filepath.Join("gX", "foo")) { - t.Errorf("File %s found in %s", filepath.Join("gX", "foo"), filesOut) - } -} - func Contains[T comparable](slice []T, item T) bool { for _, v := range slice { if v == item {