From 94ccbc0a4f7f63d35a26e475cd28f6f4ee674b9b Mon Sep 17 00:00:00 2001 From: Andrew Ayer Date: Mon, 22 Feb 2016 14:11:47 -0800 Subject: [PATCH] Add backoff during fetch errors --- scanner.go | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/scanner.go b/scanner.go index 81abd5c..10ffbaf 100644 --- a/scanner.go +++ b/scanner.go @@ -16,6 +16,11 @@ import ( type ProcessCallback func(*Scanner, *ct.LogEntry) +const ( + FETCH_RETRIES = 10 + FETCH_RETRY_WAIT = 1 +) + // ScannerOptions holds configuration options for the Scanner type ScannerOptions struct { // Number of entries to request in one batch from the Log @@ -77,16 +82,27 @@ func (s *Scanner) processerJob(id int, entries <-chan ct.LogEntry, processCert P wg.Done() } -func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *MerkleTreeBuilder) { +func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *MerkleTreeBuilder) error { success := false - // TODO(alcutter): give up after a while: + retries := FETCH_RETRIES + retryWait := FETCH_RETRY_WAIT for !success { s.Log(fmt.Sprintf("Fetching entries %d to %d", r.start, r.end)) logEntries, err := s.logClient.GetEntries(r.start, r.end) if err != nil { - s.Warn(fmt.Sprintf("Problem fetching from log: %s", err.Error())) - continue + if retries == 0 { + s.Warn(fmt.Sprintf("Problem fetching entries %d to %d from log: %s", r.start, r.end, err.Error())) + return err + } else { + s.Log(fmt.Sprintf("Problem fetching entries %d to %d from log (will retry): %s", r.start, r.end, err.Error())) + time.Sleep(time.Duration(retryWait) * time.Second) + retries-- + retryWait *= 2 + continue + } } + retries = FETCH_RETRIES + retryWait = FETCH_RETRY_WAIT for _, logEntry := range logEntries { if treeBuilder != nil { treeBuilder.Add(hashLeaf(logEntry.LeafBytes)) @@ -102,6 +118,7 @@ func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *M success = true } } + return nil } // Worker function for fetcher jobs. @@ -110,6 +127,7 @@ func (s *Scanner) fetch(r fetchRange, entries chan<- ct.LogEntry, treeBuilder *M // |entries| channel for the processors to chew on. // Will retry failed attempts to retrieve ranges indefinitely. // Sends true over the |done| channel when the |ranges| channel is closed. +/* disabled becuase error handling is broken func (s *Scanner) fetcherJob(id int, ranges <-chan fetchRange, entries chan<- ct.LogEntry, wg *sync.WaitGroup) { for r := range ranges { s.fetch(r, entries, nil) @@ -117,6 +135,7 @@ func (s *Scanner) fetcherJob(id int, ranges <-chan fetchRange, entries chan<- ct s.Log(fmt.Sprintf("Fetcher %d finished", id)) wg.Done() } +*/ // Returns the smaller of |a| and |b| func min(a int64, b int64) int64 { @@ -256,7 +275,9 @@ func (s *Scanner) Scan(startIndex int64, endIndex int64, processCert ProcessCall */ for start := startIndex; start < int64(endIndex); { end := min(start+int64(s.opts.BatchSize), int64(endIndex)) - 1 - s.fetch(fetchRange{start, end}, jobs, treeBuilder) + if err := s.fetch(fetchRange{start, end}, jobs, treeBuilder); err != nil { + return err + } start = end + 1 } close(jobs)