2
\$\begingroup\$

I have a input with some transaction data in json input (in this case a file)

{ "timestamp": "2019-01-01 12:00:00", "customer": "customer-1", "page": "product", "product": "product-1" }
{ "timestamp": "2019-01-01 12:02:00", "customer": "customer-1", "page": "basket", "product": "product-1" }
{ "timestamp": "2019-01-01 12:04:00", "customer": "customer-1", "page": "checkout" }
{ "timestamp": "2019-01-01 13:00:00", "customer": "customer-2", "page": "product", "product": "product-2" }
{ "timestamp": "2019-01-01 13:02:00", "customer": "customer-2", "page": "basket", "product": "product-2" }
{ "timestamp": "2019-01-01 14:00:00", "customer": "customer-3", "page": "product", "product": "product-3" }
{ "timestamp": "2019-01-01 14:05:00", "customer": "customer-3", "page": "basket", "product": "product-3" }
{ "timestamp": "2019-01-01 14:10:00", "customer": "customer-3", "page": "product", "product": "product-4" }
{ "timestamp": "2019-01-01 14:16:00", "customer": "customer-3", "page": "product", "product": "product-5" }
{ "timestamp": "2019-01-01 14:20:00", "customer": "customer-3", "page": "basket", "product": "product-4" }
{ "timestamp": "2019-01-01 14:21:00", "customer": "customer-3", "page": "checkout" }
{ "timestamp": "2019-01-01 15:00:00", "customer": "customer-9", "page": "product", "product": "product-3" }
{ "timestamp": "2019-01-01 15:11:00", "customer": "customer-9", "page": "basket", "product": "product-3" }

The idea is be able to identify abandoned sessions, for example with more than 10 minutes idle. Transactions checkedout cannot be considered abandoned. The code to process is is below

import argparse
import json
import logging
import os

import arrow

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam import ParDo, Pipeline, Map, GroupByKey, Partition
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions



class JsonCoder:
    """A simple encoder/decoder for JsonNL format"""

    @staticmethod
    def encode(stream):
        return bytes(json.dumps(stream.as_dict()), 'utf8')

    @staticmethod
    def decode(stream):
        return json.loads(stream)


class PageViews:
    """Datastructure to handle, store and view PageViews entries"""

    def arrow_to_json(self):
        return self.timestamp.format('YYYY-MM-DD HH:mm:ss')

    def __init__(self, timestamp, customer, page, product=None):
        self.timestamp = arrow.get(timestamp)
        self.customer = customer
        self.page = page
        self.product = product

    def as_dict(self):
        json_dict = {'customer': self.customer,
                     'page': self.page,
                     'timestamp': self.arrow_to_json()}

        if self.product is not None:
            json_dict['product'] = self.product

        return json_dict

    def __str__(self):
        return f'{self.customer};{self.arrow_to_json()};{self.page};{self.product} '

    def __repr__(self):
        repr_fmt = 'PageViews(customer={}, timestamp={}, page={}, product={}'
        return repr_fmt.format(self.customer, self.arrow_to_json(), self.page, self.product)


class JsonToPageViews(beam.DoFn):
    """A simple processor to be used while converting Json entries to PageViews struct"""

    def process(self, element, **kwargs):
        # Necessary to "normalize" the Json readings
        if 'product' not in element:
            element['product'] = None

        pv = PageViews(timestamp=element['timestamp'],
                       customer=element['customer'],
                       page=element['page'],
                       product=element['product'])
        yield pv


class OnlyExpired(beam.DoFn):
    def process(self, element, **kwargs):
        customer, entries = element

        o_timestamp = entries[0].timestamp.timestamp
        checkout = False
        expired = False
        last_entry = None
        for num, entry in enumerate(entries[0:]):
            n_timestamp = entry.timestamp.timestamp
            d_timestamp = n_timestamp - o_timestamp
            o_timestamp = n_timestamp
            if d_timestamp > 60*10:
                expired = True

            if entry.page == 'checkout':
                checkout = True

            last_entry = entry

        if not checkout:
            expired = True

        if expired:
            yield last_entry


class GetTimestamp(beam.DoFn):
    """A Process just to check if timestamps where correctly assigned"""
    def process(self, element, timestamp=beam.DoFn.TimestampParam, **kwargs):
        yield '{} - {}'.format(timestamp.to_utc_datetime(), element)


def partition_fn(pageview, num_partitions=1):
    """Partition the customer based on customer_id"""
    customer_id = int(pageview.customer.split('-')[1])
    return customer_id % num_partitions


def run(argv=None, save_main_session=True):
    """Entry point for main code"""

    # Processing Command Line  Arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--input', help='Input files', default='input/page-views.json')
    parser.add_argument('--output', help='Output dir', default='output')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    # Create the pipeline
    p = Pipeline(options=pipeline_options)
    res = (p
           | 'Read JSON files' >> ReadFromText(known_args.input, coder=JsonCoder())
           | 'Convert JSON to Pageview' >> ParDo(JsonToPageViews())
           | 'Add Timestamps' >> Map(lambda entry: beam.window.TimestampedValue(entry, entry.timestamp.timestamp))
           | 'Create Group Key' >> Map(lambda entry: (entry.customer, entry))
           | GroupByKey()
           | 'Only Expired' >> ParDo(OnlyExpired())
           | 'Partition' >> Partition(partition_fn, 2)
           )

    res[0] | 'Save Part0' >> WriteToText(os.path.join(known_args.output, "abandoned-carts-part0"), file_name_suffix='.json', coder=JsonCoder())
    res[1] | 'Save Part1' >> WriteToText(os.path.join(known_args.output, 'abandoned-carts-part1'), file_name_suffix='.json', coder=JsonCoder())

    p.run()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.WARN)
    run()

I personally didn't like the way I have handled the idle sessions with a custom class OnlyExpired, since I need to save a session variable. I think is a bottleneck in case of streaming. How can I improve this code?

I also added a partition to split data when needed (in this case my customer name).

\$\endgroup\$
1
  • \$\begingroup\$ please, some moderator can create and assign the tag: apache-beam ? \$\endgroup\$ Commented Mar 8, 2020 at 9:58

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.