package folderwatcher import ( "github.com/sirupsen/logrus" "os" "path/filepath" "regexp" "sync" "time" "github.com/fsnotify/fsnotify" ) var logger = logrus.WithField("package", "folderwatcher") const DebounceTime = 2 * time.Second type FolderWatcher struct { watcher *fsnotify.Watcher Config Config regex *regexp.Regexp quit chan struct{} mutex sync.Mutex wg sync.WaitGroup debounceMap map[string]*time.Timer debouncedChan chan string enqueuedFiles int } func NewFolderWatcher(conf Config, includeExisting bool, quit chan struct{}) (*FolderWatcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err } matches, err := filepath.Glob(conf.Folder) if err != nil { return nil, err } var matchedFolders []string for _, path := range matches { stat, err := os.Stat(path) if err != nil { return nil, err } if stat.IsDir() { matchedFolders = append(matchedFolders, path) } } for _, path := range matchedFolders { err = watcher.Add(path) if err != nil { return nil, err } } var regex *regexp.Regexp if len(conf.Pattern) > 0 { regex, err = regexp.Compile("^" + conf.Pattern + "$") if err != nil { return nil, err } } var folderWatcher = &FolderWatcher{ watcher: watcher, Config: conf, regex: regex, quit: quit, debounceMap: make(map[string]*time.Timer), debouncedChan: make(chan string), } if includeExisting { for _, path := range matchedFolders { entries, err := os.ReadDir(path) if err != nil { return nil, err } for _, entry := range entries { if !entry.IsDir() { folderWatcher.foundFileEvent(filepath.Join(path, entry.Name())) } } } } return folderWatcher, nil } type FileHandler func(path string) (triggerCallback bool, err error) type HandlerCallback func(path string, err error) func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) { defer w.watcher.Close() for stopped := false; !stopped; { select { case <-w.quit: logger.Info("Stopping watcher") stopped = true case event, ok := <-w.watcher.Events: if !ok { stopped = true break } logger.Debug("event:", event) if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) { w.foundFileEvent(event.Name) } case file, _ := <-w.debouncedChan: triggerCallback, err := handler(file) if err != nil { logger.Warn("error during handle:", err) } if triggerCallback { callback(file, err) } w.wg.Done() case err, ok := <-w.watcher.Errors: if !ok { stopped = true break } logger.Warn("error:", err) } } w.wg.Wait() logger.Infof("watcher handled %d files", w.enqueuedFiles) } func (w *FolderWatcher) foundFileEvent(fileName string) { baseName := filepath.Base(fileName) if w.regex == nil || w.regex.MatchString(baseName) { w.enqueueDebounce(fileName) } } func (w *FolderWatcher) enqueueDebounce(f string) { w.mutex.Lock() defer w.mutex.Unlock() if timer, ok := w.debounceMap[f]; ok && timer != nil { timer.Stop() } else { w.wg.Add(1) } w.debounceMap[f] = time.AfterFunc(DebounceTime, func() { w.mutex.Lock() defer w.mutex.Unlock() delete(w.debounceMap, f) w.enqueuedFiles = w.enqueuedFiles + 1 w.debouncedChan <- f }) }