Building production grade streaming data pipelines using Publish/Subscribe pattern

Raghu Porumamilla
7 min readSep 15, 2020

Overview

In this story I am going to talk about how we can build highly performant and maintainable data pipelines using Publish/Subscribe pattern. This pattern is not new as it has been there in industry since long time.

In Publish/Subscribe pattern there can be x number of Producers of the data and x number of Receivers of that data. Topic is the central entity in this pattern that decouples the producers & receivers. One of the advantages of this pattern is that either of these need not to be running continuously in order to exchange the data. And this would be essential for modern applications so that they can have the maintenance without disrupting one another.

Using this pattern I am going to describe how we can scale up & scale out the resources & pipelines to handle vast amount of streaming data.

I am going to use following GCP components to implement this pattern

  • Pubsub : This will be used for creating topic & subscription
  • Publisher pipeline : Responsible for publishing the data to Pubsub
  • Subscriber pipeline : Responsible for reading the data from Pubsub
  • BigQuery : Responsible for storing the data for Streaming analysis
  • Google Cloud Storage : Responsible for staging area for streaming files
  • Storage notifications : Responsible for sending the notifications to Pubsub when there is a new file arrives in cloud storage

Here I imagined a hypothetical farming organization BetterFarming. And it has fields in US across different locations and they do moisture analysis so that they can better use of water. Each location hosts x number of sensors to send the moisture levels and this data is aggregated at each location then that data is sent to cloud storage. We assume that each location sends the data every minute. Each state(i.e., CA, IA, OH etc.) can have multiple locations. So we not only have to scale up at each state and also have to scale out across the states.

To scale up the resources we will use the Dataflow autoscaling and to scale out we will re-use the Dataflow templates to run one pipeline per state.

Here is how the Sensor data looks like

{
“device_id” : ”1233455”,
“sensor_id” : ”432333”,
“moisture-level”:5,
“created_time”:”2020–09–22 8:00:00 UTC”
}

Cloud storage

We assume that the sensor data is loaded into the google cloud storage

better-farming-sensor-streaming

and this bucket will have subfolders like az, ia, oh, ky, ca (i.e., one for each US state)

Cloud storage notification

A notification is sent to a state specific topic for example ia-better-farming-sensor-topic

Following commands will let you create the storage notifications. And the sample notifications are for subfolders IA & CA

gsutil notification create -t ia-better-farming-sensor-topic-file-notifications -f json -p ia/ -e OBJECT_FINALIZE gs://better-farming-sensor-streaminggsutil notification create -t ca-better-farming-sensor-topic-file-notifications -f json -p ca/ -e OBJECT_FINALIZE gs://better-farming-sensor-streaming

Topics & Subscriptions

Following topics are responsible for holding the sensor file notifications and one topic is created per state

ia-better-farming-sensor-topic-file-notifications
ca-better-farming-sensor-topic-file-notifications

Following topic subscriptions are responsible for pulling the sensor file notification messages and one topic subscription is created per state

ia-better-farming-sensor-file-notifications-topic-subscription
ca-better-farming-sensor-file-notifications-topic-subscription

Following topics are responsible for holding the sensor messages and one topic is created per state

ia-better-farming-sensor-topic
ca-better-farming-sensor-topic

Following topic subscriptions are responsible for pulling the sensor messages and one topic subscription is created per state

ia-better-farming-sensor-topic-subscription
ca-better-farming-sensor-topic-subscription

Publisher pipeline (Apache beam Java)

This pipeline is responsible for reading the file notifications and then reading the file content. Each line contains one sensor message. Once it reads the sensor messages it publishes the data to state specific topic

Transformations:

