folderwatcher/folderwatcher.go

216 lines
4.9 KiB
Go

package folderwatcher
import (
"fmt"
"github.com/sirupsen/logrus"
"os"
"path/filepath"
"regexp"
"slices"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
var logger = logrus.WithField("package", "folderwatcher")
const DebounceTime = 2 * time.Second
type FolderWatcher struct {
watcher *fsnotify.Watcher
Config Config
regex *regexp.Regexp
quit chan struct{}
mutex sync.Mutex
wg sync.WaitGroup
debounceWg sync.WaitGroup
debounceMap map[string]*time.Timer
debouncedChan chan string
existingList []string
enqueuedFiles int
}
func NewFolderWatcher(conf Config, includeExisting bool, quit chan struct{}) (*FolderWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
matches, err := filepath.Glob(conf.Folder)
if err != nil {
return nil, err
}
var matchedFolders []string
for _, path := range matches {
stat, err := os.Stat(path)
if err != nil {
return nil, err
}
if stat.IsDir() {
matchedFolders = append(matchedFolders, path)
}
}
if len(matchedFolders) == 0 {
return nil, fmt.Errorf("no folders found matching '%s'", conf.Folder)
}
for _, path := range matchedFolders {
stat, err := os.Stat(path)
if err != nil {
return nil, err
}
if !stat.IsDir() {
return nil, fmt.Errorf("%s is not a folder", path)
}
err = watcher.Add(path)
if err != nil {
return nil, err
}
}
var regex *regexp.Regexp
if len(conf.Pattern) > 0 {
regex, err = regexp.Compile("^" + conf.Pattern + "$")
if err != nil {
return nil, err
}
}
var folderWatcher = &FolderWatcher{
watcher: watcher,
Config: conf,
regex: regex,
quit: quit,
debounceMap: make(map[string]*time.Timer),
debouncedChan: make(chan string),
existingList: []string{},
}
if includeExisting {
for _, path := range matchedFolders {
entries, err := os.ReadDir(path)
if err != nil {
return nil, err
}
for _, entry := range entries {
if !entry.IsDir() {
fullFilePath := filepath.Join(path, entry.Name())
if folderWatcher.matchesPattern(fullFilePath) {
folderWatcher.existingList = append(folderWatcher.existingList, fullFilePath)
}
}
}
}
slices.SortFunc(folderWatcher.existingList, func(a, b string) int {
infoA, _ := os.Stat(a)
infoB, _ := os.Stat(b)
return infoA.ModTime().Compare(infoB.ModTime())
})
}
return folderWatcher, nil
}
type FileHandler func(path string) (triggerCallback bool, err error)
type HandlerCallback func(path string, err error)
func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) {
defer w.watcher.Close()
initialWait := sync.WaitGroup{}
handlerWg := sync.WaitGroup{}
handlerWg.Add(1)
// Start File Handling
go func() {
defer handlerWg.Done()
for file := range w.debouncedChan {
triggerCallback, err := handler(file)
if err != nil {
logger.Warn("error during handle:", err)
}
if triggerCallback {
callback(file, err)
}
w.wg.Done() // mark enqueued file as done
}
}()
if len(w.existingList) > 0 {
logger.Infof("got %d existing files", len(w.existingList))
initialWait.Add(1)
go func() {
defer initialWait.Done()
for _, path := range w.existingList {
w.enqueue(path)
}
}()
}
// Start Event Handling
for stopped := false; !stopped; {
select {
case <-w.quit:
logger.Info("Stopping watcher")
w.debounceWg.Wait() // wait for debounce to settle
w.wg.Wait() // wait for existing files to be handled
close(w.debouncedChan) //close the debounced chan to mark it as done
stopped = true
case event, ok := <-w.watcher.Events:
initialWait.Wait() // wait that all initial are enqueued, before handling of first event
if !ok {
stopped = true
break
}
logger.Debug("event:", event)
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) {
if w.matchesPattern(event.Name) {
w.enqueueDebounce(event.Name)
}
}
case err, ok := <-w.watcher.Errors:
if !ok {
stopped = true
break
}
logger.Warn("error:", err)
}
}
handlerWg.Wait() // wait for filehandler to be closed, implies debouncedChan is closed
logger.Infof("watcher handled %d files", w.enqueuedFiles)
}
func (w *FolderWatcher) matchesPattern(fileName string) bool {
baseName := filepath.Base(fileName)
return w.regex == nil || w.regex.MatchString(baseName)
}
func (w *FolderWatcher) enqueue(f string) {
w.wg.Add(1)
w.mutex.Lock()
defer w.mutex.Unlock()
if timer, ok := w.debounceMap[f]; ok && timer != nil {
timer.Stop()
}
delete(w.debounceMap, f)
w.enqueuedFiles = w.enqueuedFiles + 1
w.debouncedChan <- f
}
func (w *FolderWatcher) enqueueDebounce(f string) {
logger.Infof("Debounced Enqueue of %s", f)
w.mutex.Lock()
defer w.mutex.Unlock()
if timer, ok := w.debounceMap[f]; ok && timer != nil {
timer.Stop()
} else {
w.debounceWg.Add(1)
}
w.debounceMap[f] = time.AfterFunc(DebounceTime, func() {
defer w.debounceWg.Done()
w.enqueue(f)
})
}