106 lines
3.2 KiB
Go
106 lines
3.2 KiB
Go
|
package daemon
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"database/sql"
|
||
|
"fmt"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
|
||
|
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
|
||
|
"github.com/jackc/pgx/v4"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
const formatDecimal = 10
|
||
|
const elonID = "44196397"
|
||
|
|
||
|
// TweetPersister fetches tweets from twitter and persists them in the database.
|
||
|
type TweetPersister struct {
|
||
|
store twitterapi.Store
|
||
|
apiClient twitterapi.APIClient
|
||
|
logger *zap.SugaredLogger
|
||
|
}
|
||
|
|
||
|
// NewTweetPersister creates a new NewTweetPersister.
|
||
|
func NewTweetPersister(store twitterapi.Store, apiClient twitterapi.APIClient, logger *zap.SugaredLogger) *TweetPersister {
|
||
|
return &TweetPersister{store: store, apiClient: apiClient, logger: logger}
|
||
|
}
|
||
|
|
||
|
// PersistTweets fetches tweets from Twitter and persists them in the database.
|
||
|
// In the case there are no tweets in the database, the single most recent
|
||
|
// tweet will be fetched. Otherwise, the maximum tweets returnable by the
|
||
|
// Twitter API will be fetched starting from the most recent known tweet. If
|
||
|
// there are more tweets available, multiple calls to this method will be
|
||
|
// required to fetch them all.
|
||
|
func (tu *TweetPersister) PersistTweets(ctx context.Context) (int, error) {
|
||
|
lastKnownTweet, err := tu.store.GetLastElonTweet(ctx)
|
||
|
if err != nil && err != pgx.ErrNoRows {
|
||
|
return 0, fmt.Errorf("error fetching last Elon tweet: %v", err)
|
||
|
}
|
||
|
|
||
|
// we have no Elon tweets at all, this is the first run. Grab the most recent
|
||
|
// tweet, and mark it as processed.
|
||
|
if err == pgx.ErrNoRows {
|
||
|
return tu.insertInitialTweet(ctx)
|
||
|
}
|
||
|
|
||
|
sinceID := strconv.FormatInt(lastKnownTweet.TwitterID, formatDecimal)
|
||
|
var n int
|
||
|
newTweets, err := tu.apiClient.GetTweets(elonID, sinceID)
|
||
|
if err != nil {
|
||
|
return n, fmt.Errorf("error fetching latest Elon tweets: %v", err)
|
||
|
}
|
||
|
|
||
|
for _, tweet := range newTweets {
|
||
|
if _, storeErr := tu.insertTweet(ctx, tweet, false); storeErr != nil {
|
||
|
return n, fmt.Errorf("error inserting tweet: %v", storeErr)
|
||
|
}
|
||
|
|
||
|
n++
|
||
|
}
|
||
|
|
||
|
return n, nil
|
||
|
}
|
||
|
|
||
|
func (tu *TweetPersister) insertInitialTweet(ctx context.Context) (int, error) {
|
||
|
tweet, err := tu.apiClient.GetLastTweet(elonID)
|
||
|
if err == twitterapi.ErrNoTweets {
|
||
|
tu.logger.Warn("Twitter API returned empty success response, no tweets available.")
|
||
|
return 0, nil
|
||
|
} else if err != nil {
|
||
|
return 0, fmt.Errorf("error fetching initial tweet: %v", err)
|
||
|
}
|
||
|
if _, err = tu.insertTweet(ctx, tweet, true); err != nil {
|
||
|
return 0, fmt.Errorf("error inserting initial tweet: %v", err)
|
||
|
}
|
||
|
|
||
|
return 1, nil
|
||
|
}
|
||
|
|
||
|
func (tu *TweetPersister) insertTweet(ctx context.Context, tweet *twitterapi.Tweet, markAsProcessed bool) (*store.ElonTweet, error) {
|
||
|
twitterID, err := strconv.ParseInt(tweet.ID, 10, 64)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error parsing twitter ID: %v", err)
|
||
|
}
|
||
|
|
||
|
params := store.UpsertElonTweetParams{
|
||
|
TwitterID: twitterID,
|
||
|
Text: tweet.Text,
|
||
|
PostedAt: tweet.CreatedAt,
|
||
|
}
|
||
|
if markAsProcessed {
|
||
|
params.ProcessedAt = sql.NullTime{Time: time.Now().UTC(), Valid: true}
|
||
|
}
|
||
|
|
||
|
storeTweet, err := tu.store.UpsertElonTweet(ctx, params)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("error upserting tweet: %v", err)
|
||
|
}
|
||
|
|
||
|
tu.logger.With("twitter_id", twitterID, "text", tweet.Text).Infof("new tweet inserted")
|
||
|
|
||
|
return &storeTweet, nil
|
||
|
}
|