package com.betterfarming.data.sensor.tramsform;public class FileNotificationToFileNameTransform extends 
DoFn<PubsubMessage, String> {
public static class FileNotification {
private String bucket;
private String name;
//getter & setter methods go here } private static final Logger LOG = LoggerFactory.getLogger(
FileNotificationToFileNameTransform.class)
ObjectMapper mapper = new ObjectMapper(); @ProcessElement
public void processElement(ProcessContext context) {
try {
Map<String, String> attributes =
context.element().getAttributeMap();
String eventType = attributes.get("eventType");
if (eventType != null&&event.equals("OBJECT_FINALIZE")) {
FileNotification fileNtf =
mapper.readValue(context.element.getPayload(),
FileNotification.class);
context.output("gs://" + fileNotif.getBucket + "/" +
fileNotif.getName());
}
} catch(Exception error) {
LOG.error("Error in transformating pubsub to filename");
}
}
}

Pipeline:

package com.betterfarming.data.sensor.pipeline;public class SensorPublisher {
public static void main(String[] args) {
SensorPublisherOptions options =
PipelineOptionsFactory.fromArgs(args).
withValidation.as(SensorPublisherOptions.class);
Pipeline pipeline = Pipeline.create(options); #Read the file notifications
PCollection<PubsubMessage> sensorFileNotifications =
pipeline.read("Read
file notifications", PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.
getFileNotificationsSubscription()));

#Transform the pubsub file notification to file names
PCollection<String> sensorFileNames =
sensorFileNotifications.apply
("Transform file", ParDo.of(new
FileNotificationToFileNameTransform()));

#Read the file content from files
PCollection<String> sensorMsgs = sensorFileNames
.apply(FileIO.matchAll()).apply(FileIO.readMatches()).
.apply("Read the sensor data from files", TextIO.readFiles());
#Write sensor messages to sensor topic
sensorMsgs.apply("Write to
pubsub",PubsubIO.writeStrings(sensorMsgsAsStrings).
to(options.getSensorTopic());
pipelin.run(); }}

Subscriber pipeline (Apache beam Java)

This pipeline is responsible for reading reading the sensor messages and then saving to BigQuery.

Transformations:

package com.betterfarming.data.sensor.tramsform;
public class SensorMsgToTableRow extends DoFn<String, TableRow> {
ObjectMapper mapper = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(
SensorMsgToTableRow.class)
public static class Sensor {
private String device_id;
private String sensor_id;
private Integer moisture_level;
private String created_time;

//getter & setter go here
} @ProcessElement
public void processElement(ProcessContext context) {
TableRow row = new TableRow();
Sensor sensor = mapper.readValue(context.element(),
Sensor.class);
row.set(“device_id”, sensor.getDevice_id());
row.set(“sensor_id”, sensor.getSensor_id());
row.set(“moisture_level”, getMoisture_level());
row.set(“created_time”, getCreated_time());
context.output(sensor); }}

Pipeline:

