This commit is contained in:
priv-r8s 2026-03-21 22:16:48 +00:00 committed by GitHub
commit 98b2fbc51e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 358 additions and 36 deletions

View file

@ -146,6 +146,18 @@ func (e ScrapeType) MarshalGQL(w io.Writer) {
fmt.Fprint(w, strconv.Quote(e.String()))
}
// HTTPError represents an HTTP response error with its status code.
// Callers can inspect the StatusCode to decide whether to skip an item
// and continue with the rest of a batch (e.g. 404, 500) or whether the
// error was a rate-limit that exhausted all retries (429).
type HTTPError struct {
StatusCode int
}
func (e *HTTPError) Error() string {
return fmt.Sprintf("http error %d: %s", e.StatusCode, http.StatusText(e.StatusCode))
}
var (
// ErrMaxRedirects is returned if the max number of HTTP redirects are reached.
ErrMaxRedirects = errors.New("maximum number of HTTP redirects reached")

View file

@ -10,6 +10,7 @@ import (
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
@ -25,18 +26,27 @@ import (
const scrapeDefaultSleep = time.Second * 2
const (
// maxRateLimitRetries is the maximum number of retries when receiving HTTP 429 responses.
maxRateLimitRetries = 5
// rateLimitBaseDelay is the initial backoff delay for 429 retries.
rateLimitBaseDelay = time.Second * 2
// rateLimitMaxDelay caps the exponential backoff to prevent excessively long waits.
rateLimitMaxDelay = time.Minute
// rateLimitTotalTimeout bounds the total wall-clock time for a single loadURL call
// including all retry delays, so that rate-limit retries don't run indefinitely.
rateLimitTotalTimeout = 5 * time.Minute
)
func loadURL(ctx context.Context, loadURL string, client *http.Client, def Definition, globalConfig GlobalConfig) (io.Reader, error) {
driverOptions := def.DriverOptions
if driverOptions != nil && driverOptions.UseCDP {
// get the page using chrome dp
return urlFromCDP(ctx, loadURL, *driverOptions, globalConfig)
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, loadURL, nil)
if err != nil {
return nil, err
}
jar, err := def.jar()
if err != nil {
return nil, fmt.Errorf("error creating cookie jar: %w", err)
@ -47,44 +57,136 @@ func loadURL(ctx context.Context, loadURL string, client *http.Client, def Defin
return nil, fmt.Errorf("error parsing url %s: %w", loadURL, err)
}
// Fetch relevant cookies from the jar for url u and add them to the request
cookies := jar.Cookies(u)
for _, cookie := range cookies {
req.AddCookie(cookie)
}
userAgent := globalConfig.GetScraperUserAgent()
if userAgent != "" {
req.Header.Set("User-Agent", userAgent)
// Apply an overall deadline so retry delays don't run indefinitely.
ctx, cancel := context.WithTimeout(ctx, rateLimitTotalTimeout)
defer cancel()
for attempt := 0; ; attempt++ {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, loadURL, nil)
if err != nil {
return nil, err
}
// Fetch relevant cookies from the jar for url u and add them to the request
cookies := jar.Cookies(u)
for _, cookie := range cookies {
req.AddCookie(cookie)
}
if userAgent != "" {
req.Header.Set("User-Agent", userAgent)
}
if driverOptions != nil { // setting the Headers after the UA allows us to override it from inside the scraper
for _, h := range driverOptions.Headers {
if h.Key != "" {
req.Header.Set(h.Key, h.Value)
logger.Debugf("[scraper] adding header <%s:%s>", h.Key, h.Value)
}
}
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusTooManyRequests {
resp.Body.Close()
// attempt counts from 0: attempt 0 is the initial request,
// attempts 1..maxRateLimitRetries are retries.
if attempt >= maxRateLimitRetries {
logger.Warnf("[scraper] rate limited on %s, all %d retries exhausted", loadURL, maxRateLimitRetries)
return nil, &HTTPError{StatusCode: resp.StatusCode}
}
delay := rateLimitBackoff(resp, attempt)
if delay < 0 {
logger.Warnf("[scraper] rate limited on %s, server requested wait exceeds maximum", loadURL)
return nil, &HTTPError{StatusCode: resp.StatusCode}
}
logger.Infof("[scraper] rate limited on %s (retry %d/%d), waiting %v", loadURL, attempt+1, maxRateLimitRetries, delay)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
continue
}
}
if resp.StatusCode >= 400 {
resp.Body.Close()
return nil, &HTTPError{StatusCode: resp.StatusCode}
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
bodyReader := bytes.NewReader(body)
printCookies(jar, def, "Jar cookies found for scraper urls")
return charset.NewReader(bodyReader, resp.Header.Get("Content-Type"))
}
}
// rateLimitBackoff calculates the delay before retrying a rate-limited request.
// The delay is the sum of the parsed Retry-After value (defaulting to
// rateLimitBaseDelay when absent) and an exponential backoff (2s, 4s, 8s, ...,
// capped at rateLimitMaxDelay). Returns -1 if the server's Retry-After exceeds
// rateLimitMaxDelay, signalling that the caller should stop retrying.
func rateLimitBackoff(resp *http.Response, attempt int) time.Duration {
retryAfter := parseRetryAfter(resp)
// If the server asks us to wait longer than our max, give up immediately.
if retryAfter > rateLimitMaxDelay {
return -1
}
if driverOptions != nil { // setting the Headers after the UA allows us to override it from inside the scraper
for _, h := range driverOptions.Headers {
if h.Key != "" {
req.Header.Set(h.Key, h.Value)
logger.Debugf("[scraper] adding header <%s:%s>", h.Key, h.Value)
}
// Exponential backoff: 2s, 4s, 8s, 16s, 32s, ...
// Guard against int64 overflow for large attempt values.
if attempt >= 30 {
return rateLimitMaxDelay
}
backoff := rateLimitBaseDelay << uint(attempt)
return clampDelay(retryAfter + backoff)
}
// parseRetryAfter extracts a duration from the Retry-After header.
// Returns rateLimitBaseDelay if the header is absent or unparseable.
func parseRetryAfter(resp *http.Response) time.Duration {
retryAfter := resp.Header.Get("Retry-After")
if retryAfter == "" {
return rateLimitBaseDelay
}
// Try parsing as seconds
if seconds, err := strconv.Atoi(retryAfter); err == nil && seconds >= 0 {
return time.Duration(seconds) * time.Second
}
// Try parsing as HTTP-date
if t, err := http.ParseTime(retryAfter); err == nil {
if d := time.Until(t); d > 0 {
return d
}
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("http error %d:%s", resp.StatusCode, http.StatusText(resp.StatusCode))
}
return rateLimitBaseDelay
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
// clampDelay caps a duration to rateLimitMaxDelay.
func clampDelay(d time.Duration) time.Duration {
if d > rateLimitMaxDelay {
return rateLimitMaxDelay
}
bodyReader := bytes.NewReader(body)
printCookies(jar, def, "Jar cookies found for scraper urls")
return charset.NewReader(bodyReader, resp.Header.Get("Content-Type"))
return d
}
// func urlFromCDP uses chrome cdp and DOM to load and process the url

208
pkg/scraper/url_test.go Normal file
View file

@ -0,0 +1,208 @@
package scraper
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
)
func TestRateLimitBackoff_ExponentialWithoutHeader(t *testing.T) {
// Without Retry-After, parseRetryAfter returns rateLimitBaseDelay (2s).
// delay = retryAfter(2s) + (2s << attempt)
tests := []struct {
attempt int
expected time.Duration
}{
{0, 4 * time.Second}, // 2s + (2s << 0) = 4s
{1, 6 * time.Second}, // 2s + (2s << 1) = 6s
{2, 10 * time.Second}, // 2s + (2s << 2) = 10s
{3, 18 * time.Second}, // 2s + (2s << 3) = 18s
{4, 34 * time.Second}, // 2s + (2s << 4) = 34s
{5, time.Minute}, // 2s + (2s << 5) = 66s, clamped to 60s
{30, time.Minute}, // overflow guard
}
for _, tc := range tests {
t.Run(fmt.Sprintf("attempt_%d", tc.attempt), func(t *testing.T) {
resp := &http.Response{Header: http.Header{}}
got := rateLimitBackoff(resp, tc.attempt)
if got != tc.expected {
t.Errorf("attempt %d: got %v, want %v", tc.attempt, got, tc.expected)
}
})
}
}
func TestRateLimitBackoff_RetryAfterPlusBackoff(t *testing.T) {
// With Retry-After: 10, delay = 10s + (2s << attempt)
tests := []struct {
attempt int
expected time.Duration
}{
{0, 12 * time.Second}, // 10s + (2s << 0) = 12s
{1, 14 * time.Second}, // 10s + (2s << 1) = 14s
{2, 18 * time.Second}, // 10s + (2s << 2) = 18s
{3, 26 * time.Second}, // 10s + (2s << 3) = 26s
{4, 42 * time.Second}, // 10s + (2s << 4) = 42s
{5, time.Minute}, // 10s + (2s << 5) = 74s, clamped to 60s
}
for _, tc := range tests {
t.Run(fmt.Sprintf("attempt_%d", tc.attempt), func(t *testing.T) {
resp := &http.Response{Header: http.Header{}}
resp.Header.Set("Retry-After", "10")
got := rateLimitBackoff(resp, tc.attempt)
if got != tc.expected {
t.Errorf("attempt %d: got %v, want %v", tc.attempt, got, tc.expected)
}
})
}
}
func TestRateLimitBackoff_RetryAfterDate(t *testing.T) {
future := time.Now().Add(5 * time.Second)
resp := &http.Response{Header: http.Header{}}
resp.Header.Set("Retry-After", future.UTC().Format(http.TimeFormat))
got := rateLimitBackoff(resp, 0)
// ~5s (Retry-After) + 2s (backoff attempt 0) = ~7s
if got < 6*time.Second || got > 8*time.Second {
t.Errorf("got %v, want ~7s", got)
}
}
func TestRateLimitBackoff_RetryAfterTooLong(t *testing.T) {
resp := &http.Response{Header: http.Header{}}
resp.Header.Set("Retry-After", "300") // 5 minutes, exceeds rateLimitMaxDelay
got := rateLimitBackoff(resp, 0)
if got != -1 {
t.Errorf("got %v, want -1 (give up)", got)
}
}
func TestClampDelay(t *testing.T) {
if got := clampDelay(30 * time.Second); got != 30*time.Second {
t.Errorf("got %v, want 30s", got)
}
if got := clampDelay(2 * time.Minute); got != rateLimitMaxDelay {
t.Errorf("got %v, want %v", got, rateLimitMaxDelay)
}
}
func TestLoadURL_429RetriesAndSucceeds(t *testing.T) {
var attempts atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
n := attempts.Add(1)
if n <= 2 {
w.Header().Set("Retry-After", "0")
w.WriteHeader(http.StatusTooManyRequests)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprint(w, "<html><body>OK</body></html>")
}))
defer srv.Close()
ctx := context.Background()
def := Definition{}
gc := &testGlobalConfig{}
reader, err := loadURL(ctx, srv.URL, srv.Client(), def, gc)
if err != nil {
t.Fatalf("expected success after retries, got: %v", err)
}
if reader == nil {
t.Fatal("expected non-nil reader")
}
if got := attempts.Load(); got != 3 {
t.Errorf("expected 3 attempts, got %d", got)
}
}
func TestLoadURL_429ExhaustsRetries(t *testing.T) {
var attempts atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts.Add(1)
w.Header().Set("Retry-After", "0")
w.WriteHeader(http.StatusTooManyRequests)
}))
defer srv.Close()
// Use a tight context so exponential backoff doesn't make the test slow.
// With Retry-After: 0, delays are 2s, 4s, 8s, 16s (cumulative 30s),
// so 15s allows about 3 retries before the context deadline fires.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
def := Definition{}
gc := &testGlobalConfig{}
_, err := loadURL(ctx, srv.URL, srv.Client(), def, gc)
if err == nil {
t.Fatal("expected error after exhausting retries")
}
}
func TestLoadURL_429RetryAfterTooLong(t *testing.T) {
var attempts atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts.Add(1)
w.Header().Set("Retry-After", "300") // 5 minutes, exceeds max
w.WriteHeader(http.StatusTooManyRequests)
}))
defer srv.Close()
ctx := context.Background()
def := Definition{}
gc := &testGlobalConfig{}
_, err := loadURL(ctx, srv.URL, srv.Client(), def, gc)
if err == nil {
t.Fatal("expected error when Retry-After exceeds max")
}
httpErr, ok := err.(*HTTPError)
if !ok {
t.Fatalf("expected *HTTPError, got %T: %v", err, err)
}
if httpErr.StatusCode != 429 {
t.Errorf("expected status 429, got %d", httpErr.StatusCode)
}
// Should give up immediately without retrying
if got := attempts.Load(); got != 1 {
t.Errorf("expected 1 attempt (no retries), got %d", got)
}
}
func TestLoadURL_ContextCancellation(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusTooManyRequests)
}))
defer srv.Close()
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
def := Definition{}
gc := &testGlobalConfig{}
_, err := loadURL(ctx, srv.URL, srv.Client(), def, gc)
if err == nil {
t.Fatal("expected error from context cancellation")
}
}
// testGlobalConfig implements GlobalConfig for testing.
type testGlobalConfig struct{}
func (c *testGlobalConfig) GetScraperUserAgent() string { return "" }
func (c *testGlobalConfig) GetScrapersPath() string { return "" }
func (c *testGlobalConfig) GetScraperCDPPath() string { return "" }
func (c *testGlobalConfig) GetScraperCertCheck() bool { return true }
func (c *testGlobalConfig) GetProxy() string { return "" }
func (c *testGlobalConfig) GetPythonPath() string { return "" }
func (c *testGlobalConfig) GetScraperExcludeTagPatterns() []string { return nil }