- Published on
Guide to Google Cloud's Pub/Sub, Cloud Scheduler, Pub/Sub Lite, and Pub/Sub Schemas
- Authors
- Name
- Anurag Verma
- @anurag_629
A Comprehensive Guide to Google Cloud's Pub/Sub, Cloud Scheduler, Pub/Sub Lite, and Pub/Sub Schemas
In the era of distributed systems and microservices, reliable messaging and scheduling services are vital for building scalable and decoupled applications. Google Cloud offers a suite of services tailored for these needs:
- Pub/Sub: A global, real-time messaging service for event ingestion and delivery.
- Cloud Scheduler: A fully managed cron job service for automating tasks.
- Pub/Sub Lite: A cost-effective messaging solution for high-throughput workloads with predictable traffic patterns.
- Pub/Sub Schemas: A feature for enforcing message structure and validating data.
In this comprehensive guide, we'll explore these services in depth, covering:
- Introduction to Pub/Sub and Pub/Sub Schemas
- Setting Up Pub/Sub with Schemas
- Creating a Cloud Function Triggered by Pub/Sub
- Automating Tasks with Cloud Scheduler
- Getting Started with Pub/Sub Lite
- Conclusion
- References
Introduction to Pub/Sub and Pub/Sub Schemas
Pub/Sub Overview
Google Cloud Pub/Sub is a fully managed messaging service designed to ingest and deliver event data between applications. It decouples senders and receivers, allowing for asynchronous communication and scalable architectures.
Key Concepts:
- Topic: A named resource to which messages are sent by publishers.
- Subscription: A named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application.
- Publisher: An application that creates and sends messages to a topic.
- Subscriber: An application that receives messages from a subscription to process them.
Pub/Sub Schemas
Pub/Sub Schemas allow you to define the structure of messages using schema definitions. This ensures that all messages published to a topic conform to a specific format, enabling data validation and reducing errors.
Benefits:
- Data Quality: Enforce message structure to ensure data integrity.
- Schema Evolution: Manage changes to data structures over time.
- Cross-Language Support: Use schemas defined in languages like Avro or Protocol Buffers.
Setting Up Pub/Sub with Schemas
In this section, we'll walk through creating a Pub/Sub schema, using it to define a topic, and then publishing messages that conform to the schema.
Prerequisites
- A Google Cloud project with billing enabled.
- Access to the Google Cloud Console and Cloud Shell.
Creating a Pub/Sub Schema
Let's create a schema named city-temp-schema
with the following Avro schema definition:
{
"type": "record",
"name": "CityTemperature",
"fields": [
{
"name": "city",
"type": "string"
},
{
"name": "temperature",
"type": "double"
},
{
"name": "pressure",
"type": "int"
},
{
"name": "time_position",
"type": "string"
}
]
}
Steps:
Navigate to Pub/Sub Schemas:
In the Google Cloud Console, click on Navigation Menu (☰) > Pub/Sub > Schemas.
Create a New Schema:
- Click Create Schema.
- Enter Schema ID:
city-temp-schema
. - Type: Select Avro.
- Definition: Paste the schema definition above.
- Click Create.
Note: The schema defines a record with fields for city, temperature, pressure, and time position.
Creating a Pub/Sub Topic Using a Schema
Now, we'll create a topic named temp-topic
that uses the city-temp-schema
schema for message validation.
Steps:
Navigate to Pub/Sub Topics:
Click on Topics in the left-hand menu.
Create a New Topic:
- Click Create Topic.
- Topic ID: Enter
temp-topic
. - Message Encoding: Choose Avro.
- Schema Settings:
- Select Schema: Choose
city-temp-schema
. - Schema Enforcement: Ensure that messages are validated against the schema.
- Select Schema: Choose
- Click Create.
Note: Any messages published to temp-topic
must conform to the city-temp-schema
schema.
Publishing Messages with Schema Validation
To publish messages that conform to the schema:
Use Cloud Shell or Client Library:
You can publish messages using the Google Cloud SDK or a client library (e.g., Python, Java).
Sample Message in Avro Format:
{ "city": "New York", "temperature": 22.5, "pressure": 1013, "time_position": "2023-10-01T12:00:00Z" }
Publish the Message:
Use the appropriate method to publish the message to
temp-topic
. Ensure the message adheres to the schema.
Creating a Cloud Function Triggered by Pub/Sub
Cloud Functions can respond to events from various sources, including Pub/Sub topics. We'll create a Cloud Function named gcf-pubsub
that triggers when messages are published to a topic named gcf-topic
.
Steps:
Navigate to Cloud Functions:
Click on Navigation Menu > Cloud Functions.
Create a New Function:
- Click Create Function.
- Function Name: Enter
gcf-pubsub
. - Environment: Choose 1st gen.
- Region: Select your preferred region.
- Click Next.
Configure Trigger:
- Trigger Type: Select Cloud Pub/Sub.
- Topic: Select
gcf-topic
. If it doesn't exist, create it. - Click Next.
Function Code:
Runtime: Choose your preferred language (e.g., Python 3.9).
Entry Point: For Python, the default is
pubsub
.Code Sample (Python):
import base64 def pubsub(event, context): if 'data' in event: message = base64.b64decode(event['data']).decode('utf-8') print(f"Received message: {message}") else: print("No data found in the message.")
Click Deploy.
Test the Function:
- Publish a message to
gcf-topic
and verify that the function logs the message.
- Publish a message to
Note: Ensure that the service account associated with Cloud Functions has the necessary Pub/Sub permissions.
Automating Tasks with Cloud Scheduler
Cloud Scheduler allows you to schedule jobs to run at specified times or intervals. We'll set up a cron job that publishes messages to a Pub/Sub topic.
Enabling the Cloud Scheduler API
Enable API:
- Navigate to APIs & Services > Library.
- Search for Cloud Scheduler API.
- Click Enable.
Creating a Scheduled Job
Navigate to Cloud Scheduler:
Click on Navigation Menu > Cloud Scheduler.
Create a Job:
- Click Create Job.
- Name: Enter
my-scheduled-job
. - Frequency: Use cron syntax (e.g.,
*/5 * * * *
to run every 5 minutes). - Timezone: Select your timezone.
- Click Continue.
Configure the Target:
- Target Type: Select Pub/Sub.
- Topic: Choose
temp-topic
or any topic you prefer. - Message Body: Enter a message, e.g.,
Automated message from Cloud Scheduler
. - Click Create.
Verifying Job Execution
Check Job Status:
In the Cloud Scheduler dashboard, you should see your job listed with its next run time.
Verify Messages:
Use Cloud Shell or a subscriber to pull messages from the topic to confirm the job is publishing messages as expected.
Getting Started with Pub/Sub Lite
Pub/Sub Lite is designed for workloads with predictable traffic patterns, offering a cost-effective alternative for high-throughput messaging.
Creating a Lite Topic
Navigate to Pub/Sub Lite:
Click on Navigation Menu > Pub/Sub > Lite Topics.
Create a Lite Topic:
- Click Create Lite Topic.
- Topic ID: Enter
my-lite-topic
. - Location: Choose a zone (e.g.,
us-east4-a
). - Throughput and Storage: Configure as needed.
- Click Create.
Creating a Lite Subscription
Navigate to Lite Subscriptions:
Click on Lite Subscriptions.
Create a Lite Subscription:
- Click Create Lite Subscription.
- Subscription ID: Enter
my-lite-subscription
. - Topic: Select
my-lite-topic
. - Delivery Requirement: Choose your preferred option.
- Click Create.
Sending and Receiving Messages with Python
Prerequisites
Python 3 installed.
Pub/Sub Lite client library installed:
pip3 install --upgrade google-cloud-pubsublite
Sending Messages
Create send_messages.py
:
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath
# Replace with your project number and zone information
project_number = 'YOUR_PROJECT_NUMBER'
cloud_region = 'us-east4'
zone_id = 'a'
topic_id = 'my-lite-topic'
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
with PublisherClient() as publisher_client:
data = "Hello, Pub/Sub Lite!"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
message_id = api_future.result()
print(f"Published message ID: {message_id}")
Run the script:
python3 send_messages.py
Receiving Messages
Create receive_messages.py
:
from concurrent.futures import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath, FlowControlSettings
project_number = 'YOUR_PROJECT_NUMBER'
cloud_region = 'us-east4'
zone_id = 'a'
subscription_id = 'my-lite-subscription'
timeout = 90 # seconds
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
flow_control_settings = FlowControlSettings(
messages_outstanding=1000,
bytes_outstanding=10 * 1024 * 1024,
)
def callback(message):
print(f"Received message: {message.data.decode('utf-8')}")
message.ack()
with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=flow_control_settings,
)
print(f"Listening for messages on {subscription_path}...")
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
Run the script:
python3 receive_messages.py
You should see:
Listening for messages on projects/[PROJECT_NUMBER]/locations/us-east4-a/subscriptions/my-lite-subscription...
Received message: Hello, Pub/Sub Lite!
Conclusion
In this guide, we've explored:
- Pub/Sub with Schemas: Defining and enforcing message structures for data integrity.
- Creating Cloud Functions Triggered by Pub/Sub: Responding to real-time events with serverless functions.
- Automating Tasks with Cloud Scheduler: Scheduling jobs to automate workflows.
- Getting Started with Pub/Sub Lite: Leveraging a cost-effective messaging service for predictable workloads.
By integrating these services, you can build robust, scalable, and efficient applications on Google Cloud Platform.
Next Steps:
- Experiment: Try extending the examples, such as adding error handling or processing messages in batch.
- Integrate: Use these services in your applications to enhance functionality.
- Optimize: Monitor performance and costs, adjusting configurations as needed.
References
- Google Cloud Pub/Sub Documentation
- Pub/Sub Schemas Documentation
- Cloud Functions Documentation
- Cloud Scheduler Documentation
- Pub/Sub Lite Documentation
- Google Cloud Client Libraries
Feel free to leave comments or questions below!