Implement TweetPersister daemon
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Rob Watson 2022-05-24 21:20:28 +02:00
parent 993f07c08e
commit 553335d75d
17 changed files with 670 additions and 92 deletions

30
daemon/daemon.go Normal file
View File

@ -0,0 +1,30 @@
package daemon
import (
"context"
"time"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"go.uber.org/zap"
)
// Run blocks until the provided context is terminated.
func Run(ctx context.Context, store twitterapi.Store, apiClient twitterapi.APIClient, logger *zap.SugaredLogger) error {
tweetPersister := TweetPersister{store: store, apiClient: apiClient, logger: logger}
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if n, err := tweetPersister.PersistTweets(ctx); err != nil {
logger.With("err", err, "inserted", n).Error("error updating tweets")
} else {
logger.With("inserted", n).Info("tweets updated")
}
case <-ctx.Done():
logger.With("err", ctx.Err()).Info("context complete, exiting")
return nil
}
}
}

105
daemon/tweet_persister.go Normal file
View File

@ -0,0 +1,105 @@
package daemon
import (
"context"
"database/sql"
"fmt"
"strconv"
"time"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/jackc/pgx/v4"
"go.uber.org/zap"
)
const formatDecimal = 10
const elonID = "44196397"
// TweetPersister fetches tweets from twitter and persists them in the database.
type TweetPersister struct {
store twitterapi.Store
apiClient twitterapi.APIClient
logger *zap.SugaredLogger
}
// NewTweetPersister creates a new NewTweetPersister.
func NewTweetPersister(store twitterapi.Store, apiClient twitterapi.APIClient, logger *zap.SugaredLogger) *TweetPersister {
return &TweetPersister{store: store, apiClient: apiClient, logger: logger}
}
// PersistTweets fetches tweets from Twitter and persists them in the database.
// In the case there are no tweets in the database, the single most recent
// tweet will be fetched. Otherwise, the maximum tweets returnable by the
// Twitter API will be fetched starting from the most recent known tweet. If
// there are more tweets available, multiple calls to this method will be
// required to fetch them all.
func (tu *TweetPersister) PersistTweets(ctx context.Context) (int, error) {
lastKnownTweet, err := tu.store.GetLastElonTweet(ctx)
if err != nil && err != pgx.ErrNoRows {
return 0, fmt.Errorf("error fetching last Elon tweet: %v", err)
}
// we have no Elon tweets at all, this is the first run. Grab the most recent
// tweet, and mark it as processed.
if err == pgx.ErrNoRows {
return tu.insertInitialTweet(ctx)
}
sinceID := strconv.FormatInt(lastKnownTweet.TwitterID, formatDecimal)
var n int
newTweets, err := tu.apiClient.GetTweets(elonID, sinceID)
if err != nil {
return n, fmt.Errorf("error fetching latest Elon tweets: %v", err)
}
for _, tweet := range newTweets {
if _, storeErr := tu.insertTweet(ctx, tweet, false); storeErr != nil {
return n, fmt.Errorf("error inserting tweet: %v", storeErr)
}
n++
}
return n, nil
}
func (tu *TweetPersister) insertInitialTweet(ctx context.Context) (int, error) {
tweet, err := tu.apiClient.GetLastTweet(elonID)
if err == twitterapi.ErrNoTweets {
tu.logger.Warn("Twitter API returned empty success response, no tweets available.")
return 0, nil
} else if err != nil {
return 0, fmt.Errorf("error fetching initial tweet: %v", err)
}
if _, err = tu.insertTweet(ctx, tweet, true); err != nil {
return 0, fmt.Errorf("error inserting initial tweet: %v", err)
}
return 1, nil
}
func (tu *TweetPersister) insertTweet(ctx context.Context, tweet *twitterapi.Tweet, markAsProcessed bool) (*store.ElonTweet, error) {
twitterID, err := strconv.ParseInt(tweet.ID, 10, 64)
if err != nil {
return nil, fmt.Errorf("error parsing twitter ID: %v", err)
}
params := store.UpsertElonTweetParams{
TwitterID: twitterID,
Text: tweet.Text,
PostedAt: tweet.CreatedAt,
}
if markAsProcessed {
params.ProcessedAt = sql.NullTime{Time: time.Now().UTC(), Valid: true}
}
storeTweet, err := tu.store.UpsertElonTweet(ctx, params)
if err != nil {
return nil, fmt.Errorf("error upserting tweet: %v", err)
}
tu.logger.With("twitter_id", twitterID, "text", tweet.Text).Infof("new tweet inserted")
return &storeTweet, nil
}

