skip to navigation
skip to content

Planet Python

Last update: April 26, 2025 04:42 AM UTC

April 26, 2025


Ed Crewe

Talk about Cloud Prices at PyConLT 2025


Introduction to Cloud Pricing

I am looking forward to speaking at PyConLT 2025
My talk is called Cutting the Price of Scraping Cloud Costs

Its been a while (12 years!) since my last Python conference EuroPython Florence 2012, when I spoke as a Django web developer, although I did give a Golang talk at Kubecon USA last year.

I work at EDB, the Postgres company, on our Postgres AI product. The cloud version of which runs across the main cloud providers, AWS, Azure and GCP.

The team I am in handles the identity management and billing components of the product. So whilst I am mainly a Golang micro-service developer, I have dipped my toe into Data Science, having rewritten our Cloud prices ETL using Python & Airflow. The subject of my talk in Lithuania.

Cloud pricing can be surprisingly complex ... and the price lists are not small.

The full price lists for the 3 CSPs together are almost 5 million prices - known as SKUs (Stock Keeping Unit prices)

csp x service x type x tier x region
3    x  200      x 50     x 3     x 50        = 4.5 million

csp = AWS, Azure and GCP

service = vms, k8s, network, load balancer, storage etc.

type = e.g. storage - general purpose E2, N1 ... accelerated A1, A2  multiplied by various property sizes

tier  = T-shirt size tiers of usage, ie more use = cheaper rate - small, medium, large

region = us-east-1, us-west-2, af-south-1, etc.

We need to gather all the latest service SKU that our Postgres AI may use and total them up as a cost estimate for when customers are selecting the various options for creating or adding to their installation.
Applying the additional pricing for our product and any private offer discounts for it, as part of this process.

Therefore we needed to build a data pipeline to gather the SKUs and keep them current.

Previously we used a 3rd party kubecost based provider's data, however our usage was not sufficient to justify for paying for this particular cloud service when its free usage expired.

Hence we needed to rewrite our cloud pricing data pipeline. This pipeline is in Apache Airflow but it could equally be in Dagster or any other data pipeline framework.

My talk deals with the wider points around cloud pricing, refactoring a data pipeline and pipeline framework options. But here I want to provide more detail on the data pipeline's Python code, its use of Embedded Postgres and Click, and the benefits for development and testing.  Some things I didn't have room for in the talk.


Outline of our use of Data Pipelines

Airflow, Dagster, etc. provide many tools for pipeline development.
Notably local development mode for running up the pipeline framework locally and doing test runs.
Including some reloading on edit, it can still be a long process, running up a pipeline and then executing the full set of steps, known as a directed acyclic graph, DAG.

One way to improve the DEVX is if the DAG step's code is encapsulated as much as possible per step.
Removing use of shared state where that is viable and allowing individual steps to be separately tested, rapidly, with fixture data. With fast stand up and tear down, of temporary embedded storage.

To avoid shared state persistence across the whole pipeline we use extract transform load (ETL) within each step, rather than across the whole pipeline. This enables functional running and testing of individual steps outside the pipeline.


The Scraper Class

We need a standard scraper class to fetch the cloud prices from each CSP so use an abstract base class.


from abc import ABC

class BaseScraper(ABC):

   """Abstract base class for Scrapers"""

   batch = 500

   conn = None

   unit_map = {"FAIL": ""}

   root_url = ""


   def map_units(self, entry, key):

       """To standardize naming of units between CSPs"""

       return self.unit_map.get(entry.get(key, "FAIL"), entry[key])


   def scrape_sku(self):

       """Scrapes prices from CSP bulk JSON API - uses CSP specific methods"""

       Pass


   def bulk_insert_rows(self, rows):

       """Bulk insert batches of rows - Note that Psycopg >= 3.1 uses pipeline mode"""

       query = """INSERT INTO api_price.infra_price VALUES

       (%(sku_id)s, %(cloud_provider)s, %(region)s, %(sku_name)s, %(end_usage_amount)s)"""

       with self.conn.cursor() as cur:

           cur.executemany(query, rows)


This has 3 common methods:

  1. mapping units to common ones across all CSP
  2. Top level scrape sku methods some CSP differences within sub methods called from it
  3. Bulk insert rows - the main concrete method used by all scrapers

To bulk insert 500 rows per query we use Psycopg 3 pipeline mode - so it can send batch updates again and again without waiting for response.

The database update against local embedded Postgres is faster than the time to scrape the remote web site SKUs.


The largest part of the Extract is done at this point. Rather than loading all 5 million SKU as we did with the kubecost data dump, to query out the 120 thousand for our product. Scraping the sources directly we only need to ingest those 120k SKU. Which saves handling 97.6% of the data!


So the resultant speed is sufficient although not as performant as pg_dump loading which uses COPY.


Unfortunately Python Psycopg is significantly slower when using cursor.copy and it mitigated against using zipped up Postgres dumps. Hence all the data artefact creation and loading simply uses the pg_dump utility wrapped as a Python shell command. 

There is no need to use Python here when there is the tried and tested C based pg_dump utility for it that ensures compatibility outside our pipeline. Later version pg_dump can always handle earlier Postgres dumps.


We don't need to retain a long history of artefacts, since it is public data and never needs to be reverted.

This allows us a low retention level, cleaning out most of the old dumps on creation of a new one. So any storage saving on compression is negligible.

Therefore we avoid pg_dump compression, since it can be significantly slower, especially if the data already contains compressed blobs. Plain SQL COPY also allows for data inspection if required - eg grep for a SKU, when debugging why a price may be missing.


Postgres Embedded wrapped with Go

Unlike MySQL, Postgres doesn't do in memory databases. The equivalent for temporary or test run database lifetime, is the embedded version of Postgres. Run from an auto-created temp folder of files. 
Python doesn’t have maintained wrapper for Embedded Postgres, sadly project https://github.com/Simulmedia/pyembedpg is abandoned 😢

Hence use the most up to date wrapper from Go. Running the Go binary via a Python shell command.
It still lags behind by a version of Postgres, so its on Postgres 16 rather than latest 17.
But for the purposes of embedded use that is irrelevant.

By using separate temporary Postgres per step we can save a dumped SQL artefact at the end of a step and need no data dependency between steps, meaning individual step retry in parallel, just works.
The performance of localhost dump to socket is also superior.
By processing everything in the same (if embedded) version of our final target database as the Cloud Price, Go micro-service, we remove any SQL compatibility issues and ensure full Postgresql functionality is available.

The final data artefacts will be loaded to a Postgres cluster price schema micro-service running on CloudNativePG

Use a Click wrapper with Tests

The click package provides all the functionality for our pipeline..

> pscraper -h

Usage: pscraper [OPTIONS] COMMAND [ARGS]...

   price-scraper: python web scraping of CSP prices for api-price

Options:

  -h, --help  Show this message and exit.


Commands:

  awsscrape     Scrape prices from AWS

  azurescrape  Scrape prices from Azure

  delold            Delete old blob storage files, default all over 12 weeks old are deleted

  gcpscrape     Scrape prices from GCP - set env GCP_BILLING_KEY

  pgdump        Dump postgres file and upload to cloud storage - set env STORAGE_KEY
                      > pscraper pgdump --port 5377 --file price.sql 

  pgembed      Run up local embeddedPG on a random port for tests

> pscraper pgembed

  pgload           Load schema to local embedded postgres for testing

> pscraper pgload --port 5377 --file price.sql


This caters for developing the step code entirely outside the pipeline for development and debug.
We can run pgembed to create a local db, pgload to add the price schema. Then run individual scrapes from a pipenv pip install -e version of the the price scraper package.


For unit testing we can create a mock response object for the data scrapers that returns different fixture payloads based on the query and monkeypatch it in. This allows us to functionally test the whole scrape and data artefact creation ETL cycle as unit functional tests.

Any issues with source data changes can be replicated via a fixture for regression tests.

class MockResponse:

"""Fake to return fixture value of requests.get() for testing scrape parsing"""

name = "Mock User"
payload = {}
content = ""
status_code = 200
url = "http://mock_url"

def __init__(self, payload={}, url="http://mock_url"):
self.url = url
self.payload = payload
self.content = str(payload)

def json(self):
return self.payload


def mock_aws_get(url, **kwargs):
    """Return the fixture JSON that matches the URL used"""
for key, fix in fixtures.items():
if key in url:
return MockResponse(payload=fix, url=url)
return MockResponse()

class TestAWSScrape(TestCase):
"""Tests for the 'pscraper awsscrape' command"""

def setUpClass():
"""Simple monkeypatch in mock handlers for all tests in the class"""
psycopg.connect = MockConn
requests.get = mock_aws_get
# confirm that requests is patched hence returns short fixture of JSON from the AWS URLs
result = requests.get("{}/AmazonS3/current/index.json".format(ROOT))
assert len(result.json().keys()) > 5 and len(result.content) < 2000

A simple DAG with Soda Data validation

The click commands for each DAG are imported at the top, one for the scrape and one for postgres embedded, the DAG just becomes a wrapper to run them, adding Soda data validation of the scraped data ...

def scrape_azure():
   """Scrape Azure via API public json web pages"""
   from price_scraper.commands import azurescrape, pgembed
   folder, port = setup_pg_db(PORT)
   error = azurescrape.run_azure_scrape(port, HOST)
   if not error:
       error = csp_dump(port, "azure")
   if error:
       pgembed.teardown_pg_embed(folder) 
       notify_slack("azure", error)
       raise AirflowFailException(error)
  
   data_test = SodaScanOperator(
       dag=dag,
       task_id="data_test",
       data_sources=[
           {
               "data_source_name": "embedpg",
               "soda_config_path": "price-scraper/soda/configuration_azure.yml",
           }
       ],
       soda_cl_path="price-scraper/soda/price_azure_checks.yml",
   )
   data_test.execute(dict())
   pgembed.teardown_pg_embed(folder)
 


We setup a new Embedded Postgres (takes a few seconds) and then scrape directly to it.


We then use the SodaScanOperator to check the data we have scraped, if there is no error we dump to blob storage otherwise notify Slack with the error and raise it ending the DAG

Our Soda tests check that the number of and prices are in the ranges that they should be for each service. We also check we have the amount of tiered rates that we expect. We expect over 10 starting usage rates and over 3000 specific tiered prices.

If the Soda tests pass, we dump to cloud storage and teardown temporary Postgres. A final step aggregates together each steps data. We save the money and maintenance of running a persistent database cluster in the cloud for our pipeline.


April 26, 2025 04:37 AM UTC

April 25, 2025


Test and Code

The role of AI in software testing - Anthony Shaw

AI is helping people write code.  
Tests are one of those things that some people don't like to write.   

Can AI play a role in creating automated software tests?  
Well, yes. But it's a nuanced yes.  

Anthony Shaw comes on the show to discuss the topic and try to get AI to write some test for my very own cards project.

We discuss:

Links:


Sponsored by: 

Learn pytest: 


★ Support this podcast on Patreon ��� <p>AI is helping people write code.  <br>Tests are one of those things that some people don't like to write.   </p><p>Can AI play a role in creating automated software tests?  <br>Well, yes. But it's a nuanced yes.  </p><p>Anthony Shaw comes on the show to discuss the topic and try to get AI to write some test for my very own cards project.</p><p>We discuss:</p><ul><li>The promise of AI writing your tests for you</li><li>Downsides to not writing tests yourself</li><li>Bad ways to generate tests</li><li>Good ways to ask AI for help in writing tests</li><li>Tricks to get better results while using copilot and other AI tools</li></ul><p>Links:</p><ul><li><a href="https://github.com/okken/cards">The cards project</a></li><li>A video version of this discussion: <a href="https://www.youtube.com/watch?v=a_V-BH_luJ4">Should AI write tests?</a></li></ul> <br><p><strong>Sponsored by: </strong></p><ul><li><a href="https://porkbun.com/TestAndCode25"><strong>Porkbun</strong></a><strong> -- </strong>named the #1 domain registrar by USA Today from 2023 to 2025!</li><li><a href="https://porkbun.com/TestAndCode25">Get a .app or.dev domain name for only $5.99 first year.</a></li></ul><p><strong>Learn pytest: </strong></p><ul><li><a href="https://file+.vscode-resource.vscode-cdn.net/Users/brianokken/projects/test_and_code_notes/new_ad.md">The Complete pytest course</a> is now a bundle, with each part available separately.<ul><li><a href="https://courses.pythontest.com/pytest-primary-power">pytest Primary Power</a> teaches the super powers of pytest that you need to learn to use pytest effectively.</li><li><a href="https://courses.pythontest.com/using-pytest-with-projects">Using pytest with Projects</a> has lots of "when you need it" sections like debugging failed tests, mocking, testing strategy, and CI</li><li>Then <a href="https://courses.pythontest.com/pytest-booster-rockets">pytest Booster Rockets</a> can help with advanced parametrization and building plugins.</li></ul></li><li>Whether you need to get started with pytest today, or want to power up your pytest skills, <a href="https://courses.pythontest.com/">PythonTest</a> has a course for you.</li></ul><p><br></p> <strong> <a href="https://www.patreon.com/c/testpodcast" rel="payment" title="★ Support this podcast on Patreon ★">★ Support this podcast on Patreon ★</a> </strong>

April 25, 2025 05:41 PM UTC


Everyday Superpowers

Event Sourcing: Reactivity Without the React Overhead

This is the second entry in a five-part series about event sourcing:

  1. Why I Finally Embraced Event Sourcing—And Why You Should Too
  2. What is event sourcing and why you should care
  3. Preventing painful coupling
  4. Reactivity Without the React Overhead (this page)
  5. Get started with event sourcing today

In this post, I’ll share some things I’ve enjoyed about event sourcing since adopting it.

I'll start by saying that one of the ideas I love about some of today’s JavaScript frameworks is that the HTML of the page is a functional result of the data on the page. If that data changes, the JS framework will automatically change the affected HTML.

