Add backoff during fetch errors

This commit is contained in:
Andrew Ayer 2016-02-22 14:11:47 -08:00
parent df6527b165
commit 94ccbc0a4f
1 changed files with 26 additions and 5 deletions

View File

@ -16,6 +16,11 @@ import (
type ProcessCallback func(*Scanner, *ct.LogEntry) type ProcessCallback func(*Scanner, *ct.LogEntry)
const (
FETCH_RETRIES = 10
FETCH_RETRY_WAIT = 1
)
// ScannerOptions holds configuration options for the Scanner // ScannerOptions holds configuration options for the Scanner
type ScannerOptions struct { type ScannerOptions struct {
// Number of entries to request in one batch from the Log // Number of entries to request in one batch from the Log
@ -77,16 +82,27 @@ func (s *Scanner) processerJob(id int, entries <-chan ct.LogEntry, processCert P
wg.Done() wg.Done()
} }
func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *MerkleTreeBuilder) { func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *MerkleTreeBuilder) error {
success := false success := false
// TODO(alcutter): give up after a while: retries := FETCH_RETRIES
retryWait := FETCH_RETRY_WAIT
for !success { for !success {
s.Log(fmt.Sprintf("Fetching entries %d to %d", r.start, r.end)) s.Log(fmt.Sprintf("Fetching entries %d to %d", r.start, r.end))
logEntries, err := s.logClient.GetEntries(r.start, r.end) logEntries, err := s.logClient.GetEntries(r.start, r.end)
if err != nil { if err != nil {
s.Warn(fmt.Sprintf("Problem fetching from log: %s", err.Error())) if retries == 0 {
continue s.Warn(fmt.Sprintf("Problem fetching entries %d to %d from log: %s", r.start, r.end, err.Error()))
return err
} else {
s.Log(fmt.Sprintf("Problem fetching entries %d to %d from log (will retry): %s", r.start, r.end, err.Error()))
time.Sleep(time.Duration(retryWait) * time.Second)
retries--
retryWait *= 2
continue
}
} }
retries = FETCH_RETRIES
retryWait = FETCH_RETRY_WAIT
for _, logEntry := range logEntries { for _, logEntry := range logEntries {
if treeBuilder != nil { if treeBuilder != nil {
treeBuilder.Add(hashLeaf(logEntry.LeafBytes)) treeBuilder.Add(hashLeaf(logEntry.LeafBytes))
@ -102,6 +118,7 @@ func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *M
success = true success = true
} }
} }
return nil
} }
// Worker function for fetcher jobs. // Worker function for fetcher jobs.
@ -110,6 +127,7 @@ func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *M
// |entries| channel for the processors to chew on. // |entries| channel for the processors to chew on.
// Will retry failed attempts to retrieve ranges indefinitely. // Will retry failed attempts to retrieve ranges indefinitely.
// Sends true over the |done| channel when the |ranges| channel is closed. // Sends true over the |done| channel when the |ranges| channel is closed.
/* disabled becuase error handling is broken
func (s *Scanner) fetcherJob(id int, ranges <-chan fetchRange, entries chan<- ct.LogEntry, wg *sync.WaitGroup) { func (s *Scanner) fetcherJob(id int, ranges <-chan fetchRange, entries chan<- ct.LogEntry, wg *sync.WaitGroup) {
for r := range ranges { for r := range ranges {
s.fetch(r, entries, nil) s.fetch(r, entries, nil)
@ -117,6 +135,7 @@ func (s *Scanner) fetcherJob(id int, ranges <-chan fetchRange, entries chan<- ct
s.Log(fmt.Sprintf("Fetcher %d finished", id)) s.Log(fmt.Sprintf("Fetcher %d finished", id))
wg.Done() wg.Done()
} }
*/
// Returns the smaller of |a| and |b| // Returns the smaller of |a| and |b|
func min(a int64, b int64) int64 { func min(a int64, b int64) int64 {
@ -256,7 +275,9 @@ func (s *Scanner) Scan(startIndex int64, endIndex int64, processCert ProcessCall
*/ */
for start := startIndex; start < int64(endIndex); { for start := startIndex; start < int64(endIndex); {
end := min(start+int64(s.opts.BatchSize), int64(endIndex)) - 1 end := min(start+int64(s.opts.BatchSize), int64(endIndex)) - 1
s.fetch(fetchRange{start, end}, jobs, treeBuilder) if err := s.fetch(fetchRange{start, end}, jobs, treeBuilder); err != nil {
return err
}
start = end + 1 start = end + 1
} }
close(jobs) close(jobs)