folderwatcher/folderwatcher.go
Christian Schmied 366152c483
All checks were successful
go/folderwatcher/pipeline/head This commit looks good
init
2024-09-26 12:34:36 +02:00

156 lines
3.3 KiB
Go

package folderwatcher
import (
"github.com/sirupsen/logrus"
"os"
"path/filepath"
"regexp"
"sync"
"time"
"github.com/fsnotify/fsnotify"
)
var logger = logrus.WithField("package", "folderwatcher")
const DebounceTime = 2 * time.Second
type FolderWatcher struct {
watcher *fsnotify.Watcher
Config Config
regex *regexp.Regexp
quit chan struct{}
mutex sync.Mutex
wg sync.WaitGroup
debounceMap map[string]*time.Timer
debouncedChan chan string
enqueuedFiles int
}
func NewFolderWatcher(conf Config, includeExisting bool, quit chan struct{}) (*FolderWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
matches, err := filepath.Glob(conf.Folder)
if err != nil {
return nil, err
}
var matchedFolders []string
for _, path := range matches {
stat, err := os.Stat(path)
if err != nil {
return nil, err
}
if stat.IsDir() {
matchedFolders = append(matchedFolders, path)
}
}
for _, path := range matchedFolders {
err = watcher.Add(path)
if err != nil {
return nil, err
}
}
var regex *regexp.Regexp
if len(conf.Pattern) > 0 {
regex, err = regexp.Compile("^" + conf.Pattern + "$")
if err != nil {
return nil, err
}
}
var folderWatcher = &FolderWatcher{
watcher: watcher,
Config: conf,
regex: regex,
quit: quit,
debounceMap: make(map[string]*time.Timer),
debouncedChan: make(chan string),
}
if includeExisting {
for _, path := range matchedFolders {
entries, err := os.ReadDir(path)
if err != nil {
return nil, err
}
for _, entry := range entries {
if !entry.IsDir() {
folderWatcher.foundFileEvent(filepath.Join(path, entry.Name()))
}
}
}
}
return folderWatcher, nil
}
type FileHandler func(path string) (triggerCallback bool, err error)
type HandlerCallback func(path string, err error)
func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) {
defer w.watcher.Close()
for stopped := false; !stopped; {
select {
case <-w.quit:
logger.Info("Stopping watcher")
stopped = true
case event, ok := <-w.watcher.Events:
if !ok {
stopped = true
break
}
logger.Debug("event:", event)
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) {
w.foundFileEvent(event.Name)
}
case file, _ := <-w.debouncedChan:
triggerCallback, err := handler(file)
if err != nil {
logger.Warn("error during handle:", err)
}
if triggerCallback {
callback(file, err)
}
w.wg.Done()
case err, ok := <-w.watcher.Errors:
if !ok {
stopped = true
break
}
logger.Warn("error:", err)
}
}
w.wg.Wait()
logger.Infof("watcher handled %d files", w.enqueuedFiles)
}
func (w *FolderWatcher) foundFileEvent(fileName string) {
baseName := filepath.Base(fileName)
if w.regex == nil || w.regex.MatchString(baseName) {
w.enqueueDebounce(fileName)
}
}
func (w *FolderWatcher) enqueueDebounce(f string) {
w.mutex.Lock()
defer w.mutex.Unlock()
if timer, ok := w.debounceMap[f]; ok && timer != nil {
timer.Stop()
} else {
w.wg.Add(1)
}
w.debounceMap[f] = time.AfterFunc(DebounceTime, func() {
w.mutex.Lock()
defer w.mutex.Unlock()
delete(w.debounceMap, f)
w.enqueuedFiles = w.enqueuedFiles + 1
w.debouncedChan <- f
})
}