This feature enables some impressive user experiences, and it’s especially fun to see an edit in one part of the page immediately affect another.

I’m finding this helpful pattern to remember as I’ve been working with event sourcing. Any event can trigger updates to any number of database tables, caches, and UIs, and it's fun to see that reactivity server-side.

Like React, but for the server

React is (mostly) a front-end framework. It's concern is to update the in-browser HTML after data changes.

In a way, you can say event-driven microservices are similar. One part of the system publishes an event without knowing who will listen, and other parts kick off their process with the data coming in from the event.

One of the things that has caught me by surprise about event sourcing is how I get similar benefits of an event-driven microservice architecture in a monolith, while keeping complexity low.

At one time, the project I'm working on was a microservice architecture with six different Python applications. With a vertically sliced, event-sourced architecture, we could make it one.[one]{It's currently two, since the architect feels better that way, but it can easily be one.}

This project processes files through several stages. It's so fun to see this application work. Like an event-driven microservice, the command creates an event when a document enters the system.

However, instead of going to an external PubSub queue, this event gets saved to the event store and then to an internal message bus. The code that created the event doesn't know or care who's listening, and any number of functions can listen in.

In this case, several functions listen for the document created event. One kicks off the first step of the processing. Another makes a new entry for the status page. A third creates an entry in a table for a slice that helps us keep track of our SLAs.

Once the first step finishes, another event is raised. In turn, a few other functions are run. One updates the status info, and another begins the next processing step.

If something went wrong with the first step, we'll save a different event. In reaction to this event, we have a function that updates the status screen, another that adds info to an admin screen to help diagnose what went wrong, a third that notifies an external team that consumes the result of this workflow, and a fourth that will determine whether to retry the process.

Keeping the complexity in check

This sounds incredibly complicated, and in some ways it is. There are a lot of small moving parts. But they're all visible either through looking at the event modeling diagram or leveraging the IDE to see where a type of event is used.

This is similar to having an event-driven microservice, but it all lives in a decoupled monolith[dcmono]{Decoupled monolith?! Who would have guessed those words would be used together?} and is easily deployable.

The most painful part of creating this app has been debugging issues that span the interactivity between the two services. Adding additional services dramatically increases complexity.

This is not to say that you shouldn't use microservices. I love the idea of implementing slices in different languages to better meet specific slices' needs. But having most of the code in one code base and deploy target is nice.

I'm also thrilled that complexity doesn't grow as the project ages. Because of the decoupled nature of the vertical slices, adding new functionality will not make the code much more complicated. Each slice is isolated, and there are only a few patterns to master.

When it's time to start working on a new piece of functionality, I'll create its folder and examine where my data comes from. Do I need to subscribe to events or pull from a read model? Then, I check to see what events my slice needs to publish. Once those are in place, it's all about implementing the business logic.

Rinse and repeat.

But part of an excellent service is a great user experience, and I love how this reactivity is not just limited to the back end.

Bringing it to the browser

I value a great user experience, so early in the project, I looked for when a live-updating view would greatly benefit the user.

The first one I did was the status view I discussed in previous posts. When a document enters our system, it appears in the table like this:

html
Document ID Status Last Updated Duration
1542-example-94834 0% done 5 seconds ago 5 seconds
alignment
normal

When one step as been finished, the UI looks like this:

html
Document ID Status Last Updated Duration
1542-example-94834 25% done 0 seconds ago 10 seconds
alignment
normal

The way I implemented this is to have a function that subscribes to the events that would change the UI and update a database table. Something like this:

StatusEvent = typing.Union[
  DocumentCreated, 
  Step1Finished, 
  Step1Failed, 
  ...
]

def on_status_updates(event: StatusEvent):
	if isinstance(event, DocumentCreated):
		...
	elif isinstance(event, Step1Finished):
	    db.document(event.document_id).update({
		    ‘percent_done’: 25,
		    ‘last_updated’: event.stored_at,
	    })
	    ...

This project uses Google’s Firestore as its primary database, and it has a feature allowing you to subscribe to database changes.[firebase]{Believe it or not, I'm not using the internal bus to update the UI. That'll wait until the next project.}

When a user loads this page, we use HTMX to open a server-sent events connection to code that subscribes to changes in the status database. Something like this[this]{Complexity was removed to improve understandability. I'm working on making this aspect more understandable for a future blog post.}:

def on_database_update(changes):
    now = datetime.now(tz=UTC)
    template = templates.template('document_status_row.jinja')
    return HTTPStream(
        template.render_async(
            context=(document=changes, last_updated=now)
        )
    )

With that, any time an entry in the database changes, an updated table row gets sent to the browser as HTML and HTMX either updates an existing row or inserts the new one into the table.[cav]{This isn't unique to HTMX. Frameworks like data-star, unpoly, and fixi can do the same} All this without setting up a JavaScript build pipeline or WebSockets infrastructure.

Reactivity, user experience, and history too

One final aspect of event sourcing I've enjoyed through this project is the ability to decide what to do based on an item's history.

I mentioned above that an external team wants to be notified about specific conditions.

When I was tasked to implement this, the person giving it to me felt a little sorry, as they suspected this had complexity hiding below the surface.

After talking with the external team, they wanted up to two notifications for every document: one notification if that document completed every step, or one notification if the document failed any step twice.

I handled the first case similarly to this:

def document_just_completed_all_steps(event_names: list[DomainEvent]) -> bool:
    return (
        event_names.count('Step4Finished') == 1 and
        event_names[-1].name == 'Step4Finished'
    )

def should_notify(event: DomainEvent, container: svcs.Container) -> bool:
    event_store = container.get(EventStore)
    event_names = [
        event.name for event 
        in event_store.get_event_stream(event.entity_id)
    ]
    if document_just_completed_all_steps(event_names):
        return True
    return did_document_fail_retry_for_the_first_time(event_names)

Thankfully, with event sourcing and the excellent svcs framework[hynek]{Thanks, Hynek!}, I have access to every event that happened to that document.

I used that list of events to ensure that there was only one instance of the final event, and that it was the last event in the sequence.

Next up

If this sounds like magic, it’s not. It’s just good design and a new way of thinking about change. In the next post, I’ll show you exactly how to dip your toe into event sourcing.


Read more...

April 25, 2025 04:38 PM UTC


Awesome Python Applications

DollarDollar Bill Y'all

DollarDollar Bill Y'all: Self-hosted money management and expense splitting web service.

Links:

April 25, 2025 10:35 AM UTC

Beaver Habits

Beaver Habits: Self-hosted habit tracking app without "Goals".

Links:

April 25, 2025 10:31 AM UTC

aider

aider: Terminal-based AI pair programming assistant.

Links:

April 25, 2025 10:31 AM UTC


Python GUIs

Building a Currency Converter Application using Tkinter — Convert between currencies with ease

In this tutorial, you'll create a currency converter application with Python and Tkinter. The app will allow the users to select the source currency, choose the target currency, and input the amount to convert. The application will use real-time exchange rates to convert from the source to target currency, providing accurate and up-to-date conversions.

FIXME: Add the demo video here... Currency converter demo

Through this project you'll gain hands-on experience working with Tkinter's GUI elements, handling user input, and interacting with an external API. By the end of the tutorial, you'll have a functional currency converter app and have learnt practical skills you can apply to your own Python & Tkinter projects.

Setting Up the Environment

We'll start by setting up a working environment for the project. We are using Tkinter for this project, which is included by default in most Python installations, so we don't need to install that.

If you're on Linux you may need to install it. See the Linux Tkinter installation instructions.

We'll also be using the requests library to make HTTP requests to the Exchange Rate API which we can install from PyPi using pip. We also want to create a folder to hold our project files including our Python script and images.

Below are the instructions to create a folder for our project called currency_converter, set up a virtual environment, activate it and install requests into that environment.

sh
$ mkdir currency_converter/
$ cd currency_converter
$ python -m venv venv
$ source venv/bin/activate
(venv)$ pip install requests
cmd
> mkdir currency_converter/
> cd currency_converter
> python -m venv venv
> venv\Scripts\activate.bat
(venv)> pip install requests
sh
$ mkdir currency_converter/
$ cd currency_converter
$ python -m venv venv
$ source venv/bin/activate
(venv)$ pip install requests

We will use the free Exchange Rate API to access real-time exchange rate data. It offers various endpoints that allow users to retrieve exchange rate information for different currency pairs, convert currency amounts from one currency to another, and perform other related operations. You'll have to sign up on the API page to be able to run the app.

Setting Up the Project Structure

Now that we've created the virtual environment and installed the required third-party libraries, we can set up the project structure.

Add a folder named images to the root of your project.

The images/ subfolder is where we will place the app's logo. Our application code will go in the root folder, named currency_converter.py.

python
currency_converter/
&boxv
&boxvr&boxh&boxh images/
&boxv   &boxur&boxh&boxh logo.png
&boxv
&boxur&boxh&boxh currency_converter.py

Getting Started with our Application

Now we have the project folder setup and requirements installed, we can start building our app. Create a new file called currency_converter.py at the root of the project folder and open this in your editor.

We'll start by adding the imports we need for our project, and building a basic window which will hold our application UI.

python
import os
import sys
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import messagebox

import requests

class CurrencyConverterApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.geometry("500x450+300+150")
        self.title("Currency Converter")
        self.resizable(width=0, height=0)

if __name__ == "__main__":
    app = CurrencyConverterApp()
    app.mainloop()

In the above code, we import the os and sys modules from the Python standard library. Then, we import the tkinter package as tk. This shorthand is typically used with Tkinter to save repeatedly typing the full name. We also import the tkinter.ttk package which gives us access to Tkinter's themed widgets, which looker nicer than the defaults. We also import Tkinter's messagebox module for creating pop up dialogs. Finally, we import the requests library to make HTTP requests to the API so that we can get up-to-date exchange rates and convert currencies.

We start by creating the application's root window, which in Tkinter also works as the application container. To do this we create a class called CurrencyConverterApp, which inherits from the tk.Tk class. On this class we add a custom __init__() method, which is called to initialize the object, where we set up the window's attributes.

To set the window's width, height, and position, we use the geometry() method. For the window's title, we use title(). Finally, we use resizable() with the width and height set to 0 to make the window unresizable.

With the main window class created, we then add the code to instantiate the class -- creating an instance of the CurrencyConverterApp -- and start up the main loop for the application. The main loop is the event loop which handles user input events from the keyboard and mouse and passes them to the widgets.

When you run this code, you see the following window on your screen:

Currency converter's main window Currency converter's main window

There's not much to see for now: our currency converter doesn't have a functional GUI. It's just a plain window with an appropriate title and size.

Now let's create the app's GUI.

Creating the Currency Converter's GUI

To create the app's GUI, let's begin adding the widgets to the main window. We will add the following widgets:

First lets add the logo to our application.

We'll be including small snippets of the code as we build it. But the full code is shown below, to make sure you don't get mixed up.

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def build_gui(self):
        self.logo = tk.PhotoImage(file="images/logo.png")
        tk.Label(self, image=self.logo).pack()

In this code snippet, we first create build_gui() method to define all the widgets we need in our GUI. Inside the method, we load an image using the tk.PhotoImage() class.

Then, we create a label to display the image using the ttk.Label() class, which takes self and image as arguments. Finally, we position the label in the main window using the pack() method, which is a geometry manager in Tkinter.

To use the build_gui() method, we need to call it. To do this, add the call to self.build_gui() to the end of the __init__() method. That gives us the following code.

python
import os
import sys
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import messagebox

import requests

class CurrencyConverterApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.geometry("500x450+300+150")
        self.title("Currency Converter")
        self.resizable(width=0, height=0)

    def build_gui(self):
        self.logo = tk.PhotoImage(file="images/logo.png")
        tk.Label(self, image=self.logo).pack()
        self.build_gui()


if __name__ == "__main__":
    app = CurrencyConverterApp()
    app.mainloop()

Go ahead and run the app. You'll get an output like the following:

Currency converter window with logo Currency converter window with logo

Now we can see something! We'll continue adding widgets to build up our UI.First, we will create a frame to position the widgets. Below the logo code we added in to the build_gui() method, add a frame:

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def build_gui(self):
        self.logo = tk.PhotoImage(file="images/logo.png")
        tk.Label(self, image=self.logo).pack()
        frame = tk.Frame(self)
        frame.pack()

Here, we create a frame using the tk.Frame class. It takes self as an argument because the current window is the frame's parent. To position the frame inside the main window, we use the pack() method.

With the frame in place, we can add some more widgets. Below is the code for populating the frame:

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def build_gui(self):
        # ...

        from_label = ttk.Label(frame, text="From:")
        from_label.grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)

        to_label = ttk.Label(frame, text="To:")
        to_label.grid(row=0, column=1, padx=5, pady=5, sticky=tk.W)

        self.from_combo = ttk.Combobox(frame)
        self.from_combo.grid(row=1, column=0, padx=5, pady=5)

        self.to_combo = ttk.Combobox(frame)
        self.to_combo.grid(row=1, column=1, padx=5, pady=5)

In this code, we create two labels using the ttk.Label() class. We position them using the grid() method. Then, we create the two combo boxes using the ttk.Combobox() class, and position them both in the frame using the grid() method again.

Note that we've positioned the labels in the first row of the frame while the combo boxes are in the second row. The GUI now will look something like this:

Currency converter's GUI Currency converter's GUI

Great! Your app's GUI now has the widgets for selecting the source and target currencies. Let's now add the last four widgets. Below is the code for this:

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def build_gui(self):
        # ...

        amount_label = ttk.Label(frame, text="Amount:")
        amount_label.grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)

        self.amount_entry = ttk.Entry(frame)
        self.amount_entry.insert(0, "1.00")
        self.amount_entry.grid(
            row=3, column=0, columnspan=2, padx=5, pady=5, sticky=tk.W + tk.E
        )

        self.result_label = ttk.Label(font=("Arial", 20, "bold"))
        self.result_label.pack()

        convert_button = ttk.Button(self, text="Convert", width=20)
        convert_button.pack()

