0

I have a python generator that yields a sample (np.ndarray, size: batchsize x ndim) of a larger dataset (np.ndarray, size: ndata x ndim).

I coded a decorator that yields a processed output (dict) of the batching generator. Something like:

def decorator(func):
    def wrapper(*args, **kwargs):

        generator = generator_from_large_dataset()

        for kwargs['data'] in generator:
            yield func(*args, **kwargs)

    return wrapper
return decorator

The output of func() is a dict with nested dicts and a single nested list. Something like:

out = {'param0': 'somevalue',
       'param1': {'someotheparams': 'somevalue'},
       ...,
       'fromdata': [val1, val2, val3...]} # a list of len $batchsize

Next, I want to save out in the json format, appending the value of out['fromdata'] for each output of the generator. Ideally, I need to dump into the json file without reloading the last generator output because of memory issues.

I'm a newbie at generators and ijson or json_stream. How can I save the output?

7
  • 1
    " Ideally, I need to dump into the json file without reloading the last generator output because of memory issues." so, it must be JSON? Because there is going to be no easy way to write a JSON file iteratively (assuming you want a JSON array of arrays? ... You could use new-line delimited JSON format, where each newline is a valid JSON, that is very amenable to streaming. Commented Aug 20 at 23:17
  • What is return decorator for? Where are you returning it from? Commented Aug 21 at 0:17
  • did you consider using pickle instead? stackoverflow.com/a/11218504/31291070 Commented Aug 21 at 3:04
  • 2
    better create minimal working code which uses this decorator. And use some fake data, and show expected result for these data. Commented Aug 21 at 13:42
  • 1
    I think the decorator actually has nothing to do with the question. It's honestly best to remove it since it's just going to distract people - the question, IIUC, is how to write JSON to a file incrementally. You need to provide more details about exactly are the constraints you are dealing with. Commented Aug 21 at 17:37

1 Answer 1

0

The python library json-stream likely does exactly what you are looking for. The request is a little ambiguous how you expect the final product to look, and I believe all the decorator parts likely add complexity to this that may not be required to accomplish what you describe.

To stream your data to the file you simply need to rewrite your function that outputs its parts to return json_stream.streamable_list or json_stream.streamable_dict objects and then everything else works without much alteration.

Depending on what the line "appending the value of out['fromdata'] for each output of the generator" means, I have 3 solutions for you.

  • If you mean that literally, the json object is created, then subsequent out['fromdata'] is appended to the previous fromdata field, it creates a heterogenous list with some values and some lists of values, and that is shown by the function streamable_func_literal.
  • If you mean that each value of out['fromdata'] is appended to a list, it creates a list of lists with each of their values, and that is shown by the function streamable_func_adjusted_append.
  • If you mean that each value of out['fromdata'] is extended onto the first one's values, it creates a list with all the values from all fromdata in it, and that is shown by the function streamable_func_adjusted_extend.

The func function is a direct copy of the example json you provided. If the real data is very different from that, you may need to adjust the appropriate streamable_func_* to match it. All the logic here is taken almost directly from the json-stream documentation's examples on pypi.

import json
import json_stream

def func():
    for i in range(10):
        out = {
            'param0': 'somevalue',
            'param1': {'someotherparams': 'somevalue'},
            'fromdata': [0, 1, 2, i]
        }
        yield out

def streamable_func_literal():
    # Define a function to stream the 'fromdata` field
    def _fromdata_only():
        func_generator = func()
        for item_num, item in enumerate(func_generator):
            # For the first call, create a list with each value in it
            if item_num == 0:
                for value in item['fromdata']:
                    yield value
            # For each subsequent call, append the whole return value to the first one
            else:
                yield item['fromdata']
    # Retrieve only the first entry, subsequent calls will be handled by the subfunction
    entry = next(func())
    # Replace the list in the 'fromdata' block with the streaming version defined above
    entry['fromdata'] = json_stream.streamable_list(_fromdata_only())
    # Yield each component of the parent dict
    for key, value in entry.items():
        yield key, value

def streamable_func_adjusted_append():
    # Define a function to stream the 'fromdata` field
    def _fromdata_only():
        func_generator = func()
        for item in func_generator:
            # For each call, a list is returned creating a list of lists
            yield item['fromdata']
    # Retrieve only the first entry, subsequent calls will be handled by the subfunction
    entry = next(func())
    # Replace the list in the 'fromdata' block with the streaming version defined above
    entry['fromdata'] = json_stream.streamable_list(_fromdata_only())
    # Yield each component of the parent dict
    for key, value in entry.items():
        yield key, value

def streamable_func_adjusted_extend():
    # Define a function to stream the 'fromdata` field
    def _fromdata_only():
        func_generator = func()
        for item in func_generator:
            # For each call, the values are appended to the list, creating one large list with all values
            for value in item['fromdata']:
                yield value
    # Retrieve only the first entry, subsequent calls will be handled by the subfunction
    entry = next(func())
    # Replace the list in the 'fromdata' block with the streaming version defined above
    entry['fromdata'] = json_stream.streamable_list(_fromdata_only())
    # Yield each component of the parent dict
    for key, value in entry.items():
        yield key, value

for streamable_func in [
        streamable_func_literal,
        streamable_func_adjusted_append,
        streamable_func_adjusted_extend
]:
    data = json_stream.streamable_dict(streamable_func())
    print(f'{streamable_func.__name__:<35}', json.dumps(data))

Which produces the following console:

streamable_func_literal             {"param0": "somevalue", "param1": {"someotherparams": "somevalue"}, "fromdata": [0, 1, 2, 0, [0, 1, 2, 1], [0, 1, 2, 2], [0, 1, 2, 3], [0, 1, 2, 4], [0, 1, 2, 5], [0, 1, 2, 6], [0, 1, 2, 7], [0, 1, 2, 8], [0, 1, 2, 9]]}
streamable_func_adjusted_append     {"param0": "somevalue", "param1": {"someotherparams": "somevalue"}, "fromdata": [[0, 1, 2, 0], [0, 1, 2, 1], [0, 1, 2, 2], [0, 1, 2, 3], [0, 1, 2, 4], [0, 1, 2, 5], [0, 1, 2, 6], [0, 1, 2, 7], [0, 1, 2, 8], [0, 1, 2, 9]]}
streamable_func_adjusted_extend     {"param0": "somevalue", "param1": {"someotherparams": "somevalue"}, "fromdata": [0, 1, 2, 0, 0, 1, 2, 1, 0, 1, 2, 2, 0, 1, 2, 3, 0, 1, 2, 4, 0, 1, 2, 5, 0, 1, 2, 6, 0, 1, 2, 7, 0, 1, 2, 8, 0, 1, 2, 9]}

Let me know if you have any questions, or if the outputs don't quite match what you are expecting and I'll be glad to update it if I can.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.