From 553335d75d27e86a63dd2437863b3a637345a02e Mon Sep 17 00:00:00 2001 From: Rob Watson Date: Tue, 24 May 2022 21:20:28 +0200 Subject: [PATCH] Implement TweetPersister daemon --- daemon/daemon.go | 30 +++ daemon/tweet_persister.go | 105 +++++++++++ daemon/tweet_persister_test.go | 176 ++++++++++++++++++ generated/mocks/Store.go | 42 +++++ generated/mocks/TwitterAPIClient.go | 58 +++++- generated/store/models.go | 11 ++ generated/store/queries.sql.go | 54 ++++++ httpserver/handler.go | 11 +- httpserver/handler_test.go | 12 +- main.go | 13 +- ...20523031402_add_elon_tweets_table.down.sql | 1 + ...0220523031402_add_elon_tweets_table.up.sql | 11 ++ sql/queries.sql | 9 + twitter/types.go | 13 -- twitter/api.go => twitterapi/client.go | 115 +++++++----- .../api_test.go => twitterapi/client_test.go | 56 ++++-- twitterapi/types.go | 45 +++++ 17 files changed, 670 insertions(+), 92 deletions(-) create mode 100644 daemon/daemon.go create mode 100644 daemon/tweet_persister.go create mode 100644 daemon/tweet_persister_test.go create mode 100644 sql/migrations/20220523031402_add_elon_tweets_table.down.sql create mode 100644 sql/migrations/20220523031402_add_elon_tweets_table.up.sql delete mode 100644 twitter/types.go rename twitter/api.go => twitterapi/client.go (55%) rename twitter/api_test.go => twitterapi/client_test.go (57%) create mode 100644 twitterapi/types.go diff --git a/daemon/daemon.go b/daemon/daemon.go new file mode 100644 index 0000000..34a96d5 --- /dev/null +++ b/daemon/daemon.go @@ -0,0 +1,30 @@ +package daemon + +import ( + "context" + "time" + + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" + "go.uber.org/zap" +) + +// Run blocks until the provided context is terminated. +func Run(ctx context.Context, store twitterapi.Store, apiClient twitterapi.APIClient, logger *zap.SugaredLogger) error { + tweetPersister := TweetPersister{store: store, apiClient: apiClient, logger: logger} + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if n, err := tweetPersister.PersistTweets(ctx); err != nil { + logger.With("err", err, "inserted", n).Error("error updating tweets") + } else { + logger.With("inserted", n).Info("tweets updated") + } + case <-ctx.Done(): + logger.With("err", ctx.Err()).Info("context complete, exiting") + return nil + } + } +} diff --git a/daemon/tweet_persister.go b/daemon/tweet_persister.go new file mode 100644 index 0000000..ac5daaf --- /dev/null +++ b/daemon/tweet_persister.go @@ -0,0 +1,105 @@ +package daemon + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "time" + + "git.netflux.io/rob/elon-eats-my-tweets/generated/store" + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" + "github.com/jackc/pgx/v4" + "go.uber.org/zap" +) + +const formatDecimal = 10 +const elonID = "44196397" + +// TweetPersister fetches tweets from twitter and persists them in the database. +type TweetPersister struct { + store twitterapi.Store + apiClient twitterapi.APIClient + logger *zap.SugaredLogger +} + +// NewTweetPersister creates a new NewTweetPersister. +func NewTweetPersister(store twitterapi.Store, apiClient twitterapi.APIClient, logger *zap.SugaredLogger) *TweetPersister { + return &TweetPersister{store: store, apiClient: apiClient, logger: logger} +} + +// PersistTweets fetches tweets from Twitter and persists them in the database. +// In the case there are no tweets in the database, the single most recent +// tweet will be fetched. Otherwise, the maximum tweets returnable by the +// Twitter API will be fetched starting from the most recent known tweet. If +// there are more tweets available, multiple calls to this method will be +// required to fetch them all. +func (tu *TweetPersister) PersistTweets(ctx context.Context) (int, error) { + lastKnownTweet, err := tu.store.GetLastElonTweet(ctx) + if err != nil && err != pgx.ErrNoRows { + return 0, fmt.Errorf("error fetching last Elon tweet: %v", err) + } + + // we have no Elon tweets at all, this is the first run. Grab the most recent + // tweet, and mark it as processed. + if err == pgx.ErrNoRows { + return tu.insertInitialTweet(ctx) + } + + sinceID := strconv.FormatInt(lastKnownTweet.TwitterID, formatDecimal) + var n int + newTweets, err := tu.apiClient.GetTweets(elonID, sinceID) + if err != nil { + return n, fmt.Errorf("error fetching latest Elon tweets: %v", err) + } + + for _, tweet := range newTweets { + if _, storeErr := tu.insertTweet(ctx, tweet, false); storeErr != nil { + return n, fmt.Errorf("error inserting tweet: %v", storeErr) + } + + n++ + } + + return n, nil +} + +func (tu *TweetPersister) insertInitialTweet(ctx context.Context) (int, error) { + tweet, err := tu.apiClient.GetLastTweet(elonID) + if err == twitterapi.ErrNoTweets { + tu.logger.Warn("Twitter API returned empty success response, no tweets available.") + return 0, nil + } else if err != nil { + return 0, fmt.Errorf("error fetching initial tweet: %v", err) + } + if _, err = tu.insertTweet(ctx, tweet, true); err != nil { + return 0, fmt.Errorf("error inserting initial tweet: %v", err) + } + + return 1, nil +} + +func (tu *TweetPersister) insertTweet(ctx context.Context, tweet *twitterapi.Tweet, markAsProcessed bool) (*store.ElonTweet, error) { + twitterID, err := strconv.ParseInt(tweet.ID, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing twitter ID: %v", err) + } + + params := store.UpsertElonTweetParams{ + TwitterID: twitterID, + Text: tweet.Text, + PostedAt: tweet.CreatedAt, + } + if markAsProcessed { + params.ProcessedAt = sql.NullTime{Time: time.Now().UTC(), Valid: true} + } + + storeTweet, err := tu.store.UpsertElonTweet(ctx, params) + if err != nil { + return nil, fmt.Errorf("error upserting tweet: %v", err) + } + + tu.logger.With("twitter_id", twitterID, "text", tweet.Text).Infof("new tweet inserted") + + return &storeTweet, nil +} diff --git a/daemon/tweet_persister_test.go b/daemon/tweet_persister_test.go new file mode 100644 index 0000000..6efcac5 --- /dev/null +++ b/daemon/tweet_persister_test.go @@ -0,0 +1,176 @@ +package daemon_test + +import ( + "context" + "errors" + "testing" + "time" + + "git.netflux.io/rob/elon-eats-my-tweets/daemon" + "git.netflux.io/rob/elon-eats-my-tweets/generated/mocks" + "git.netflux.io/rob/elon-eats-my-tweets/generated/store" + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" + "github.com/jackc/pgx/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/zap" +) + +func TestTweetUpdaterUpdateTweetsInitialTweet(t *testing.T) { + utcLoc, _ := time.LoadLocation("UTC") + createdAt := time.Date(2022, 1, 1, 12, 12, 23, 0, utcLoc) + + testCases := []struct { + name string + lastKnownTweetErr error + fetchedTweet *twitterapi.Tweet + fetchedTweetErr error + insertTweetErr error + wantCount int + wantErr string + }{ + { + name: "error fetching last known tweet", + lastKnownTweetErr: errors.New("database error"), + wantCount: 0, + wantErr: "error fetching last Elon tweet: database error", + }, + { + name: "no tweet available from the API", + lastKnownTweetErr: pgx.ErrNoRows, + fetchedTweetErr: twitterapi.ErrNoTweets, + wantCount: 0, + wantErr: "", + }, + { + name: "error fetching tweet from API", + lastKnownTweetErr: pgx.ErrNoRows, + fetchedTweetErr: errors.New("API error"), + wantCount: 0, + wantErr: "error fetching initial tweet: API error", + }, + { + name: "error inserting tweet", + lastKnownTweetErr: pgx.ErrNoRows, + fetchedTweet: &twitterapi.Tweet{ID: "101", Text: "bar", CreatedAt: createdAt}, + insertTweetErr: errors.New("boom"), + wantCount: 0, + wantErr: "error inserting initial tweet: error upserting tweet: boom", + }, + { + name: "success", + lastKnownTweetErr: pgx.ErrNoRows, + fetchedTweet: &twitterapi.Tweet{ID: "101", Text: "bar", CreatedAt: createdAt}, + wantCount: 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetLastElonTweet", mock.Anything).Return(store.ElonTweet{}, tc.lastKnownTweetErr) + mockStore. + On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool { + return params.TwitterID == 101 && params.Text == "bar" && params.PostedAt.Equal(createdAt) && params.ProcessedAt.Valid + })). + Return(store.ElonTweet{}, tc.insertTweetErr) + + var mockAPIClient mocks.TwitterAPIClient + mockAPIClient.On("GetLastTweet", "44196397").Return(tc.fetchedTweet, tc.fetchedTweetErr) + if tc.wantErr == "" { + defer mockAPIClient.AssertExpectations(t) + } + + updater := daemon.NewTweetPersister(&mockStore, &mockAPIClient, zap.NewNop().Sugar()) + n, err := updater.PersistTweets(context.Background()) + + assert.Equal(t, tc.wantCount, n) + if tc.wantErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.wantErr) + } + }) + } +} + +func TestTweetUpdaterUpdateTweets(t *testing.T) { + utcLoc, _ := time.LoadLocation("UTC") + createdAt := time.Date(2022, 1, 1, 12, 12, 23, 0, utcLoc) + + lastKnownTweet := store.ElonTweet{TwitterID: 100, Text: "foo", PostedAt: createdAt, CreatedAt: createdAt} + + testCases := []struct { + name string + lastKnownTweetErr error + fetchedTweets []*twitterapi.Tweet + fetchedTweetsErr error + insertTweetErr error + wantCount int + wantErr string + }{ + { + name: "error fetching tweets", + fetchedTweetsErr: errors.New("whale"), + wantCount: 0, + wantErr: "error fetching latest Elon tweets: whale", + }, + { + name: "error inserting tweet", + fetchedTweets: []*twitterapi.Tweet{{ID: "101", Text: "bar", CreatedAt: createdAt}}, + insertTweetErr: errors.New("database error"), + wantCount: 0, + wantErr: "error inserting tweet: error upserting tweet: database error", + }, + { + name: "success", + fetchedTweets: []*twitterapi.Tweet{ + {ID: "101", Text: "bar", CreatedAt: createdAt}, + {ID: "102", Text: "baz", CreatedAt: createdAt}, + {ID: "103", Text: "qux", CreatedAt: createdAt}, + }, + wantCount: 3, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var mockStore mocks.Store + mockStore.On("GetLastElonTweet", mock.Anything).Return(lastKnownTweet, nil) + mockStore. + On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool { + return params.TwitterID == 101 && params.Text == "bar" && params.PostedAt.Equal(createdAt) && !params.ProcessedAt.Valid + })). + Return(store.ElonTweet{}, tc.insertTweetErr) + mockStore. + On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool { + return params.TwitterID == 102 && params.Text == "baz" && params.PostedAt.Equal(createdAt) && !params.ProcessedAt.Valid + })). + Return(store.ElonTweet{}, tc.insertTweetErr) + mockStore. + On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool { + return params.TwitterID == 103 && params.Text == "qux" && params.PostedAt.Equal(createdAt) && !params.ProcessedAt.Valid + })). + Return(store.ElonTweet{}, tc.insertTweetErr) + if tc.wantErr == "" { + defer mockStore.AssertExpectations(t) + } + + var mockAPIClient mocks.TwitterAPIClient + mockAPIClient.On("GetTweets", "44196397", "100").Return(tc.fetchedTweets, tc.fetchedTweetsErr) + if tc.wantErr == "" { + defer mockAPIClient.AssertExpectations(t) + } + + updater := daemon.NewTweetPersister(&mockStore, &mockAPIClient, zap.NewNop().Sugar()) + n, err := updater.PersistTweets(context.Background()) + + assert.Equal(t, tc.wantCount, n) + if tc.wantErr == "" { + assert.NoError(t, err) + } else { + assert.EqualError(t, err, tc.wantErr) + } + }) + } +} diff --git a/generated/mocks/Store.go b/generated/mocks/Store.go index 128ded2..4f16b2e 100644 --- a/generated/mocks/Store.go +++ b/generated/mocks/Store.go @@ -37,6 +37,48 @@ func (_m *Store) CreateUser(_a0 context.Context, _a1 store.CreateUserParams) (st return r0, r1 } +// GetLastElonTweet provides a mock function with given fields: _a0 +func (_m *Store) GetLastElonTweet(_a0 context.Context) (store.ElonTweet, error) { + ret := _m.Called(_a0) + + var r0 store.ElonTweet + if rf, ok := ret.Get(0).(func(context.Context) store.ElonTweet); ok { + r0 = rf(_a0) + } else { + r0 = ret.Get(0).(store.ElonTweet) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UpsertElonTweet provides a mock function with given fields: _a0, _a1 +func (_m *Store) UpsertElonTweet(_a0 context.Context, _a1 store.UpsertElonTweetParams) (store.ElonTweet, error) { + ret := _m.Called(_a0, _a1) + + var r0 store.ElonTweet + if rf, ok := ret.Get(0).(func(context.Context, store.UpsertElonTweetParams) store.ElonTweet); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Get(0).(store.ElonTweet) + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, store.UpsertElonTweetParams) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewStore creates a new instance of Store. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. func NewStore(t testing.TB) *Store { mock := &Store{} diff --git a/generated/mocks/TwitterAPIClient.go b/generated/mocks/TwitterAPIClient.go index 109b4c3..e011e4f 100644 --- a/generated/mocks/TwitterAPIClient.go +++ b/generated/mocks/TwitterAPIClient.go @@ -7,24 +7,47 @@ import ( mock "github.com/stretchr/testify/mock" - twitter "git.netflux.io/rob/elon-eats-my-tweets/twitter" + twitterapi "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" ) -// TwitterAPIClient is an autogenerated mock type for the TwitterAPIClient type +// TwitterAPIClient is an autogenerated mock type for the APIClient type type TwitterAPIClient struct { mock.Mock } +// GetLastTweet provides a mock function with given fields: _a0 +func (_m *TwitterAPIClient) GetLastTweet(_a0 string) (*twitterapi.Tweet, error) { + ret := _m.Called(_a0) + + var r0 *twitterapi.Tweet + if rf, ok := ret.Get(0).(func(string) *twitterapi.Tweet); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*twitterapi.Tweet) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetMe provides a mock function with given fields: -func (_m *TwitterAPIClient) GetMe() (*twitter.User, error) { +func (_m *TwitterAPIClient) GetMe() (*twitterapi.User, error) { ret := _m.Called() - var r0 *twitter.User - if rf, ok := ret.Get(0).(func() *twitter.User); ok { + var r0 *twitterapi.User + if rf, ok := ret.Get(0).(func() *twitterapi.User); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*twitter.User) + r0 = ret.Get(0).(*twitterapi.User) } } @@ -38,6 +61,29 @@ func (_m *TwitterAPIClient) GetMe() (*twitter.User, error) { return r0, r1 } +// GetTweets provides a mock function with given fields: _a0, _a1 +func (_m *TwitterAPIClient) GetTweets(_a0 string, _a1 string) ([]*twitterapi.Tweet, error) { + ret := _m.Called(_a0, _a1) + + var r0 []*twitterapi.Tweet + if rf, ok := ret.Get(0).(func(string, string) []*twitterapi.Tweet); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*twitterapi.Tweet) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string, string) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewTwitterAPIClient creates a new instance of TwitterAPIClient. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. func NewTwitterAPIClient(t testing.TB) *TwitterAPIClient { mock := &TwitterAPIClient{} diff --git a/generated/store/models.go b/generated/store/models.go index 84e0275..5238900 100644 --- a/generated/store/models.go +++ b/generated/store/models.go @@ -5,11 +5,22 @@ package store import ( + "database/sql" "time" "github.com/google/uuid" ) +type ElonTweet struct { + ID uuid.UUID + TwitterID int64 + Text string + PostedAt time.Time + ProcessedAt sql.NullTime + CreatedAt time.Time + UpdatedAt time.Time +} + type User struct { ID uuid.UUID TwitterID string diff --git a/generated/store/queries.sql.go b/generated/store/queries.sql.go index bc9dc23..8a7c8e1 100644 --- a/generated/store/queries.sql.go +++ b/generated/store/queries.sql.go @@ -7,6 +7,7 @@ package store import ( "context" + "database/sql" "time" ) @@ -55,6 +56,25 @@ func (q *Queries) CreateUser(ctx context.Context, arg CreateUserParams) (User, e return i, err } +const getLastElonTweet = `-- name: GetLastElonTweet :one +SELECT id, twitter_id, text, posted_at, processed_at, created_at, updated_at from elon_tweets ORDER BY twitter_id DESC LIMIT 1 +` + +func (q *Queries) GetLastElonTweet(ctx context.Context) (ElonTweet, error) { + row := q.db.QueryRow(ctx, getLastElonTweet) + var i ElonTweet + err := row.Scan( + &i.ID, + &i.TwitterID, + &i.Text, + &i.PostedAt, + &i.ProcessedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + const getUserByTwitterID = `-- name: GetUserByTwitterID :one SELECT id, twitter_id, username, name, access_token, refresh_token, delete_tweets_enabled, delete_tweets_num_per_iteration, created_at, updated_at FROM users WHERE twitter_id = $1 ` @@ -76,3 +96,37 @@ func (q *Queries) GetUserByTwitterID(ctx context.Context, twitterID string) (Use ) return i, err } + +const upsertElonTweet = `-- name: UpsertElonTweet :one +INSERT INTO elon_tweets (twitter_id, text, posted_at, processed_at, created_at, updated_at) + VALUES ($1, $2, $3, $4, NOW(), NOW()) + ON CONFLICT (twitter_id) DO NOTHING + RETURNING id, twitter_id, text, posted_at, processed_at, created_at, updated_at +` + +type UpsertElonTweetParams struct { + TwitterID int64 + Text string + PostedAt time.Time + ProcessedAt sql.NullTime +} + +func (q *Queries) UpsertElonTweet(ctx context.Context, arg UpsertElonTweetParams) (ElonTweet, error) { + row := q.db.QueryRow(ctx, upsertElonTweet, + arg.TwitterID, + arg.Text, + arg.PostedAt, + arg.ProcessedAt, + ) + var i ElonTweet + err := row.Scan( + &i.ID, + &i.TwitterID, + &i.Text, + &i.PostedAt, + &i.ProcessedAt, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} diff --git a/httpserver/handler.go b/httpserver/handler.go index aa33d92..f735f5a 100644 --- a/httpserver/handler.go +++ b/httpserver/handler.go @@ -1,7 +1,6 @@ package httpserver //go:generate mockery --recursive --srcpkg github.com/gorilla/sessions --name Store --structname SessionStore --filename SessionStore.go --output ../generated/mocks -//go:generate mockery --recursive --name TwitterAPIClient --filename TwitterAPIClient.go --output ../generated/mocks import ( "context" @@ -13,7 +12,7 @@ import ( "git.netflux.io/rob/elon-eats-my-tweets/config" "git.netflux.io/rob/elon-eats-my-tweets/generated/store" "git.netflux.io/rob/elon-eats-my-tweets/templates" - "git.netflux.io/rob/elon-eats-my-tweets/twitter" + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" "github.com/go-chi/chi" "github.com/go-chi/chi/middleware" "github.com/google/uuid" @@ -59,12 +58,12 @@ func getCurrentUser(r *http.Request, sessionStore sessions.Store) (CurrentUser, } type TwitterAPIClient interface { - GetMe() (*twitter.User, error) + GetMe() (*twitterapi.User, error) } type handler struct { - store twitter.Store - twitterAPIClientFunc func(c *http.Client) TwitterAPIClient + store twitterapi.Store + twitterAPIClientFunc func(c *http.Client) twitterapi.APIClient oauth2Config *oauth2.Config renderer *templates.Renderer sessionStore sessions.Store @@ -72,7 +71,7 @@ type handler struct { logger *zap.SugaredLogger } -func NewHandler(cfg config.Config, store twitter.Store, twitterAPIClientFunc func(c *http.Client) TwitterAPIClient, sessionStore sessions.Store, tokenGenerator TokenGenerator, logger *zap.Logger) http.Handler { +func NewHandler(cfg config.Config, store twitterapi.Store, twitterAPIClientFunc func(c *http.Client) twitterapi.APIClient, sessionStore sessions.Store, tokenGenerator TokenGenerator, logger *zap.Logger) http.Handler { r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.RealIP) diff --git a/httpserver/handler_test.go b/httpserver/handler_test.go index 3335b55..adbe350 100644 --- a/httpserver/handler_test.go +++ b/httpserver/handler_test.go @@ -11,7 +11,7 @@ import ( "git.netflux.io/rob/elon-eats-my-tweets/generated/mocks" "git.netflux.io/rob/elon-eats-my-tweets/generated/store" "git.netflux.io/rob/elon-eats-my-tweets/httpserver" - "git.netflux.io/rob/elon-eats-my-tweets/twitter" + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" "github.com/google/uuid" "github.com/gorilla/sessions" "github.com/stretchr/testify/assert" @@ -67,7 +67,7 @@ func TestGetIndex(t *testing.T) { handler := httpserver.NewHandler( config.Config{}, &mocks.Store{}, - func(*http.Client) httpserver.TwitterAPIClient { return &mocks.TwitterAPIClient{} }, + func(*http.Client) twitterapi.APIClient { return &mocks.TwitterAPIClient{} }, &mockSessionStore, &mockTokenGenerator{}, zap.NewNop(), @@ -124,7 +124,7 @@ func TestGetLogin(t *testing.T) { }, }, &mocks.Store{}, - func(*http.Client) httpserver.TwitterAPIClient { return &mocks.TwitterAPIClient{} }, + func(*http.Client) twitterapi.APIClient { return &mocks.TwitterAPIClient{} }, &mockSessionStore, &mockTokenGenerator{tokens: []string{"state", "pkceVerifier"}}, zap.NewNop(), @@ -268,7 +268,7 @@ func TestGetCallback(t *testing.T) { mockSessionStore.On("Save", mock.Anything, mock.Anything, sess).Return(tc.sessionSaveError) var mockTwitterClient mocks.TwitterAPIClient - mockTwitterClient.On("GetMe").Return(&twitter.User{ID: "1", Name: "foo", Username: "Foo Bar"}, tc.getTwitterUserError) + mockTwitterClient.On("GetMe").Return(&twitterapi.User{ID: "1", Name: "foo", Username: "Foo Bar"}, tc.getTwitterUserError) var mockStore mocks.Store mockStore.On("CreateUser", mock.Anything, mock.MatchedBy(func(params store.CreateUserParams) bool { @@ -304,7 +304,7 @@ func TestGetCallback(t *testing.T) { }, }, &mockStore, - func(*http.Client) httpserver.TwitterAPIClient { return &mockTwitterClient }, + func(*http.Client) twitterapi.APIClient { return &mockTwitterClient }, &mockSessionStore, nil, zap.NewNop(), @@ -339,7 +339,7 @@ func TestPostLogout(t *testing.T) { handler := httpserver.NewHandler( config.Config{}, &mocks.Store{}, - func(*http.Client) httpserver.TwitterAPIClient { return &mocks.TwitterAPIClient{} }, + func(*http.Client) twitterapi.APIClient { return &mocks.TwitterAPIClient{} }, &mockSessionStore, nil, zap.NewNop(), diff --git a/main.go b/main.go index 0a5989d..0ef18d6 100644 --- a/main.go +++ b/main.go @@ -8,9 +8,10 @@ import ( "net/http" "git.netflux.io/rob/elon-eats-my-tweets/config" + "git.netflux.io/rob/elon-eats-my-tweets/daemon" "git.netflux.io/rob/elon-eats-my-tweets/generated/store" "git.netflux.io/rob/elon-eats-my-tweets/httpserver" - "git.netflux.io/rob/elon-eats-my-tweets/twitter" + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" "github.com/gorilla/sessions" "github.com/jackc/pgx/v4/pgxpool" "go.uber.org/zap" @@ -35,10 +36,18 @@ func main() { defer dbconn.Close() store := store.New(dbconn) + // TODO: separate daemon and webserver in separate binaries (or via flags). + go daemon.Run( + context.Background(), + store, + twitterapi.NewClientWithBearerToken(cfg.Twitter.BearerToken), + logger.Sugar().Named("daemon"), + ) + handler := httpserver.NewHandler( cfg, store, - func(c *http.Client) httpserver.TwitterAPIClient { return twitter.NewAPIClient(c) }, + func(c *http.Client) twitterapi.APIClient { return twitterapi.NewClient(c) }, sessions.NewCookieStore([]byte(cfg.SessionKey)), httpserver.RandomTokenGenerator{}, logger, diff --git a/sql/migrations/20220523031402_add_elon_tweets_table.down.sql b/sql/migrations/20220523031402_add_elon_tweets_table.down.sql new file mode 100644 index 0000000..2b774b2 --- /dev/null +++ b/sql/migrations/20220523031402_add_elon_tweets_table.down.sql @@ -0,0 +1 @@ +DROP TABLE elon_tweets; diff --git a/sql/migrations/20220523031402_add_elon_tweets_table.up.sql b/sql/migrations/20220523031402_add_elon_tweets_table.up.sql new file mode 100644 index 0000000..f4ca149 --- /dev/null +++ b/sql/migrations/20220523031402_add_elon_tweets_table.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE elon_tweets ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + twitter_id bigint NOT NULL, + text TEXT NOT NULL, + posted_at TIMESTAMP WITH TIME ZONE NOT NULL, + processed_at TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + updated_at TIMESTAMP WITH TIME ZONE NOT NULL +); + +CREATE UNIQUE INDEX index_elon_tweets_on_twitter_id ON elon_tweets (twitter_id); diff --git a/sql/queries.sql b/sql/queries.sql index 8ffeacf..eb21da2 100644 --- a/sql/queries.sql +++ b/sql/queries.sql @@ -8,3 +8,12 @@ INSERT INTO users (twitter_id, username, name, access_token, refresh_token, crea -- name: GetUserByTwitterID :one SELECT * FROM users WHERE twitter_id = $1; + +-- name: GetLastElonTweet :one +SELECT * from elon_tweets ORDER BY twitter_id DESC LIMIT 1; + +-- name: UpsertElonTweet :one +INSERT INTO elon_tweets (twitter_id, text, posted_at, processed_at, created_at, updated_at) + VALUES ($1, $2, $3, $4, NOW(), NOW()) + ON CONFLICT (twitter_id) DO NOTHING + RETURNING *; diff --git a/twitter/types.go b/twitter/types.go deleted file mode 100644 index 6194646..0000000 --- a/twitter/types.go +++ /dev/null @@ -1,13 +0,0 @@ -package twitter - -//go:generate mockery --recursive --name Store --output ../generated/mocks - -import ( - "context" - - "git.netflux.io/rob/elon-eats-my-tweets/generated/store" -) - -type Store interface { - CreateUser(context.Context, store.CreateUserParams) (store.User, error) -} diff --git a/twitter/api.go b/twitterapi/client.go similarity index 55% rename from twitter/api.go rename to twitterapi/client.go index 8ea8eed..0ec7e08 100644 --- a/twitter/api.go +++ b/twitterapi/client.go @@ -1,28 +1,34 @@ -package twitter - -//go:generate mockery --recursive --name Getter --output ../generated/mocks +package twitterapi import ( "encoding/json" + "errors" "fmt" "net/http" "time" ) -// ElonID is the Twitter ID of @elonmusk. -const ElonID = "44196397" - -// User represents a Twitter user. -type User struct { - ID string - Name string - Username string +type Getter interface { + Get(string) (*http.Response, error) } -// Tweet represents a tweet. -type Tweet struct { - ID string - Text string +// NewClient returns a new APIClient. +func NewClient(httpclient Getter) APIClient { + return &apiClient{httpclient} +} + +// NewAPIClient returns a new APIClient which will authenticate with the +// provided bearer token. +func NewClientWithBearerToken(bearerToken string) APIClient { + return &apiClient{&http.Client{ + Timeout: time.Second * 5, + Transport: &bearerTokenTransport{bearerToken: bearerToken}, + }} +} + +// apiClient implements APIClient. +type apiClient struct { + Getter } // bearerTokenTransport implements http.RoundTripper. @@ -55,31 +61,8 @@ func cloneRequest(r *http.Request) *http.Request { return r2 } -type Getter interface { - Get(string) (*http.Response, error) -} - -// NewAPIClient returns a new APIClient. -func NewAPIClient(httpclient Getter) *APIClient { - return &APIClient{httpclient} -} - -// NewAPIClient returns a new APIClient which will authenticate with the -// provided bearer token. -func NewAPIClientWithBearerToken(bearerToken string) *APIClient { - return &APIClient{&http.Client{ - Timeout: time.Second * 5, - Transport: &bearerTokenTransport{bearerToken: bearerToken}, - }} -} - -// APIClient interacts with the Twitter API V2. -type APIClient struct { - Getter -} - // GetMe returns the currently authenticated user. -func (c *APIClient) GetMe() (*User, error) { +func (c *apiClient) GetMe() (*User, error) { type oauthResponse struct { Data *User `json:"data"` } @@ -101,13 +84,61 @@ func (c *APIClient) GetMe() (*User, error) { return oauthResp.Data, nil } -// GetElonTweets returns the latest tweets for a given user. -func (c *APIClient) GetTweets(userID string, sinceID string) ([]*Tweet, error) { +var ErrNoTweets = errors.New("no tweets available") + +// GetLastTweet returns the most recent tweet for a given user. If no tweets +// are available, ErrNoTweets will be returned. +func (c *apiClient) GetLastTweet(userID string) (*Tweet, error) { + type oauthResponse struct { + Data []*Tweet `json:"data"` + Meta TwitterMetadata `json:"meta"` + } + + apiURL := "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at" + + resp, err := c.Get(apiURL) + if err != nil { + return nil, fmt.Errorf("error fetching resource: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("error fetching resource: status code %d", resp.StatusCode) + } + + var oauthResp oauthResponse + if err = json.NewDecoder(resp.Body).Decode(&oauthResp); err != nil { + return nil, fmt.Errorf("error decoding resource: %v", err) + } + + if oauthResp.Meta.ResultCount == 0 { + return nil, ErrNoTweets + } + + // the order of returned tweets seems to be chronological, but it isn't + // documented so use the metadata instead. + for _, tweet := range oauthResp.Data { + if tweet.ID == oauthResp.Meta.NewestID { + return tweet, nil + } + } + + return nil, errors.New("error fetching latest tweet: could not match newest_id") +} + +// GetTweets returns the latest tweets for a given user, up to the maximum +// batch size of 100 allowable by the Twitter API. +func (c *apiClient) GetTweets(userID string, sinceID string) ([]*Tweet, error) { type oauthResponse struct { Data []*Tweet `json:"data"` } - resp, err := c.Get(fmt.Sprintf("https://api.twitter.com/2/users/%s/tweets?since_id=%s", userID, sinceID)) + apiURL := "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100" + if sinceID == "" { + } else { + apiURL += "&since_id=" + sinceID + } + + resp, err := c.Get(apiURL) if err != nil { return nil, fmt.Errorf("error fetching resource: %v", err) } diff --git a/twitter/api_test.go b/twitterapi/client_test.go similarity index 57% rename from twitter/api_test.go rename to twitterapi/client_test.go index 0f66791..ebce87b 100644 --- a/twitter/api_test.go +++ b/twitterapi/client_test.go @@ -1,4 +1,6 @@ -package twitter_test +package twitterapi_test + +//go:generate mockery --recursive --name Getter --output ../generated/mocks import ( "errors" @@ -6,9 +8,10 @@ import ( "net/http" "strings" "testing" + "time" "git.netflux.io/rob/elon-eats-my-tweets/generated/mocks" - "git.netflux.io/rob/elon-eats-my-tweets/twitter" + "git.netflux.io/rob/elon-eats-my-tweets/twitterapi" "github.com/stretchr/testify/assert" ) @@ -18,14 +21,14 @@ func TestGetMe(t *testing.T) { responseStatusCode int responseBody io.Reader responseErr error - wantUser *twitter.User + wantUser *twitterapi.User wantErr string }{ { name: "successful request", responseStatusCode: 200, responseBody: strings.NewReader(`{"Data": {"id": "1", "name": "foo", "username": "foo bar"}}`), - wantUser: &twitter.User{ID: "1", Name: "foo", Username: "foo bar"}, + wantUser: &twitterapi.User{ID: "1", Name: "foo", Username: "foo bar"}, }, { name: "network error", @@ -54,7 +57,7 @@ func TestGetMe(t *testing.T) { getter. On("Get", "https://api.twitter.com/2/users/me"). Return(&http.Response{StatusCode: tc.responseStatusCode, Body: io.NopCloser(tc.responseBody)}, tc.responseErr) - client := twitter.NewAPIClient(&getter) + client := twitterapi.NewClient(&getter) user, err := client.GetMe() assert.Equal(t, tc.wantUser, user) @@ -69,32 +72,50 @@ func TestGetMe(t *testing.T) { } func TestGetTweets(t *testing.T) { - const ( - userID = "1" - sinceID = "2" - ) + const userID = "1" + utcLoc, _ := time.LoadLocation("UTC") + testCases := []struct { name string + sinceID string + wantAPIURL string responseStatusCode int responseBody io.Reader responseErr error - wantTweets []*twitter.Tweet + wantTweets []*twitterapi.Tweet wantErr string }{ { - name: "successful request", + name: "successful request, empty since_id", + wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100", responseStatusCode: 200, - responseBody: strings.NewReader(`{"Data": [{"id": "101", "text": "foo"}, {"id": "102", "text": "bar"}]}`), - wantTweets: []*twitter.Tweet{{ID: "101", Text: "foo"}, {ID: "102", Text: "bar"}}, + responseBody: strings.NewReader(`{"Data": [{"id": "101", "text": "foo", "created_at": "2019-06-04T23:12:08.000Z"}, {"id": "102", "text": "bar", "created_at": "2019-06-04T23:12:08.000Z"}]}`), + wantTweets: []*twitterapi.Tweet{ + {ID: "101", Text: "foo", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)}, + {ID: "102", Text: "bar", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)}, + }, + }, + { + name: "successful request, non-empty since_id", + sinceID: "2", + wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100&since_id=2", + responseStatusCode: 200, + responseBody: strings.NewReader(`{"Data": [{"id": "101", "text": "foo", "created_at": "2019-06-04T23:12:08.000Z"}, {"id": "102", "text": "bar", "created_at": "2019-06-04T23:12:08.000Z"}]}`), + wantTweets: []*twitterapi.Tweet{ + {ID: "101", Text: "foo", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)}, + {ID: "102", Text: "bar", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)}, + }, }, { name: "network error", + wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100", responseErr: errors.New("network error"), wantTweets: nil, wantErr: "error fetching resource: network error", }, { name: "500 response", + wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100", responseStatusCode: 500, responseBody: strings.NewReader("whale"), wantTweets: nil, @@ -102,6 +123,7 @@ func TestGetTweets(t *testing.T) { }, { name: "decoder error", + wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100", responseStatusCode: 200, responseBody: strings.NewReader(""), wantTweets: nil, @@ -113,13 +135,13 @@ func TestGetTweets(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var getter mocks.Getter getter. - On("Get", "https://api.twitter.com/2/users/"+userID+"/tweets?since_id="+sinceID). + On("Get", tc.wantAPIURL). Return(&http.Response{StatusCode: tc.responseStatusCode, Body: io.NopCloser(tc.responseBody)}, tc.responseErr) - client := twitter.NewAPIClient(&getter) + client := twitterapi.NewClient(&getter) + + tweets, err := client.GetTweets(userID, tc.sinceID) - tweets, err := client.GetTweets(userID, sinceID) assert.Equal(t, tc.wantTweets, tweets) - if tc.wantErr == "" { assert.NoError(t, err) } else { diff --git a/twitterapi/types.go b/twitterapi/types.go new file mode 100644 index 0000000..f7658be --- /dev/null +++ b/twitterapi/types.go @@ -0,0 +1,45 @@ +package twitterapi + +//go:generate mockery --recursive --name Store --output ../generated/mocks +//go:generate mockery --recursive --name APIClient --structname TwitterAPIClient --filename TwitterAPIClient.go --output ../generated/mocks + +import ( + "context" + "time" + + "git.netflux.io/rob/elon-eats-my-tweets/generated/store" +) + +// Store is a persistent store. +type Store interface { + CreateUser(context.Context, store.CreateUserParams) (store.User, error) + GetLastElonTweet(context.Context) (store.ElonTweet, error) + UpsertElonTweet(context.Context, store.UpsertElonTweetParams) (store.ElonTweet, error) +} + +// APIClient is a client for the Twitter API. +type APIClient interface { + GetLastTweet(string) (*Tweet, error) + GetTweets(string, string) ([]*Tweet, error) + GetMe() (*User, error) +} + +// TwitterMetadata contains the metadata returned in Twitter API responses. +type TwitterMetadata struct { + ResultCount int `json:"result_count"` + NewestID string `json:"newest_id"` +} + +// User represents a Twitter user as returned from the Twitter API. +type User struct { + ID string `json:"id"` + Name string `json:"name"` + Username string `json:"username"` +} + +// Tweet represents a tweet as returned from the Twitter API. +type Tweet struct { + ID string `json:"id"` + Text string `json:"text"` + CreatedAt time.Time `json:"created_at"` +}