In this code snippet, we add two labels using the ttk.Label() class as usual. Then, we create the entry field using the ttk.Entry() class. Next, we add the Convert button using the ttk.Button() class. All these widgets must go inside the frame object.

Note that we've positioned the amount_label and the amount_entry using the grid() method. In contrast, we've used the pack() method to place the result_label and convert_button.

The complete current code is shown below.

python
import os
import sys
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import messagebox

import requests

class CurrencyConverterApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.geometry("500x450+300+150")
        self.title("Currency Converter")
        self.resizable(width=0, height=0)
        self.build_gui()

    def build_gui(self):
        self.logo = tk.PhotoImage(file="images/logo.png")
        tk.Label(self, image=self.logo).pack()
        frame = tk.Frame(self)
        frame.pack()

        from_label = ttk.Label(frame, text="From:")
        from_label.grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)

        to_label = ttk.Label(frame, text="To:")
        to_label.grid(row=0, column=1, padx=5, pady=5, sticky=tk.W)

        self.from_combo = ttk.Combobox(frame)
        self.from_combo.grid(row=1, column=0, padx=5, pady=5)

        self.to_combo = ttk.Combobox(frame)
        self.to_combo.grid(row=1, column=1, padx=5, pady=5)

        amount_label = ttk.Label(frame, text="Amount:")
        amount_label.grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)

        self.amount_entry = ttk.Entry(frame)
        self.amount_entry.insert(0, "1.00")
        self.amount_entry.grid(
            row=3, column=0, columnspan=2, padx=5, pady=5, sticky=tk.W + tk.E
        )

        self.result_label = ttk.Label(font=("Arial", 20, "bold"))
        self.result_label.pack()

        convert_button = ttk.Button(self, text="Convert", width=20)
        convert_button.pack()


if __name__ == "__main__":
    app = CurrencyConverterApp()
    app.mainloop()

Run this and you'll see the following window.

Currency converter's GUI Currency converter's GUI

This is looking good now. The app's GUI is functionally complete. Even though you can't see the label showing the conversion result, this label is there and will be visible once we add something to it.

Implementing the Convert Currency Functionality

As mentioned, we will be using the Exchange Rate API to get our live currency data. To request data from the the API we need an API key. You can get one by signing up and creating an account. This is free if you only need daily rates (fine for our app).

Exchange Rate API Sign-up Page Exchange Rate API Sign-up Page

If you accept the terms, you will receive your API key via the email address you provided, or you can go to the dashboard, where you will see your API key as follows:

Exchange Rate API Key Exchange Rate API Key

Now that we have the API key, let's implement the currency conversion functionality. First, we will add the API key as an environment variable.

On Windows, open the terminal as an administrator and run the command below. Note that you must replace the "your_api_key" part with the actual API key:

sh
> setx API_KEY "your_api_key"
sh
PS> setx API_KEY "your_api_key"
sh
$ export API_KEY="your_api_key"
sh
$ export API_KEY="your_api_key"

Now, get back to your code editor. Below the imports, paste the following code:

python
# ...
import requests

API_KEY = os.getenv("API_KEY")
if API_KEY is None:
    messagebox.showerror(
        "API Key Error", "API_KEY environment variable is not set."
    )
    sys.exit(1)

API_URL = f"https://v6.exchangerate-api.com/v6/{API_KEY}/"

# ...

Here, we retrieve the API key from the environment variable (that we just set) using the os.getenv() function. Using an if statement, we check whether the key was set. If not, we issue an error message and terminate the app's execution using the sys.exit() function. Then, we set up the URL to get the latest currencies.

Run the code now -- in the same shell where you set the environment variable. If you see the error dialog then you know that the environment variable has not been set -- check the instructions again, and make sure the variable is set correctly in the shell where you are running the code. If you see the application as normal, then everything is good!

The error shown when API_KEY is not set correctly in the environment The error shown when API_KEY is not set correctly in the environment.

Interacting with the API

Now we have the API_KEY set up correctly we move on to interacting with the API itself. To do this we will create a method for getting all the currencies. Let's call it get_currencies(). Below the build_gui() method, add this new method:

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def build_gui(self):
        # ...

    def get_currencies(self):
        response = requests.get(f"{API_URL}/latest/USD")
        data = response.json()
        return list(data["conversion_rates"])

The method above sends a GET request to the given URL. We convert the received JSON response to Python objects using the json() method. Finally, we convert the conversion rates that come in the response to a Python list.

We can populate the two combo boxes using the get_currencies() method. Add the following code to the bottom of the build_gui method.

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def build_gui(self):
        # ...

        currencies = self.get_currencies()

        self.from_combo["values"] = currencies
        self.from_combo.current(0)

        self.to_combo["values"] = currencies
        self.to_combo.current(0)

This calls the get_currencies method to get the available currencies, and then populates the two combo boxes with the returned list.

If you run the application now, you'll see that the combo-boxes now contain the currencies returned from the API. Note that we're setting the default item to USD, which is the first currency in the list.

Populating the *From* Currency Combo Box Populating the From currency combo box

Populating the *To* Currency Combo Box Populating the To currency combo box

Handling the Currency Conversion

The final step is to implement the actual currency conversion, using the values returned from the API. To do this we will create a method to handle this. Add new method called convert() to the bottom of our CurrencyConverterApp class.

python
# ...

class CurrencyConverterApp(tk.Tk):
    # ...

    def convert(self):
        src = self.from_combo.get()
        dest = self.to_combo.get()
        amount = self.amount_entry.get()
        response = requests.get(f"{API_URL}/pair/{src}/{dest}/{amount}").json()
        result = response["conversion_result"]
        self.result_label.config(text=f"{amount} {src} = {result} {dest}")

In the first three lines, we get input data from the from_combo and to_combo combo boxes and the amount_entry field using the get() method of each widget. The from_combo combo box data is named src, the to_combo combo box data is named dest, and the amount_entry field data is named amount.

To get the conversion between currencies, we make a GET request to the API using a URL constructed using the input data. The result returned from the API is again in JSON format, which we convert to a Python dictionary by calling .json(). We take the "conversion_result" from the response and use this to update the result label with the conversion result.

The final step is to hook our convert() method up to a button so we can trigger it. To do this, we will add the command argument to the button's definition. The value for this argument will be assigned the convert method object without the parentheses.

Here's how the button code will look after the update:

python
convert_button = ttk.Button(
            self,
            text="Convert",
            width=20,
            command=self.convert,
        )

This code binds the button to the convert() method. Now, when you click the Convert button, this method will run.

That's it! With these final touches, your currency converter application is complete. The full final code is shown below.

python
import os
import sys
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import messagebox

import requests

API_KEY = os.getenv("API_KEY")
if API_KEY is None:
    messagebox.showerror("API Key Error", "API_KEY environment variable is not set.")
    sys.exit(1)

API_URL = f"https://v6.exchangerate-api.com/v6/{API_KEY}/"


class CurrencyConverterApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.geometry("500x450+300+150")
        self.title("Currency Converter")
        self.resizable(width=0, height=0)
        self.build_gui()

    def build_gui(self):
        self.logo = tk.PhotoImage(file="images/logo.png")
        tk.Label(self, image=self.logo).pack()
        frame = tk.Frame(self)
        frame.pack()

        from_label = ttk.Label(frame, text="From:")
        from_label.grid(row=0, column=0, padx=5, pady=5, sticky=tk.W)

        to_label = ttk.Label(frame, text="To:")
        to_label.grid(row=0, column=1, padx=5, pady=5, sticky=tk.W)

        self.from_combo = ttk.Combobox(frame)
        self.from_combo.grid(row=1, column=0, padx=5, pady=5)

        self.to_combo = ttk.Combobox(frame)
        self.to_combo.grid(row=1, column=1, padx=5, pady=5)

        amount_label = ttk.Label(frame, text="Amount:")
        amount_label.grid(row=2, column=0, padx=5, pady=5, sticky=tk.W)

        self.amount_entry = ttk.Entry(frame)
        self.amount_entry.insert(0, "1.00")
        self.amount_entry.grid(
            row=3, column=0, columnspan=2, padx=5, pady=5, sticky=tk.W + tk.E
        )

        self.result_label = ttk.Label(font=("Arial", 20, "bold"))
        self.result_label.pack()

        convert_button = ttk.Button(
            self, text="Convert", width=20, command=self.convert
        )
        convert_button.pack()

        currencies = self.get_currencies()

        self.from_combo["values"] = currencies
        self.from_combo.current(0)

        self.to_combo["values"] = currencies
        self.to_combo.current(0)

    def get_currencies(self):
        response = requests.get(f"{API_URL}/latest/USD")
        data = response.json()
        return list(data["conversion_rates"])

    def convert(self):
        src = self.from_combo.get()
        dest = self.to_combo.get()
        amount = self.amount_entry.get()
        response = requests.get(f"{API_URL}/pair/{src}/{dest}/{amount}").json()
        result = response["conversion_result"]
        self.result_label.config(text=f"{amount} {src} = {result} {dest}")


if __name__ == "__main__":
    app = CurrencyConverterApp()
    app.mainloop()

Run the final code and you will be able to convert amounts between any of the supported currencies. For example, select USD and EUR in the from and to combo boxes and enter a conversion amount of 100. The application will call the API and update the label with the result of the conversion, like follows:

Running currency converter app Running currency converter app

Conclusion

Well done! You've built a functional currency converter application with Python & Tkinter. You've learnt the basics of building up a UI using Tkinter's widgets and layouts, how to use APIs to fill widgets with values and perform operations in response to user input.

If you want to take it further, think about some ways that you could improve the usability or extend the functionality of this application:

See if you can add these yourself! If you want to go further with Tkinter, take a look at our complete TKinter tutorial.

April 25, 2025 06:00 AM UTC

April 24, 2025


Everyday Superpowers

Supercharge Your Enums: Cleaner Code with Hidden Features

I see a lot of articles suggesting you use enums by mostly restating the Python documentation. Unfortunately, I feel this leaves readers without crutial practical advice, which I'd like to pass on here.

This is especially true since most of the projects I've worked on, and developers I've coded with, don't seem to know this, leading to more complex code, including values and behavior that are tightly coupled in the business concepts scattered about in separate code files.

First, let's review enum fundamentals:

Enums in a nutshell

If you're not aware of enums, they were added to Python in version 3.4 and represent a way to communicate, among other things, a reduced set of options.

For example, you can communicate the status of tasks:

from enum import Enum

class TaskStatus(Enum):  
    PENDING = 'pending'  
    IN_PROGRESS = 'in_progress'  
    COMPLETED = 'completed'  
    CANCELLED = 'cancelled'

The four lines of code beneath the `TaskStatus` class definition define the enum "members," or the specific options for this class.

This code communicates the fact that there are only four statuses a task can have.

This is very useful information, as it clearly shows the options available, but it is as deep as many developers go with enums. They don't realize how much more enums can do besides holding constant values.

For example, many don't know how easy it can be to select an enum member.

Enums can select themselves

I see a lot of code that complicates selecting enum members, like this:

def change_task_status(task_id: str, status: str):  
    task = database.get_task_by_id(task_id)  
    for member in TaskStatus:  
        if member.value == status:  
            task.status = member  
            database.update_task(task)

Instead, enum classes are smart enough to select members from their values (the things on the right side of the equal sign)[left]{You can also select members by their names (the left side of the equal sign) with square brackets, `TaskStatus['PENDING']`.}:

>>> TaskStatus('pending')
<TaskStatus.PENDING: 'pending'>

This means that you could simplify the code above like this[missing]{Be aware that if the status string does not match one of the enum member's values, it'll raise a `ValueError`.}:

def change_task_status(task_id: str, status: str):  
    task = database.get_task_by_id(task_id)  
    task.status = TaskStatus(status)  
    database.update_task(task)

But enums are not just static values. They can have behavior and data associated with them too.

Enum members are objects too

The thing about enums that many people are missing is that they are objects too.

For example, I recently worked on a project that would have had this after the `TaskStatus` class to connect a description to each enum member:

STATUS_TO_DESCRIPTION_MAP = {  
    TaskStatus.PENDING: "Task is pending",  
    TaskStatus.IN_PROGRESS: "Task is in progress",  
    TaskStatus.COMPLETED: "Task is completed",  
    TaskStatus.CANCELLED: "Task is cancelled"  
}

But here's the thing, you can add it in the enum!

Granted, it takes a little bit of work, but here's how I would do it[init]{If we weren't using the enum's value to select a member, we could make this simpler by editing the `__init__` method instead, like they do in the docs.}:

class TaskStatus(Enum):  
    PENDING = "pending", 'Task is pending'  
    IN_PROGRESS = "in_progress", 'Task is in progress'  
    COMPLETED = "completed", 'Task is completed'  
    CANCELLED = "cancelled", 'Task is cancelled' 
 
    def __new__(cls, value, description):  
        obj = object.__new__(cls)  
        obj._value_ = value  
        obj.description = description  
        return obj

This means that whenever a `TaskStatus` member is created, it keeps its original value but also adds a new attribute, description.

This means that a `TaskStatus` member would behave like this:

>>> completed = TaskStatus.COMPLETED
>>> completed.value
'completed'
>>> completed.description
'Task is completed'
>>> completed.name
'COMPLETED'

On top of that, you can define methods that interact with the enum members.

Let's add what the business expects would be the next status for each member:

class TaskStatus(Enum):  
    PENDING = "pending", "Task is pending"  
    IN_PROGRESS = "in_progress", "Task is in progress"  
    COMPLETED = "completed", "Task is completed"  
    CANCELLED = "cancelled", "Task is cancelled"  
  
    def __new__(cls, value, description):  
        ...
  
    @property  
    def expected_next_status(self):  
        if self == TaskStatus.PENDING:  
            return TaskStatus.IN_PROGRESS  
        elif self == TaskStatus.IN_PROGRESS:  
            return TaskStatus.COMPLETED  
        else:  # Task is completed or cancelled  
            return self