View File

@ -0,0 +1,176 @@
package daemon_test
import (
"context"
"errors"
"testing"
"time"
"git.netflux.io/rob/elon-eats-my-tweets/daemon"
"git.netflux.io/rob/elon-eats-my-tweets/generated/mocks"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
)
func TestTweetUpdaterUpdateTweetsInitialTweet(t *testing.T) {
utcLoc, _ := time.LoadLocation("UTC")
createdAt := time.Date(2022, 1, 1, 12, 12, 23, 0, utcLoc)
testCases := []struct {
name string
lastKnownTweetErr error
fetchedTweet *twitterapi.Tweet
fetchedTweetErr error
insertTweetErr error
wantCount int
wantErr string
}{
{
name: "error fetching last known tweet",
lastKnownTweetErr: errors.New("database error"),
wantCount: 0,
wantErr: "error fetching last Elon tweet: database error",
},
{
name: "no tweet available from the API",
lastKnownTweetErr: pgx.ErrNoRows,
fetchedTweetErr: twitterapi.ErrNoTweets,
wantCount: 0,
wantErr: "",
},
{
name: "error fetching tweet from API",
lastKnownTweetErr: pgx.ErrNoRows,
fetchedTweetErr: errors.New("API error"),
wantCount: 0,
wantErr: "error fetching initial tweet: API error",
},
{
name: "error inserting tweet",
lastKnownTweetErr: pgx.ErrNoRows,
fetchedTweet: &twitterapi.Tweet{ID: "101", Text: "bar", CreatedAt: createdAt},
insertTweetErr: errors.New("boom"),
wantCount: 0,
wantErr: "error inserting initial tweet: error upserting tweet: boom",
},
{
name: "success",
lastKnownTweetErr: pgx.ErrNoRows,
fetchedTweet: &twitterapi.Tweet{ID: "101", Text: "bar", CreatedAt: createdAt},
wantCount: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var mockStore mocks.Store
mockStore.On("GetLastElonTweet", mock.Anything).Return(store.ElonTweet{}, tc.lastKnownTweetErr)
mockStore.
On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool {
return params.TwitterID == 101 && params.Text == "bar" && params.PostedAt.Equal(createdAt) && params.ProcessedAt.Valid
})).
Return(store.ElonTweet{}, tc.insertTweetErr)
var mockAPIClient mocks.TwitterAPIClient
mockAPIClient.On("GetLastTweet", "44196397").Return(tc.fetchedTweet, tc.fetchedTweetErr)
if tc.wantErr == "" {
defer mockAPIClient.AssertExpectations(t)
}
updater := daemon.NewTweetPersister(&mockStore, &mockAPIClient, zap.NewNop().Sugar())
n, err := updater.PersistTweets(context.Background())
assert.Equal(t, tc.wantCount, n)
if tc.wantErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.wantErr)
}
})
}
}
func TestTweetUpdaterUpdateTweets(t *testing.T) {
utcLoc, _ := time.LoadLocation("UTC")
createdAt := time.Date(2022, 1, 1, 12, 12, 23, 0, utcLoc)
lastKnownTweet := store.ElonTweet{TwitterID: 100, Text: "foo", PostedAt: createdAt, CreatedAt: createdAt}
testCases := []struct {
name string
lastKnownTweetErr error
fetchedTweets []*twitterapi.Tweet
fetchedTweetsErr error
insertTweetErr error
wantCount int
wantErr string
}{
{
name: "error fetching tweets",
fetchedTweetsErr: errors.New("whale"),
wantCount: 0,
wantErr: "error fetching latest Elon tweets: whale",
},
{
name: "error inserting tweet",
fetchedTweets: []*twitterapi.Tweet{{ID: "101", Text: "bar", CreatedAt: createdAt}},
insertTweetErr: errors.New("database error"),
wantCount: 0,
wantErr: "error inserting tweet: error upserting tweet: database error",
},
{
name: "success",
fetchedTweets: []*twitterapi.Tweet{
{ID: "101", Text: "bar", CreatedAt: createdAt},
{ID: "102", Text: "baz", CreatedAt: createdAt},
{ID: "103", Text: "qux", CreatedAt: createdAt},
},
wantCount: 3,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var mockStore mocks.Store
mockStore.On("GetLastElonTweet", mock.Anything).Return(lastKnownTweet, nil)
mockStore.
On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool {
return params.TwitterID == 101 && params.Text == "bar" && params.PostedAt.Equal(createdAt) && !params.ProcessedAt.Valid
})).
Return(store.ElonTweet{}, tc.insertTweetErr)
mockStore.
On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool {
return params.TwitterID == 102 && params.Text == "baz" && params.PostedAt.Equal(createdAt) && !params.ProcessedAt.Valid
})).
Return(store.ElonTweet{}, tc.insertTweetErr)
mockStore.
On("UpsertElonTweet", mock.Anything, mock.MatchedBy(func(params store.UpsertElonTweetParams) bool {
return params.TwitterID == 103 && params.Text == "qux" && params.PostedAt.Equal(createdAt) && !params.ProcessedAt.Valid
})).
Return(store.ElonTweet{}, tc.insertTweetErr)
if tc.wantErr == "" {
defer mockStore.AssertExpectations(t)
}
var mockAPIClient mocks.TwitterAPIClient
mockAPIClient.On("GetTweets", "44196397", "100").Return(tc.fetchedTweets, tc.fetchedTweetsErr)
if tc.wantErr == "" {
defer mockAPIClient.AssertExpectations(t)
}
updater := daemon.NewTweetPersister(&mockStore, &mockAPIClient, zap.NewNop().Sugar())
n, err := updater.PersistTweets(context.Background())
assert.Equal(t, tc.wantCount, n)
if tc.wantErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, tc.wantErr)
}
})
}
}

