I have a input with some transaction data in json input (in this case a file)
{ "timestamp": "2019-01-01 12:00:00", "customer": "customer-1", "page": "product", "product": "product-1" }
{ "timestamp": "2019-01-01 12:02:00", "customer": "customer-1", "page": "basket", "product": "product-1" }
{ "timestamp": "2019-01-01 12:04:00", "customer": "customer-1", "page": "checkout" }
{ "timestamp": "2019-01-01 13:00:00", "customer": "customer-2", "page": "product", "product": "product-2" }
{ "timestamp": "2019-01-01 13:02:00", "customer": "customer-2", "page": "basket", "product": "product-2" }
{ "timestamp": "2019-01-01 14:00:00", "customer": "customer-3", "page": "product", "product": "product-3" }
{ "timestamp": "2019-01-01 14:05:00", "customer": "customer-3", "page": "basket", "product": "product-3" }
{ "timestamp": "2019-01-01 14:10:00", "customer": "customer-3", "page": "product", "product": "product-4" }
{ "timestamp": "2019-01-01 14:16:00", "customer": "customer-3", "page": "product", "product": "product-5" }
{ "timestamp": "2019-01-01 14:20:00", "customer": "customer-3", "page": "basket", "product": "product-4" }
{ "timestamp": "2019-01-01 14:21:00", "customer": "customer-3", "page": "checkout" }
{ "timestamp": "2019-01-01 15:00:00", "customer": "customer-9", "page": "product", "product": "product-3" }
{ "timestamp": "2019-01-01 15:11:00", "customer": "customer-9", "page": "basket", "product": "product-3" }
The idea is be able to identify abandoned sessions, for example with more than 10 minutes idle. Transactions checkedout cannot be considered abandoned. The code to process is is below
import argparse
import json
import logging
import os
import arrow
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam import ParDo, Pipeline, Map, GroupByKey, Partition
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
class JsonCoder:
"""A simple encoder/decoder for JsonNL format"""
@staticmethod
def encode(stream):
return bytes(json.dumps(stream.as_dict()), 'utf8')
@staticmethod
def decode(stream):
return json.loads(stream)
class PageViews:
"""Datastructure to handle, store and view PageViews entries"""
def arrow_to_json(self):
return self.timestamp.format('YYYY-MM-DD HH:mm:ss')
def __init__(self, timestamp, customer, page, product=None):
self.timestamp = arrow.get(timestamp)
self.customer = customer
self.page = page
self.product = product
def as_dict(self):
json_dict = {'customer': self.customer,
'page': self.page,
'timestamp': self.arrow_to_json()}
if self.product is not None:
json_dict['product'] = self.product
return json_dict
def __str__(self):
return f'{self.customer};{self.arrow_to_json()};{self.page};{self.product} '
def __repr__(self):
repr_fmt = 'PageViews(customer={}, timestamp={}, page={}, product={}'
return repr_fmt.format(self.customer, self.arrow_to_json(), self.page, self.product)
class JsonToPageViews(beam.DoFn):
"""A simple processor to be used while converting Json entries to PageViews struct"""
def process(self, element, **kwargs):
# Necessary to "normalize" the Json readings
if 'product' not in element:
element['product'] = None
pv = PageViews(timestamp=element['timestamp'],
customer=element['customer'],
page=element['page'],
product=element['product'])
yield pv
class OnlyExpired(beam.DoFn):
def process(self, element, **kwargs):
customer, entries = element
o_timestamp = entries[0].timestamp.timestamp
checkout = False
expired = False
last_entry = None
for num, entry in enumerate(entries[0:]):
n_timestamp = entry.timestamp.timestamp
d_timestamp = n_timestamp - o_timestamp
o_timestamp = n_timestamp
if d_timestamp > 60*10:
expired = True
if entry.page == 'checkout':
checkout = True
last_entry = entry
if not checkout:
expired = True
if expired:
yield last_entry
class GetTimestamp(beam.DoFn):
"""A Process just to check if timestamps where correctly assigned"""
def process(self, element, timestamp=beam.DoFn.TimestampParam, **kwargs):
yield '{} - {}'.format(timestamp.to_utc_datetime(), element)
def partition_fn(pageview, num_partitions=1):
"""Partition the customer based on customer_id"""
customer_id = int(pageview.customer.split('-')[1])
return customer_id % num_partitions
def run(argv=None, save_main_session=True):
"""Entry point for main code"""
# Processing Command Line Arguments
parser = argparse.ArgumentParser()
parser.add_argument('--input', help='Input files', default='input/page-views.json')
parser.add_argument('--output', help='Output dir', default='output')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# Create the pipeline
p = Pipeline(options=pipeline_options)
res = (p
| 'Read JSON files' >> ReadFromText(known_args.input, coder=JsonCoder())
| 'Convert JSON to Pageview' >> ParDo(JsonToPageViews())
| 'Add Timestamps' >> Map(lambda entry: beam.window.TimestampedValue(entry, entry.timestamp.timestamp))
| 'Create Group Key' >> Map(lambda entry: (entry.customer, entry))
| GroupByKey()
| 'Only Expired' >> ParDo(OnlyExpired())
| 'Partition' >> Partition(partition_fn, 2)
)
res[0] | 'Save Part0' >> WriteToText(os.path.join(known_args.output, "abandoned-carts-part0"), file_name_suffix='.json', coder=JsonCoder())
res[1] | 'Save Part1' >> WriteToText(os.path.join(known_args.output, 'abandoned-carts-part1'), file_name_suffix='.json', coder=JsonCoder())
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.WARN)
run()
I personally didn't like the way I have handled the idle sessions with a custom class OnlyExpired, since I need to save a session variable. I think is a bottleneck in case of streaming. How can I improve this code?
I also added a partition to split data when needed (in this case my customer name).
apache-beam? \$\endgroup\$