Now, each `TaskStatus` member "knows" what status is expected to be next:

>>> TaskStatus.PENDING.expected_next_status
<TaskStatus.IN_PROGRESS: 'in_progress'>
>>> TaskStatus.CANCELLED.expected_next_status
<TaskStatus.CANCELLED: 'cancelled'>

You could use this in a task detail view:

def task_details(task_id: str):  
    task = database.get_task_by_id(task_id)  
    return {  
        "id": task.id,  
        "title": task.title,  
        "status": task.status.value,  
        "expected_next_status": task.status.expected_next_status.value,  
    }

>>> task_details("task_id_123")
{
  'id': 'task_id_123', 
  'title': 'Sample Task', 
  'status': 'pending', 
  'expected_next_status': 'in_progress'
}
In conclusion

Python enums are more powerful than most developers realize, and I hope you might remember these great options.


Read more...

April 24, 2025 07:38 PM UTC


Python Software Foundation

2025 PSF Board Election Schedule Change

Starting this year, the PSF Board Election will be held a couple of months later in the year than in years prior. The nomination period through the end of the vote will run around the August to September time frame. This is due to several factors:

A detailed election schedule will be published in June.

Consider running for the PSF Board!

In the meantime, we hope that folks in the Python community consider running for a seat on the PSF Board! Wondering who runs for the Board? People who care about the Python community, who want to see it flourish and grow, and also have a few hours a month to attend regular meetings, serve on committees, participate in conversations, and promote the Python community.

Check out our Life as Python Software Foundation Director video to learn more about what being a part of the PSF Board entails. You can also check out our FAQ’s with the PSF Board video on the PSF YouTube Channel. If you’re headed to PyCon US 2025 next month, that’s a great time to connect with current and past Board Members. We also invite you to review our Annual Impact Report for 2023 to learn more about the PSF’s mission and what we do. Last but not least, we welcome you to join the PSF Board Office Hours to connect with Board members about being a part of the PSF Board!

April 24, 2025 03:21 PM UTC


Everyday Superpowers

Finding the root of a project with pathlib

Every now and then I'm writing code deep in some Python project, and I realize that it would be nice to generate a file at the root of a project.

The following is the way I'm currently finding the root folder with pathlib:

if __name__ == '__main__':
    from pathlib import Path
    project_root = next(
        p for p in Path(__file__).parents
        if (p / '.git').exists()
    )
    project_root.joinpath('output.text').write_text(...)

To explain what this does, let's start on line 4:

  • `Path(__file__)` will create an absolute path to the file this Python code is in.
  • `.parents` is a generator that yields the parent folders of this path, so if this path is `Path('/Users/example/projects/money_generator/src/cash/services/models.py')`, it will generate:
    • `Path('/Users/example/projects/money_generator/src/cash/services')` then
    • `Path('/Users/example/projects/money_generator/src/cash')`
    • and so on
  • `next()` given an iterable, it will retrieve the next item in it[next]{It can also return a default value if the iterable runs out of items.[docs]}
  • `p for p in Path(__file__).parents` is a comprehension that will yield each parent of the current file.
  • `(p / '.git').exists()` given a path `p`, it will look to see if the git folder (`.git`) exists in that folder
  • So, `next(p for p in Path(__file__).parents if (p / '.git').exists())` will return the first folder that contains a git repo in the parents of the current Python file.

How do you accomplish this task?


Read more...

April 24, 2025 01:38 PM UTC


Zato Blog

Integrating with Jira APIs

Integrating with Jira APIs

Overview

Continuing in the series of articles about newest cloud connections in Zato 3.3, this episode covers Atlassian Jira from the perspective of invoking its APIs to build integrations between Jira and other systems.

There are essentially two use modes of integrations with Jira:

  1. Jira reacts to events taking place in your projects and invokes your endpoints accordingly via WebHooks. In this case, it is Jira that explicitly establishes connections with and sends requests to your APIs.
  2. Jira projects are queried periodically or as a consequence of events triggered by Jira using means other than WebHooks.

The first case is usually more straightforward to conceptualize - you create a WebHook in Jira, point it to your endpoint and Jira invokes it when a situation of interest arises, e.g. a new ticket is opened or updated. I will talk about this variant of integrations with Jira in a future instalment as the current one is about the other situation, when it is your systems that establish connections with Jira.

The reason why it is more practical to first speak about the second form is that, even if WebHooks are somewhat easier to reason about, they do come with their own ramifications.

