2
\$\begingroup\$

I wrote this preprocessor just to add a couple features to Python (pipes mostly.) Is there any way I could improve it?

import tokenize, io, types

PREFIX = """
# ITHON START
import sys as _INTERNAL_SYS
class _INTERNAL_LazyIterable:
    __slots__ = ('x','y')
    def __init__(self, x, y) -> None:
        self.x = iter(x)
        self.y = iter(y)
    def __iter__(self):
        return _INTERNAL_LazyIterator(self)
class _INTERNAL_LazyIterator:
    __slots__ = ('iterable',)
    def __init__(self, iterable) -> None:
        self.iterable = iterable
    def __next__(self):
        try:
            return next(self.iterable.x)
        except StopIteration:
            try:
                return next(self.iterable.y)
            except StopIteration:
                raise StopIteration
class _INTERNAL_LAZYMERGE:
    def __init__(self, val):
        self.val = val
    def __rrshift__(self, other):
        return _INTERNAL_LAZYMERGE(other)
    def __lshift__(self, other):
        return _INTERNAL_LazyIterable(self.val, other)
class _INTERNAL_LPIPE:
    def __init__(self, val):
        self.val = val
    def __rrshift__(self, other):
        return _INTERNAL_LPIPE(other)
    def __lshift__(self, other):
        return other(self.val)
class _INTERNAL_RPIPE:
    def __init__(self, action):
        self.action = action
    def __rrshift__(self, other):
        return _INTERNAL_RPIPE(other)
    def __lshift__(self, other):
        return self.action(other)

_INTERNAL_lpipe = _INTERNAL_LPIPE(None)
_INTERNAL_rpipe = _INTERNAL_RPIPE(None)

_INTERNAL_lazymerge = _INTERNAL_LAZYMERGE(None)

# ITHON END
"""





def translate(file: io.StringIO):
    patched_file = io.StringIO(PREFIX + file.read())
    skip_token = False
    tokens = list(tokenize.generate_tokens(patched_file.readline)) # Precalculate tokens
    print(tokens)
    for n, i in enumerate(tokens):
        type, name,_,_,_ = i
        try:
            next_type, next_name,_,_,_ = tokens[n + 1]
        except IndexError:
            next_type, next_name = (None, None)
        print(type, name)
        if skip_token:
            skip_token = False
            continue
        if type == tokenize.OP and next_type == tokenize.OP:
            # Most likely special operation
            if name == "|" and next_name == ">": # left pipe
                yield tokenize.OP, ">>"
                yield tokenize.NAME, "_INTERNAL_lpipe"
                yield tokenize.OP, "<<"
                skip_token = True
            elif name == "<" and next_name == "|": # right pipe
                yield tokenize.OP, ">>"
                yield tokenize.NAME, "_INTERNAL_rpipe"
                yield tokenize.OP, "<<"
                skip_token = True
            elif name == ":" and next_name == ":": # lazy merge
                yield tokenize.OP, ">>"
                yield tokenize.NAME, "_INTERNAL_lazymerge"
                yield tokenize.OP, "<<"
                skip_token = True
            else:
                yield type,name
        elif type == tokenize.OP:
            if name == "<>": # barry is flufl
                yield tokenize.OP, "!="
            else:
                yield type, name
        else:
            
            yield type,name

script = """
def x():
    print(1)
a = "Hello, World!"
b = print
a |> b

b <| a

for i in [1,2,3] :: [4,5,6] :: [7,8,9]:
    print(i)
if a <> b:
    print("Barry is FLUFL!")

"Hello, World!" |> print
"""
code = tokenize.untokenize(translate(io.StringIO(script)))
print(code)
exec(code)

Example code output:

# ITHON START
import sys as _INTERNAL_SYS 
class _INTERNAL_LazyIterable :
    __slots__ =('x','y')
    def __init__ (self ,x ,y )->None :
        self .x =iter (x )
        self .y =iter (y )
    def __iter__ (self ):
        return _INTERNAL_LazyIterator (self )
class _INTERNAL_LazyIterator :
    __slots__ =('iterable',)
    def __init__ (self ,iterable )->None :
        self .iterable =iterable 
    def __next__ (self ):
        try :
            return next (self .iterable .x )
        except StopIteration :
            try :
                return next (self .iterable .y )
            except StopIteration :
                raise StopIteration 
class _INTERNAL_LAZYMERGE :
    def __init__ (self ,val ):
        self .val =val 
    def __rrshift__ (self ,other ):
        return _INTERNAL_LAZYMERGE (other )
    def __lshift__ (self ,other ):
        return _INTERNAL_LazyIterable (self .val ,other )