package com.betterfarming.data.sensor.pipeline;public class SensorSubscriber {  public static void main(String[] args) {
SensorSubscriberOptions options =
PipelineOptionsFactory.fromArgs(args).
withValidation.as(SensorSubscriberOptions.class);
Pipeline pipeline = Pipeline.create(options); #Read the sensor message as strings
PCollection<String> sensorMessages =
pipeline.read("Read
sensor messages", PubsubIO.readStrings()
.fromSubscription(options.
getSensorSubscription()));
#Transform the sensor message to table row
PCollection<TableRow> sensorMsgsAsTableRows =
sensorMessages.apply("Transform msgs to Table row",
ParDo.of(new SensorMsgToTableRow()));
#Save to BigQuery table
sensorMsgsAsTableRows.apply("Save to BQ",
BigQueryIO.writeTableRows().to
(options.getDestincationTable()).
withSchema(getSchema()).withWriteDisposition(
BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
pipelin.run(); } public static TableSchema getSchema() { List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("device_id").
setType("STRING");
fields.add(new TableFieldSchema().setName("sensor_id").
setType("STRING");
fields.add(new TableFieldSchema().setName("moisture_level").
setType("INTEGER");
fields.add(new TableFieldSchema().setName("created_time").
setType("TIMESTAMP");
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
}

Creating the template

Following command lets you create the templates for publisher & subscriber pipelines. Once you run the following commands the templates will be created in gs://{dataflowbucket}/sensor/v1/

If you see the flag enableStreamingEngine and this will enable the dataflow to use Streaming backend. This will help the job to use the Streaming backend for shuffle instead of using VM storage. And based on the throughput of the input data it auto scales the workers. And this is the fundamental feature of dataflow that makes it awesome for processing large amounts of data. As a user you don’t have to worry about estimating how many resources you need before hand. If there is less throughput of data then it scales down the resources.

#Creating the template for Publisher pipeline./gradew clean run 
-PmainClass=com.betterfarming.data.sensor.pipeline.SensorPublisher
-Pargs=--enableStreamingEngine
--project={GCP project ID goes here}
--runner=DataflowRunner
--maxNumWorkers=16
--workerMachineType=n1-standard-2
--region=us-central1
--numWorkers=1
--stagingLocation=gs://{dataflow-bucket}/sensor/v1/staging
--templateLocation=gs://{dataflowbucket}/sensor/
v1/SensorPublisher
--gcpTempLocation=gs://{dataflowbucket}/sensor/v1/temp
#Creating the template for Subscriber pipeline./gradew clean run
-PmainClass=com.betterfarming.data.sensor.pipeline.SensorSubscriber
-Pargs=--enableStreamingEngine
--project={GCP project ID goes here}
--runner=DataflowRunner
--maxNumWorkers=16
--workerMachineType=n1-standard-2
--region=us-central1
--numWorkers=1
--stagingLocation=gs://{dataflow-bucket}/sensor/v1/staging
--templateLocation=gs://{dataflowbucket}/sensor/
v1/SensorSubscriber
--gcpTempLocation=gs://{dataflowbucket}/sensor/v1/temp

Submitting the streaming job using template

Following command lets you submit the jobs from the above templates. In the following example we are submitting the jobs for CA & IA states. You can scale out for other states by submitting the jobs with different inputs.

#Publisher & subscriber jobs for CA stategcloud dataflow jobs run sensor-publisher-ca 
--gcs-location=gs://{dataflowbucket}/sensor/v1/SensorPublisher
--region=us-central1
--project={GCS project ID}
--parameters
"fileNotificationsSubscription=projects/
{project_id}/subscriptions/
ia-better-farming-sensor-file-notifications-topic-subscription,
sensorTopic=projects/
{project_id}/topics/ca-better-farming-sensor-topic"
gcloud dataflow jobs run sensor-subscriber-ca
--gcs-location=gs://{dataflowbucket}/sensor/v1/SensorSubscriber
--region=us-central1
--project={GCS project ID}
--parameters
"sensorSubscription=projects/
{project_id}/subscriptions/
ca-better-farming-sensor-topic-subscription,
destinationTable={project_id}:ca_sensors.sensor"
#Publisher & subscriber jobs for IA stategcloud dataflow jobs run sensor-publisher-ia
--gcs-location=gs://{dataflowbucket}/sensor/v1/SensorPublisher
--region=us-central1
--project={GCS project ID}
--parameters
"fileNotificationsSubscription=projects/
{project_id}/subscriptions/
ia-better-farming-sensor-file-notifications-topic-subscription,
sensorTopic=projects/
{project_id}/topics/ia-better-farming-sensor-topic"
gcloud dataflow jobs run sensor-subscriber-ia
--gcs-location=gs://{dataflowbucket}/sensor/v1/SensorSubscriber
--region=us-central1
--project={GCS project ID}
--parameters
"sensorSubscription=projects/
{project_id}/subscriptions/
ia-better-farming-sensor-topic-subscription,
destinationTable={project_id}:ia_sensors.sensor"

Conclusion

So we have seen how we can use publish & subscribe pattern to loosely couple publisher & subscriber pipelines. If any other consumer of your data wants subscribe you can just create another subscription on your topic and give it to them.

Also saw how we can scale up & out your pipelines using auto scaling & templates.

Also from maintenance perspective if you want to take down pipelines related to one state you can easily without interrupting other state specific pipelines.

One thing I haven’t discussed here is error handling. May be in future posts I will explain it how it can be done in a uniform way across the pipelines.

--

--