To start off, assuming that you use the cloud-based version of Jira (e.g. https://example.atlassian.net), you need to have a publicly available endpoint for Jira to invoke through WebHooks. Very often, this is undesirable because the systems that you need to integrate with may be internal ones, never meant to be exposed to public networks.

Secondly, your endpoints need to have a TLS certificate signed by a public Certificate Authority and they need to be accessible on port 443. Again, both of these are something that most enterprise systems will not allow at all or it may take months or years to process such a change internally across the various corporate departments involved.

Lastly, even if a WebHook can be used, it is not always a given that the initial information that you receive in the request from a WebHook will already contain everything that you need in your particular integration service. Thus, you will still need a way to issue requests to Jira to look up details of a particular object, such as tickets, in this way reducing WebHooks to the role of initial triggers of an interaction with Jira, e.g. a WebHook invokes your endpoint, you have a ticket ID on input and then you invoke Jira back anyway to obtain all the details that you actually need in your business integration.

The end situation is that, although WebHooks are a useful concept that I will write about in a future article, they may very well not be sufficient for many integration use cases. That is why I start with integration methods that are alternative to WebHooks.

Alternatives to WebHooks

If, in our case, we cannot use WebHooks then what next? Two good approaches are:

  1. Scheduled jobs
  2. Reacting to emails (via IMAP)

Scheduled jobs will let you periodically inquire with Jira about the changes that you have not processed yet. For instance, with a job definition as below:

Now, the service configured for this job will be invoked once per minute to carry out any integration works required. For instance, it can get a list of tickets since the last time it ran, process each of them as required in your business context and update a database with information about what has been just done - the database can be based on Redis, MongoDB, SQL or anything else.

Integrations built around scheduled jobs make most sense when you need to make periodic sweeps across a large swaths of business data, these are the "Give me everything that changed in the last period" kind of interactions when you do not know precisely how much data you are going to receive.

In the specific case of Jira tickets, though, an interesting alternative may be to combine scheduled jobs with IMAP connections:

The idea here is that when new tickets are opened, or when updates are made to existing ones, Jira will send out notifications to specific email addresses and we can take advantage of it.

For instance, you can tell Jira to CC or BCC an address such as zato@example.com. Now, Zato will still run a scheduled job but instead of connecting with Jira directly, that job will look up unread emails for it inbox ("UNSEEN" per the relevant RFC).

Anything that is unread must be new since the last iteration which means that we can process each such email from the inbox, in this way guaranteeing that we process only the latest updates, dispensing with the need for our own database of tickets already processed. We can extract the ticket ID or other details from the email, look up its details in Jira and the continue as needed.

All the details of how to work with IMAP emails are provided in the documentation but it would boil down to this:

# -*- coding: utf-8 -*-

# Zato
from zato.server.service import Service

class MyService(Service):

    def handle(self):
        conn = self.email.imap.get('My Jira Inbox').conn

        for msg_id, msg in conn.get():

            # Process the message here ..
            process_message(msg.data)

            # .. and mark it as seen in IMAP.
            msg.mark_seen()

The natural question is - how would the "process_message" function extract details of a ticket from an email?

There are several ways:

  1. Each email has a subject of a fixed form - "[JIRA] (ABC-123) Here goes description". In this case, ABC-123 is the ticket ID.
  2. Each email will contain a summary, such as the one below, which can also be parsed:
         Summary: Here goes description
             Key: ABC-123
             URL: https://example.atlassian.net/browse/ABC-123
         Project: My Project
      Issue Type: Improvement
Affects Versions: 1.3.17
     Environment: Production
        Reporter: Reporter Name
        Assignee: Assignee Name
  1. Finally, each email will have an "X-Atl-Mail-Meta" header with interesting metadata that can also be parsed and extracted:
X-Atl-Mail-Meta: user_id="123456:12d80508-dcd0-42a2-a2cd-c07f230030e5",
                 event_type="Issue Created",
                 tenant="https://example.atlassian.net"

The first option is the most straightforward and likely the most convenient one - simply parse out the ticket ID and call Jira with that ID on input for all the other information about the ticket. How to do it exactly is presented in the next chapter.

Regardless of how we parse the emails, the important part is that we know that we invoke Jira only when there are new or updated tickets - otherwise there would not have been any new emails to process. Moreover, because it is our side that invokes Jira, we do not expose our internal system to the public network directly.

However, from the perspective of the overall security architecture, email is still part of the attack surface so we need to make sure that we read and parse emails with that in view. In other words, regardless of whether it is Jira invoking us or our reading emails from Jira, all the usual security precautions regarding API integrations and accepting input from external resources, all that still holds and needs to be part of the design of the integration workflow.

Creating Jira connections

The above presented the ways in which we can arrive at the step of when we invoke Jira and now we are ready to actually do it.

As with other types of connections, Jira connections are created in Zato Dashboard, as below. Note that you use the email address of a user on whose behalf you connect to Jira but the only other credential is that user's API token previously generated in Jira, not the user's password.

Invoking Jira

With a Jira connection in place, we can now create a Python API service. In this case, we accept a ticket ID on input (called "a key" in Jira) and we return a few details about the ticket to our caller.

This is the kind of a service that could be invoked from a service that is triggered by a scheduled job. That is, we would separate the tasks, one service would be responsible for opening IMAP inboxes and parsing emails and the one below would be responsible for communication with Jira.

Thanks to this loose coupling, we make everything much more reusable - that the services can be changed independently is but one part and the more important side is that, with such separation, both of them can be reused by future services as well, without tying them rigidly to this one integration alone.

# -*- coding: utf-8 -*-

# stdlib
from dataclasses import dataclass

# Zato
from zato.common.typing_ import cast_, dictnone
from zato.server.service import Model, Service

# ###########################################################################

if 0:
    from zato.server.connection.jira_ import JiraClient

# ###########################################################################

@dataclass(init=False)
class GetTicketDetailsRequest(Model):
    key: str

@dataclass(init=False)
class GetTicketDetailsResponse(Model):
    assigned_to: str = ''
    progress_info: dictnone = None

# ###########################################################################

class GetTicketDetails(Service):

    class SimpleIO:
        input  = GetTicketDetailsRequest
        output = GetTicketDetailsResponse

    def handle(self):

        # This is our input data
        input = self.request.input # type: GetTicketDetailsRequest

        # .. create a reference to our connection definition ..
        jira = self.cloud.jira['My Jira Connection']

        # .. obtain a client to Jira ..
        with jira.conn.client() as client:

            # Cast to enable code completion
            client = cast_('JiraClient', client)

            # Get details of a ticket (issue) from Jira
            ticket = client.get_issue(input.key)

        # Observe that ticket may be None (e.g. invalid key), hence this 'if' guard ..
        if ticket:

            # .. build a shortcut reference to all the fields in the ticket ..
            fields = ticket['fields']

            # .. build our response object ..
            response = GetTicketDetailsResponse()
            response.assigned_to = fields['assignee']['emailAddress']
            response.progress_info = fields['progress']

            # .. and return the response to our caller.
            self.response.payload = response

# ###########################################################################

Creating a REST channel and testing it

The last remaining part is a REST channel to invoke our service through. We will provide the ticket ID (key) on input and the service will reply with what was found in Jira for that ticket.

We are now ready for the final step - we invoke the channel, which invokes the service which communicates with Jira, transforming the response from Jira to the output that we need:

$ curl localhost:17010/jira1 -d '{"key":"ABC-123"}'
{
    "assigned_to":"zato@example.com",
    "progress_info": {
        "progress": 10,
        "total": 30
    }
}
$

And this is everything for today - just remember that this is just one way of integrating with Jira. The other one, using WebHooks, is something that I will go into in one of the future articles.

More resources

➤ Python API integration tutorials
What is an integration platform?
Python Integration platform as a Service (iPaaS)
What is an Enterprise Service Bus (ESB)? What is SOA?
Open-source iPaaS in Python

April 24, 2025 08:00 AM UTC

April 23, 2025


DataWars.io

Replit Teams for Education Deprecation: All you need to know | DataWars

April 23, 2025 07:41 PM UTC


Real Python

Getting Started With Python IDLE

Python IDLE is the default integrated development environment (IDE) that comes bundled with every Python installation, helping you to start coding right out of the box. In this tutorial, you’ll explore how to interact with Python directly in IDLE, edit and execute Python files, and even customize the environment to suit your preferences.

By the end of this tutorial, you’ll understand that:

  • Python IDLE is completely free and comes packaged with the Python language itself.
  • Python IDLE is an IDE included with Python installations, designed for basic editing, execution, and debugging of Python code.
  • You open IDLE through your system’s application launcher or terminal, depending on your operating system.
  • You can customize IDLE to make it a useful tool for writing Python.

Understanding the basics of Python IDLE will allow you to write, test, and debug Python programs without installing any additional software.

Get Your Cheat Sheet: Click here to download your free cheat sheet that will help you find the best coding font when starting with IDLE.

Open Python’s IDLE for the First Time

Python IDLE is free and comes included in Python installations on Windows and macOS. If you’re a Linux user, then you should be able to find and download Python IDLE using your package manager. Once you’ve installed it, you can then open Python IDLE and use it as an interactive interpreter or as a file editor.

Note: IDLE stands for “Integrated Development and Learning Environment.” It’s a wordplay with IDE, which stands for Integrated Development Environment.

The procedure for opening IDLE depends on how you installed Python and varies from one operating system to another. Select your operating system below and follow the steps to open IDLE:

Open the Start menu and click All Programs or All Apps. There should be a program icon labeled IDLE (Python 3.x). This will vary slightly between different versions of Windows. The IDLE icon may be in a program group folder named Python 3.x.

You can also find the IDLE program icon by using the Windows search from the Start menu and typing in IDLE. Click on the icon to start the program.

IDLE is available with the Python distribution but may not have been installed by default. To find out whether it’s installed, open a terminal window.

In the terminal window, type idle3 and press Enter. If you get an error telling you that the command wasn’t found, then IDLE isn’t installed and you’ll need to install it.

The method for installing apps varies from one Linux distribution to the next. For example, with Ubuntu Linux, you can install IDLE using the package manager apt:

Shell
$ sudo apt install idle3
Copied!

Many Linux distributions have GUI-based application managers that you can use to install apps as well.

Follow whatever procedure is appropriate for your distribution to install IDLE. Then, type idle3 in a terminal window and press Enter to run it. Your installation procedure may have also set up a program icon on the desktop that you can alternatively click to start IDLE.

Open Spotlight Search and search for IDLE. Alternatively, you can open a terminal window, type idle3 and press Enter.

Once you’ve started IDLE successfully, you should see a window titled IDLE Shell 3.x.x, where 3.x.x corresponds to your version of Python:

Interactive Python interpreter session in IDLE 3.13

The window that you’re seeing is the IDLE shell, which is an interactive interpreter that IDLE opens by default.

Get to Know the Python IDLE Shell

When you open IDLE, the shell is the first thing that you see. The shell is the default mode of operation for Python IDLE. It’s a blank Python interpreter window, which you can use to interact with Python immediately.

Understanding the Interactive Interpreter

The interactive interpreter is a basic Read-Eval-Print Loop (REPL). It reads a Python statement, evaluates the result of that statement, and then prints the result on the screen. Then, it loops back to read the next statement.

Note: For a full guide to the standard Python REPL, check out The Python Standard REPL: Try Out Code and Ideas Quickly.

The IDLE shell is an excellent place to experiment with small code snippets and test short lines of code.

Interacting With the IDLE Shell

When you launch Python’s IDLE, it will immediately start a Python shell for you. Go ahead and write some Python code in the shell:

Hello World program shown in the IDLE python interpreter

Here, you used print() to output the string "Hello, from IDLE!" to your screen. This is the most basic way to interact with Python IDLE. You type in commands one at a time and Python responds with the result of each command.

Next, take a look at the menu bar. You’ll see a few options for using the shell:

Read the full article at https://realpython.com/python-idle/ »


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 23, 2025 02:00 PM UTC

April 22, 2025


PyCoder’s Weekly

Issue #678: Namespaces, __init__, Sets, and More (April 22, 2025)

#678 – APRIL 22, 2025
View in Browser »

The PyCoder’s Weekly Logo


Namespaces in Python

In this tutorial, you’ll learn about Python namespaces, the structures that store and organize the symbolic names during the execution of a Python program. You’ll learn when namespaces are created, how they’re implemented, and how they support variable scope.
REAL PYTHON

Quiz: Namespaces in Python

REAL PYTHON

Stop Writing __init__ Methods

Glyph recommends using dataclasses in order to avoid the use of __init__. This post shows you how and what additional work you need to do for constructor side effects.
GLYPH LEFKOWITZ

AI Agent Code Walkthrough with Python & Temporal

alt

Join us on May 2nd at 9am PST/12pm EST for a deep dive into using Temporal’s Agentic AI use cases. We’ll begin with a live demo demonstrating how Temporal lets you recover from unexpected issues before transitioning to a live walkthrough with our Solution Architects →
TEMPORAL sponsor

Practical Uses of Sets

Sets are unordered collections of values that are great for removing duplicates, quick containment checks, and set operations.
TREY HUNNER

PEP 770: Improving Measurability of Python Packages With Software Bill-of-Materials (Accepted)

PYTHON.ORG

PEP 736: Shorthand Syntax for Keyword Arguments at Invocation (Rejected)

PYTHON.ORG

PEP 661: Sentinel Values (Deferred)

PYTHON.ORG

Pydantic v2.11 Released

PYDANTIC.DEV

Quiz: Python’s Instance, Class, and Static Methods Demystified

REAL PYTHON

Articles & Tutorials

Elliptical Python Programming

This fun little article shows how certain combinations of punctuation in Python can evaluate to integers, and as a result allow you to create some rather obfuscated code. See also this associated article that breaks down exactly how it all works.
SUSAM PAL

How to Exit Loops Early With the Python Break Keyword

In this tutorial, you’ll explore various ways to use Python’s break statement to exit a loop early. Through practical examples, such as a student test score analysis tool and a number-guessing game, you’ll see how the break statement can improve the efficiency and effectiveness of your code.
REAL PYTHON

YOLO11 for Real-Time Object Detection

Ready-to-deploy, state of the art, open source computer vision apps? Sign me up!
INTEL CORPORATION sponsor

Creating a Python Dice Roll Application

In this step-by-step video course, you’ll build a dice-rolling simulator app with a minimal text-based user interface using Python. The app will simulate the rolling of up to six dice. Each individual die will have six sides.
REAL PYTHON course

Django Simple Deploy and Other DevOps Things

Talk Python interviews Eric Matthes, educator, author, and developer behind Django Simple Deploy. If you’ve ever struggled with taking that final step of getting your Django app onto a live server this tool might be for you.
KENNEDY & MATTHES podcast

Mastering DuckDB: Part 2

This is the second part in a post on how to use DuckDB when you’re used to pandas or Polars. It covers how to translate DataFrame operations into not-so-obvious SQL ones.
QUANSIGHT.ORG • Shared by Marco Gorelli

Process​Thread​Pool​Executor: When I‍/‍O Becomes CPU-bound

Learn how to combine thread and process management into a single executor class. Includes details on when you may be I/O vs CPU bound and what to do about it.
LEMON24

14 Advanced Python Features

Edward has collected a series of Python tricks and patterns that he has used over the years. They include typing overloads, generics, protocols, and more.
EDWARD LI

Python Is an Interpreted Language With a Compiler

Ever wonder where about the distinction between compiled and interpreted languages? Python straddles the boundaries, and this article explains just that.
NICOLE TIETZ-SOKOLSKAYA

Marimo: Reactive Notebooks for Python

Marimo is a new alternative to Jupyter notebooks. Talk Python interviews Akshay Agrawal and they talk all about this latest data science tool
KENNEDY, AGRAWAL

Background Tasks in Django Admin With Celery

This tutorial looks at how to run background tasks directly from Django admin using Celery.
TESTDRIVEN.IO • Shared by Michael Herman

Quiz: How to Exit Loops Early With the Python Break Keyword

REAL PYTHON

Projects & Code

VTK: Open Source 3D Graphics and Visualization

Visualization Toolkit
KITWARE.COM

django-action-triggers: Actions in Response to DB Changes

GITHUB.COM/SALAAH01

pytest.nvim: Neovim Plugin for Python Testing

GITHUB.COM/RICHARDHAPB

nbdime: Tools for Diffing and Merging Jupyter Notebooks

GITHUB.COM/JUPYTER

maestro-cli: CLI to Play Songs

GITHUB.COM/PRAJWALVANDANA

Events

Weekly Real Python Office Hours Q&A (Virtual)

April 23, 2025
REALPYTHON.COM

PyCon DE & PyData 2025

April 23 to April 26, 2025
PYCON.DE

DjangoCon Europe 2025

April 23 to April 28, 2025
DJANGOCON.EU

PyCon Lithuania 2025

April 23 to April 26, 2025
PYCON.LT

Django Girls Ho, 2025

April 25, 2025
DJANGOGIRLS.ORG

Django Girls Ho, 2025

April 26, 2025
DJANGOGIRLS.ORG


Happy Pythoning!
This was PyCoder’s Weekly Issue #678.
View in Browser »

alt

[ Subscribe to 🐍 PyCoder’s Weekly 💌 – Get the best Python news, articles, and tutorials delivered to your inbox once a week >> Click here to learn more ]

April 22, 2025 07:30 PM UTC


EuroPython Society

Call for EuroPython 2026 Host Venues

Are you a community builder dreaming of bringing EuroPython to your city? The Call for Venues for EuroPython 2026 is now open! 🎉

EuroPython is the longest-running volunteer-led Python conference in the world, uniting communities across Europe. It’s a place to learn, share, connect, spark new ideas—and have fun along the way.

We aim to keep the conference welcoming and accessible by choosing venues that are affordable, easy to reach, and sustainable. As with the selection process in previous years, we’d love your help in finding the best location for future editions.

If you&aposd like to propose a location on behalf of your community, please fill out this form:

👉 https://forms.gle/ZGQA7WhTW4gc53MD6

Even if 2026 isn’t the right time, don’t hesitate to get in touch. We&aposd also like to hear from communities interested in hosting EuroPython in 2027 or later.

Questions, suggestions, or comments? Drop us a line at board@europython.eu—we’ll get back to you!

EuroPython Society Board

April 22, 2025 02:48 PM UTC


Real Python

MySQL Databases and Python

MySQL is one of the most popular database management systems (DBMSs) on the market today. It ranked second only to the Oracle DBMS in this year’s DB-Engines Ranking. As most software applications need to interact with data in some form, programming languages like Python provide tools for storing and accessing these data sources.

Using the techniques discussed in this video course, you’ll be able to efficiently integrate a MySQL database with a Python application. You’ll develop a small MySQL database for a movie rating system and learn how to query it directly from your Python code.

By the end of this video course, you’ll be able to:


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 22, 2025 02:00 PM UTC


Python Software Foundation

PSF Grants Program 2024 Transparency Report

The PSF’s Grants Program is a key plank in our charitable mission to promote, protect, and advance the Python programming language and to support and facilitate the growth of a diverse and international community of Python programmers. After much research, input, and analysis, the PSF is pleased to share the PSF Grants Program 2024 Transparency Report. The report includes context, numbers, analysis, and next steps for the Program.

Similar to our PSF Grants Program 2022 & 2023 Transparency Report, this 2024 report reflects the outcome of a significant amount of work. There are some differences in the position we are in as we approached the development of this report:

The data from 2024 was truly wonderful to see (many WOW’s and 🥳🥳 were shared among PSF Staff)– and we are so happy to share it with the community. The PSF is also excited to share that we achieved many of the goals we listed in the 2022 & 2023 report and believe this is a reflection of the focused work we undertook in 2024. Even with these wins, the PSF recognizes multiple opportunities to continue to improve the Program. We are also aware that our recent sustainability-focused changes to the Program will likely result in the need for additional improvements and adjustments.

The PSF continues to feel it is important to acknowledge that individual Pythonistas, regional communities, and the broader community are behind these statistics and commentaries. This report reflects the outcome of thousands of hours of efforts over 2024, 2023, and 2022 by the Grants Workgroup, the PSF Board, Python organizers, and PSF Staff worldwide. The PSF truly values this opportunity to share information on the success and challenges of our Grants Program, all while honoring the hard work of everyone involved in making the program and our mission possible.

This report was compiled by PSF Staff and reviewed by the PSF Board and Grants Workgroup. If you have questions, comments, or feedback about the Grants Program or this report, please email grants@python.org or attend a session of the Grants Program Office Hours on the PSF Discord.

 

Setting Context

The PSF has been working to improve our Grants Program since receiving a call from our community to address concerns and frustrations in December 2023. Our 2022 & 2023 report listed the actions the PSF has taken since we first received the call, and we are happy to share that since publishing our last report, the PSF Board, Grants Workgroup, and Staff have:


For a high-level idea of the scope of our Grants Program, the PSF is happy to share:


A couple of additional notes:


Again, the growth the PSF Grants Program has seen from 2022 to 2024 (and over the years) is exciting to reflect on because of what it means– you, the community, are wonderfully thriving and active! The PSF can’t wait to see what 2025 has in store for our Grants Program and the Python community.  


The numbers (in graph form)

Thanks to Tania Allard for helping improve the readability and accessibility of the graphs provided in this report.

 

Our Analysis 

General Trends, Observations, & Notes

Number of Total Grant Applications by Continent

Percentage of Grants Approved

Percentage of Grant Applications by Continent

Approved & Declined Grant Applications by Continent

Dollar Amount Granted by Continent & Percentage of Money Granted by Continent

Average Amount Granted by Continent

Grant Decision Times in Weeks by Number & Percentage of Applications

Grants Program Average Days to Decide by Continent

 

Next steps and a final note

As the PSF reflects on all the successes of the Grants Program in 2024, we are preparing ourselves for even more adjustments to come with the updated Program and Grants Workgroup Charter. Some of our goals for 2025 include:


The PSF hopes this transparency report will help our community understand the state of our Grants Program in 2024, and the previous two years. Again, the process has been instructive to the Board, the Grants Workgroup, and the PSF Staff who administer the Program to understand where our efforts paid off, and where we can continue to improve. This report will inform our future efforts as we continue to make adjustments and improvements to the program. The PSF looks forward to continuing to serve the Python community with grants in 2025!

If you have any questions, comments, or feedback, please email grants@python.org. We also welcome you to attend a session of the Grants Program Office Hours on the PSF Discord (the next session is Tuesday, May 20th, at 9 AM Eastern, 1 PM UTC!). 

April 22, 2025 08:48 AM UTC


Wingware

Wing Python IDE 11 Beta 2 - April 22, 2025

Wing 11 beta2 is now available. It introduces support for Claude, Grok, Gemini, Perplexity, Mistral, Deepseek, Ollama, and other OpenAI API compatible AI providers.

Wing 11 is a new major release of the Wingware Python IDE, with improved AI assisted development, support for the uv package manager, improved Python code analysis, improved custom key binding assignment user interface, improved diff/merge, a new preference to auto-save files when Wing loses the application focus, updated German, French and Russian localizations (partly using AI), a new experimental AI-driven Spanish localization, and other bug fixes and minor improvements.

You can access early access releases simply by downloading them. We ask only that you keep your feedback and bug reports private by submitting them through Wing's Help menu or by emailing us at support@wingware.com.

Wing 11 Screen Shot

Downloads

IMPORTANT Be sure to Check for Updates from Wing's Help menu after installing so that you have the latest hot fixes.

Wing Pro 11.0.0.3

Wing Personal 11.0.0.3

Wing 101 11.0.0.3

Wing 10 and earlier versions are not affected by installation of Wing 11 and may be installed and used independently. However, project files for Wing 10 and earlier are converted when opened by Wing 11 and should be saved under a new name, since Wing 11 projects cannot be opened by older versions of Wing.

New in Wing 11

Improved AI Assisted Development

Wing 11 improves the user interface for AI assisted development by introducing two separate tools AI Coder and AI Chat. AI Coder can be used to write, redesign, or extend code in the current editor. AI Chat can be used to ask about code or iterate in creating a design or new code without directly modifying the code in an editor.

Wing 11's AI assisted development features now support not just OpenAI but also Claude, Grok, Gemini, Perplexity, Mistral, Deepseek, and any other OpenAI completions API compatible AI provider.

This release also improves setting up AI request context, so that both automatically and manually selected and described context items may be paired with an AI request. AI request contexts can now be stored, optionally so they are shared by all projects, and may be used independently with different AI features.

AI requests can now also be stored in the current project or shared with all projects, and Wing comes preconfigured with a set of commonly used requests. In addition to changing code in the current editor, stored requests may create a new untitled file or run instead in AI Chat. Wing 11 also introduces options for changing code within an editor, including replacing code, commenting out code, or starting a diff/merge session to either accept or reject changes.

Wing 11 also supports using AI to generate commit messages based on the changes being committed to a revision control system.

You can now also configure multiple AI providers for easier access to different models.

For details see AI Assisted Development under Wing Manual in Wing 11's Help menu.

Package Management with uv

Wing Pro 11 adds support for the uv package manager in the New Project dialog and the Packages tool.

For details see Project Manager > Creating Projects > Creating Python Environments and Package Manager > Package Management with uv under Wing Manual in Wing 11's Help menu.

Improved Python Code Analysis

Wing 11 improves code analysis of literals such as dicts and sets, parametrized type aliases, typing.Self, type variables on the def or class line that declares them, generic classes with [...], and __all__ in *.pyi files.

Updated Localizations

Wing 11 updates the German, French, and Russian localizations, and introduces a new experimental AI-generated Spanish localization. The Spanish localization and the new AI-generated strings in the French and Russian localizations may be accessed with the new User Interface > Include AI Translated Strings preference.

Improved diff/merge

Wing Pro 11 adds floating buttons directly between the editors to make navigating differences and merging easier, allows undoing previously merged changes, and does a better job managing scratch buffers, scroll locking, and sizing of merged ranges.

For details see Difference and Merge under Wing Manual in Wing 11's Help menu.

Other Minor Features and Improvements

Wing 11 also improves the custom key binding assignment user interface, adds a Files > Auto-Save Files When Wing Loses Focus preference, warns immediately when opening a project with an invalid Python Executable configuration, allows clearing recent menus, expands the set of available special environment variables for project configuration, and makes a number of other bug fixes and usability improvements.

Changes and Incompatibilities

Since Wing 11 replaced the AI tool with AI Coder and AI Chat, and AI configuration is completely different than in Wing 10, you will need to reconfigure your AI integration manually in Wing 11. This is done with Manage AI Providers in the AI menu. After adding the first provider configuration, Wing will set that provider as the default. You can switch between providers with Switch to Provider in the AI menu.

If you have questions about any of this, please don't hesitate to contact us at support@wingware.com.

April 22, 2025 01:00 AM UTC


Seth Michael Larson

Quick Mastodon toot templates for event hashtags

I'm a big fan of Mastodon and I plan to cover PyCon US 2025 on Mastodon almost exclusively (and it sounds like I'm not alone).

