2008-12-16 9 views
17

Ich entwickle ein Programm, das kontinuierlich einen Datenstrom im Hintergrund sendet, und ich möchte dem Benutzer erlauben, eine Obergrenze für das Upload- und Download-Limit festzulegen.Bandbreitenbeschränkung in C#

Ich habe auf den token bucket und leaky bucket algorhithms nachgelesen, und scheinbar der letztere scheint die Beschreibung zu passen, da dies nicht eine Frage der Maximierung der Netzwerkbandbreite ist, sondern eher so unauffällig wie möglich.

Ich bin jedoch ein wenig unsicher, wie ich das umsetzen würde. Ein natürlicher Ansatz besteht darin, die abstrakte Stream-Klasse zu erweitern, um das Erweitern des vorhandenen Datenverkehrs zu vereinfachen, aber würde dies nicht die Einbeziehung zusätzlicher Threads erfordern, um die Daten zu senden, während sie gleichzeitig empfangen (leaky bucket)? Irgendwelche Hinweise auf andere Implementierungen, die dasselbe tun, würden geschätzt werden.

Auch, obwohl ich ändern kann, wie viele Daten das Programm empfängt, wie gut funktioniert die Bandbreitenbeschränkung auf der Ebene C#? Wird der Computer die Daten noch erhalten und einfach speichern, so dass der Drosselungseffekt effektiv aufgehoben wird, oder wartet er, bis ich weitere Informationen erhalte?

EDIT: Ich bin daran interessiert, eingehende und ausgehende Daten zu drosseln, wo ich keine Kontrolle über das andere Ende des Streams habe.

Antwort

1

Ich habe eine andere Implementierung der von Arul erwähnten ThrottledStream-Klasse entwickelt. Meine Version verwendet einen Waithandle und einen Timer mit einem 1s-Intervall:

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
{ 
    MaxBytesPerSecond = maxBytesPerSecond; 
    parent = parentStream; 
    processed = 0; 
    resettimer = new System.Timers.Timer(); 
    resettimer.Interval = 1000; 
    resettimer.Elapsed += resettimer_Elapsed; 
    resettimer.Start();   
} 

protected void Throttle(int bytes) 
{ 
    try 
    { 
     processed += bytes; 
     if (processed >= maxBytesPerSecond) 
      wh.WaitOne(); 
    } 
    catch 
    { 
    } 
} 

private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    processed = 0; 
    wh.Set(); 
} 

Jedes Mal, wenn die Bandbreite-Limit den Thread überschreitet wird tief, bis in der nächsten Sekunde beginnt. Keine Notwendigkeit, die optimale Schlafdauer zu berechnen.

Vollständige Umsetzung:

public class ThrottledStream : Stream 
{ 
    #region Properties 

    private int maxBytesPerSecond; 
    /// <summary> 
    /// Number of Bytes that are allowed per second 
    /// </summary> 
    public int MaxBytesPerSecond 
    { 
     get { return maxBytesPerSecond; } 
     set 
     { 
      if (value < 1) 
       throw new ArgumentException("MaxBytesPerSecond has to be >0"); 

      maxBytesPerSecond = value; 
     } 
    } 

    #endregion 


    #region Private Members 

    private int processed; 
    System.Timers.Timer resettimer; 
    AutoResetEvent wh = new AutoResetEvent(true); 
    private Stream parent; 

    #endregion 

    /// <summary> 
    /// Creates a new Stream with Databandwith cap 
    /// </summary> 
    /// <param name="parentStream"></param> 
    /// <param name="maxBytesPerSecond"></param> 
    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    { 
     MaxBytesPerSecond = maxBytesPerSecond; 
     parent = parentStream; 
     processed = 0; 
     resettimer = new System.Timers.Timer(); 
     resettimer.Interval = 1000; 
     resettimer.Elapsed += resettimer_Elapsed; 
     resettimer.Start();   
    } 

    protected void Throttle(int bytes) 
    { 
     try 
     { 
      processed += bytes; 
      if (processed >= maxBytesPerSecond) 
       wh.WaitOne(); 
     } 
     catch 
     { 
     } 
    } 

    private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
    { 
     processed = 0; 
     wh.Set(); 
    } 

    #region Stream-Overrides 

    public override void Close() 
    { 
     resettimer.Stop(); 
     resettimer.Close(); 
     base.Close(); 
    } 
    protected override void Dispose(bool disposing) 
    { 
     resettimer.Dispose(); 
     base.Dispose(disposing); 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     return parent.Read(buffer, offset, count); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 

    #endregion 


} 
+0

Es wird genauer, wenn Sie "processed" nicht auf "0" setzen, wenn der Timer tickt, aber subtrahiere "maxBytesPerSecond" davon. –

1

Basierend auf @ 0xDEADBEEF Lösung habe ich die folgende (testbar) Lösung auf Basis von Rx Disponenten:

public class ThrottledStream : Stream 
{ 
    private readonly Stream parent; 
    private readonly int maxBytesPerSecond; 
    private readonly IScheduler scheduler; 
    private readonly IStopwatch stopwatch; 

    private long processed; 

    public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler) 
    { 
     this.maxBytesPerSecond = maxBytesPerSecond; 
     this.parent = parent; 
     this.scheduler = scheduler; 
     stopwatch = scheduler.StartStopwatch(); 
     processed = 0; 
    } 

    public ThrottledStream(Stream parent, int maxBytesPerSecond) 
     : this (parent, maxBytesPerSecond, Scheduler.Immediate) 
    { 
    } 

    protected void Throttle(int bytes) 
    { 
     processed += bytes; 
     var targetTime = TimeSpan.FromSeconds((double)processed/maxBytesPerSecond); 
     var actualTime = stopwatch.Elapsed; 
     var sleep = targetTime - actualTime; 
     if (sleep > TimeSpan.Zero) 
     { 
      using (var waitHandle = new AutoResetEvent(initialState: false)) 
      { 
       scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set()); 
       waitHandle.WaitOne(); 
      } 
     } 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var read = parent.Read(buffer, offset, count); 
     Throttle(read); 
     return read; 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 
} 

und einige Tests, die nur einige Millisekunden dauern:

[TestMethod] 
public void ShouldThrottleReading() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new ThrottledStream(new MemoryStream(content), content.Length/8, scheduler); 
    var target = new MemoryStream(); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 

[TestMethod] 
public void ShouldThrottleWriting() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new MemoryStream(content); 
    var target = new ThrottledStream(new MemoryStream(), content.Length/8, scheduler); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
}