certspotter/monitor/daemon.go

149 lines
4.0 KiB
Go
Raw Normal View History

// Copyright (C) 2023 Opsmate, Inc.
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License, v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// This software is distributed WITHOUT A WARRANTY OF ANY KIND.
// See the Mozilla Public License for details.
package monitor
import (
"context"
"errors"
"fmt"
"golang.org/x/sync/errgroup"
"log"
insecurerand "math/rand"
"software.sslmate.com/src/certspotter/loglist"
"time"
)
const (
reloadLogListIntervalMin = 3 * time.Second // TODO-3: use 30 * time.Minute
reloadLogListIntervalMax = 9 * time.Second // TODO-3: use 90 * time.Minute
healthCheckInterval = 24 * time.Hour
)
func randomDuration(min, max time.Duration) time.Duration {
return min + time.Duration(insecurerand.Int63n(int64(max-min+1)))
}
func reloadLogListInterval() time.Duration {
return randomDuration(reloadLogListIntervalMin, reloadLogListIntervalMax)
}
type task struct {
stop context.CancelFunc
}
type daemon struct {
config *Config
taskgroup *errgroup.Group
tasks map[LogID]task
logsLoadedAt time.Time
logListToken *loglist.ModificationToken
}
func (daemon *daemon) healthCheck(ctx context.Context) error { // TODO-2
return nil
}
func (daemon *daemon) startTask(ctx context.Context, ctlog *loglist.Log) task {
ctx, cancel := context.WithCancel(ctx)
daemon.taskgroup.Go(func() error {
defer cancel()
err := monitorLogContinously(ctx, daemon.config, ctlog)
if daemon.config.Verbose {
log.Printf("task for log %s stopped with error %s", ctlog.URL, err)
}
if ctx.Err() == context.Canceled && errors.Is(err, context.Canceled) {
return nil
} else {
return fmt.Errorf("error while monitoring %s: %w", ctlog.URL, err)
}
})
return task{stop: cancel}
}
func (daemon *daemon) loadLogList(ctx context.Context) error {
newLogList, newToken, err := getLogList(ctx, daemon.config.LogListSource, daemon.logListToken)
if errors.Is(err, loglist.ErrNotModified) {
log.Printf("log list %q not modified", daemon.config.LogListSource)
return nil
} else if err != nil {
return err
}
if daemon.config.Verbose {
log.Printf("fetched %d logs from %q", len(newLogList), daemon.config.LogListSource)
}
for logID, task := range daemon.tasks {
if _, exists := newLogList[logID]; exists {
continue
}
if daemon.config.Verbose {
log.Printf("stopping task for log %s", logID.Base64String())
}
task.stop()
delete(daemon.tasks, logID)
}
for logID, ctlog := range newLogList {
if _, isRunning := daemon.tasks[logID]; isRunning {
continue
}
if daemon.config.Verbose {
log.Printf("starting task for log %s (%s)", logID.Base64String(), ctlog.URL)
}
daemon.tasks[logID] = daemon.startTask(ctx, ctlog)
}
daemon.logsLoadedAt = time.Now()
daemon.logListToken = newToken
return nil
}
func (daemon *daemon) run(ctx context.Context) error {
if err := prepareStateDir(daemon.config.StateDir); err != nil {
return fmt.Errorf("error preparing state directory: %w", err)
}
if err := daemon.loadLogList(ctx); err != nil {
return fmt.Errorf("error loading log list: %w", err)
}
reloadLogListTicker := time.NewTicker(reloadLogListInterval())
defer reloadLogListTicker.Stop()
healthCheckTicker := time.NewTicker(healthCheckInterval)
defer healthCheckTicker.Stop()
for ctx.Err() == nil {
select {
case <-ctx.Done():
case <-reloadLogListTicker.C:
if err := daemon.loadLogList(ctx); err != nil {
recordError(fmt.Errorf("error reloading log list (will try again later): %w", err))
}
reloadLogListTicker.Reset(reloadLogListInterval())
case <-healthCheckTicker.C:
if err := daemon.healthCheck(ctx); err != nil {
return err
}
}
}
return ctx.Err()
}
func Run(ctx context.Context, config *Config) error {
group, ctx := errgroup.WithContext(ctx)
daemon := &daemon{
config: config,
taskgroup: group,
tasks: make(map[LogID]task),
}
group.Go(func() error { return daemon.run(ctx) })
return group.Wait()
}