Christian Schmied
097c4b3d84
All checks were successful
go/folderwatcher/pipeline/head This commit looks good
204 lines
4.7 KiB
Go
204 lines
4.7 KiB
Go
package folderwatcher
|
|
|
|
import (
|
|
"github.com/sirupsen/logrus"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fsnotify/fsnotify"
|
|
)
|
|
|
|
var logger = logrus.WithField("package", "folderwatcher")
|
|
|
|
const DebounceTime = 2 * time.Second
|
|
|
|
type FolderWatcher struct {
|
|
watcher *fsnotify.Watcher
|
|
Config Config
|
|
regex *regexp.Regexp
|
|
quit chan struct{}
|
|
mutex sync.Mutex
|
|
wg sync.WaitGroup
|
|
debounceWg sync.WaitGroup
|
|
debounceMap map[string]*time.Timer
|
|
debouncedChan chan string
|
|
existingList []string
|
|
enqueuedFiles int
|
|
}
|
|
|
|
func NewFolderWatcher(conf Config, includeExisting bool, quit chan struct{}) (*FolderWatcher, error) {
|
|
watcher, err := fsnotify.NewWatcher()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
matches, err := filepath.Glob(conf.Folder)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var matchedFolders []string
|
|
for _, path := range matches {
|
|
stat, err := os.Stat(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if stat.IsDir() {
|
|
matchedFolders = append(matchedFolders, path)
|
|
}
|
|
}
|
|
|
|
for _, path := range matchedFolders {
|
|
err = watcher.Add(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var regex *regexp.Regexp
|
|
if len(conf.Pattern) > 0 {
|
|
regex, err = regexp.Compile("^" + conf.Pattern + "$")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var folderWatcher = &FolderWatcher{
|
|
watcher: watcher,
|
|
Config: conf,
|
|
regex: regex,
|
|
quit: quit,
|
|
debounceMap: make(map[string]*time.Timer),
|
|
debouncedChan: make(chan string),
|
|
existingList: []string{},
|
|
}
|
|
|
|
if includeExisting {
|
|
for _, path := range matchedFolders {
|
|
entries, err := os.ReadDir(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, entry := range entries {
|
|
if !entry.IsDir() {
|
|
fullFilePath := filepath.Join(path, entry.Name())
|
|
if folderWatcher.matchesPattern(fullFilePath) {
|
|
folderWatcher.existingList = append(folderWatcher.existingList, fullFilePath)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
slices.SortFunc(folderWatcher.existingList, func(a, b string) int {
|
|
infoA, _ := os.Stat(a)
|
|
infoB, _ := os.Stat(b)
|
|
return infoA.ModTime().Compare(infoB.ModTime())
|
|
})
|
|
}
|
|
|
|
return folderWatcher, nil
|
|
}
|
|
|
|
type FileHandler func(path string) (triggerCallback bool, err error)
|
|
type HandlerCallback func(path string, err error)
|
|
|
|
func (w *FolderWatcher) Watch(handler FileHandler, callback HandlerCallback) {
|
|
defer w.watcher.Close()
|
|
initialWait := sync.WaitGroup{}
|
|
|
|
handlerWg := sync.WaitGroup{}
|
|
handlerWg.Add(1)
|
|
// Start File Handling
|
|
go func() {
|
|
defer handlerWg.Done()
|
|
for file := range w.debouncedChan {
|
|
triggerCallback, err := handler(file)
|
|
if err != nil {
|
|
logger.Warn("error during handle:", err)
|
|
}
|
|
if triggerCallback {
|
|
callback(file, err)
|
|
}
|
|
w.wg.Done() // mark enqueued file as done
|
|
}
|
|
}()
|
|
|
|
if len(w.existingList) > 0 {
|
|
logger.Infof("got %d existing files", len(w.existingList))
|
|
initialWait.Add(1)
|
|
go func() {
|
|
defer initialWait.Done()
|
|
for _, path := range w.existingList {
|
|
w.enqueue(path)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Start Event Handling
|
|
for stopped := false; !stopped; {
|
|
select {
|
|
case <-w.quit:
|
|
logger.Info("Stopping watcher")
|
|
w.debounceWg.Wait() // wait for debounce to settle
|
|
w.wg.Wait() // wait for existing files to be handled
|
|
close(w.debouncedChan) //close the debounced chan to mark it as done
|
|
stopped = true
|
|
case event, ok := <-w.watcher.Events:
|
|
initialWait.Wait() // wait that all initial are enqueued, before handling of first event
|
|
if !ok {
|
|
stopped = true
|
|
break
|
|
}
|
|
logger.Debug("event:", event)
|
|
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) {
|
|
if w.matchesPattern(event.Name) {
|
|
w.enqueueDebounce(event.Name)
|
|
}
|
|
}
|
|
case err, ok := <-w.watcher.Errors:
|
|
if !ok {
|
|
stopped = true
|
|
break
|
|
}
|
|
logger.Warn("error:", err)
|
|
}
|
|
}
|
|
handlerWg.Wait() // wait for filehandler to be closed, implies debouncedChan is closed
|
|
logger.Infof("watcher handled %d files", w.enqueuedFiles)
|
|
}
|
|
|
|
func (w *FolderWatcher) matchesPattern(fileName string) bool {
|
|
baseName := filepath.Base(fileName)
|
|
return w.regex == nil || w.regex.MatchString(baseName)
|
|
}
|
|
|
|
func (w *FolderWatcher) enqueue(f string) {
|
|
w.wg.Add(1)
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
if timer, ok := w.debounceMap[f]; ok && timer != nil {
|
|
timer.Stop()
|
|
}
|
|
delete(w.debounceMap, f)
|
|
w.enqueuedFiles = w.enqueuedFiles + 1
|
|
w.debouncedChan <- f
|
|
}
|
|
|
|
func (w *FolderWatcher) enqueueDebounce(f string) {
|
|
logger.Infof("Debounced Enqueue of %s", f)
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if timer, ok := w.debounceMap[f]; ok && timer != nil {
|
|
timer.Stop()
|
|
} else {
|
|
w.debounceWg.Add(1)
|
|
}
|
|
w.debounceMap[f] = time.AfterFunc(DebounceTime, func() {
|
|
defer w.debounceWg.Done()
|
|
w.enqueue(f)
|
|
})
|
|
}
|