This was the plan last year, too, but I found typing out all the hashtags every time definitely discouraged me from posting about "in-the-moment" stuff.

The goal is to quickly capture a thought, interaction, or image, share to Mastodon, and then get back to enjoying the conference.


Pre-populated event hashtags!

So I went looking for a solution and found that Mastodon supports share URLs with pre-filled content, like hashtags. What I wanted to do was add a URL to my phone home screen:

https://<instance domain name>/share?text=%0A%0A%23PyCon%20%23PyConUS%20%23PyConUS2025

The above percent-encoded text parameter is:

\n\n#PyCon #PyConUS #PyConUS2025

If we could save this URL direct to the home screen that would be the end of the story. Unfortunately, at least for iOS you can't save a web page that has a query parameter in the URL (like ?text=...), so the solution needs to be slightly more complicated.

What I ended up doing is adding this URL to a GitHub Gist and then saving the Gist webpage to my home screen. On iOS the process is:

Now I'm only ever two clicks away from a ready-to-use toot template for PyCon US. This technique is remixable for any hashtags or other text you're tooting often. Use urllib.parse.quote() in Python to create percent-encoded text.

April 22, 2025 12:00 AM UTC

April 21, 2025


death and gravity

Process​Thread​Pool​Executor: when I‍/‍O becomes CPU-bound

So, you're doing some I‍/‍O bound stuff, in parallel.

Maybe you're scraping some websites – a lot of websites.

Maybe you're updating or deleting millions of DynamoDB items.

You've got your ThreadPoolExecutor, you've increased the number of threads and tuned connection limits... but after some point, it's just not getting any faster. You look at your Python process, and you see CPU utilization hovers above 100%.

You could split the work into batches and have a ProcessPoolExecutor run your original code in separate processes. But that requires yet more code, and a bunch of changes, which is no fun. And maybe your input is not that easy to split into batches.

If only we had an executor that worked seamlessly across processes and threads.

Well, you're in luck, since that's exactly what we're building today!

And even better, in a couple years you won't even need it anymore.

Establishing a baseline #

To measure things, we'll use a mock that pretends to do mostly I‍/‍O, with a sprinkling of CPU-bound work thrown in – a stand-in for something like a database connection, a Requests session, or a DynamoDB client.

class Client:
    io_time = 0.02
    cpu_time = 0.0008

    def method(self, arg):
        # simulate I/O
        time.sleep(self.io_time)

        # simulate CPU-bound work
        start = time.perf_counter()
        while time.perf_counter() - start < self.cpu_time:
            for i in range(100): i ** i

        return arg

We sleep() for the I‍/‍O, and do some math in a loop for the CPU stuff; it doesn't matter exactly how long each takes, as long I‍/‍O time dominates.

Real multi-threaded clients are usually backed by a connection pool; we could simulate one using a semaphore, but it's not relevant here – we're assuming the connection pool is effectively unbounded.

Since we'll use our client from multiple processes, we set up a global instance and a function that uses it; we can then pass init_client() as an executor initializer, which also allows us passing arguments to the client when creating it.

client = None

def init_client(*args):
    global client
    client = Client(*args)

def do_stuff(*args):
    return client.method(*args)

Finally, we make a simple timing context manager:

@contextmanager
def timer():
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"elapsed: {end-start:1.3f}")

...and put everything together in a function that measures how long it takes to do a bunch of work using a concurrent.futures executor:

def benchmark(executor, n=10_000, timer=timer, chunksize=10):
    with executor:
        # make sure all the workers are started,
        # so we don't measure their startup time
        list(executor.map(time.sleep, [0] * 200))

        with timer():
            values = list(executor.map(do_stuff, range(n), chunksize=chunksize))

        assert values == list(range(n)), values

Threads #

So, a ThreadPoolExecutor should suffice here, since we're mostly doing I‍/‍O, right?

>>> from concurrent.futures import *
>>> from bench import *
>>> init_client()
>>> benchmark(ThreadPoolExecutor(10))
elapsed: 24.693

More threads!

>>> benchmark(ThreadPoolExecutor(20))
elapsed: 12.405

Twice the threads, twice as fast. More!

>>> benchmark(ThreadPoolExecutor(30))
elapsed: 8.718

Good, it's still scaling linearly. MORE!

>>> benchmark(ThreadPoolExecutor(40))
elapsed: 8.638

confused cat with question marks around its head

...more?

>>> benchmark(ThreadPoolExecutor(50))
elapsed: 8.458
>>> benchmark(ThreadPoolExecutor(60))
elapsed: 8.430
>>> benchmark(ThreadPoolExecutor(70))
elapsed: 8.428

squinting confused cat

Problem: CPU becomes a bottleneck #

It's time we take a closer look at what our process is doing.

I'd normally use the top command for this, but since the flags and output vary with the operating system, we'll implement our own using the excellent psutil library.

@contextmanager
def top():
    """Print information about current and child processes.

    RES is the resident set size. USS is the unique set size.
    %CPU is the CPU utilization. nTH is the number of threads.

    """
    process = psutil.Process()
    processes = [process] + process.children(True)
    for p in processes: p.cpu_percent()

    yield

    print(f"{'PID':>7} {'RES':>7} {'USS':>7} {'%CPU':>7} {'nTH':>7}")
    for p in processes:
        try:
            m = p.memory_full_info()
        except psutil.AccessDenied:
            m = p.memory_info()
        rss = m.rss / 2**20
        uss = getattr(m, 'uss', 0) / 2**20
        cpu = p.cpu_percent()
        nth = p.num_threads()
        print(f"{p.pid:>7} {rss:6.1f}m {uss:6.1f}m {cpu:7.1f} {nth:>7}")

And because it's a context manager, we can use it as a timer:

>>> init_client()
>>> benchmark(ThreadPoolExecutor(10), timer=top)
    PID     RES     USS    %CPU     nTH
  51395   35.2m   28.5m    38.7      11

So, what happens if we increase the number of threads?

>>> benchmark(ThreadPoolExecutor(20), timer=top)
    PID     RES     USS    %CPU     nTH
  13912   16.8m   13.2m    70.7      21
>>> benchmark(ThreadPoolExecutor(30), timer=top)
    PID     RES     USS    %CPU     nTH
  13912   17.0m   13.4m    99.1      31
>>> benchmark(ThreadPoolExecutor(40), timer=top)
    PID     RES     USS    %CPU     nTH
  13912   17.3m   13.7m   100.9      41

With more threads, the compute part of our I‍/‍O bound workload increases, eventually becoming high enough to saturate one CPU – and due to the global interpreter lock, one CPU is all we can use, regardless of the number of threads.1

Processes? #

I know, let's use a ProcessPoolExecutor instead!

>>> benchmark(ProcessPoolExecutor(20, initializer=init_client))
elapsed: 12.374
>>> benchmark(ProcessPoolExecutor(30, initializer=init_client))
elapsed: 8.330
>>> benchmark(ProcessPoolExecutor(40, initializer=init_client))
elapsed: 6.273

Hmmm... I guess it is a little bit better.

More? More!

>>> benchmark(ProcessPoolExecutor(60, initializer=init_client))
elapsed: 4.751
>>> benchmark(ProcessPoolExecutor(80, initializer=init_client))
elapsed: 3.785
>>> benchmark(ProcessPoolExecutor(100, initializer=init_client))
elapsed: 3.824

OK, it's better, but with diminishing returns – there's no improvement after 80 processes, and even then, it's only 2.2x faster than the best time with threads, when, in theory, it should be able to make full use of all 4 CPUs.

Also, we're not making best use of connection pooling (relevant if the client connects to many different hosts, since we now have 80 pools), nor multiplexing (relevant with protocols like HTTP/2 or newer, since we now have 80 connections).

Problem: more processes, more memory #

But it gets worse!

>>> benchmark(ProcessPoolExecutor(80, initializer=init_client), timer=top)
    PID     RES     USS    %CPU     nTH
   2479   21.2m   15.4m    15.0       3
   2480   11.2m    6.3m     0.0       1
   2481   13.8m    8.5m     3.4       1
  ... 78 more lines ...
   2560   13.8m    8.5m     4.4       1

13.8 MiB * 80 ~= 1 GiB ... that is a lot of memory.

Now, there's some nuance to be had here.

First, on most operating systems that have virtual memory, code segment pages are shared between processes – there's no point in having 80 copies of libc or the Python interpreter in memory.

The unique set size is probably a better measurement than the resident set size, since it excludes memory shared between processes.2 So, for the macOS output above,3 the actual usage is more like 8.5 MiB * 80 = 680 MiB.

Second, if you use the fork or forkserver start methods, processes also share memory allocated before the fork() via copy-on-write; for Python, this includes module code and variables. On Linux, the actual usage is 1.7 MiB * 80 = 136 MiB:

>>> benchmark(ProcessPoolExecutor(80, initializer=init_client), timer=top)
    PID     RES     USS    %CPU     nTH
 329801   17.0m    6.6m     5.1       3
 329802   13.3m    1.6m     2.1       1
  ... 78 more lines ...
 329881   13.3m    1.7m     2.0       1

However, it's important to note that's just a lower bound; memory allocated after fork() is not shared, and most real work will unavoidably allocate more memory.

Liking this so far? Here's another article you might like:

Why not both? #

One reasonable way of dealing with this would be to split the input into batches, one per CPU, and pass them to a ProcessPoolExecutor, which in turn runs the batch items using a ThreadPoolExecutor.4

But that would mean we need to change our code, and that's no fun.

If only we had an executor that worked seamlessly across processes and threads.

A minimal plausible solution #

In keeping with what has become tradition by now, we'll take an iterative, problem-solution approach; since we're not sure what to do yet, we start with the simplest thing that could possibly work.

We know we want a process pool executor that starts one thread pool executor per process, so let's deal with that first.

