From a418a3686d527be1c575ebc30626cf78f46233d8 Mon Sep 17 00:00:00 2001 From: Andrew Ayer Date: Thu, 4 Feb 2016 18:45:37 -0800 Subject: [PATCH] Initial commit --- cmd/.gitignore | 3 + cmd/common.go | 83 ++++++++++ cmd/ctwatch/main.go | 45 +++++ cmd/sha1watch/main.go | 41 +++++ helpers.go | 267 ++++++++++++++++++++++++++++++ scanner.go | 370 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 809 insertions(+) create mode 100644 cmd/.gitignore create mode 100644 cmd/common.go create mode 100644 cmd/ctwatch/main.go create mode 100644 cmd/sha1watch/main.go create mode 100644 helpers.go create mode 100644 scanner.go diff --git a/cmd/.gitignore b/cmd/.gitignore new file mode 100644 index 0000000..110a9fc --- /dev/null +++ b/cmd/.gitignore @@ -0,0 +1,3 @@ +*/* +!.gitignore +!*.go diff --git a/cmd/common.go b/cmd/common.go new file mode 100644 index 0000000..0179221 --- /dev/null +++ b/cmd/common.go @@ -0,0 +1,83 @@ +package cmd + +import ( + "flag" + "fmt" + "log" + "os" + "sync" + + "src.agwa.name/ctwatch" + "github.com/google/certificate-transparency/go" + "github.com/google/certificate-transparency/go/client" +) + +var batchSize = flag.Int("batch_size", 1000, "Max number of entries to request at per call to get-entries") +var numWorkers = flag.Int("num_workers", 2, "Number of concurrent matchers") +var parallelFetch = flag.Int("parallel_fetch", 2, "Number of concurrent GetEntries fetches") +var script = flag.String("script", "", "Script to execute when a matching certificate is found") +var repo = flag.String("repo", "", "Directory of scanned certificates") +var verbose = flag.Bool("verbose", false, "Be verbose") + +var printMutex sync.Mutex + +func logCallback (entry *ct.LogEntry) { + if *repo != "" { + alreadyPresent, err := ctwatch.WriteCertRepository(*repo, entry) + if err != nil { + log.Print(err) + } + if alreadyPresent { + return + } + } + + if *script != "" { + if err := ctwatch.InvokeHookScript(*script, entry); err != nil { + log.Print(err) + } + } else { + printMutex.Lock() + ctwatch.DumpLogEntry(os.Stdout, entry) + fmt.Fprintf(os.Stdout, "\n") + printMutex.Unlock() + } +} + +func Main(logUri string, stateFile string, matcher ctwatch.Matcher) { + startIndex, err := ctwatch.ReadStateFile(stateFile) + if err != nil { + fmt.Fprintf(os.Stderr, "%s: Error reading state file: %s: %s\n", os.Args[0], stateFile, err) + os.Exit(3) + } + + os.Setenv("LOG_URI", logUri) + + logClient := client.New(logUri) + opts := ctwatch.ScannerOptions{ + Matcher: matcher, + BatchSize: *batchSize, + NumWorkers: *numWorkers, + ParallelFetch: *parallelFetch, + Quiet: !*verbose, + } + scanner := ctwatch.NewScanner(logClient, opts) + + endIndex, err := scanner.TreeSize() + if err != nil { + fmt.Fprintf(os.Stderr, "%s: Error contacting log: %s: %s\n", os.Args[0], logUri, err) + os.Exit(1) + } + + if startIndex != -1 { + if err := scanner.Scan(startIndex, endIndex, logCallback); err != nil { + fmt.Fprintf(os.Stderr, "%s: Error scanning log: %s: %s\n", os.Args[0], logUri, err) + os.Exit(1) + } + } + + if err := ctwatch.WriteStateFile(stateFile, endIndex); err != nil { + fmt.Fprintf(os.Stderr, "%s: Error writing state file: %s: %s\n", os.Args[0], stateFile, err) + os.Exit(3) + } +} diff --git a/cmd/ctwatch/main.go b/cmd/ctwatch/main.go new file mode 100644 index 0000000..7b7d82a --- /dev/null +++ b/cmd/ctwatch/main.go @@ -0,0 +1,45 @@ +package main + +import ( + "flag" + "fmt" + "os" + "bufio" + + "src.agwa.name/ctwatch" + "src.agwa.name/ctwatch/cmd" +) + +func main() { + flag.Parse() + if flag.NArg() < 2 { + fmt.Fprintf(os.Stderr, "Usage: %s [flags] log_uri state_file [domain ...]\n", os.Args[0]) + os.Exit(2) + } + + logUri := flag.Arg(0) + stateFile := flag.Arg(1) + + var domains []string + if flag.NArg() == 3 && flag.Arg(2) == "-" { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + domains = append(domains, scanner.Text()) + } + if err := scanner.Err(); err != nil { + fmt.Fprintf(os.Stderr, "Error reading standard input: %s\n", err) + os.Exit(1) + } + } else { + domains = flag.Args()[2:] + } + + var matcher ctwatch.Matcher + if len(domains) == 0 { + matcher = ctwatch.MatchAll{} + } else { + matcher = ctwatch.NewDomainMatcher(domains) + } + + cmd.Main(logUri, stateFile, matcher) +} diff --git a/cmd/sha1watch/main.go b/cmd/sha1watch/main.go new file mode 100644 index 0000000..d937b13 --- /dev/null +++ b/cmd/sha1watch/main.go @@ -0,0 +1,41 @@ +package main + +import ( + "flag" + "fmt" + "os" + "time" + + "github.com/google/certificate-transparency/go" + "github.com/google/certificate-transparency/go/x509" + + "src.agwa.name/ctwatch/cmd" +) + +type sha1Matcher struct { } + +func (m sha1Matcher) CertificateMatches(c *x509.Certificate) bool { + return c.NotBefore.After(time.Date(2016, time.January, 1, 0, 0, 0, 0, time.UTC)) && + (c.SignatureAlgorithm == x509.SHA1WithRSA || + c.SignatureAlgorithm == x509.MD5WithRSA || + c.SignatureAlgorithm == x509.MD2WithRSA || + c.SignatureAlgorithm == x509.DSAWithSHA1 || + c.SignatureAlgorithm == x509.ECDSAWithSHA1) +} + +func (m sha1Matcher) PrecertificateMatches(pc *ct.Precertificate) bool { + return m.CertificateMatches(&pc.TBSCertificate) +} + +func main() { + flag.Parse() + if flag.NArg() != 2 { + fmt.Fprintf(os.Stderr, "Usage: %s [flags] log_uri state_file\n", os.Args[0]) + os.Exit(2) + } + + logUri := flag.Arg(0) + stateFile := flag.Arg(1) + + cmd.Main(logUri, stateFile, &sha1Matcher{}) +} diff --git a/helpers.go b/helpers.go new file mode 100644 index 0000000..a528841 --- /dev/null +++ b/helpers.go @@ -0,0 +1,267 @@ +package ctwatch + +import ( + "fmt" + "log" + "time" + "os" + "os/exec" + "bytes" + "io" + "io/ioutil" + "math/big" + "path/filepath" + "strconv" + "strings" + "crypto/sha256" + "encoding/hex" + "encoding/pem" + + "github.com/google/certificate-transparency/go" + "github.com/google/certificate-transparency/go/x509" + "github.com/google/certificate-transparency/go/x509/pkix" +) + +func ReadStateFile (path string) (int64, error) { + content, err := ioutil.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return -1, nil + } + return -1, err + } + + startIndex, err := strconv.ParseInt(strings.TrimSpace(string(content)), 10, 64) + if err != nil { + return -1, err + } + + return startIndex, nil +} + +func WriteStateFile (path string, endIndex int64) error { + return ioutil.WriteFile(path, []byte(strconv.FormatInt(endIndex, 10) + "\n"), 0666) +} + +func appendDnArray (buf *bytes.Buffer, code string, values []string) { + for _, value := range values { + if buf.Len() != 0 { + buf.WriteString(", ") + } + buf.WriteString(code) + buf.WriteString("=") + buf.WriteString(value) + } +} + +func appendDnValue (buf *bytes.Buffer, code string, value string) { + if value != "" { + appendDnArray(buf, code, []string{value}) + } +} + +func formatDN (name pkix.Name) (string) { + // C=US, ST=UT, L=Salt Lake City, O=The USERTRUST Network, OU=http://www.usertrust.com, CN=UTN-USERFirst-Hardware + var buf bytes.Buffer + appendDnArray(&buf, "C", name.Country) + appendDnArray(&buf, "ST", name.Province) + appendDnArray(&buf, "L", name.Locality) + appendDnArray(&buf, "O", name.Organization) + appendDnArray(&buf, "OU", name.OrganizationalUnit) + appendDnValue(&buf, "CN", name.CommonName) + return buf.String() +} + +func allDNSNames (cert *x509.Certificate) []string { + dnsNames := []string{} + + if cert.Subject.CommonName != "" { + dnsNames = append(dnsNames, cert.Subject.CommonName) + } + + for _, dnsName := range cert.DNSNames { + if dnsName != cert.Subject.CommonName { + dnsNames = append(dnsNames, dnsName) + } + } + + return dnsNames +} + +func getRoot (chain []ct.ASN1Cert) *x509.Certificate { + if len(chain) > 0 { + root, err := x509.ParseCertificate(chain[len(chain)-1]) + if err == nil { + return root + } + log.Printf("Failed to parse root certificate: %s", err) + } + return nil +} + +func getSubjectOrganization (cert *x509.Certificate) string { + if cert != nil && len(cert.Subject.Organization) > 0 { + return cert.Subject.Organization[0] + } + return "" +} + +func formatSerial (serial *big.Int) string { + if serial != nil { + return fmt.Sprintf("%x", serial) + } else { + return "" + } +} + +func sha256hex (data []byte) string { + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) +} + +func getRaw (entry *ct.LogEntry) []byte { + if entry.Precert != nil { + return entry.Precert.Raw + } else if entry.X509Cert != nil { + return entry.X509Cert.Raw + } else { + panic("getRaw: entry is neither precert nor x509") + } +} + +type certInfo struct { + IsPrecert bool + RootOrg string + SubjectDn string + IssuerDn string + DnsNames []string + Serial string + PubkeyHash string + Fingerprint string + NotBefore time.Time + NotAfter time.Time +} + +func makeCertInfo (entry *ct.LogEntry) certInfo { + var isPrecert bool + var cert *x509.Certificate + + if entry.Precert != nil { + isPrecert = true + cert = &entry.Precert.TBSCertificate + } else if entry.X509Cert != nil { + isPrecert = false + cert = entry.X509Cert + } else { + panic("makeCertInfo: entry is neither precert nor x509") + } + return certInfo { + IsPrecert: isPrecert, + RootOrg: getSubjectOrganization(getRoot(entry.Chain)), + SubjectDn: formatDN(cert.Subject), + IssuerDn: formatDN(cert.Issuer), + DnsNames: allDNSNames(cert), + Serial: formatSerial(cert.SerialNumber), + PubkeyHash: sha256hex(cert.RawSubjectPublicKeyInfo), + Fingerprint: sha256hex(getRaw(entry)), + NotBefore: cert.NotBefore, + NotAfter: cert.NotAfter, + } +} + +func (info *certInfo) TypeString () string { + if info.IsPrecert { + return "precert" + } else { + return "cert" + } +} + +func (info *certInfo) TypeFriendlyString () string { + if info.IsPrecert { + return "Pre-certificate" + } else { + return "Certificate" + } +} + +func DumpLogEntry (out io.Writer, entry *ct.LogEntry) { + info := makeCertInfo(entry) + + fmt.Fprintf(out, "%d:\n", entry.Index) + fmt.Fprintf(out, "\t Type = %s\n", info.TypeFriendlyString()) + fmt.Fprintf(out, "\t DNS Names = %v\n", info.DnsNames) + fmt.Fprintf(out, "\t Pubkey = %s\n", info.PubkeyHash) + fmt.Fprintf(out, "\t Fingerprint = %s\n", info.Fingerprint) + fmt.Fprintf(out, "\t Subject = %s\n", info.SubjectDn) + fmt.Fprintf(out, "\t Issuer = %s\n", info.IssuerDn) + fmt.Fprintf(out, "\tRoot Operator = %s\n", info.RootOrg) + fmt.Fprintf(out, "\t Serial = %s\n", info.Serial) + fmt.Fprintf(out, "\t Not Before = %s\n", info.NotBefore) + fmt.Fprintf(out, "\t Not After = %s\n", info.NotAfter) +} + +func InvokeHookScript (command string, entry *ct.LogEntry) error { + info := makeCertInfo(entry) + + cmd := exec.Command(command) + cmd.Env = append(os.Environ(), + "LOG_INDEX=" + strconv.FormatInt(entry.Index, 10), + "CERT_TYPE=" + info.TypeString(), + "SUBJECT_DN=" + info.SubjectDn, + "ISSUER_DN=" + info.IssuerDn, + "DNS_NAMES=" + strings.Join(info.DnsNames, ","), + "SERIAL=" + info.Serial, + "PUBKEY_HASH=" + info.PubkeyHash, + "FINGERPRINT=" + info.Fingerprint, + "NOT_BEFORE=" + strconv.FormatInt(info.NotBefore.Unix(), 10), + "NOT_AFTER=" + strconv.FormatInt(info.NotAfter.Unix(), 10)) + stderrBuffer := bytes.Buffer{} + cmd.Stderr = &stderrBuffer + if err := cmd.Run(); err != nil { + if _, isExitError := err.(*exec.ExitError); isExitError { + fmt.Errorf("Script failed: %s: %s", command, strings.TrimSpace(stderrBuffer.String())) + } else { + fmt.Errorf("Failed to execute script: %s: %s", command, err) + } + } + return nil +} + +func WriteCertRepository (repoPath string, entry *ct.LogEntry) (bool, error) { + fingerprint := sha256hex(getRaw(entry)) + prefixPath := filepath.Join(repoPath, fingerprint[0:2]) + var filenameSuffix string + if entry.Precert != nil { + filenameSuffix = ".precert.pem" + } else if entry.X509Cert != nil { + filenameSuffix = ".cert.pem" + } + if err := os.Mkdir(prefixPath, 0777); err != nil && !os.IsExist(err) { + return false, fmt.Errorf("Failed to create prefix directory %s: %s", prefixPath, err) + } + path := filepath.Join(prefixPath, fingerprint + filenameSuffix) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666) + if err != nil { + if os.IsExist(err) { + return true, nil + } else { + return false, fmt.Errorf("Failed to open %s for writing: %s", path, err) + } + } + if err := pem.Encode(file, &pem.Block{Type: "CERTIFICATE", Bytes: getRaw(entry)}); err != nil { + file.Close() + return false, fmt.Errorf("Error writing to %s: %s", path, err) + } + for _, chainCert := range entry.Chain { + if err := pem.Encode(file, &pem.Block{Type: "CERTIFICATE", Bytes: chainCert}); err != nil { + file.Close() + return false, fmt.Errorf("Error writing to %s: %s", path, err) + } + } + if err := file.Close(); err != nil { + return false, fmt.Errorf("Error writing to %s: %s", path, err) + } + + return false, nil +} diff --git a/scanner.go b/scanner.go new file mode 100644 index 0000000..eb50e7a --- /dev/null +++ b/scanner.go @@ -0,0 +1,370 @@ +package ctwatch + +import ( + "container/list" + "fmt" + "log" + "sync" + "sync/atomic" + "time" + "strings" + + "github.com/google/certificate-transparency/go" + "github.com/google/certificate-transparency/go/client" + "github.com/google/certificate-transparency/go/x509" +) + +// Clients wishing to implement their own Matchers should implement this interface: +type Matcher interface { + // CertificateMatches is called by the scanner for each X509 Certificate found in the log. + // The implementation should return |true| if the passed Certificate is interesting, and |false| otherwise. + CertificateMatches(*x509.Certificate) bool + + // PrecertificateMatches is called by the scanner for each CT Precertificate found in the log. + // The implementation should return |true| if the passed Precertificate is interesting, and |false| otherwise. + PrecertificateMatches(*ct.Precertificate) bool +} + +// MatchAll is a Matcher which will match every possible Certificate and Precertificate. +type MatchAll struct{} + +func (m MatchAll) CertificateMatches(_ *x509.Certificate) bool { + return true +} + +func (m MatchAll) PrecertificateMatches(_ *ct.Precertificate) bool { + return true +} + +type DomainMatcher struct { + domains []string + domainSuffixes []string +} + +func NewDomainMatcher (domains []string) DomainMatcher { + m := DomainMatcher{} + for _, domain := range domains { + m.domains = append(m.domains, strings.ToLower(domain)) + m.domainSuffixes = append(m.domainSuffixes, "." + strings.ToLower(domain)) + } + return m +} + +func (m DomainMatcher) dnsNameMatches (dnsName string) bool { + dnsNameLower := strings.ToLower(dnsName) + for _, domain := range m.domains { + if dnsNameLower == domain { + return true + } + } + for _, domainSuffix := range m.domainSuffixes { + if strings.HasSuffix(dnsNameLower, domainSuffix) { + return true + } + } + return false +} + +func (m DomainMatcher) CertificateMatches(c *x509.Certificate) bool { + if m.dnsNameMatches(c.Subject.CommonName) { + return true + } + for _, dnsName := range c.DNSNames { + if m.dnsNameMatches(dnsName) { + return true + } + } + return false +} + +func (m DomainMatcher) PrecertificateMatches(pc *ct.Precertificate) bool { + return m.CertificateMatches(&pc.TBSCertificate) +} + + +// ScannerOptions holds configuration options for the Scanner +type ScannerOptions struct { + // Custom matcher for x509 Certificates, functor will be called for each + // Certificate found during scanning. + Matcher Matcher + + // Number of entries to request in one batch from the Log + BatchSize int + + // Number of concurrent matchers to run + NumWorkers int + + // Number of concurrent fethers to run + ParallelFetch int + + // Don't print any status messages to stdout + Quiet bool +} + +// Creates a new ScannerOptions struct with sensible defaults +func DefaultScannerOptions() *ScannerOptions { + return &ScannerOptions{ + Matcher: &MatchAll{}, + BatchSize: 1000, + NumWorkers: 1, + ParallelFetch: 1, + Quiet: false, + } +} + +// Scanner is a tool to scan all the entries in a CT Log. +type Scanner struct { + // Client used to talk to the CT log instance + logClient *client.LogClient + + // Configuration options for this Scanner instance + opts ScannerOptions + + // Size of tree at end of scan + latestTreeSize int64 + + // Stats + certsProcessed int64 + unparsableEntries int64 + entriesWithNonFatalErrors int64 +} + +// matcherJob represents the context for an individual matcher job. +type matcherJob struct { + // The log entry returned by the log server + entry ct.LogEntry + // The index of the entry containing the LeafInput in the log + index int64 +} + +// fetchRange represents a range of certs to fetch from a CT log +type fetchRange struct { + start int64 + end int64 +} + +// Takes the error returned by either x509.ParseCertificate() or +// x509.ParseTBSCertificate() and determines if it's non-fatal or otherwise. +// In the case of non-fatal errors, the error will be logged, +// entriesWithNonFatalErrors will be incremented, and the return value will be +// nil. +// Fatal errors will be logged, unparsableEntires will be incremented, and the +// fatal error itself will be returned. +// When |err| is nil, this method does nothing. +func (s *Scanner) handleParseEntryError(err error, entryType ct.LogEntryType, index int64) error { + if err == nil { + // No error to handle + return nil + } + switch err.(type) { + case x509.NonFatalErrors: + s.entriesWithNonFatalErrors++ + // We'll make a note, but continue. + s.Warn(fmt.Sprintf("Non-fatal error in %+v at index %d: %s", entryType, index, err.Error())) + default: + s.unparsableEntries++ + s.Warn(fmt.Sprintf("Failed to parse in %+v at index %d : %s", entryType, index, err.Error())) + return err + } + return nil +} + +// Processes the given |entry| in the specified log. +func (s *Scanner) processEntry(entry ct.LogEntry, foundCert func(*ct.LogEntry)) { + atomic.AddInt64(&s.certsProcessed, 1) + switch entry.Leaf.TimestampedEntry.EntryType { + case ct.X509LogEntryType: + cert, err := x509.ParseCertificate(entry.Leaf.TimestampedEntry.X509Entry) + if err = s.handleParseEntryError(err, entry.Leaf.TimestampedEntry.EntryType, entry.Index); err != nil { + // We hit an unparseable entry, already logged inside handleParseEntryError() + return + } + if s.opts.Matcher.CertificateMatches(cert) { + entry.X509Cert = cert + foundCert(&entry) + } + case ct.PrecertLogEntryType: + c, err := x509.ParseTBSCertificate(entry.Leaf.TimestampedEntry.PrecertEntry.TBSCertificate) + if err = s.handleParseEntryError(err, entry.Leaf.TimestampedEntry.EntryType, entry.Index); err != nil { + // We hit an unparseable entry, already logged inside handleParseEntryError() + return + } + precert := &ct.Precertificate{ + Raw: entry.Chain[0], + TBSCertificate: *c, + IssuerKeyHash: entry.Leaf.TimestampedEntry.PrecertEntry.IssuerKeyHash, + } + if s.opts.Matcher.PrecertificateMatches(precert) { + entry.Precert = precert + foundCert(&entry) + } + } +} + +// Worker function to match certs. +// Accepts MatcherJobs over the |entries| channel, and processes them. +// Returns true over the |done| channel when the |entries| channel is closed. +func (s *Scanner) matcherJob(id int, entries <-chan matcherJob, foundCert func(*ct.LogEntry), wg *sync.WaitGroup) { + for e := range entries { + s.processEntry(e.entry, foundCert) + } + s.Log(fmt.Sprintf("Matcher %d finished", id)) + wg.Done() +} + +// Worker function for fetcher jobs. +// Accepts cert ranges to fetch over the |ranges| channel, and if the fetch is +// successful sends the individual LeafInputs out (as MatcherJobs) into the +// |entries| channel for the matchers to chew on. +// Will retry failed attempts to retrieve ranges indefinitely. +// Sends true over the |done| channel when the |ranges| channel is closed. +func (s *Scanner) fetcherJob(id int, ranges <-chan fetchRange, entries chan<- matcherJob, wg *sync.WaitGroup) { + for r := range ranges { + success := false + // TODO(alcutter): give up after a while: + for !success { + s.Log(fmt.Sprintf("Fetching entries %d to %d", r.start, r.end)) + logEntries, err := s.logClient.GetEntries(r.start, r.end) + if err != nil { + s.Warn(fmt.Sprintf("Problem fetching from log: %s", err.Error())) + continue + } + for _, logEntry := range logEntries { + logEntry.Index = r.start + entries <- matcherJob{logEntry, r.start} + r.start++ + } + if r.start > r.end { + // Only complete if we actually got all the leaves we were + // expecting -- Logs MAY return fewer than the number of + // leaves requested. + success = true + } + } + } + s.Log(fmt.Sprintf("Fetcher %d finished", id)) + wg.Done() +} + +// Returns the smaller of |a| and |b| +func min(a int64, b int64) int64 { + if a < b { + return a + } else { + return b + } +} + +// Returns the larger of |a| and |b| +func max(a int64, b int64) int64 { + if a > b { + return a + } else { + return b + } +} + +// Pretty prints the passed in number of |seconds| into a more human readable +// string. +func humanTime(seconds int) string { + nanos := time.Duration(seconds) * time.Second + hours := int(nanos / (time.Hour)) + nanos %= time.Hour + minutes := int(nanos / time.Minute) + nanos %= time.Minute + seconds = int(nanos / time.Second) + s := "" + if hours > 0 { + s += fmt.Sprintf("%d hours ", hours) + } + if minutes > 0 { + s += fmt.Sprintf("%d minutes ", minutes) + } + if seconds > 0 { + s += fmt.Sprintf("%d seconds ", seconds) + } + return s +} + +func (s Scanner) Log(msg string) { + if !s.opts.Quiet { + log.Print(msg) + } +} + +func (s Scanner) Warn(msg string) { + log.Print(msg) +} + +func (s *Scanner) TreeSize() (int64, error) { + latestSth, err := s.logClient.GetSTH() + if err != nil { + return 0, err + } + return int64(latestSth.TreeSize), nil +} + +func (s *Scanner) Scan(startIndex int64, endIndex int64, foundCert func(*ct.LogEntry)) error { + s.Log("Starting up...\n") + + s.certsProcessed = 0 + s.unparsableEntries = 0 + s.entriesWithNonFatalErrors = 0 + ticker := time.NewTicker(time.Second) + startTime := time.Now() + fetches := make(chan fetchRange, 1000) + jobs := make(chan matcherJob, 100000) + go func() { + for range ticker.C { + throughput := float64(s.certsProcessed) / time.Since(startTime).Seconds() + remainingCerts := int64(endIndex) - int64(startIndex) - s.certsProcessed + remainingSeconds := int(float64(remainingCerts) / throughput) + remainingString := humanTime(remainingSeconds) + s.Log(fmt.Sprintf("Processed: %d certs (to index %d). Throughput: %3.2f ETA: %s\n", s.certsProcessed, + startIndex+int64(s.certsProcessed), throughput, remainingString)) + } + }() + + var ranges list.List + for start := startIndex; start < int64(endIndex); { + end := min(start+int64(s.opts.BatchSize), int64(endIndex)) - 1 + ranges.PushBack(fetchRange{start, end}) + start = end + 1 + } + var fetcherWG sync.WaitGroup + var matcherWG sync.WaitGroup + // Start matcher workers + for w := 0; w < s.opts.NumWorkers; w++ { + matcherWG.Add(1) + go s.matcherJob(w, jobs, foundCert, &matcherWG) + } + // Start fetcher workers + for w := 0; w < s.opts.ParallelFetch; w++ { + fetcherWG.Add(1) + go s.fetcherJob(w, fetches, jobs, &fetcherWG) + } + for r := ranges.Front(); r != nil; r = r.Next() { + fetches <- r.Value.(fetchRange) + } + close(fetches) + fetcherWG.Wait() + close(jobs) + matcherWG.Wait() + s.Log(fmt.Sprintf("Completed %d certs in %s", s.certsProcessed, humanTime(int(time.Since(startTime).Seconds())))) + s.Log(fmt.Sprintf("%d unparsable entries, %d non-fatal errors", s.unparsableEntries, s.entriesWithNonFatalErrors)) + + return nil +} + +// Creates a new Scanner instance using |client| to talk to the log, and taking +// configuration options from |opts|. +func NewScanner(client *client.LogClient, opts ScannerOptions) *Scanner { + var scanner Scanner + scanner.logClient = client + // Set a default match-everything regex if none was provided: + if opts.Matcher == nil { + opts.Matcher = &MatchAll{} + } + scanner.opts = opts + return &scanner +}