I am new to Reactive Extensions, I have the requirement to consume PairCollection<IPointCloud> which are fed into a service which performs a very expensive multi-threaded process but has limited concurrency (which I base on the number of machine cores). The process of getting the PairCollection<IPointCloud> objects is itself expensive as there are many, but can be done in parallel with no thread restriction so I have have decided to use ReactiveExtensions (why not) for this.
I have setup a LinqPAD example which simulates what I want, however, there are a few aspects of what I have done that do not feel right. I have started a Task to ensure the observable subscriptions do not block, however, I would like to do this using another Observable subscription, but one that is asynchronious, I would like some advice to ensure that
- What I have done so far is correct?
- How can I replace the Task.Run in the Main method with some form of async IObservable that does not block?
- Is the method is use for cancellation support correct?
The pseudo data files that I read from "C:\Data" are available from here https://1drv.ms/u/s!AuCd_PcRnNWpn3qhr0MskrbhEoC2?e=M6d1zo.
void Main()
{
Task.Run(() =>
{
int index = 0;
var pairCollectionProducer = PairCollectionProducer(CancellationToken.None);
pairCollectionProducer.Subscribe(cp =>
{
// Now run expensive process using pair collection.
// _pointCloudMergingService.Merge(cp);
Console.WriteLine($"PairCollection count = {cp.Count:N0}, Index = {index++:N0}");
},
ex =>
{
Console.WriteLine("ERROR");
},
() =>
{
Console.WriteLine("COMPLETED");
});
});
Console.WriteLine("LAST LINE");
Console.ReadLine();
}
public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(CancellationToken token)
{
return Observable.Create<PairCollection<IPointCloud>>(
observer =>
{
Parallel.ForEach(
Utils.GetFileBatches(@"C:\Data"),
(fileBatch) =>
{
Console.WriteLine($"{fileBatch[0]} file loop");
if (fileBatch[0] == @"C:\Data\bb.0000.exr")
Thread.Sleep(200);
var producer = RawDepthMapsProducer(fileBatch[0], token);
int index = 0;
ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();
IDisposable subscriber = producer.Subscribe(rawDepthMap =>
{
Thread.Sleep(50);
bag.Add(new PointCloud() { Index = index++ });
},
ex => { },
() =>
{
PointCloudPartitioningService ps = new PointCloudPartitioningService();
observer.OnNext(ps.Partition(bag.ToList()));
});
});
observer.OnCompleted();
return () => { };
});
}
public IObservable<RawDepthMap> RawDepthMapsProducer(string filePath, CancellationToken token)
{
return Observable.Create<RawDepthMap>(
observer =>
{
for (int i = 0; i < 10; ++i)
{
Thread.Sleep(50);
Console.WriteLine($"filePath = {filePath}, i = {i:N0}");
observer.OnNext(new RawDepthMap() { Height = i, Width = 1 });
}
observer.OnCompleted();
return () => { };
});
}
public interface IPointCloud
{
Vertex[] Vertices { get; set; }
bool ContainsNormals { get; }
int? Index { get; set; }
}
public class PointCloud : IPointCloud
{
public PointCloud() { }
public PointCloud(Vertex[] vertices)
{
Vertices = vertices;
}
public Vertex[] Vertices { get; set; }
public bool ContainsNormals => Vertices.Any(v =>
v.Normal.X != 0.0f ||
v.Normal.Y != 0.0f ||
v.Normal.Z != 0.0f);
public int? Index { get; set; }
}
public struct Vertex
{
public Vertex(Vector3 point, Vector3 normal)
{
Point = point;
Normal = normal;
}
public Vertex(Vector3 point)
: this(point, new Vector3()) { }
public Vector3 Point;
public Vector3 Normal;
}
public class PairCollection<T> where T : class
{
public PairCollection()
{
Partitions = new List<IndexedPair<T>>();
Last = null;
}
public List<T> Flatten()
{
List<T> l = new List<T>();
foreach (var it in Partitions)
{
l.Add(it.Pair.Item1);
l.Add(it.Pair.Item2);
}
if (Last != null)
l.Add(Last);
return l;
}
public IList<IndexedPair<T>> Partitions { get; set; }
public T Last { get; set; }
public int Count { get { return Partitions.Count * 2 + (Last != null ? 1 : 0); } }
}
public class IndexedPair<T>
{
public IndexedPair(int index, T item1, T item2)
: this(index, new Tuple<T, T>(item1, item2)) { }
public IndexedPair(int index, Tuple<T, T> tuple)
{
Index = index;
Pair = tuple;
}
public int Index { get; set; }
public Tuple<T, T> Pair { get; set; }
}
public class RawDepthMap
{
public string Source { get; set; }
public List<double> DepthMapArray { get; set; }
public int Height { get; set; }
public int Width { get; set; }
public override string ToString()
{
string source = !String.IsNullOrEmpty(Source) ? Source : "N/A";
StringBuilder builder = new StringBuilder($"RawDepthMap: Source \"{source}\"");
if (DepthMapArray != null)
builder.Append($", Array size {DepthMapArray:N0}");
builder.Append($", Width {Width:N0}, Height {Height:N0}");
return builder.ToString();
}
}
public interface IPartitioningService<T> where T : class
{
PairCollection<T> Partition(IList<T> itemList);
}
public class PointCloudPartitioningService : PartitioningServiceBase<IPointCloud>
{
public override PairCollection<IPointCloud> Partition(IList<IPointCloud> pointCloudCollection)
{
if (pointCloudCollection == null || pointCloudCollection.Count < 2)
throw new ArgumentException("Point cloud collection cannot be null or less than 2");
if (pointCloudCollection.Any(pc => pc.Index == null))
throw new ArgumentNullException("All point clouds must be indexed");
List<IPointCloud> orderedPointClouds = pointCloudCollection
.OrderBy(f => (int)f.Index)
.ToList();
return PartitioningCore(orderedPointClouds);
}
}
public abstract class PartitioningServiceBase<T> : IPartitioningService<T> where T : class
{
public abstract PairCollection<T> Partition(IList<T> itemList);
protected PairCollection<T> PartitioningCore(IList<T> itemList)
{
int index = 0;
PairCollection<T> partitionedCollection = new PairCollection<T>();
for (int i = 0; i < itemList.Count; i += 2)
{
if (i + 1 >= itemList.Count)
break;
IndexedPair<T> it = new IndexedPair<T>(
index++, itemList[i], itemList[i + 1]);
partitionedCollection.Partitions.Add(it);
}
if (itemList.Count % 2 != 0)
partitionedCollection.Last = itemList.Last();
return partitionedCollection;
}
}
public static class Utils
{
public static List<List<string>> GetFileBatches(string directory)
{
if (!Directory.Exists(directory))
throw new IOException($"Directory \"{directory}\" does not exist");
var fileInfoCollection = Directory.GetFiles(directory).Select(f => new FileInfo(f));
if (fileInfoCollection.Any(fi => !DepthMapSourceMappings.SupportedDepthMapSourceTypes.ContainsKey(Path.GetExtension(fi.Name))))
{
string tmp = String.Join(", ", DepthMapSourceMappings.SupportedDepthMapSourceTypes.Select(kvp => kvp.Key));
throw new IOException($"Only source files of type ({tmp.Trim()}) are currently supported");
}
Regex regex = new Regex(@"^\w+");
return fileInfoCollection
.GroupBy(fi => regex.Match(fi.Name).ToString())
.Select(group => group.Select(fi => fi.FullName).ToList())
.ToList();
}
}
public enum DepthMapSourceType
{
Exr
};
public static class DepthMapSourceMappings
{
public static Dictionary<string, DepthMapSourceType> SupportedDepthMapSourceTypes { get; } =
new Dictionary<string, DepthMapSourceType>()
{
{ ".exr", DepthMapSourceType.Exr }
};
}
IPointClouds is relatively inexpensive and I want this process to be done using max parrallelisation. Likewise with RawDepthMap production. However, the processing in the main method will be very expensive and the threading is throttled using a limited concurrency TaskScheduler. I think this is a good candidate for the user of Rx. I have decided I need to await the Parrallel.ForEach so it does not fire the OnCompleted right away, so I am going to swap this out for an array of Tasks and await all of these. \$\endgroup\$