This repository has been archived on 2022-05-25. You can view files and clone it, but cannot push or open issues or pull requests.
elon-eats-my-tweets/daemon/tweet_persister.go

106 lines
3.2 KiB
Go

package daemon
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/jackc/pgx/v4"
"go.uber.org/zap"
)
const formatDecimal = 10
const elonID = "44196397"
// TweetPersister fetches tweets from twitter and persists them in the database.
type TweetPersister struct {
store twitterapi.Store
apiClient twitterapi.Client
logger *zap.SugaredLogger
}
// NewTweetPersister creates a new NewTweetPersister.
func NewTweetPersister(store twitterapi.Store, apiClient twitterapi.Client, logger *zap.SugaredLogger) *TweetPersister {
return &TweetPersister{store: store, apiClient: apiClient, logger: logger}
}
// PersistTweets fetches tweets from Twitter and persists them in the database.
// In the case there are no tweets in the database, the single most recent
// tweet will be fetched. Otherwise, the maximum tweets returnable by the
// Twitter API will be fetched starting from the most recent known tweet. If
// there are more tweets available, multiple calls to this method will be
// required to fetch them all.
func (tu *TweetPersister) PersistTweets(ctx context.Context) (int, error) {
lastKnownTweet, err := tu.store.GetLastElonTweet(ctx)
if err != nil && err != pgx.ErrNoRows {
return 0, fmt.Errorf("error fetching last Elon tweet: %v", err)
}
// we have no Elon tweets at all, this is the first run. Grab the most recent
// tweet, and mark it as processed.
if err == pgx.ErrNoRows {
return tu.insertInitialTweet(ctx)
}
sinceID := strconv.FormatInt(lastKnownTweet.TwitterID, formatDecimal)
var n int
newTweets, err := tu.apiClient.GetTweets(elonID, sinceID)
if err != nil {
return n, fmt.Errorf("error fetching latest Elon tweets: %v", err)
}
for _, tweet := range newTweets {
if _, storeErr := tu.insertTweet(ctx, tweet, false); storeErr != nil {
return n, fmt.Errorf("error inserting tweet: %v", storeErr)
}
n++
}
return n, nil
}
func (tu *TweetPersister) insertInitialTweet(ctx context.Context) (int, error) {
tweet, err := tu.apiClient.GetLastTweet(elonID)
if err == twitterapi.ErrNoTweets {
tu.logger.Warn("Twitter API returned empty success response, no tweets available.")
return 0, nil
} else if err != nil {
return 0, fmt.Errorf("error fetching initial tweet: %v", err)
}
if _, err = tu.insertTweet(ctx, tweet, true); err != nil {
return 0, fmt.Errorf("error inserting initial tweet: %v", err)
}
return 1, nil
}
func (tu *TweetPersister) insertTweet(ctx context.Context, tweet *twitterapi.Tweet, markAsProcessed bool) (*store.ElonTweet, error) {
twitterID, err := strconv.ParseInt(tweet.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing twitter ID: %v", err)
}
params := store.UpsertElonTweetParams{
TwitterID: twitterID,
Text: tweet.Text,
PostedAt: tweet.CreatedAt,
}
if markAsProcessed {
params.ProcessedAt = sql.NullTime{Time: time.Now().UTC(), Valid: true}
}
storeTweet, err := tu.store.UpsertElonTweet(ctx, params)
if err != nil {
return nil, fmt.Errorf("error upserting tweet: %v", err)
}
tu.logger.With("twitter_id", twitterID, "text", tweet.Text).Infof("new tweet inserted")
return &storeTweet, nil
}