Skip to content

graph, store: Make sure vid batching works with large vids #5970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions graph/src/util/ogive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{internal_error, prelude::StoreError};
/// more fun to say.
pub struct Ogive {
/// The breakpoints of the piecewise linear function
points: Vec<f64>,
points: Vec<i64>,
/// The size of each bin; the linear piece from `points[i]` to
/// `points[i+1]` rises by this much
bin_size: f64,
Expand All @@ -46,7 +46,6 @@ impl Ogive {
let bins = points.len() - 1;
let bin_size = total as f64 / bins as f64;
let range = points[0]..=points[bins];
let points = points.into_iter().map(|p| p as f64).collect();
Ok(Self {
points,
bin_size,
Expand Down Expand Up @@ -90,7 +89,6 @@ impl Ogive {
fn interval_start(&self, point: i64) -> Result<usize, StoreError> {
self.check_in_range(point)?;

let point = point as f64;
let idx = self
.points
.iter()
Expand All @@ -102,35 +100,61 @@ impl Ogive {

/// Return the value of the ogive at `point`, i.e., `f(point)`. It is an
/// error if `point` is outside the range of points of this ogive.
///
/// If `i` is such that
/// `points[i] <= point < points[i+1]`, then
/// ```text
/// f(point) = i * bin_size + (point - points[i]) / (points[i+1] - points[i]) * bin_size
/// ```
// See the comment on `inverse` for numerical considerations
fn value(&self, point: i64) -> Result<i64, StoreError> {
if self.points.len() == 1 {
return Ok(*self.range.end());
}

let idx = self.interval_start(point)?;
let bin_size = self.bin_size as f64;
let (a, b) = (self.points[idx], self.points[idx + 1]);
let point = point as f64;
let value = (idx as f64 + (point - a) / (b - a)) * bin_size;
let offset = (point - a) as f64 / (b - a) as f64;
let value = (idx as f64 + offset) * self.bin_size;
Ok(value as i64)
}

/// Return the value of the inverse ogive at `value`, i.e., `g(value)`.
/// It is an error if `value` is negative. If `value` is greater than
/// the total count of the ogive, the maximum point of the ogive is
/// returned.
///
/// For `points[j] <= v < points[j+1]`, the value of `g(v)` is
/// ```text
/// g(v) = (1-lambda)*points[j] + lambda * points[j+1]
/// ```
/// where `lambda = (v - j * bin_size) / bin_size`
///
// Note that in the definition of `lambda`, the numerator is
// `v.rem_euclid(bin_size)`
//
// Numerical consideration: in these calculations, we need to be careful
// to never convert one of the points directly to f64 since they can be
// so large that the conversion from i64 to f64 loses precision. That
// loss of precision can cause the convex combination of `points[j]` and
// `points[j+1]` above to lie outside of that interval when `(points[j]
// as f64) as i64 < points[j]`
//
// We therefore try to only convert differences between points to f64
// which are much smaller.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the assumption is that differences of the subsequent points is never more than 2'000'000 blocks, as we have only 21 bits of the mantissa available (53 of f64 - 32 of the shift)? If that's not absolutely guaranteed I would suggest to convert all the calculations to work with i128. The bins_size could be represented as a ratio of two integers. The only drawback are somewhat slow divisions...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a really good point. I'll leave this as-is for now, but if I ever need to touch it, I'll adopt your suggestion.

fn inverse(&self, value: i64) -> Result<i64, StoreError> {
let value = value as f64;
if value < 0.0 {
if value < 0 {
return Err(internal_error!("value {} can not be negative", value));
}
let idx = (value / self.bin_size) as usize;
if idx >= self.points.len() - 1 {
let j = (value / self.bin_size as i64) as usize;
if j >= self.points.len() - 1 {
return Ok(*self.range.end());
}
let (a, b) = (self.points[idx] as f64, self.points[idx + 1] as f64);
let lambda = (value - idx as f64 * self.bin_size) / self.bin_size;
let x = (1.0 - lambda) * a + lambda * b;
let (a, b) = (self.points[j], self.points[j + 1]);
// This is the same calculation as in the comment above, but
// rewritten to be more friendly to lossy calculations with f64
let offset = (value as f64).rem_euclid(self.bin_size) * (b - a) as f64;
let x = a + (offset / self.bin_size) as i64;
Ok(x as i64)
}

Expand Down
101 changes: 100 additions & 1 deletion store/postgres/src/vid_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl VidBatcher {
}
}

#[derive(Copy, Clone, QueryableByName)]
#[derive(Debug, Copy, Clone, QueryableByName)]
pub(crate) struct VidRange {
#[diesel(sql_type = BigInt, column_name = "min_vid")]
pub min: i64,
Expand Down Expand Up @@ -470,4 +470,103 @@ mod tests {
assert_eq!(1, ogive.start());
assert_eq!(100_000, ogive.end());
}

#[test]
fn vid_batcher_handles_large_vid() {
// An example with very large `vid` values which come from the new
// schema of setting the `vid` to `block_num << 32 + sequence_num`.
// These values are taken from an actual example subgraph and cuased
// errors because of numerical roundoff issues
const MIN: i64 = 186155521970012263;
const MAX: i64 = 187989601854423140;
const BOUNDS: &[i64] = &[
186155521970012263,
186155552034783334,
186166744719556711,
187571594162339943,
187571628522078310,
187576619274076263,
187576649338847334,
187580570643988583,
187590242910339175,
187590268680142950,
187963647367053415,
187970828552372324,
187986749996138596,
187989601854423140,
];

// The start, end, and batch size we expect when we run through the
// `vid_batcher` we set up below with `MIN`, `MAX` and `BOUNDS`
const STEPS: &[(i64, i64, i64)] = &[
(186155521970012263, 186155521970012265, 2),
(186155521970012266, 186155521970012269, 3),
(186155521970012270, 186155521970012276, 6),
(186155521970012277, 186155521970012289, 12),
(186155521970012290, 186155521970012312, 22),
(186155521970012313, 186155521970012353, 40),
(186155521970012354, 186155521970012426, 72),
(186155521970012427, 186155521970012557, 130),
(186155521970012558, 186155521970012792, 234),
(186155521970012793, 186155521970013215, 422),
(186155521970013216, 186155521970013976, 760),
(186155521970013977, 186155521970015346, 1369),
(186155521970015347, 186155521970017812, 2465),
(186155521970017813, 186155521970022250, 4437),
(186155521970022251, 186155521970030238, 7987),
(186155521970030239, 186155521970044616, 14377),
(186155521970044617, 186155521970070495, 25878),
(186155521970070496, 186155521970117077, 46581),
(186155521970117078, 186155521970200925, 83847),
(186155521970200926, 186155521970351851, 150925),
(186155521970351852, 186155521970623517, 271665),
(186155521970623518, 186155521971112515, 488997),
(186155521971112516, 186155521971992710, 880194),
(186155521971992711, 186155521973577061, 1584350),
(186155521973577062, 186155521976428893, 2851831),
(186155521976428894, 186155521981562190, 5133296),
(186155521981562191, 186155521990802124, 9239933),
(186155521990802125, 186155522007434004, 16631879),
(186155522007434005, 186155522037371388, 29937383),
(186155522037371389, 186155522091258678, 53887289),
(186155522091258679, 186155522188255800, 96997121),
(186155522188255801, 186155522362850619, 174594818),
(186155522362850620, 186155522677121292, 314270672),
(186155522677121293, 186155523242808503, 565687210),
(186155523242808504, 186155524261045483, 1018236979),
(186155524261045484, 186155526093872046, 1832826562),
(186155526093872047, 186155529392959859, 3299087812),
(186155529392959860, 186155535331317922, 5938358062),
(186155535331317923, 186155546020362436, 10689044513),
(186155546020362437, 186160475833232786, 4929812870349),
(186160475833232787, 186998193536485260, 837717703252473),
(186998193536485261, 187574948946679478, 576755410194217),
(187574948946679479, 187590253155585376, 15304208905897),
(187590253155585377, 187989601854423140, 399348698837763),
];

let vid_range = VidRange::new(MIN, MAX);
let batch_size = AdaptiveBatchSize {
size: 10000,
target: Duration::from_secs(180),
};

let mut vid_batcher = VidBatcher::new(BOUNDS.to_vec(), vid_range, batch_size).unwrap();
vid_batcher.step_timer.set(Duration::from_secs(100));

// Run through the entire `vid_batcher`, collecting start and end in
// `steps`
let steps = std::iter::from_fn(|| {
vid_batcher
.step(|start, end| Ok((start, end, end - start)))
.unwrap()
.1
})
.fold(Vec::new(), |mut steps, (start, end, step)| {
steps.push((start, end, step));
steps
});

assert_eq!(STEPS, &steps);
}
}
Loading