class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        super().__init__(
            initializer=_init_process,
            initargs=(max_threads, initializer, initargs)
        )

By subclassing ProcessPoolExecutor, we get the map() implementation for free. By going with the default max_workers, we get one process per CPU (which is what we want); we can add more arguments later if needed.

In our custom process initializer, we set up a global thread pool executor, and then call the initializer provided by the user:

_executor = None

def _init_process(max_threads, initializer, initargs):
    global _executor

    _executor = concurrent.futures.ThreadPoolExecutor(max_threads)
    atexit.register(_executor.shutdown)

    if initializer:
        initializer(*initargs)

Likewise, submit() passes the work along to the thread pool executor:

class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):
    # ...
    def submit(self, fn, *args, **kwargs):
        return super().submit(_submit, fn, *args, **kwargs)
def _submit(fn, *args, **kwargs):
    return _executor.submit(fn, *args, **kwargs).result()

OK, that looks good enough; let's use it and see if it works:

def _do_stuff(n):
    print(f"doing: {n}")
    return n ** 2

if __name__ == '__main__':
    with ProcessThreadPoolExecutor() as e:
        print(list(e.map(_do_stuff, [0, 1, 2])))
 $ python ptpe.py
doing: 0
doing: 1
doing: 2
[0, 1, 4]

Wait, we got it on the first try?!

Let's measure that:

>>> from bench import *
>>> from ptpe import *
>>> benchmark(ProcessThreadPoolExecutor(30, initializer=init_client), n=1000)
elapsed: 6.161

Hmmm... that's unexpectedly slow... almost as if:

>>> multiprocessing.cpu_count()
4
>>> benchmark(ProcessPoolExecutor(4, initializer=init_client), n=1000)
elapsed: 6.067

Ah, because _submit() waits for the result() in the main thread of the worker process, this is just a ProcessPoolExecutor with extra steps.


But what if we send back the future instead?

    def submit(self, fn, *args, **kwargs):
        return super().submit(_submit, fn, *args, **kwargs).result()
def _submit(fn, *args, **kwargs):
    return _executor.submit(fn, *args, **kwargs)

Alas:

$ python ptpe.py
doing: 0
doing: 1
doing: 2
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "concurrent/futures/process.py", line 210, in _sendback_result
    result_queue.put(_ResultItem(work_id, result=result,
  File "multiprocessing/queues.py", line 391, in put
    obj = _ForkingPickler.dumps(obj)
  File "multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "ptpe.py", line 42, in <module>
    print(list(e.map(_do_stuff, [0, 1, 2])))
  ...
TypeError: cannot pickle '_thread.RLock' object

It may not seem like it, but this is a partial success: the work happens, we just can't get anything back. Not surprising, to be honest, it couldn't have been that easy.

Getting results #

If you look carefully at the traceback, you'll find a hint of how ProcessPoolExecutor gets its own results back from workers – a queue; the module docstring even has a neat data-flow diagram:

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+

Now, we could probably use the same queue somehow, but it would involve touching a lot of (private) internals.5 Instead, let's use a separate queue:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        self.__result_queue = multiprocessing.Queue()
        super().__init__(
            initializer=_init_process,
            initargs=(self.__result_queue, max_threads, initializer, initargs)
        )

On the worker side, we make it globally accessible:

_executor = None
_result_queue = None

def _init_process(queue, max_threads, initializer, initargs):
    global _executor, _result_queue

    _executor = concurrent.futures.ThreadPoolExecutor(max_threads)
    atexit.register(_executor.shutdown)

    _result_queue = queue
    atexit.register(_result_queue.close)

    if initializer:
        initializer(*initargs)

...so we can use it from a task callback registered by _submit():

def _submit(fn, *args, **kwargs):
    task = _executor.submit(fn, *args, **kwargs)
    task.add_done_callback(_put_result)

def _put_result(task):
    if exception := task.exception():
        _result_queue.put((False, exception))
    else:
        _result_queue.put((True, task.result()))

Back in the main process, we handle the results in a thread:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        # ...
        self.__result_handler = threading.Thread(target=self.__handle_results)
        self.__result_handler.start()
    def __handle_results(self):
        for ok, result in iter(self.__result_queue.get, None):
            print(f"{'ok' if ok else 'error'}: {result}")

Finally, to stop the handler, we use None as a sentinel on executor shutdown:

    def shutdown(self, wait=True):
        super().shutdown(wait=wait)
        if self.__result_queue:
            self.__result_queue.put(None)
            if wait:
                self.__result_handler.join()
            self.__result_queue.close()
            self.__result_queue = None

Let's see if it works:

$ python ptpe.py
doing: 0
ok: [0]
doing: 1
ok: [1]
doing: 2
ok: [4]
Traceback (most recent call last):
  File "concurrent/futures/_base.py", line 317, in _result_or_cancel
    return fut.result(timeout)
AttributeError: 'NoneType' object has no attribute 'result'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  ...
AttributeError: 'NoneType' object has no attribute 'cancel'

Yay, the results are making it to the handler!

The error happens because instead of returning a Future, our submit() returns the result of _submit(), which is always None.

Fine, we'll make our own futures #

But submit() must return a future, so we make our own:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        # ...
        self.__tasks = {}
        # ...
    def submit(self, fn, *args, **kwargs):
        outer = concurrent.futures.Future()
        task_id = id(outer)
        self.__tasks[task_id] = outer

        outer.set_running_or_notify_cancel()
        inner = super().submit(_submit, task_id, fn, *args, **kwargs)

        return outer

In order to map results to their futures, we can use a unique identifier; the id() of the outer future should do, since it is unique for the object's lifetime.

We pass the id to _submit(), then to _put_result() as an attribute on the future, and finally back in the queue with the result:

def _submit(task_id, fn, *args, **kwargs):
    task = _executor.submit(fn, *args, **kwargs)
    task.task_id = task_id
    task.add_done_callback(_put_result)

def _put_result(task):
    if exception := task.exception():
        _result_queue.put((task.task_id, False, exception))
    else:
        _result_queue.put((task.task_id, True, task.result()))

Back in the result handler, we find the maching future, and set the result accordingly:

    def __handle_results(self):
        for task_id, ok, result in iter(self.__result_queue.get, None):
            outer = self.__tasks.pop(task_id)
            if ok:
                outer.set_result(result)
            else:
                outer.set_exception(result)

And it works:

$ python ptpe.py
doing: 0
doing: 1
doing: 2
[0, 1, 4]

I mean, it really works:

>>> benchmark(ProcessThreadPoolExecutor(10, initializer=init_client))
elapsed: 6.220
>>> benchmark(ProcessThreadPoolExecutor(20, initializer=init_client))
elapsed: 3.397
>>> benchmark(ProcessThreadPoolExecutor(30, initializer=init_client))
elapsed: 2.575
>>> benchmark(ProcessThreadPoolExecutor(40, initializer=init_client))
elapsed: 2.664

3.3x is not quite the 4 CPUs my laptop has, but it's pretty close, and much better than the 2.2x we got from processes alone.

Death becomes a problem #

I wonder what happens when a worker process dies.

For example, the initializer can fail:

>>> executor = ProcessPoolExecutor(initializer=divmod, initargs=(0, 0))
>>> executor.submit(int).result()
Exception in initializer:
Traceback (most recent call last):
  ...
ZeroDivisionError: integer division or modulo by zero
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

...or a worker can die some time later, which we can help along with a custom timer:6

@contextmanager
def terminate_child(interval=1):
    threading.Timer(interval, psutil.Process().children()[-1].terminate).start()
    yield
>>> executor = ProcessPoolExecutor(initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
[ one second later ]
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Now let's see our executor:

>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
[ one second later ]
[ ... ]
[ still waiting ]
[ ... ]
[ hello? ]

If the dead worker is not around to send back results, its futures never get completed, and map() keeps waiting until the end of time, when the expected behavior is to detect when this happens, and fail all pending tasks with BrokenProcessPool.


Before we do that, though, let's address a more specific issue.

If map() hasn't finished submitting tasks when the worker dies, inner fails with BrokenProcessPool, which right now we're ignoring entirely. While we don't need to do anything about it in particular because it gets covered by handling the general case, we should still propagate all errors to the outer task anyway.

    def submit(self, fn, *args, **kwargs):
        # ...
        inner = super().submit(_submit, task_id, fn, *args, **kwargs)
        inner.task_id = task_id
        inner.add_done_callback(self.__handle_inner)

        return outer
    def __handle_inner(self, inner):
        task_id = inner.task_id
        if exception := inner.exception():
            if outer := self.__tasks.pop(task_id, None):
                outer.set_exception(exception)

This fixes the case where a worker dies almost instantly:

>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=lambda: terminate_child(0))
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

For the general case, we need to check if the executor is broken – but how? We've already decided we don't want to depend on internals, so we can't use Process��Pool​Executor.​​_broken. Maybe we can submit a dummy task and see if it fails instead:

    def __check_broken(self):
        try:
            super().submit(int).cancel()
        except concurrent.futures.BrokenExecutor as e:
            return type(e)(str(e))
        except RuntimeError as e:
            if 'shutdown' not in str(e):
                raise
        return None

Using it is a bit involved, but not completely awful:

    def __handle_results(self):
        last_broken_check = time.monotonic()

        while True:
            now = time.monotonic()
            if now - last_broken_check >= .1:
                if exc := self.__check_broken():
                    break
                last_broken_check = now

            try:
                value = self.__result_queue.get(timeout=.1)
            except queue.Empty:
                continue

            if not value:
                return

            task_id, ok, result = value
            if outer := self.__tasks.pop(task_id, None):
                if ok:
                    outer.set_result(result)
                else:
                    outer.set_exception(result)

        while self.__tasks:
            try:
                _, outer = self.__tasks.popitem()
            except KeyError:
                break
            outer.set_exception(exc)

When there's a steady stream of results coming in, we don't want to check too often, so we enforce a minimum delay between checks. When there are no results coming in, we want to check regularly, so we use the Queue.get() timeout to avoid waiting forever. If the check fails, we break out of the loop and fail the pending tasks. Like so:

>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

cool smoking cat wearing denim jacket and sunglasses


So, yeah, I think we're done. Here's the final executor and benchmark code.

Some features left as an exercise for the reader:

Learned something new today? Share this with others, it really helps!

Want to know when new articles come out? Subscribe here to get new stuff straight to your inbox!

Bonus: free threading #

You may have heard people being excited about the experimental free threading support added in Python 3.13, which allows running Python code on multiple CPUs.

And for good reason:

$ python3.13t
Python 3.13.2 experimental free-threading build
>>> from concurrent.futures import *
>>> from bench import *
>>> init_client()
>>> benchmark(ThreadPoolExecutor(30))
elapsed: 8.224
>>> benchmark(ThreadPoolExecutor(40))
elapsed: 6.193
>>> benchmark(ThreadPoolExecutor(120))
elapsed: 2.323

3.6x over to the GIL version, with none of the shenanigans in this article!

Alas, packages with extensions need to be updated to support it:

>>> import psutil
zsh: segmentation fault  python3.13t

...but the ecosystem is slowly catching up.

cat patiently waiting on balcony

  1. At least, all we can use for pure-Python code. I‍/‍O always releases the global interpreter lock, and so do some extension modules. [return]

  2. The psutil documentation for memory_full_info() explains the difference quite nicely and links to further resources, because good libraries educate. [return]

  3. You may have to run Python as root to get the USS of child processes. [return]

  4. And no, asyncio is not a solution, since the event loop runs in a single thread, so you'd still need to run one event loop per CPU in dedicated processes. [return]

  5. Check out nilp0inter/threadedprocess for an idea of what that looks like. [return]

  6. pkill -fn '[Pp]ython' would've done it too, but it gets tedious if you do it a lot, and it's a different command on Windows. [return]

April 21, 2025 04:43 PM UTC


Real Python

Shallow vs Deep Copying of Python Objects

Python’s assignment statements don’t copy objects as they do in some other programming languages. Instead, they create bindings between your variable names and objects. For immutable objects, this distinction usually doesn’t matter. However, when you work with mutable objects or containers of mutable items, you may need to create explicit copies or “clones” of these objects.

By the end of this tutorial, you’ll understand that:

  • Shallow copying creates a new object but references the same nested objects, leading to shared changes.
  • Deep copying recursively duplicates all objects, ensuring full independence from the original.
  • Python’s copy module provides the copy() function for shallow copies and deepcopy() for deep copies.
  • Custom classes can implement .__copy__() and .__deepcopy__() for specific copying behavior.
  • Assignment in Python binds variable names to objects without copying, unlike some lower-level languages.

Explore the nuances of copying objects in Python and learn how to apply these techniques to manage mutable data structures effectively.

Get Your Code: Click here to download the free sample code that you’ll use to learn about shallow vs deep copying in Python.

Take the Quiz: Test your knowledge with our interactive “Shallow vs Deep Copying of Python Objects” quiz. You’ll receive a score upon completion to help you track your learning progress:


Interactive Quiz

Shallow vs Deep Copying of Python Objects

In this quiz, you'll test your understanding of Python's copy module, which provides tools for creating shallow and deep copies of objects. This knowledge is crucial for managing complex, mutable data structures safely and effectively.

Getting the Big Picture of Object Copying

Copying an object means creating its exact duplicate in memory. While there are many good reasons for doing so, at the end of the day, it allows you to modify the cloned objects independently of each other.

For example, a getter method may return sensitive information like the balance of someone’s bank account. To prevent unauthorized modifications of the bank account’s state, whether accidental or intentional, you’ll typically return a copy of the original data as a defensive programming measure. That way, you’ll have two separate objects safely representing the same piece of information.

Sometimes, you may need to work with multiple snapshots of the same data. In 3D computer graphics, transformations like rotation and scaling rely on matrix multiplication to update a model’s vertices. Rather than permanently changing the original model, you can duplicate its vertices and apply transformations to the copy. This will allow you to animate the model in a non-destructive way.

The following section provides an overview of the fundamental concepts and challenges associated with object copying in general. If you’d like to jump straight to copying objects in Python, then feel free to skip ahead.

Scalar vs Composite Types

In programming, objects can be classified into two broad categories of data types:

  1. Scalar
  2. Composite

Scalar data types represent simple, indivisible values that can’t be decomposed into smaller parts, much like atoms were once thought to be. Examples of scalars in Python include numbers, dates, and UUID-type identifiers:

Python
>>> from datetime import date
>>> from uuid import uuid4
>>> numbers = 42, 3.14, 3 + 2j
>>> dates = date.today(), date(1991, 2, 20)
>>> ids = uuid4(), uuid4(), uuid4()
Copied!

Each of these objects holds a single value representing a basic unit of data. By combining these fundamental building blocks, you can create more complex data structures.

Composite data types, on the other hand, are containers made up of other elements. Some of them are merely collections of scalar values, while others contain other composites or both, forming a complex hierarchy of objects:

Python
>>> import array
>>> audio_frames = array.array("h", [2644, 2814, 3001])
>>> audio_data = (
...     ("PCM", 2, 44100, 16),
...     [
...         (15975, 28928),
...         (-86, 15858),
...         (31999, -3),
...     ]
... )
Copied!

In this case, the "h" argument in the array.array() call specifies that the array will store numbers as two-byte signed integers. As you can see, a Python array aggregates scalar numbers into a flat sequence, whereas a list and tuple can contain deeply nested structures arranged in a particular way.

Note: Python types can sometimes fall into a gray area. For example, strings have a dual nature, as they’re technically sequences of characters. At the same time, they behave like scalars in specific contexts because they don’t allow element-wise operations—you must treat them as a whole.

These two categories of data types are closely related to the concept of object mutability, which you’ll learn more about now.

Mutable vs Immutable Objects

In high-level programming languages like Java and JavaScript, scalar types typically represent read-only values that can’t change over time. Such objects don’t allow in-place state mutation during their lifetime. So, if you want to modify a scalar value, then your only option is to disregard it and create another instance with a different value. In contrast, composite types can be either mutable or immutable, depending on their implementation.

Note: Immutable types have several advantages, including thread safety and improved memory efficiency, as they let you reuse objects without copying. On the other hand, when performance is vital, mutable types can reduce the overhead associated with object creation, especially when you tend to modify your objects frequently.

Read the full article at https://realpython.com/python-copy/ »


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 21, 2025 02:00 PM UTC

Quiz: Shallow vs Deep Copying of Python Objects

In this quiz, you’ll test your understanding of Shallow vs Deep Copying of Python Objects.

By working through this quiz, you’ll revisit the concepts of shallow and deep copying, and how they affect mutable objects in Python.


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 21, 2025 12:00 PM UTC


Talk Python to Me

#502: Django Ledger: Accounting with Python

Do you or your company need accounting software? Well, there are plenty of SaaS products out there that you can give your data to. but maybe you also really like Django and would rather have a foundation to build your own accounting system exactly as you need for your company or your product. On this episode, we're diving into Django Ledger, created by Miguel Sanda, which can do just that.<br/> <br/> <strong>Episode sponsors</strong><br/> <br/> <a href='https://talkpython.fm/auth0'>Auth0</a><br> <a href='https://talkpython.fm/training'>Talk Python Courses</a><br/> <br/> <h2 class="links-heading">Links from the show</h2> <div><strong>Miguel Sanda on Twitter</strong>: <a href="https://x.com/elarroba?featured_on=talkpython" target="_blank" >@elarroba</a><br/> <strong>Miguel on Mastodon</strong>: <a href="https://fosstodon.org/@elarroba" target="_blank" >@elarroba@fosstodon.org</a><br/> <strong>Miguel on GitHub</strong>: <a href="https://github.com/elarroba?featured_on=talkpython" target="_blank" >github.com</a><br/> <br/> <strong>Django Ledger on Github</strong>: <a href="https://github.com/arrobalytics/django-ledger?featured_on=talkpython" target="_blank" >github.com</a><br/> <strong>Django Ledger Discord</strong>: <a href="https://discord.gg/c7PZcbYgrc?featured_on=talkpython" target="_blank" >discord.gg</a><br/> <br/> <strong>Get Started with Django MongoDB Backend</strong>: <a href="https://www.mongodb.com/docs/languages/python/django-mongodb/current/get-started/?featured_on=talkpython" target="_blank" >mongodb.com</a><br/> <strong>Wagtail CMS</strong>: <a href="https://wagtail.org/?featured_on=talkpython" target="_blank" >wagtail.org</a><br/> <strong>Watch this episode on YouTube</strong>: <a href="https://www.youtube.com/watch?v=eM170jyjbu8" target="_blank" >youtube.com</a><br/> <strong>Episode transcripts</strong>: <a href="https://talkpython.fm/episodes/transcript/502/django-ledger-accounting-with-python" target="_blank" >talkpython.fm</a><br/> <br/> <strong>--- Stay in touch with us ---</strong><br/> <strong>Subscribe to Talk Python on YouTube</strong>: <a href="https://talkpython.fm/youtube" target="_blank" >youtube.com</a><br/> <strong>Talk Python on Bluesky</strong>: <a href="https://bsky.app/profile/talkpython.fm" target="_blank" >@talkpython.fm at bsky.app</a><br/> <strong>Talk Python on Mastodon</strong>: <a href="https://fosstodon.org/web/@talkpython" target="_blank" ><i class="fa-brands fa-mastodon"></i>talkpython</a><br/> <strong>Michael on Bluesky</strong>: <a href="https://bsky.app/profile/mkennedy.codes?featured_on=talkpython" target="_blank" >@mkennedy.codes at bsky.app</a><br/> <strong>Michael on Mastodon</strong>: <a href="https://fosstodon.org/web/@mkennedy" target="_blank" ><i class="fa-brands fa-mastodon"></i>mkennedy</a><br/></div>

April 21, 2025 08:00 AM UTC


Python Bytes

#429 Nitpicking Python

<strong>Topics covered in this episode:</strong><br> <ul> <li><strong><a href="https://github.com/hcengineering/platform?featured_on=pythonbytes">Huly</a></strong></li> <li><a href="https://www.thecvefoundation.org/?featured_on=pythonbytes"><strong>CVE Foundation</strong></a> formed to take over CVE program from MITRE</li> <li><strong><a href="https://www.drawdb.app/?featured_on=pythonbytes">drawdb</a></strong></li> <li><strong><a href="https://blog.edward-li.com/tech/advanced-python-features/?featured_on=pythonbytes">14 Advanced Python Features</a></strong></li> <li><strong>Extras</strong></li> <li><strong>Joke</strong></li> </ul><a href='https://www.youtube.com/watch?v=ddnRex0fsNw' style='font-weight: bold;'data-umami-event="Livestream-Past" data-umami-event-episode="429">Watch on YouTube</a><br> <p><strong>About the show</strong></p> <p>Sponsored by Posit Workbench: <a href="https://pythonbytes.fm/workbench">pythonbytes.fm/workbench</a></p> <p><strong>Connect with the hosts</strong></p> <ul> <li>Michael: <a href="https://fosstodon.org/@mkennedy"><strong>@mkennedy@fosstodon.org</strong></a> <strong>/</strong> <a href="https://bsky.app/profile/mkennedy.codes?featured_on=pythonbytes"><strong>@mkennedy.codes</strong></a> <strong>(bsky)</strong></li> <li>Brian: <a href="https://fosstodon.org/@brianokken"><strong>@brianokken@fosstodon.org</strong></a> <strong>/</strong> <a href="https://bsky.app/profile/brianokken.bsky.social?featured_on=pythonbytes"><strong>@brianokken.bsky.social</strong></a></li> <li>Show: <a href="https://fosstodon.org/@pythonbytes"><strong>@pythonbytes@fosstodon.org</strong></a> <strong>/</strong> <a href="https://bsky.app/profile/pythonbytes.fm"><strong>@pythonbytes.fm</strong></a> <strong>(bsky)</strong></li> </ul> <p>Join us on YouTube at <a href="https://pythonbytes.fm/stream/live"><strong>pythonbytes.fm/live</strong></a> to be part of the audience. Usually <strong>Monday</strong> at 10am PT. Older video versions available there too.</p> <p>Finally, if you want an artisanal, hand-crafted digest of every week of the show notes in email form? Add your name and email to <a href="https://pythonbytes.fm/friends-of-the-show">our friends of the show list</a>, we'll never share it.</p> <p><strong>Michael #1:</strong> <a href="https://github.com/hcengineering/platform?featured_on=pythonbytes">Huly</a></p> <ul> <li>All-in-One Project Management Platform (alternative to Linear, Jira, Slack, Notion, Motion) </li> <li>If you're primarily interested in self-hosting Huly without the intention to modify or contribute to its development, please use <a href="https://github.com/hcengineering/huly-selfhost?featured_on=pythonbytes">huly-selfhost</a>.</li> <li>Manage your tasks efficiently with Huly's bidirectional GitHub synchronization. Use Huly as an advanced front-end for GitHub Issues and GitHub Projects.</li> <li>Connect every element of your workflow to build a dynamic knowledge base.</li> <li>Everything you need for productive team work: Team Planner • Project Management • Virtual Office • Chat • Documents • Inbox</li> <li>Self hosting as a service: <a href="https://elest.io/?featured_on=pythonbytes"><strong>elest.io</strong></a></li> </ul> <p><strong>Brian #2:</strong> <a href="https://www.thecvefoundation.org/?featured_on=pythonbytes"><strong>CVE Foundation</strong></a> formed to take over CVE program from MITRE</p> <ul> <li>Back story: <a href="https://arstechnica.com/security/2025/04/crucial-cve-flaw-tracking-database-narrowly-avoids-closure-to-dhs-cuts/?featured_on=pythonbytes">CVE, global source of cybersecurity info, was hours from being cut by DHS</a> <ul> <li>The 25-year-old CVE program, an essential part of global cybersecurity, is cited in nearly any discussion or response to a computer security issue.</li> <li>CVE was at real risk of closure after its contract was set to expire on April 16.</li> <li>The nonprofit MITRE runs CVE on a contract with the DHS.</li> <li>A letter last Tuesday sent Tuesday by Yosry Barsoum, vice president of MITRE, gave notice of the potential halt to operations.</li> <li>Another possible victim of the current administration.</li> </ul></li> <li><a href="https://www.thecvefoundation.org/?featured_on=pythonbytes">CVE Foundation Launched to Secure the Future of the CVE Program</a> <ul> <li>CVE Board members have spent the past year developing a strategy to transition CVE to a dedicated, non-profit foundation. The new CVE Foundation will focus solely on continuing the mission of delivering high-quality vulnerability identification and maintaining the integrity and availability of CVE data for defenders worldwide.</li> <li>Over the coming days, the Foundation will release more information about its structure, transition planning, and opportunities for involvement from the broader community.</li> </ul></li> </ul> <p><strong>Michael #3:</strong> <a href="https://www.drawdb.app/?featured_on=pythonbytes">drawdb</a></p> <ul> <li>Free and open source, simple, and intuitive database design editor, data-modeler, and SQL generator.</li> <li>Great drag-drop relationship manager</li> <li>Define your DB visually, export as SQL create scripts</li> <li>Or import existing SQL to kickstart the diagramming.</li> </ul> <p><strong>Brian #4:</strong> <a href="https://blog.edward-li.com/tech/advanced-python-features/?featured_on=pythonbytes">14 Advanced Python Features</a></p> <ul> <li>Edward Li</li> <li>Picking some favorites <ul> <li><a href="https://blog.edward-li.com/tech/advanced-python-features/?utm_source=pocket_shared#1-typing-overloads">1. Typing Overloads</a></li> <li><a href="https://blog.edward-li.com/tech/advanced-python-features/?utm_source=pocket_shared#2-keyword-only-and-positional-only-arguments">2. Keyword-only and Positional-only Arguments</a></li> <li><a href="https://blog.edward-li.com/tech/advanced-python-features/?utm_source=pocket_shared#9-python-nitpicks">9. Python Nitpicks</a> <ul> <li>For-else statements</li> <li>Walrus operator</li> <li>Short Circuit Evaluation</li> <li>Operator Chaining</li> </ul></li> </ul></li> </ul> <p><strong>Extras</strong> </p> <p>Michael:</p> <ul> <li><a href="https://blog.thunderbird.net/2025/04/thundermail-and-thunderbird-pro-services/?featured_on=pythonbytes">Thunderbird send / other firefox things</a>.</li> </ul> <p><strong>Joke:</strong> <a href="https://news.ycombinator.com/item?id=43681752&featured_on=pythonbytes">Python Tariffs</a></p> <ul> <li>Thanks <a href="https://bsky.app/profile/wagenrace.bsky.social/post/3lmtps57oes2f?featured_on=pythonbytes">wagenrace</a></li> <li>Thanks <a href="https://campfire.whereismytribe.net/o/d3f2b6e1dd11413d98a883e1c209ae78?featured_on=pythonbytes">Campfire Tales</a></li> </ul>

April 21, 2025 08:00 AM UTC

April 20, 2025


ListenData

How to Use Gemini API in Python

Integrating Gemini API with Python

In this tutorial, you will learn how to use Google's Gemini AI model through its API in Python.

Update (April 21, 2025) : The tutorial has been updated for the latest Gemini model - Gemini 2.5 Flash and Gemini 2.5 Pro. It supports real-time search and multimodal generation.
Steps to Access Gemini API

Follow the steps below to access the Gemini API and then use it in python.

  1. Visit Google AI Studio website.
  2. Sign in using your Google account.
  3. Create an API key.
  4. Install the Google AI Python library for the Gemini API using the command below :
    pip install google-genai
    .
To read this article in full, please click here

April 20, 2025 10:15 PM UTC