View File

@ -37,6 +37,48 @@ func (_m *Store) CreateUser(_a0 context.Context, _a1 store.CreateUserParams) (st
return r0, r1
}
// GetLastElonTweet provides a mock function with given fields: _a0
func (_m *Store) GetLastElonTweet(_a0 context.Context) (store.ElonTweet, error) {
ret := _m.Called(_a0)
var r0 store.ElonTweet
if rf, ok := ret.Get(0).(func(context.Context) store.ElonTweet); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(store.ElonTweet)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// UpsertElonTweet provides a mock function with given fields: _a0, _a1
func (_m *Store) UpsertElonTweet(_a0 context.Context, _a1 store.UpsertElonTweetParams) (store.ElonTweet, error) {
ret := _m.Called(_a0, _a1)
var r0 store.ElonTweet
if rf, ok := ret.Get(0).(func(context.Context, store.UpsertElonTweetParams) store.ElonTweet); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Get(0).(store.ElonTweet)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, store.UpsertElonTweetParams) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewStore creates a new instance of Store. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewStore(t testing.TB) *Store {
mock := &Store{}

View File

@ -7,24 +7,47 @@ import (
mock "github.com/stretchr/testify/mock"
twitter "git.netflux.io/rob/elon-eats-my-tweets/twitter"
twitterapi "git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
)
// TwitterAPIClient is an autogenerated mock type for the TwitterAPIClient type
// TwitterAPIClient is an autogenerated mock type for the APIClient type
type TwitterAPIClient struct {
mock.Mock
}
// GetLastTweet provides a mock function with given fields: _a0
func (_m *TwitterAPIClient) GetLastTweet(_a0 string) (*twitterapi.Tweet, error) {
ret := _m.Called(_a0)
var r0 *twitterapi.Tweet
if rf, ok := ret.Get(0).(func(string) *twitterapi.Tweet); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*twitterapi.Tweet)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetMe provides a mock function with given fields:
func (_m *TwitterAPIClient) GetMe() (*twitter.User, error) {
func (_m *TwitterAPIClient) GetMe() (*twitterapi.User, error) {
ret := _m.Called()
var r0 *twitter.User
if rf, ok := ret.Get(0).(func() *twitter.User); ok {
var r0 *twitterapi.User
if rf, ok := ret.Get(0).(func() *twitterapi.User); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*twitter.User)
r0 = ret.Get(0).(*twitterapi.User)
}
}
@ -38,6 +61,29 @@ func (_m *TwitterAPIClient) GetMe() (*twitter.User, error) {
return r0, r1
}
// GetTweets provides a mock function with given fields: _a0, _a1
func (_m *TwitterAPIClient) GetTweets(_a0 string, _a1 string) ([]*twitterapi.Tweet, error) {
ret := _m.Called(_a0, _a1)
var r0 []*twitterapi.Tweet
if rf, ok := ret.Get(0).(func(string, string) []*twitterapi.Tweet); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*twitterapi.Tweet)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// NewTwitterAPIClient creates a new instance of TwitterAPIClient. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewTwitterAPIClient(t testing.TB) *TwitterAPIClient {
mock := &TwitterAPIClient{}

View File

@ -5,11 +5,22 @@
package store
import (
"database/sql"
"time"
"github.com/google/uuid"
)
type ElonTweet struct {
ID uuid.UUID
TwitterID int64
Text string
PostedAt time.Time
ProcessedAt sql.NullTime
CreatedAt time.Time
UpdatedAt time.Time
}
type User struct {
ID uuid.UUID
TwitterID string

View File

@ -7,6 +7,7 @@ package store
import (
"context"
"database/sql"
"time"
)
@ -55,6 +56,25 @@ func (q *Queries) CreateUser(ctx context.Context, arg CreateUserParams) (User, e
return i, err
}
const getLastElonTweet = `-- name: GetLastElonTweet :one
SELECT id, twitter_id, text, posted_at, processed_at, created_at, updated_at from elon_tweets ORDER BY twitter_id DESC LIMIT 1
`
func (q *Queries) GetLastElonTweet(ctx context.Context) (ElonTweet, error) {
row := q.db.QueryRow(ctx, getLastElonTweet)
var i ElonTweet
err := row.Scan(
&i.ID,
&i.TwitterID,
&i.Text,
&i.PostedAt,
&i.ProcessedAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getUserByTwitterID = `-- name: GetUserByTwitterID :one
SELECT id, twitter_id, username, name, access_token, refresh_token, delete_tweets_enabled, delete_tweets_num_per_iteration, created_at, updated_at FROM users WHERE twitter_id = $1
`
@ -76,3 +96,37 @@ func (q *Queries) GetUserByTwitterID(ctx context.Context, twitterID string) (Use
)
return i, err
}
const upsertElonTweet = `-- name: UpsertElonTweet :one
INSERT INTO elon_tweets (twitter_id, text, posted_at, processed_at, created_at, updated_at)
VALUES ($1, $2, $3, $4, NOW(), NOW())
ON CONFLICT (twitter_id) DO NOTHING
RETURNING id, twitter_id, text, posted_at, processed_at, created_at, updated_at
`
type UpsertElonTweetParams struct {
TwitterID int64
Text string
PostedAt time.Time
ProcessedAt sql.NullTime
}
func (q *Queries) UpsertElonTweet(ctx context.Context, arg UpsertElonTweetParams) (ElonTweet, error) {
row := q.db.QueryRow(ctx, upsertElonTweet,
arg.TwitterID,
arg.Text,
arg.PostedAt,
arg.ProcessedAt,
)
var i ElonTweet
err := row.Scan(
&i.ID,
&i.TwitterID,
&i.Text,
&i.PostedAt,
&i.ProcessedAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}

View File

@ -1,7 +1,6 @@
package httpserver
//go:generate mockery --recursive --srcpkg github.com/gorilla/sessions --name Store --structname SessionStore --filename SessionStore.go --output ../generated/mocks
//go:generate mockery --recursive --name TwitterAPIClient --filename TwitterAPIClient.go --output ../generated/mocks
import (
"context"
@ -13,7 +12,7 @@ import (
"git.netflux.io/rob/elon-eats-my-tweets/config"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
"git.netflux.io/rob/elon-eats-my-tweets/templates"
"git.netflux.io/rob/elon-eats-my-tweets/twitter"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/google/uuid"
@ -59,12 +58,12 @@ func getCurrentUser(r *http.Request, sessionStore sessions.Store) (CurrentUser,
}
type TwitterAPIClient interface {
GetMe() (*twitter.User, error)
GetMe() (*twitterapi.User, error)
}
type handler struct {
store twitter.Store
twitterAPIClientFunc func(c *http.Client) TwitterAPIClient
store twitterapi.Store
twitterAPIClientFunc func(c *http.Client) twitterapi.APIClient
oauth2Config *oauth2.Config
renderer *templates.Renderer
sessionStore sessions.Store
@ -72,7 +71,7 @@ type handler struct {
logger *zap.SugaredLogger
}
func NewHandler(cfg config.Config, store twitter.Store, twitterAPIClientFunc func(c *http.Client) TwitterAPIClient, sessionStore sessions.Store, tokenGenerator TokenGenerator, logger *zap.Logger) http.Handler {
func NewHandler(cfg config.Config, store twitterapi.Store, twitterAPIClientFunc func(c *http.Client) twitterapi.APIClient, sessionStore sessions.Store, tokenGenerator TokenGenerator, logger *zap.Logger) http.Handler {
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)

View File

@ -11,7 +11,7 @@ import (
"git.netflux.io/rob/elon-eats-my-tweets/generated/mocks"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
"git.netflux.io/rob/elon-eats-my-tweets/httpserver"
"git.netflux.io/rob/elon-eats-my-tweets/twitter"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/google/uuid"
"github.com/gorilla/sessions"
"github.com/stretchr/testify/assert"
@ -67,7 +67,7 @@ func TestGetIndex(t *testing.T) {
handler := httpserver.NewHandler(
config.Config{},
&mocks.Store{},
func(*http.Client) httpserver.TwitterAPIClient { return &mocks.TwitterAPIClient{} },
func(*http.Client) twitterapi.APIClient { return &mocks.TwitterAPIClient{} },
&mockSessionStore,
&mockTokenGenerator{},
zap.NewNop(),
@ -124,7 +124,7 @@ func TestGetLogin(t *testing.T) {
},
},
&mocks.Store{},
func(*http.Client) httpserver.TwitterAPIClient { return &mocks.TwitterAPIClient{} },
func(*http.Client) twitterapi.APIClient { return &mocks.TwitterAPIClient{} },
&mockSessionStore,
&mockTokenGenerator{tokens: []string{"state", "pkceVerifier"}},
zap.NewNop(),
@ -268,7 +268,7 @@ func TestGetCallback(t *testing.T) {
mockSessionStore.On("Save", mock.Anything, mock.Anything, sess).Return(tc.sessionSaveError)
var mockTwitterClient mocks.TwitterAPIClient
mockTwitterClient.On("GetMe").Return(&twitter.User{ID: "1", Name: "foo", Username: "Foo Bar"}, tc.getTwitterUserError)
mockTwitterClient.On("GetMe").Return(&twitterapi.User{ID: "1", Name: "foo", Username: "Foo Bar"}, tc.getTwitterUserError)
var mockStore mocks.Store
mockStore.On("CreateUser", mock.Anything, mock.MatchedBy(func(params store.CreateUserParams) bool {
@ -304,7 +304,7 @@ func TestGetCallback(t *testing.T) {
},
},
&mockStore,
func(*http.Client) httpserver.TwitterAPIClient { return &mockTwitterClient },
func(*http.Client) twitterapi.APIClient { return &mockTwitterClient },
&mockSessionStore,
nil,
zap.NewNop(),
@ -339,7 +339,7 @@ func TestPostLogout(t *testing.T) {
handler := httpserver.NewHandler(
config.Config{},
&mocks.Store{},
func(*http.Client) httpserver.TwitterAPIClient { return &mocks.TwitterAPIClient{} },
func(*http.Client) twitterapi.APIClient { return &mocks.TwitterAPIClient{} },
&mockSessionStore,
nil,
zap.NewNop(),

13
main.go
View File

@ -8,9 +8,10 @@ import (
"net/http"
"git.netflux.io/rob/elon-eats-my-tweets/config"
"git.netflux.io/rob/elon-eats-my-tweets/daemon"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
"git.netflux.io/rob/elon-eats-my-tweets/httpserver"
"git.netflux.io/rob/elon-eats-my-tweets/twitter"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/gorilla/sessions"
"github.com/jackc/pgx/v4/pgxpool"
"go.uber.org/zap"
@ -35,10 +36,18 @@ func main() {
defer dbconn.Close()
store := store.New(dbconn)
// TODO: separate daemon and webserver in separate binaries (or via flags).
go daemon.Run(
context.Background(),
store,
twitterapi.NewClientWithBearerToken(cfg.Twitter.BearerToken),
logger.Sugar().Named("daemon"),
)
handler := httpserver.NewHandler(
cfg,
store,
func(c *http.Client) httpserver.TwitterAPIClient { return twitter.NewAPIClient(c) },
func(c *http.Client) twitterapi.APIClient { return twitterapi.NewClient(c) },
sessions.NewCookieStore([]byte(cfg.SessionKey)),
httpserver.RandomTokenGenerator{},
logger,

View File

@ -0,0 +1 @@
DROP TABLE elon_tweets;

View File

@ -0,0 +1,11 @@
CREATE TABLE elon_tweets (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
twitter_id bigint NOT NULL,
text TEXT NOT NULL,
posted_at TIMESTAMP WITH TIME ZONE NOT NULL,
processed_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL
);
CREATE UNIQUE INDEX index_elon_tweets_on_twitter_id ON elon_tweets (twitter_id);

View File

@ -8,3 +8,12 @@ INSERT INTO users (twitter_id, username, name, access_token, refresh_token, crea
-- name: GetUserByTwitterID :one
SELECT * FROM users WHERE twitter_id = $1;
-- name: GetLastElonTweet :one
SELECT * from elon_tweets ORDER BY twitter_id DESC LIMIT 1;
-- name: UpsertElonTweet :one
INSERT INTO elon_tweets (twitter_id, text, posted_at, processed_at, created_at, updated_at)
VALUES ($1, $2, $3, $4, NOW(), NOW())
ON CONFLICT (twitter_id) DO NOTHING
RETURNING *;

View File

@ -1,13 +0,0 @@
package twitter
//go:generate mockery --recursive --name Store --output ../generated/mocks
import (
"context"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
)
type Store interface {
CreateUser(context.Context, store.CreateUserParams) (store.User, error)
}

View File

@ -1,28 +1,34 @@
package twitter
//go:generate mockery --recursive --name Getter --output ../generated/mocks
package twitterapi
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
)
// ElonID is the Twitter ID of @elonmusk.
const ElonID = "44196397"
// User represents a Twitter user.
type User struct {
ID string
Name string
Username string
type Getter interface {
Get(string) (*http.Response, error)
}
// Tweet represents a tweet.
type Tweet struct {
ID string
Text string
// NewClient returns a new APIClient.
func NewClient(httpclient Getter) APIClient {
return &apiClient{httpclient}
}
// NewAPIClient returns a new APIClient which will authenticate with the
// provided bearer token.
func NewClientWithBearerToken(bearerToken string) APIClient {
return &apiClient{&http.Client{
Timeout: time.Second * 5,
Transport: &bearerTokenTransport{bearerToken: bearerToken},
}}
}
// apiClient implements APIClient.
type apiClient struct {
Getter
}
// bearerTokenTransport implements http.RoundTripper.
@ -55,31 +61,8 @@ func cloneRequest(r *http.Request) *http.Request {
return r2
}
type Getter interface {
Get(string) (*http.Response, error)
}
// NewAPIClient returns a new APIClient.
func NewAPIClient(httpclient Getter) *APIClient {
return &APIClient{httpclient}
}
// NewAPIClient returns a new APIClient which will authenticate with the
// provided bearer token.
func NewAPIClientWithBearerToken(bearerToken string) *APIClient {
return &APIClient{&http.Client{
Timeout: time.Second * 5,
Transport: &bearerTokenTransport{bearerToken: bearerToken},
}}
}
// APIClient interacts with the Twitter API V2.
type APIClient struct {
Getter
}
// GetMe returns the currently authenticated user.
func (c *APIClient) GetMe() (*User, error) {
func (c *apiClient) GetMe() (*User, error) {
type oauthResponse struct {
Data *User `json:"data"`
}
@ -101,13 +84,61 @@ func (c *APIClient) GetMe() (*User, error) {
return oauthResp.Data, nil
}
// GetElonTweets returns the latest tweets for a given user.
func (c *APIClient) GetTweets(userID string, sinceID string) ([]*Tweet, error) {
var ErrNoTweets = errors.New("no tweets available")
// GetLastTweet returns the most recent tweet for a given user. If no tweets
// are available, ErrNoTweets will be returned.
func (c *apiClient) GetLastTweet(userID string) (*Tweet, error) {
type oauthResponse struct {
Data []*Tweet `json:"data"`
Meta TwitterMetadata `json:"meta"`
}
apiURL := "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at"
resp, err := c.Get(apiURL)
if err != nil {
return nil, fmt.Errorf("error fetching resource: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("error fetching resource: status code %d", resp.StatusCode)
}
var oauthResp oauthResponse
if err = json.NewDecoder(resp.Body).Decode(&oauthResp); err != nil {
return nil, fmt.Errorf("error decoding resource: %v", err)
}
if oauthResp.Meta.ResultCount == 0 {
return nil, ErrNoTweets
}
// the order of returned tweets seems to be chronological, but it isn't
// documented so use the metadata instead.
for _, tweet := range oauthResp.Data {
if tweet.ID == oauthResp.Meta.NewestID {
return tweet, nil
}
}
return nil, errors.New("error fetching latest tweet: could not match newest_id")
}
// GetTweets returns the latest tweets for a given user, up to the maximum
// batch size of 100 allowable by the Twitter API.
func (c *apiClient) GetTweets(userID string, sinceID string) ([]*Tweet, error) {
type oauthResponse struct {
Data []*Tweet `json:"data"`
}
resp, err := c.Get(fmt.Sprintf("https://api.twitter.com/2/users/%s/tweets?since_id=%s", userID, sinceID))
apiURL := "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100"
if sinceID == "" {
} else {
apiURL += "&since_id=" + sinceID
}
resp, err := c.Get(apiURL)
if err != nil {
return nil, fmt.Errorf("error fetching resource: %v", err)
}

View File

@ -1,4 +1,6 @@
package twitter_test
package twitterapi_test
//go:generate mockery --recursive --name Getter --output ../generated/mocks
import (
"errors"
@ -6,9 +8,10 @@ import (
"net/http"
"strings"
"testing"
"time"
"git.netflux.io/rob/elon-eats-my-tweets/generated/mocks"
"git.netflux.io/rob/elon-eats-my-tweets/twitter"
"git.netflux.io/rob/elon-eats-my-tweets/twitterapi"
"github.com/stretchr/testify/assert"
)
@ -18,14 +21,14 @@ func TestGetMe(t *testing.T) {
responseStatusCode int
responseBody io.Reader
responseErr error
wantUser *twitter.User
wantUser *twitterapi.User
wantErr string
}{
{
name: "successful request",
responseStatusCode: 200,
responseBody: strings.NewReader(`{"Data": {"id": "1", "name": "foo", "username": "foo bar"}}`),
wantUser: &twitter.User{ID: "1", Name: "foo", Username: "foo bar"},
wantUser: &twitterapi.User{ID: "1", Name: "foo", Username: "foo bar"},
},
{
name: "network error",
@ -54,7 +57,7 @@ func TestGetMe(t *testing.T) {
getter.
On("Get", "https://api.twitter.com/2/users/me").
Return(&http.Response{StatusCode: tc.responseStatusCode, Body: io.NopCloser(tc.responseBody)}, tc.responseErr)
client := twitter.NewAPIClient(&getter)
client := twitterapi.NewClient(&getter)
user, err := client.GetMe()
assert.Equal(t, tc.wantUser, user)
@ -69,32 +72,50 @@ func TestGetMe(t *testing.T) {
}
func TestGetTweets(t *testing.T) {
const (
userID = "1"
sinceID = "2"
)
const userID = "1"
utcLoc, _ := time.LoadLocation("UTC")
testCases := []struct {
name string
sinceID string
wantAPIURL string
responseStatusCode int
responseBody io.Reader
responseErr error
wantTweets []*twitter.Tweet
wantTweets []*twitterapi.Tweet
wantErr string
}{
{
name: "successful request",
name: "successful request, empty since_id",
wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100",
responseStatusCode: 200,
responseBody: strings.NewReader(`{"Data": [{"id": "101", "text": "foo"}, {"id": "102", "text": "bar"}]}`),
wantTweets: []*twitter.Tweet{{ID: "101", Text: "foo"}, {ID: "102", Text: "bar"}},
responseBody: strings.NewReader(`{"Data": [{"id": "101", "text": "foo", "created_at": "2019-06-04T23:12:08.000Z"}, {"id": "102", "text": "bar", "created_at": "2019-06-04T23:12:08.000Z"}]}`),
wantTweets: []*twitterapi.Tweet{
{ID: "101", Text: "foo", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)},
{ID: "102", Text: "bar", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)},
},
},
{
name: "successful request, non-empty since_id",
sinceID: "2",
wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100&since_id=2",
responseStatusCode: 200,
responseBody: strings.NewReader(`{"Data": [{"id": "101", "text": "foo", "created_at": "2019-06-04T23:12:08.000Z"}, {"id": "102", "text": "bar", "created_at": "2019-06-04T23:12:08.000Z"}]}`),
wantTweets: []*twitterapi.Tweet{
{ID: "101", Text: "foo", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)},
{ID: "102", Text: "bar", CreatedAt: time.Date(2019, 6, 4, 23, 12, 8, 0, utcLoc)},
},
},
{
name: "network error",
wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100",
responseErr: errors.New("network error"),
wantTweets: nil,
wantErr: "error fetching resource: network error",
},
{
name: "500 response",
wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100",
responseStatusCode: 500,
responseBody: strings.NewReader("whale"),
wantTweets: nil,
@ -102,6 +123,7 @@ func TestGetTweets(t *testing.T) {
},
{
name: "decoder error",
wantAPIURL: "https://api.twitter.com/2/users/" + userID + "/tweets?tweet.fields=created_at&max_results=100",
responseStatusCode: 200,
responseBody: strings.NewReader("<html></html>"),
wantTweets: nil,
@ -113,13 +135,13 @@ func TestGetTweets(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var getter mocks.Getter
getter.
On("Get", "https://api.twitter.com/2/users/"+userID+"/tweets?since_id="+sinceID).
On("Get", tc.wantAPIURL).
Return(&http.Response{StatusCode: tc.responseStatusCode, Body: io.NopCloser(tc.responseBody)}, tc.responseErr)
client := twitter.NewAPIClient(&getter)
client := twitterapi.NewClient(&getter)
tweets, err := client.GetTweets(userID, tc.sinceID)
tweets, err := client.GetTweets(userID, sinceID)
assert.Equal(t, tc.wantTweets, tweets)
if tc.wantErr == "" {
assert.NoError(t, err)
} else {

45
twitterapi/types.go Normal file
View File

@ -0,0 +1,45 @@
package twitterapi
//go:generate mockery --recursive --name Store --output ../generated/mocks
//go:generate mockery --recursive --name APIClient --structname TwitterAPIClient --filename TwitterAPIClient.go --output ../generated/mocks
import (
"context"
"time"
"git.netflux.io/rob/elon-eats-my-tweets/generated/store"
)
// Store is a persistent store.
type Store interface {
CreateUser(context.Context, store.CreateUserParams) (store.User, error)
GetLastElonTweet(context.Context) (store.ElonTweet, error)
UpsertElonTweet(context.Context, store.UpsertElonTweetParams) (store.ElonTweet, error)
}
// APIClient is a client for the Twitter API.
type APIClient interface {
GetLastTweet(string) (*Tweet, error)
GetTweets(string, string) ([]*Tweet, error)
GetMe() (*User, error)
}
// TwitterMetadata contains the metadata returned in Twitter API responses.
type TwitterMetadata struct {
ResultCount int `json:"result_count"`
NewestID string `json:"newest_id"`
}
// User represents a Twitter user as returned from the Twitter API.
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Username string `json:"username"`
}
// Tweet represents a tweet as returned from the Twitter API.
type Tweet struct {
ID string `json:"id"`
Text string `json:"text"`
CreatedAt time.Time `json:"created_at"`
}