class _INTERNAL_LPIPE :
    def __init__ (self ,val ):
        self .val =val 
    def __rrshift__ (self ,other ):
        return _INTERNAL_LPIPE (other )
    def __lshift__ (self ,other ):
        return other (self .val )
class _INTERNAL_RPIPE :
    def __init__ (self ,action ):
        self .action =action 
    def __rrshift__ (self ,other ):
        return _INTERNAL_RPIPE (other )
    def __lshift__ (self ,other ):
        return self .action (other )

_INTERNAL_lpipe =_INTERNAL_LPIPE (None )
_INTERNAL_rpipe =_INTERNAL_RPIPE (None )

_INTERNAL_lazymerge =_INTERNAL_LAZYMERGE (None )

# ITHON END

def x ():
    print (1 )
a ="Hello, World!"
b =print 
a >>_INTERNAL_lpipe <<b 

b >>_INTERNAL_rpipe <<a 

for i in [1 ,2 ,3 ]>>_INTERNAL_lazymerge <<[4 ,5 ,6 ]>>_INTERNAL_lazymerge <<[7 ,8 ,9 ]:
    print (i )
if a !=b :
    print ("Barry is FLUFL!")

"Hello, World!">>_INTERNAL_lpipe <<print
\$\endgroup\$
2
  • 1
    \$\begingroup\$ So this is a code generator (though I didn't find a tag for that, so I used the closest relevant one I could find). Can you please show some example output? \$\endgroup\$ Commented Sep 5, 2024 at 12:33
  • \$\begingroup\$ @Reinderien Just added some example output. It is very bad-looking, but that's what tokenize.untokenize outputs. \$\endgroup\$ Commented Sep 5, 2024 at 19:39

1 Answer 1

3
\$\begingroup\$

Unused

import sys as _INTERNAL_SYS is unused. You can delete this.

slots

Two of your classes define __slots__, where as three do not. You should probably add the __slots__ to the other classes.

Iterating the hard way

The _INTERNAL_LazyIterator class is entirely unnecessary. Instead, you can use yield from in the __iter__ method to create the nessessary iterator.

class _INTERNAL_LazyIterable:
    __slots__ = ('x','y')
    
    def __init__(self, x, y) -> None:
        self.x = iter(x)
        self.y = iter(y)
        
    def __iter__(self):
        yield from self.x
        yield from self.y

Redundant Code

This code is repeated over and over, every time you define a new operator.

class _INTERNAL_XXX:
    def __rrshift__(self, other):
        return _INTERNAL_XXX(other)

Moreover, you create a singleton for each class by passing in None for the "other" argument. This smells.

To reduce the duplicate code, you could have a common base class which defines the __rrshift__ operation and creates a new instance of the current class, but the None initialization still smells

Instead, you could have one class for your singleton tokens, and pass an action function into it. When lhs >> TOKEN is evaluated, it could return an second class containing the action function and "lhs" value. This second class would respond to (lhs >> TOKEN) << rhs

class _Token:
    __slots__ = ('action',)
    
    def __init__(self, action):
        self.action = action

    def __rrshift__(self, lhs):
        return _Operation(self.action, lhs)

class _Operation:
    __slots__ = ('action', 'lhs')

    def __init__(self, action, lhs):
        self.action = action
        self.lhs = lhs

    def __lshift__(self, rhs):
        return self.action(self.lhs, rhs)

Now, you just need to create the required tokens:

_INTERNAL_lazymerge = _Token(lambda lhs, rhs: _INTERNAL_LazyIterable(lhs, rhs))

No additional classes should be needed for the pipes. Just the tokens, and the appropriate lambda functions:

_INTERNAL_lpipe = _Token(lambda lhs, rhs: rhs(lhs))
_INTERNAL_rpipe = _Token(lambda lhs, rhs: lhs(rhs))

Precalculating Tokens

tokens = list(tokenize.generate_tokens(patched_file.readline))

tokenize.generate_tokens(...), as its name implies, is a generator. It doesn't token the entire file up front, and then return the tokens one at a time. It reads the file one line at a time, and emits the tokens as it encounters them. It does this for efficiency; it doesn't need to store millions of token in memory when processing a huge file.

list(...) undoes all those savings, by realizing all of the tokens and storing them all in one big list. You're doing this just so you can access "the next token" ... tokens[n + 1] along with the current token.

The itertools.pairwise(...) method will take an iterable a, b, c, d, e, ... and return (a, b), (b, c), (c, d), (d, e), .... In short, using this would convert your stream of tokens into a stream of pairs of tokens. No need to realize and store them all in a list!

The only issue with pairwise is, given n tokens, it will yield n-1 pairs of tokens. You need to append a dummy token to the list, in order to get n token pairs, with the last real token paired with a dummy. We can do this using itertools.chain

from itertools import chain, pairwise

...

def translate(file: io.StringIO):
    patched_file = io.StringIO(PREFIX + file.read())
    tokens = tokenize.generate_tokens(patched_file.readline)
    token_pairs = pairwise(chain(tokens, [(None,)*5]))

    skip_token = False
    for first_token, second_token in token_pairs:
        type, name, _, _, _ = first_token
        next_type, next_name, _, _, _ = second_token

        ...

LazyIterable: revisited

The itertools.chain function is doing effectively the same thing as our _INTERNAL_LazyIterable class is doing. We can remove that class and just use itertools.chain ... renamed to avoid any collisions with the given source.

from itertools import chain as _INTERNAL_LazyIterable

Revised code

(The _INTERNAL prefix was removed for brevity)

import tokenize, io

from itertools import chain, pairwise


PREFIX = """
# ITHON START
from itertools import chain as _LazyIterable

class _Token:
    __slots__ = ('action',)
    
    def __init__(self, action):
        self.action = action

    def __rrshift__(self, lhs):
        return _Operation(self.action, lhs)

class _Operation:
    __slots__ = ('action', 'lhs')

    def __init__(self, action, lhs):
        self.action = action
        self.lhs = lhs

    def __lshift__(self, rhs):
        return self.action(self.lhs, rhs)        

_LAZY_MERGE = _Token(lambda lhs, rhs: _LazyIterable(lhs, rhs))
_LPIPE = _Token(lambda lhs, rhs: rhs(lhs))
_RPIPE = _Token(lambda lhs, rhs: lhs(rhs))

# ITHON END
"""

def translate(file: io.StringIO):
    patched_file = io.StringIO(PREFIX + file.read())
    tokens = tokenize.generate_tokens(patched_file.readline)
    token_pairs = pairwise(chain(tokens, [(None,)*5]))

    skip_token = False
    for first, second in token_pairs:
        type, name, _, _, _ = first
        next_type, next_name, _, _, _ = second

        if skip_token:
            skip_token = False
            continue
        
        if type == tokenize.OP and next_type == tokenize.OP:
            # Most likely special operation
            if name == "|" and next_name == ">": # left pipe
                yield tokenize.OP, ">>"
                yield tokenize.NAME, "_LPIPE"
                yield tokenize.OP, "<<"
                skip_token = True
            elif name == "<" and next_name == "|": # right pipe
                yield tokenize.OP, ">>"
                yield tokenize.NAME, "_RPIPE"
                yield tokenize.OP, "<<"
                skip_token = True
            elif name == ":" and next_name == ":": # lazy merge
                yield tokenize.OP, ">>"
                yield tokenize.NAME, "_LAZY_MERGE"
                yield tokenize.OP, "<<"
                skip_token = True
            else:
                yield type,name
                
        elif type == tokenize.OP:
            if name == "<>":
                yield tokenize.OP, "!="
            else:
                yield type, name
                
        else:
            yield type, name

script = """
def x():
    print(1)

a = "Hello, World!"
b = print
a |> b

b <| a

for i in [1,2,3] :: [4,5,6] :: [7,8,9]:
    print(i)
if a <> b:
    print("Barry is FLUFL!")

"Hello, World!" |> print
"""

code = tokenize.untokenize(translate(io.StringIO(script)))
exec(code)
\$\endgroup\$
5
  • \$\begingroup\$ Thanks, but pairwise definitely won't work for me. I don't want to have a pipe operation split in half. \$\endgroup\$ Commented Sep 7, 2024 at 20:51
  • \$\begingroup\$ @Xandaaah I'm sorry, what do you mean? Your original code is processing the token stream in a a pairwise fashion. eg) tokens[n] & tokens[n+1], for n=0, then n=1, then n=2, and so on. The pairwise operation does exactly this, and my revised code produces the same output yours did. \$\endgroup\$ Commented Sep 9, 2024 at 0:00
  • \$\begingroup\$ I thought you meant that it would be n=0, then n=2, and like that. I'm going to add new operators that are longer anyway. \$\endgroup\$ Commented Sep 15, 2024 at 19:06
  • \$\begingroup\$ You certainly can add longer, complex operations; just not in lambda functions. Simply add def _long_complex_operation(lhs, rhs): ... and _LONG_COMPLEX_OPERATION = _Token(_long_complex_operation), then just add the appropriate syntax parsing in your translate() function. \$\endgroup\$ Commented Sep 17, 2024 at 22:19
  • \$\begingroup\$ I meant more than 2 characters, not complex as in does complex operations. \$\endgroup\$ Commented Sep 21, 2024 at 2:52

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.