In the previous series post, I described a fully ready architecture to summarize and analyze tweets at scale.
Some of you have contacted me to ask about the ingestion process and the integration with Twitter. So, I have decided to present a simplified ingestion architecture to show you how it can be done more efficiently.

Architecture
(1) The integration part of the architecture remains the same; we use the ECS Fargate service and a Python script, packaged as a Docker container, as the central infrastructure to retrieve the raw Tweets. Additionally, we use the Amazon ECR to store the docker image privately.

Using Terraform, the ECR component is straightforward to provision:
resource "aws_ecr_repository" "aws-ecr" {
name = "${var.app_name}-${var.app_environment}-ecr"
}
Creating the ECS cluster is more involved, as it is comprised of a few moving parts:
- VPC
- Cluster
- Task
- Execution Role
Let’s have a look at the task which is the most exciting part:

The task definition is required to run Docker containers in Amazon ECS, and as you see, we have defined some parameters:
– Docker image URL, stored in our private ECR
– CPU assigned to the task
– Memory assigned to the task
– Networking mode, we select the “awsvpc” mode – the task is allocated its elastic network interface (ENI) and a primary private IPv4 address.
– portMappings: we are just using the HTTP port for this example
We need to deploy the task in the cluster:
data "aws_ecs_task_definition" "main" {
task_definition = aws_ecs_task_definition.twitter_task.family
}

The task is just the instantiation of a task definition within a cluster; we can define different containers in a task if we need to implement something like the sidecar pattern.

Now that the task is running within our container, we are fetching live tweets from the service:

(2) I have simplified the ingestion part of the architecture, removing AWS Kinesis Data Analytics and AWS Kinesis Data Streams, ingesting directly the raw tweets using AWS Kinesis Firehose, which allows near real-time ingestion into S3. As an additional exercise, you can use a Lambda function to format the Tweets at ingestion time – Firehose can invoke it.

Once we have created the Firehose tweet_stream, providing the bucket arn and a role so the service can access S3, the tweets will start being ingested in near real-time.
resource "aws_kinesis_firehose_delivery_stream" "firehose_tweet_stream" {
name = "tweet_stream"
destination = "s3"
s3_configuration {
role_arn = aws_iam_role.firehose_role.arn
bucket_arn = aws_s3_bucket.tweets_storage.arn
}
}
The Firehose service provides some valuable metrics to monitor the ingestion process:

(3) Finally, the tweets are ingested into the S3 bucket. Remember, “the frequency of data delivery to Amazon S3 is determined by the S3 buffer size and buffer interval value you configured for your delivery stream. Kinesis Data Firehose buffers incoming data before delivering it to Amazon S3. You can configure the values for S3 buffer size (1 MB to 128 MB) or buffer interval (60 to 900 seconds), and the condition satisfied first triggers data delivery to Amazon S3” – from AWS Documentation.

Integration with Twitter
The last part of the puzzle is integrating with Twitter using its API. For that, you need an account on Twitter’s Developer Platform, and a paid subscription, depending on the functionality you want to use.

To access the Twitter API, I recommend Tweepy, a Python library, to handle the API more easily.
As a first step, instantiate a kinesis client; remember, don’t store the keys in the code. Use a credentials store, like Secrets Manager.
kinesis_client = boto3.client('firehose',
region_name='eu-west-1', # enter the region
aws_access_key_id=access_token, # fill your AWS access key id
aws_secret_access_key=access_token_secret)
Then, using Tweepy, we search for the tweets that we are interested in, and we ingest them into the Firehose stream:
logger.info("Searching and ingesting Tweets")
response = client.search_recent_tweets("AWS", max_results=10)
tweets = response.data
for tweet in tweets:
response = kinesis_client.put_record(
DeliveryStreamName="tweet_stream",
Record={
'Data': tweet.text
}
)
Well, I hope this article helps you get started, have fun! 🙂