mediatimestamp
This library provides a class Timestamp which stores a signed time difference value with nanosecond precision.
The Timestamp can represent a time offset since the epoch (ie. 1970-01-01T00:00:00.000000000Z)
These data types are of use in a number of situations, but particularly for code that will handle PTP timestamps, which are normally stored in this fashion.
1# Copyright 2017 British Broadcasting Corporation 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15"""This library provides a class Timestamp which stores a signed time difference value with nanosecond precision. 16 17The Timestamp can represent a time offset since the epoch (ie. 1970-01-01T00:00:00.000000000Z) 18 19These data types are of use in a number of situations, but particularly for code that will handle PTP timestamps, which 20are normally stored in this fashion. 21""" 22 23from .exceptions import TsValueError 24from .immutable import ( 25 TimeOffset, SupportsMediaTimeOffset, mediatimeoffset, 26 Timestamp, SupportsMediaTimestamp, mediatimestamp, 27 TimeRange, SupportsMediaTimeRange, mediatimerange) 28 29from .count_range import CountRange 30from .time_value import TimeValue, TimeValueConstructTypes 31from .time_value_range import TimeValueRange, RangeConstructionTypes, RangeTypes 32 33__all__ = [ 34 "TsValueError", 35 "TimeOffset", "SupportsMediaTimeOffset", "mediatimeoffset", 36 "Timestamp", "SupportsMediaTimestamp", "mediatimestamp", 37 "TimeRange", "SupportsMediaTimeRange", "mediatimerange", 38 "CountRange", 39 "TimeValue", "TimeValueConstructTypes", 40 "TimeValueRange", "RangeConstructionTypes", "RangeTypes"]
20class TsValueError(Exception): 21 """ Raised when the timestamp input is invalid """ 22 def __init__(self, msg: str): 23 super().__init__(msg) 24 self.msg = msg
Raised when the timestamp input is invalid
Inherited Members
- builtins.BaseException
- with_traceback
- add_note
- args
66def mediatimestamp(v: TimestampConstructionType) -> "Timestamp": 67 """This method can be called on any object which supports the __mediatimestamp__ magic method 68 and also on a Timestamp, an int or a float. 69 It will always return a Timestamp or raise a ValueError. 70 """ 71 if isinstance(v, Timestamp): 72 return v 73 elif isinstance(v, int): 74 return Timestamp(v) 75 elif isinstance(v, float): 76 return Timestamp.from_float(v) 77 elif hasattr(v, "__mediatimestamp__"): 78 return v.__mediatimestamp__() 79 else: 80 raise ValueError("{!r} cannot be converted to a mediatimestamp.Timestamp".format(v))
This method can be called on any object which supports the __mediatimestamp__ magic method and also on a Timestamp, an int or a float. It will always return a Timestamp or raise a ValueError.
83class Timestamp(object): 84 """A nanosecond precision immutable timestamp. 85 86 Note that the canonical representation of a Timestamp is seconds:nanoseconds, e.g. "4:500000000". 87 Timestamp in seconds.fractions format (e.g. "4.5") can be parsed, but should not be used for serialization or 88 storage due to difficulty disambiguating them from floats. 89 """ 90 class Rounding (int): 91 pass 92 93 ROUND_DOWN = Rounding(0) 94 ROUND_NEAREST = Rounding(1) 95 ROUND_UP = Rounding(2) 96 97 MAX_NANOSEC = MAX_NANOSEC 98 MAX_SECONDS = MAX_SECONDS 99 100 def __init__(self, sec: int = 0, ns: int = 0, sign: int = 1): 101 if sign < 0: 102 sign = -1 103 else: 104 sign = 1 105 value = sign * int(sec * self.MAX_NANOSEC + ns) 106 107 value_limit = self.MAX_SECONDS * self.MAX_NANOSEC - 1 108 value = max(-value_limit, min(value_limit, value)) 109 110 self._value: int 111 112 self.__dict__['_value'] = value 113 114 @property 115 def sec(self) -> int: 116 """Returns the whole number of seconds""" 117 return int(abs(self._value) // self.MAX_NANOSEC) 118 119 @property 120 def ns(self) -> int: 121 """Returns the nanoseconds remainder after subtrating the whole number of seconds""" 122 return abs(self._value) - self.sec * self.MAX_NANOSEC 123 124 @property 125 def sign(self) -> int: 126 """Returns 1 if the timeoffset is positive, -1 if negative""" 127 if self._value < 0: 128 return -1 129 else: 130 return 1 131 132 def __setattr__(self, name: str, value: object) -> None: 133 raise TsValueError("Cannot assign to an immutable Timestamp") 134 135 def __mediatimestamp__(self) -> "Timestamp": 136 return self 137 138 @deprecated(version="4.0.0", 139 reason="This method is deprecated. TimeOffset has been merged into Timestamp.") 140 def __mediatimeoffset__(self) -> "Timestamp": 141 """Legacy method for getting a TimeOffset""" 142 return self 143 144 def __mediatimerange__(self) -> "TimeRange": 145 from .timerange import TimeRange # noqa: F811 146 147 return TimeRange.from_single_timestamp(self) 148 149 @classmethod 150 def get_time(cls) -> "Timestamp": 151 unix_time = time.time() 152 abs_unix_time = abs(unix_time) 153 unix_sec = int(abs_unix_time) 154 unix_ns = int(abs_unix_time*cls.MAX_NANOSEC) - int(abs_unix_time)*cls.MAX_NANOSEC 155 unix_sign = 1 if unix_time >= 0 else -1 156 157 return cls.from_unix(unix_sec, unix_ns, unix_sign=unix_sign) 158 159 @classmethod 160 @deprecated(version="4.0.0", 161 reason="This method is deprecated. TimeOffset has been merged into Timestamp.") 162 def from_timeoffset(cls, toff: TimestampConstructionType) -> "Timestamp": 163 """Legacy method that converted a TimeOffset to a Timestamp""" 164 toff = mediatimestamp(toff) 165 return cls(sec=toff.sec, ns=toff.ns, sign=toff.sign) 166 167 @classmethod 168 def get_interval_fraction(cls, 169 rate_num: RationalTypes, 170 rate_den: RationalTypes = 1, 171 factor: int = 1) -> "Timestamp": 172 if rate_num <= 0 or rate_den <= 0: 173 raise TsValueError("invalid rate") 174 if factor < 1: 175 raise TsValueError("invalid interval factor") 176 177 rate = Fraction(rate_num, rate_den) 178 ns = int((cls.MAX_NANOSEC * rate.denominator) // (rate.numerator * factor)) 179 return cls(ns=ns) 180 181 @classmethod 182 def from_millisec(cls, millisec: int) -> "Timestamp": 183 ns = millisec * 1000**2 184 return cls(ns=ns) 185 186 @classmethod 187 def from_microsec(cls, microsec: int) -> "Timestamp": 188 ns = microsec * 1000 189 return cls(ns=ns) 190 191 @classmethod 192 def from_nanosec(cls, nanosec: int) -> "Timestamp": 193 return cls(ns=nanosec) 194 195 @classmethod 196 def from_sec_frac(cls, toff_str: str) -> "Timestamp": 197 sec_frac = toff_str.split(".") 198 if len(sec_frac) != 1 and len(sec_frac) != 2: 199 raise TsValueError("invalid second.fraction format") 200 sec = int(sec_frac[0]) 201 sign = 1 202 if sec_frac[0].startswith("-"): 203 sign = -1 204 sec = abs(sec) 205 ns = 0 206 if len(sec_frac) > 1: 207 ns = _parse_seconds_fraction(sec_frac[1]) 208 return cls(sec=sec, ns=ns, sign=sign) 209 210 @classmethod 211 def from_tai_sec_frac(cls, ts_str: str) -> "Timestamp": 212 return cls.from_sec_frac(ts_str) 213 214 @classmethod 215 def from_sec_nsec(cls, toff_str: str) -> "Timestamp": 216 sec_frac = toff_str.split(":") 217 if len(sec_frac) != 1 and len(sec_frac) != 2: 218 raise TsValueError("invalid second:nanosecond format") 219 sec = int(sec_frac[0]) 220 sign = 1 221 if sec_frac[0].startswith("-"): 222 sign = -1 223 sec = abs(sec) 224 ns = 0 225 if len(sec_frac) > 1: 226 ns = int(sec_frac[1]) 227 return cls(sec=sec, ns=ns, sign=sign) 228 229 @classmethod 230 def from_tai_sec_nsec(cls, ts_str: str) -> "Timestamp": 231 return cls.from_sec_nsec(ts_str) 232 233 @classmethod 234 def from_float(cls, toff_float: float) -> "Timestamp": 235 """Parse a float as a Timestamp 236 """ 237 sign = 1 238 if toff_float < 0: 239 sign = -1 240 ns = int(abs(toff_float) * cls.MAX_NANOSEC) 241 return cls(ns=ns, sign=sign) 242 243 @classmethod 244 def from_datetime(cls, dt: datetime) -> "Timestamp": 245 minTs = datetime.fromtimestamp(0, tz.gettz('UTC')) 246 utcdt = dt.astimezone(tz.gettz('UTC')) 247 seconds = abs(int((utcdt - minTs).total_seconds())) 248 nanoseconds = utcdt.microsecond * 1000 249 if utcdt < minTs: 250 sign = -1 251 if nanoseconds > 0: 252 # The microseconds was for a positive date-time. In a negative 253 # unix time it needs to be flipped. 254 nanoseconds = cls.MAX_NANOSEC - nanoseconds 255 else: 256 sign = 1 257 258 return cls.from_unix(unix_sec=seconds, unix_ns=nanoseconds, unix_sign=sign, is_leap=False) 259 260 @classmethod 261 def from_iso8601_utc(cls, iso8601utc: str) -> "Timestamp": 262 if not iso8601utc.endswith('Z'): 263 raise TsValueError("missing 'Z' at end of ISO 8601 UTC format") 264 year, month, day, hour, minute, second, ns = _parse_iso8601(iso8601utc[:-1]) 265 gmtuple = (year, month, day, hour, minute, second - (second == 60)) 266 secs_since_epoch = calendar.timegm(gmtuple) 267 if secs_since_epoch < 0: 268 sign = -1 269 secs_since_epoch = abs(secs_since_epoch) 270 if ns > 0: 271 # The ns parsed from the timestamp was for a positive ISO 8601 date-time. In a negative 272 # unix time it needs to be flipped. 273 ns = cls.MAX_NANOSEC - ns 274 secs_since_epoch -= 1 275 else: 276 sign = 1 277 278 return cls.from_unix(unix_sec=secs_since_epoch, unix_ns=ns, unix_sign=sign, is_leap=(second == 60)) 279 280 @classmethod 281 def from_smpte_timelabel(cls, timelabel: str) -> "Timestamp": 282 r = re.compile(r'(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)F(\d+) (\d+)/(\d+) UTC([-\+])(\d+):(\d+) TAI([-\+])(\d+)') 283 m = r.match(timelabel) 284 if m is None: 285 raise TsValueError("invalid SMPTE Time Label string format") 286 groups = m.groups() 287 leap_sec = int(int(groups[5]) == 60) 288 local_tm_sec = calendar.timegm(time.struct_time((int(groups[0]), int(groups[1]), int(groups[2]), 289 int(groups[3]), int(groups[4]), int(groups[5]) - leap_sec, 290 0, 0, 0))) 291 rate_num = int(groups[7]) 292 rate_den = int(groups[8]) 293 utc_sign = 1 294 if groups[9] == '-': 295 utc_sign = -1 296 tai_sign = 1 297 if groups[12] == '-': 298 tai_sign = -1 299 tai_seconds = (local_tm_sec + 300 leap_sec - 301 utc_sign*(int(groups[10])*60*60 + int(groups[11])*60) - 302 tai_sign*int(groups[13])) 303 count = Timestamp(tai_seconds, 0).to_count(rate_num, rate_den, cls.ROUND_UP) 304 count += int(groups[6]) 305 return cls.from_count(count, rate_num, rate_den) 306 307 @classmethod 308 def from_str(cls, ts_str: str,) -> "Timestamp": 309 """Parse a string as a TimeStamp 310 311 Accepts: 312 * SMPTE timelabel 313 * ISO 8601 UTC 314 * second:nanosecond 315 * second.fraction formats 316 * "now" to mean the current time. 317 """ 318 if 'F' in ts_str: 319 return cls.from_smpte_timelabel(ts_str) 320 elif 'T' in ts_str: 321 return cls.from_iso8601_utc(ts_str) 322 elif ts_str.strip() == 'now': 323 return cls.get_time() 324 elif '.' in ts_str: 325 return cls.from_sec_frac(ts_str) 326 else: 327 return cls.from_sec_nsec(ts_str) 328 329 @classmethod 330 def from_count(cls, count: int, rate_num: RationalTypes, rate_den: RationalTypes = 1) -> "Timestamp": 331 """Returns a new Timestamp derived from a count and a particular rate. 332 333 :param count: The sample count 334 :param rate_num: The numerator of the rate, in Hz 335 :param rate_den: The denominator of the rate in Hz 336 """ 337 if rate_num <= 0 or rate_den <= 0: 338 raise TsValueError("invalid rate") 339 sign = 1 340 if count < 0: 341 sign = -1 342 rate = Fraction(rate_num, rate_den) 343 ns = (cls.MAX_NANOSEC * abs(count) * rate.denominator) // rate.numerator 344 return cls(ns=ns, sign=sign) 345 346 @classmethod 347 def from_unix(cls, unix_sec: int, unix_ns: int, unix_sign: int = 1, is_leap: bool = False) -> "Timestamp": 348 leap_sec = 0 349 if unix_sign >= 0: 350 for tbl_sec, tbl_tai_sec_minus_1 in UTC_LEAP: 351 if unix_sec + is_leap >= tbl_sec: 352 leap_sec = (tbl_tai_sec_minus_1 + 1) - tbl_sec 353 break 354 else: 355 is_leap = False 356 return cls(sec=unix_sec+leap_sec, ns=unix_ns, sign=unix_sign) 357 358 def is_null(self) -> bool: 359 return self._value == 0 360 361 def get_leap_seconds(self) -> int: 362 """ Get the UTC leaps seconds. 363 Returns the number of leap seconds that the timestamp is adjusted by when 364 converting to UTC. 365 """ 366 leap_sec = 0 367 for unix_sec, tai_sec_minus_1 in UTC_LEAP: 368 if self.sec >= tai_sec_minus_1: 369 leap_sec = (tai_sec_minus_1 + 1) - unix_sec 370 break 371 372 return leap_sec 373 374 def to_millisec(self, rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> int: 375 use_rounding = rounding 376 if self.sign < 0: 377 if use_rounding == self.ROUND_UP: 378 use_rounding = self.ROUND_DOWN 379 elif use_rounding == self.ROUND_DOWN: 380 use_rounding = self.ROUND_UP 381 round_ns = 0 382 if use_rounding == self.ROUND_NEAREST: 383 round_ns = 1000**2 // 2 384 elif use_rounding == self.ROUND_UP: 385 round_ns = 1000**2 - 1 386 return int(self.sign * ((abs(self._value) + round_ns) // 1000**2)) 387 388 def to_microsec(self, rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> int: 389 use_rounding = rounding 390 if self.sign < 0: 391 if use_rounding == self.ROUND_UP: 392 use_rounding = self.ROUND_DOWN 393 elif use_rounding == self.ROUND_DOWN: 394 use_rounding = self.ROUND_UP 395 round_ns = 0 396 if use_rounding == self.ROUND_NEAREST: 397 round_ns = 1000 // 2 398 elif use_rounding == self.ROUND_UP: 399 round_ns = 1000 - 1 400 return int(self.sign * ((abs(self._value) + round_ns) // 1000)) 401 402 def to_nanosec(self) -> int: 403 return self._value 404 405 def to_sec_nsec(self) -> str: 406 """ Convert to <seconds>:<nanoseconds> 407 """ 408 strSign = "" 409 if self.sign < 0: 410 strSign = "-" 411 return u"{}{}:{}".format(strSign, self.sec, self.ns) 412 413 def to_sec_frac(self, fixed_size: bool = False) -> str: 414 """ Convert to <seconds>.<fraction> 415 """ 416 strSign = "" 417 if self.sign < 0: 418 strSign = "-" 419 return u"{}{}.{}".format( 420 strSign, 421 self.sec, 422 self._get_fractional_seconds(fixed_size=fixed_size)) 423 424 def to_tai_sec_nsec(self) -> str: 425 return self.to_sec_nsec() 426 427 def to_tai_sec_frac(self, fixed_size: bool = False) -> str: 428 return self.to_sec_frac(fixed_size=fixed_size) 429 430 def to_float(self) -> float: 431 """ Convert to a floating point number of seconds 432 """ 433 return self._value / Timestamp.MAX_NANOSEC 434 435 def to_datetime(self) -> datetime: 436 sec, nsec, sign, leap = self.to_unix() 437 microsecond = int(round(nsec/1000)) 438 if microsecond > 999999: 439 sec += 1 440 microsecond = 0 441 if sign < 0 and microsecond > 0: 442 # The microseconds is for a negative unix time. In a positive date-time 443 # it needs to be flipped. 444 microsecond = 1000000 - microsecond 445 sec += 1 446 dt = datetime.fromtimestamp(sign * sec, tz.gettz('UTC')) 447 dt = dt.replace(microsecond=microsecond) 448 449 return dt 450 451 def to_unix(self) -> Tuple[int, int, int, bool]: 452 """ Convert to unix seconds. 453 Returns a tuple of (seconds, nanoseconds, is_leap), where `is_leap` is 454 `True` when the input time corresponds exactly to a UTC leap second. 455 Note that this deliberately returns a tuple, to try and avoid confusion. 456 """ 457 if self._value < 0: 458 return (self.sec, self.ns, self.sign, False) 459 else: 460 leap_sec = 0 461 is_leap = False 462 for unix_sec, tai_sec_minus_1 in UTC_LEAP: 463 if self.sec >= tai_sec_minus_1: 464 leap_sec = (tai_sec_minus_1 + 1) - unix_sec 465 is_leap = self.sec == tai_sec_minus_1 466 break 467 468 return (self.sec - leap_sec, self.ns, self.sign, is_leap) 469 470 def to_unix_float(self) -> float: 471 """ Convert to unix seconds since the epoch as a floating point number 472 """ 473 (sec, ns, sign, _) = self.to_unix() 474 return sign * (sec + ns / Timestamp.MAX_NANOSEC) 475 476 def to_iso8601_utc(self) -> str: 477 """ Get printed representation in ISO8601 format (UTC) 478 YYYY-MM-DDThh:mm:ss.s 479 where `s` is fractional seconds at nanosecond precision (always 9-chars wide) 480 """ 481 unix_s, unix_ns, unix_sign, is_leap = self.to_unix() 482 if unix_sign < 0 and unix_ns > 0: 483 # The nanoseconds is for a negative unix time. In a positive ISO 8601 date-time 484 # it needs to be flipped. 485 unix_ns = Timestamp.MAX_NANOSEC - unix_ns 486 unix_s += 1 487 utc_bd = time.gmtime(unix_sign * unix_s) 488 frac_sec = Timestamp(ns=unix_ns)._get_fractional_seconds(fixed_size=True) 489 leap_sec = int(is_leap) 490 491 return '%04d-%02d-%02dT%02d:%02d:%02d.%sZ' % (utc_bd.tm_year, 492 utc_bd.tm_mon, 493 utc_bd.tm_mday, 494 utc_bd.tm_hour, 495 utc_bd.tm_min, 496 utc_bd.tm_sec + leap_sec, 497 frac_sec) 498 499 def to_smpte_timelabel(self, 500 rate_num: RationalTypes, 501 rate_den: RationalTypes = 1, 502 utc_offset: Optional[int] = None) -> str: 503 if rate_num <= 0 or rate_den <= 0: 504 raise TsValueError("invalid rate") 505 rate = Fraction(rate_num, rate_den) 506 count = self.to_count(rate) 507 normalised_ts = Timestamp.from_count(count, rate) 508 tai_seconds = normalised_ts.sec 509 count_on_or_after_second = Timestamp(tai_seconds, 0).to_count(rate, rounding=self.ROUND_UP) 510 count_within_second = count - count_on_or_after_second 511 512 unix_sec, unix_ns, unix_sign, is_leap = normalised_ts.to_unix() 513 leap_sec = int(is_leap) 514 515 if utc_offset is None: 516 # calculate local time offset 517 utc_offset_sec = time.timezone 518 lt = time.localtime(unix_sec) 519 if lt.tm_isdst > 0: 520 utc_offset_sec += 60*60 521 else: 522 utc_offset_sec = utc_offset 523 utc_offset_sec_abs = abs(utc_offset_sec) 524 utc_offset_hour = utc_offset_sec_abs // (60*60) 525 utc_offset_min = (utc_offset_sec_abs % (60*60)) // 60 526 utc_sign_char = '+' 527 if utc_offset_sec < 0: 528 utc_sign_char = '-' 529 530 utc_bd = time.gmtime(unix_sec + utc_offset_sec) 531 532 tai_offset = unix_sec + leap_sec - tai_seconds 533 tai_sign_char = '+' 534 if tai_offset < 0: 535 tai_sign_char = '-' 536 537 return '%04d-%02d-%02dT%02d:%02d:%02dF%02u %u/%u UTC%c%02u:%02u TAI%c%u' % ( 538 utc_bd.tm_year, utc_bd.tm_mon, utc_bd.tm_mday, 539 utc_bd.tm_hour, utc_bd.tm_min, utc_bd.tm_sec + leap_sec, 540 count_within_second, 541 rate.numerator, rate.denominator, 542 utc_sign_char, utc_offset_hour, utc_offset_min, 543 tai_sign_char, abs(tai_offset)) 544 545 def to_count(self, rate_num: RationalTypes, rate_den: RationalTypes = 1, 546 rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> int: 547 """Returns an integer such that if this Timestamp is equal to an exact number of samples at the given rate 548 then this is equal, and otherwise the value is rounded as indicated by the rounding parameter. 549 550 :param rate_num: numerator of rate 551 :param rate_den: denominator of rate 552 :param rounding: One of Timestamp.ROUND_NEAREST, Timestamp.ROUND_UP, or Timestamp.ROUND_DOWN 553 """ 554 if rate_num <= 0 or rate_den <= 0: 555 raise TsValueError("invalid rate") 556 557 rate = Fraction(rate_num, rate_den) 558 use_rounding = rounding 559 if self.sign < 0: 560 if use_rounding == self.ROUND_UP: 561 use_rounding = self.ROUND_DOWN 562 elif use_rounding == self.ROUND_DOWN: 563 use_rounding = self.ROUND_UP 564 if use_rounding == self.ROUND_NEAREST: 565 round_ns = Timestamp.get_interval_fraction(rate, factor=2).to_nanosec() 566 elif use_rounding == self.ROUND_UP: 567 round_ns = Timestamp.get_interval_fraction(rate, factor=1).to_nanosec() - 1 568 else: 569 round_ns = 0 570 571 return int(self.sign * ( 572 ((abs(self._value) + round_ns) * rate.numerator) // ( 573 self.MAX_NANOSEC * rate.denominator))) 574 575 def to_phase_offset(self, rate_num: RationalTypes, rate_den: RationalTypes = 1) -> "Timestamp": 576 """Return the smallest positive Timestamp such that abs(self - returnval) represents an integer number of 577 samples at the given rate""" 578 return self - self.normalise(rate_num, rate_den, rounding=Timestamp.ROUND_DOWN) 579 580 def normalise(self, 581 rate_num: RationalTypes, 582 rate_den: RationalTypes = 1, 583 rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> "Timestamp": 584 """Return the nearest Timestamp to self which represents an integer number of samples at the given rate. 585 586 :param rate_num: Rate numerator 587 :param rate_den: Rate denominator 588 :param rounding: How to round, if set to Timestamp.ROUND_DOWN (resp. Timestamp.ROUND_UP) this method will only 589 return a Timestamp less than or equal to this one (resp. greater than or equal to). 590 """ 591 return self.from_count(self.to_count(rate_num, rate_den, rounding), rate_num, rate_den) 592 593 def compare(self, other_in: TimestampConstructionType) -> int: 594 other = mediatimestamp(other_in) 595 if self._value > other._value: 596 return 1 597 elif self._value < other._value: 598 return -1 599 else: 600 return 0 601 602 def __str__(self) -> str: 603 return self.to_sec_nsec() 604 605 def __repr__(self) -> str: 606 return "{}.from_sec_nsec({!r})".format("mediatimestamp.immutable." + type(self).__name__, self.to_sec_nsec()) 607 608 def __abs__(self) -> "Timestamp": 609 return Timestamp(self.sec, self.ns, 1) 610 611 def __hash__(self) -> int: 612 return self.to_nanosec() 613 614 def __eq__(self, other: object) -> bool: 615 return isinstance(other, (int, float, Timestamp)) and self.compare(other) == 0 616 617 def __ne__(self, other: object) -> bool: 618 return not (self == other) 619 620 def __lt__(self, other: TimestampConstructionType) -> bool: 621 return self.compare(other) < 0 622 623 def __le__(self, other: TimestampConstructionType) -> bool: 624 return self.compare(other) <= 0 625 626 def __gt__(self, other: TimestampConstructionType) -> bool: 627 return self.compare(other) > 0 628 629 def __ge__(self, other: TimestampConstructionType) -> bool: 630 return self.compare(other) >= 0 631 632 def __add__(self, other_in: TimestampConstructionType) -> "Timestamp": 633 other = mediatimestamp(other_in) 634 ns = self._value + other._value 635 return Timestamp(ns=ns) 636 637 def __sub__(self, other_in: TimestampConstructionType) -> "Timestamp": 638 other = mediatimestamp(other_in) 639 ns = self._value - other._value 640 return Timestamp(ns=ns) 641 642 def __iadd__(self, other_in: TimestampConstructionType) -> "Timestamp": 643 other = mediatimestamp(other_in) 644 tmp = self + other 645 return self.__class__(ns=tmp._value) 646 647 def __isub__(self, other_in: TimestampConstructionType) -> "Timestamp": 648 other = mediatimestamp(other_in) 649 tmp = self - other 650 return self.__class__(ns=tmp._value) 651 652 def __mul__(self, anint: int) -> "Timestamp": 653 ns = self._value * anint 654 return Timestamp(ns=ns) 655 656 def __rmul__(self, anint: int) -> "Timestamp": 657 return (self * anint) 658 659 def __div__(self, anint: int) -> "Timestamp": 660 return (self // anint) 661 662 def __truediv__(self, anint: int) -> "Timestamp": 663 return (self // anint) 664 665 def __floordiv__(self, anint: int) -> "Timestamp": 666 ns = self._value // anint 667 return Timestamp(ns=ns) 668 669 def _get_fractional_seconds(self, fixed_size: bool = False) -> str: 670 div = self.MAX_NANOSEC // 10 671 rem = self.ns 672 sec_frac = "" 673 674 for i in range(0, 9): 675 if not fixed_size and i > 0 and rem == 0: 676 break 677 678 sec_frac += '%i' % (rem / div) 679 rem %= div 680 div //= 10 681 682 return sec_frac
A nanosecond precision immutable timestamp.
Note that the canonical representation of a Timestamp is seconds:nanoseconds, e.g. "4:500000000". Timestamp in seconds.fractions format (e.g. "4.5") can be parsed, but should not be used for serialization or storage due to difficulty disambiguating them from floats.
100 def __init__(self, sec: int = 0, ns: int = 0, sign: int = 1): 101 if sign < 0: 102 sign = -1 103 else: 104 sign = 1 105 value = sign * int(sec * self.MAX_NANOSEC + ns) 106 107 value_limit = self.MAX_SECONDS * self.MAX_NANOSEC - 1 108 value = max(-value_limit, min(value_limit, value)) 109 110 self._value: int 111 112 self.__dict__['_value'] = value
114 @property 115 def sec(self) -> int: 116 """Returns the whole number of seconds""" 117 return int(abs(self._value) // self.MAX_NANOSEC)
Returns the whole number of seconds
119 @property 120 def ns(self) -> int: 121 """Returns the nanoseconds remainder after subtrating the whole number of seconds""" 122 return abs(self._value) - self.sec * self.MAX_NANOSEC
Returns the nanoseconds remainder after subtrating the whole number of seconds
124 @property 125 def sign(self) -> int: 126 """Returns 1 if the timeoffset is positive, -1 if negative""" 127 if self._value < 0: 128 return -1 129 else: 130 return 1
Returns 1 if the timeoffset is positive, -1 if negative
149 @classmethod 150 def get_time(cls) -> "Timestamp": 151 unix_time = time.time() 152 abs_unix_time = abs(unix_time) 153 unix_sec = int(abs_unix_time) 154 unix_ns = int(abs_unix_time*cls.MAX_NANOSEC) - int(abs_unix_time)*cls.MAX_NANOSEC 155 unix_sign = 1 if unix_time >= 0 else -1 156 157 return cls.from_unix(unix_sec, unix_ns, unix_sign=unix_sign)
159 @classmethod 160 @deprecated(version="4.0.0", 161 reason="This method is deprecated. TimeOffset has been merged into Timestamp.") 162 def from_timeoffset(cls, toff: TimestampConstructionType) -> "Timestamp": 163 """Legacy method that converted a TimeOffset to a Timestamp""" 164 toff = mediatimestamp(toff) 165 return cls(sec=toff.sec, ns=toff.ns, sign=toff.sign)
Legacy method that converted a TimeOffset to a Timestamp
167 @classmethod 168 def get_interval_fraction(cls, 169 rate_num: RationalTypes, 170 rate_den: RationalTypes = 1, 171 factor: int = 1) -> "Timestamp": 172 if rate_num <= 0 or rate_den <= 0: 173 raise TsValueError("invalid rate") 174 if factor < 1: 175 raise TsValueError("invalid interval factor") 176 177 rate = Fraction(rate_num, rate_den) 178 ns = int((cls.MAX_NANOSEC * rate.denominator) // (rate.numerator * factor)) 179 return cls(ns=ns)
195 @classmethod 196 def from_sec_frac(cls, toff_str: str) -> "Timestamp": 197 sec_frac = toff_str.split(".") 198 if len(sec_frac) != 1 and len(sec_frac) != 2: 199 raise TsValueError("invalid second.fraction format") 200 sec = int(sec_frac[0]) 201 sign = 1 202 if sec_frac[0].startswith("-"): 203 sign = -1 204 sec = abs(sec) 205 ns = 0 206 if len(sec_frac) > 1: 207 ns = _parse_seconds_fraction(sec_frac[1]) 208 return cls(sec=sec, ns=ns, sign=sign)
214 @classmethod 215 def from_sec_nsec(cls, toff_str: str) -> "Timestamp": 216 sec_frac = toff_str.split(":") 217 if len(sec_frac) != 1 and len(sec_frac) != 2: 218 raise TsValueError("invalid second:nanosecond format") 219 sec = int(sec_frac[0]) 220 sign = 1 221 if sec_frac[0].startswith("-"): 222 sign = -1 223 sec = abs(sec) 224 ns = 0 225 if len(sec_frac) > 1: 226 ns = int(sec_frac[1]) 227 return cls(sec=sec, ns=ns, sign=sign)
233 @classmethod 234 def from_float(cls, toff_float: float) -> "Timestamp": 235 """Parse a float as a Timestamp 236 """ 237 sign = 1 238 if toff_float < 0: 239 sign = -1 240 ns = int(abs(toff_float) * cls.MAX_NANOSEC) 241 return cls(ns=ns, sign=sign)
Parse a float as a Timestamp
243 @classmethod 244 def from_datetime(cls, dt: datetime) -> "Timestamp": 245 minTs = datetime.fromtimestamp(0, tz.gettz('UTC')) 246 utcdt = dt.astimezone(tz.gettz('UTC')) 247 seconds = abs(int((utcdt - minTs).total_seconds())) 248 nanoseconds = utcdt.microsecond * 1000 249 if utcdt < minTs: 250 sign = -1 251 if nanoseconds > 0: 252 # The microseconds was for a positive date-time. In a negative 253 # unix time it needs to be flipped. 254 nanoseconds = cls.MAX_NANOSEC - nanoseconds 255 else: 256 sign = 1 257 258 return cls.from_unix(unix_sec=seconds, unix_ns=nanoseconds, unix_sign=sign, is_leap=False)
260 @classmethod 261 def from_iso8601_utc(cls, iso8601utc: str) -> "Timestamp": 262 if not iso8601utc.endswith('Z'): 263 raise TsValueError("missing 'Z' at end of ISO 8601 UTC format") 264 year, month, day, hour, minute, second, ns = _parse_iso8601(iso8601utc[:-1]) 265 gmtuple = (year, month, day, hour, minute, second - (second == 60)) 266 secs_since_epoch = calendar.timegm(gmtuple) 267 if secs_since_epoch < 0: 268 sign = -1 269 secs_since_epoch = abs(secs_since_epoch) 270 if ns > 0: 271 # The ns parsed from the timestamp was for a positive ISO 8601 date-time. In a negative 272 # unix time it needs to be flipped. 273 ns = cls.MAX_NANOSEC - ns 274 secs_since_epoch -= 1 275 else: 276 sign = 1 277 278 return cls.from_unix(unix_sec=secs_since_epoch, unix_ns=ns, unix_sign=sign, is_leap=(second == 60))
280 @classmethod 281 def from_smpte_timelabel(cls, timelabel: str) -> "Timestamp": 282 r = re.compile(r'(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)F(\d+) (\d+)/(\d+) UTC([-\+])(\d+):(\d+) TAI([-\+])(\d+)') 283 m = r.match(timelabel) 284 if m is None: 285 raise TsValueError("invalid SMPTE Time Label string format") 286 groups = m.groups() 287 leap_sec = int(int(groups[5]) == 60) 288 local_tm_sec = calendar.timegm(time.struct_time((int(groups[0]), int(groups[1]), int(groups[2]), 289 int(groups[3]), int(groups[4]), int(groups[5]) - leap_sec, 290 0, 0, 0))) 291 rate_num = int(groups[7]) 292 rate_den = int(groups[8]) 293 utc_sign = 1 294 if groups[9] == '-': 295 utc_sign = -1 296 tai_sign = 1 297 if groups[12] == '-': 298 tai_sign = -1 299 tai_seconds = (local_tm_sec + 300 leap_sec - 301 utc_sign*(int(groups[10])*60*60 + int(groups[11])*60) - 302 tai_sign*int(groups[13])) 303 count = Timestamp(tai_seconds, 0).to_count(rate_num, rate_den, cls.ROUND_UP) 304 count += int(groups[6]) 305 return cls.from_count(count, rate_num, rate_den)
307 @classmethod 308 def from_str(cls, ts_str: str,) -> "Timestamp": 309 """Parse a string as a TimeStamp 310 311 Accepts: 312 * SMPTE timelabel 313 * ISO 8601 UTC 314 * second:nanosecond 315 * second.fraction formats 316 * "now" to mean the current time. 317 """ 318 if 'F' in ts_str: 319 return cls.from_smpte_timelabel(ts_str) 320 elif 'T' in ts_str: 321 return cls.from_iso8601_utc(ts_str) 322 elif ts_str.strip() == 'now': 323 return cls.get_time() 324 elif '.' in ts_str: 325 return cls.from_sec_frac(ts_str) 326 else: 327 return cls.from_sec_nsec(ts_str)
Parse a string as a TimeStamp
Accepts:
- SMPTE timelabel
- ISO 8601 UTC
- second:nanosecond
- second.fraction formats
- "now" to mean the current time.
329 @classmethod 330 def from_count(cls, count: int, rate_num: RationalTypes, rate_den: RationalTypes = 1) -> "Timestamp": 331 """Returns a new Timestamp derived from a count and a particular rate. 332 333 :param count: The sample count 334 :param rate_num: The numerator of the rate, in Hz 335 :param rate_den: The denominator of the rate in Hz 336 """ 337 if rate_num <= 0 or rate_den <= 0: 338 raise TsValueError("invalid rate") 339 sign = 1 340 if count < 0: 341 sign = -1 342 rate = Fraction(rate_num, rate_den) 343 ns = (cls.MAX_NANOSEC * abs(count) * rate.denominator) // rate.numerator 344 return cls(ns=ns, sign=sign)
Returns a new Timestamp derived from a count and a particular rate.
Parameters
- count: The sample count
- rate_num: The numerator of the rate, in Hz
- rate_den: The denominator of the rate in Hz
346 @classmethod 347 def from_unix(cls, unix_sec: int, unix_ns: int, unix_sign: int = 1, is_leap: bool = False) -> "Timestamp": 348 leap_sec = 0 349 if unix_sign >= 0: 350 for tbl_sec, tbl_tai_sec_minus_1 in UTC_LEAP: 351 if unix_sec + is_leap >= tbl_sec: 352 leap_sec = (tbl_tai_sec_minus_1 + 1) - tbl_sec 353 break 354 else: 355 is_leap = False 356 return cls(sec=unix_sec+leap_sec, ns=unix_ns, sign=unix_sign)
361 def get_leap_seconds(self) -> int: 362 """ Get the UTC leaps seconds. 363 Returns the number of leap seconds that the timestamp is adjusted by when 364 converting to UTC. 365 """ 366 leap_sec = 0 367 for unix_sec, tai_sec_minus_1 in UTC_LEAP: 368 if self.sec >= tai_sec_minus_1: 369 leap_sec = (tai_sec_minus_1 + 1) - unix_sec 370 break 371 372 return leap_sec
Get the UTC leaps seconds. Returns the number of leap seconds that the timestamp is adjusted by when converting to UTC.
374 def to_millisec(self, rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> int: 375 use_rounding = rounding 376 if self.sign < 0: 377 if use_rounding == self.ROUND_UP: 378 use_rounding = self.ROUND_DOWN 379 elif use_rounding == self.ROUND_DOWN: 380 use_rounding = self.ROUND_UP 381 round_ns = 0 382 if use_rounding == self.ROUND_NEAREST: 383 round_ns = 1000**2 // 2 384 elif use_rounding == self.ROUND_UP: 385 round_ns = 1000**2 - 1 386 return int(self.sign * ((abs(self._value) + round_ns) // 1000**2))
388 def to_microsec(self, rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> int: 389 use_rounding = rounding 390 if self.sign < 0: 391 if use_rounding == self.ROUND_UP: 392 use_rounding = self.ROUND_DOWN 393 elif use_rounding == self.ROUND_DOWN: 394 use_rounding = self.ROUND_UP 395 round_ns = 0 396 if use_rounding == self.ROUND_NEAREST: 397 round_ns = 1000 // 2 398 elif use_rounding == self.ROUND_UP: 399 round_ns = 1000 - 1 400 return int(self.sign * ((abs(self._value) + round_ns) // 1000))
405 def to_sec_nsec(self) -> str: 406 """ Convert to <seconds>:<nanoseconds> 407 """ 408 strSign = "" 409 if self.sign < 0: 410 strSign = "-" 411 return u"{}{}:{}".format(strSign, self.sec, self.ns)
Convert to
413 def to_sec_frac(self, fixed_size: bool = False) -> str: 414 """ Convert to <seconds>.<fraction> 415 """ 416 strSign = "" 417 if self.sign < 0: 418 strSign = "-" 419 return u"{}{}.{}".format( 420 strSign, 421 self.sec, 422 self._get_fractional_seconds(fixed_size=fixed_size))
Convert to
430 def to_float(self) -> float: 431 """ Convert to a floating point number of seconds 432 """ 433 return self._value / Timestamp.MAX_NANOSEC
Convert to a floating point number of seconds
435 def to_datetime(self) -> datetime: 436 sec, nsec, sign, leap = self.to_unix() 437 microsecond = int(round(nsec/1000)) 438 if microsecond > 999999: 439 sec += 1 440 microsecond = 0 441 if sign < 0 and microsecond > 0: 442 # The microseconds is for a negative unix time. In a positive date-time 443 # it needs to be flipped. 444 microsecond = 1000000 - microsecond 445 sec += 1 446 dt = datetime.fromtimestamp(sign * sec, tz.gettz('UTC')) 447 dt = dt.replace(microsecond=microsecond) 448 449 return dt
451 def to_unix(self) -> Tuple[int, int, int, bool]: 452 """ Convert to unix seconds. 453 Returns a tuple of (seconds, nanoseconds, is_leap), where `is_leap` is 454 `True` when the input time corresponds exactly to a UTC leap second. 455 Note that this deliberately returns a tuple, to try and avoid confusion. 456 """ 457 if self._value < 0: 458 return (self.sec, self.ns, self.sign, False) 459 else: 460 leap_sec = 0 461 is_leap = False 462 for unix_sec, tai_sec_minus_1 in UTC_LEAP: 463 if self.sec >= tai_sec_minus_1: 464 leap_sec = (tai_sec_minus_1 + 1) - unix_sec 465 is_leap = self.sec == tai_sec_minus_1 466 break 467 468 return (self.sec - leap_sec, self.ns, self.sign, is_leap)
Convert to unix seconds.
Returns a tuple of (seconds, nanoseconds, is_leap), where is_leap
is
True
when the input time corresponds exactly to a UTC leap second.
Note that this deliberately returns a tuple, to try and avoid confusion.
470 def to_unix_float(self) -> float: 471 """ Convert to unix seconds since the epoch as a floating point number 472 """ 473 (sec, ns, sign, _) = self.to_unix() 474 return sign * (sec + ns / Timestamp.MAX_NANOSEC)
Convert to unix seconds since the epoch as a floating point number
476 def to_iso8601_utc(self) -> str: 477 """ Get printed representation in ISO8601 format (UTC) 478 YYYY-MM-DDThh:mm:ss.s 479 where `s` is fractional seconds at nanosecond precision (always 9-chars wide) 480 """ 481 unix_s, unix_ns, unix_sign, is_leap = self.to_unix() 482 if unix_sign < 0 and unix_ns > 0: 483 # The nanoseconds is for a negative unix time. In a positive ISO 8601 date-time 484 # it needs to be flipped. 485 unix_ns = Timestamp.MAX_NANOSEC - unix_ns 486 unix_s += 1 487 utc_bd = time.gmtime(unix_sign * unix_s) 488 frac_sec = Timestamp(ns=unix_ns)._get_fractional_seconds(fixed_size=True) 489 leap_sec = int(is_leap) 490 491 return '%04d-%02d-%02dT%02d:%02d:%02d.%sZ' % (utc_bd.tm_year, 492 utc_bd.tm_mon, 493 utc_bd.tm_mday, 494 utc_bd.tm_hour, 495 utc_bd.tm_min, 496 utc_bd.tm_sec + leap_sec, 497 frac_sec)
Get printed representation in ISO8601 format (UTC)
YYYY-MM-DDThh:mm:ss.s
where s
is fractional seconds at nanosecond precision (always 9-chars wide)
499 def to_smpte_timelabel(self, 500 rate_num: RationalTypes, 501 rate_den: RationalTypes = 1, 502 utc_offset: Optional[int] = None) -> str: 503 if rate_num <= 0 or rate_den <= 0: 504 raise TsValueError("invalid rate") 505 rate = Fraction(rate_num, rate_den) 506 count = self.to_count(rate) 507 normalised_ts = Timestamp.from_count(count, rate) 508 tai_seconds = normalised_ts.sec 509 count_on_or_after_second = Timestamp(tai_seconds, 0).to_count(rate, rounding=self.ROUND_UP) 510 count_within_second = count - count_on_or_after_second 511 512 unix_sec, unix_ns, unix_sign, is_leap = normalised_ts.to_unix() 513 leap_sec = int(is_leap) 514 515 if utc_offset is None: 516 # calculate local time offset 517 utc_offset_sec = time.timezone 518 lt = time.localtime(unix_sec) 519 if lt.tm_isdst > 0: 520 utc_offset_sec += 60*60 521 else: 522 utc_offset_sec = utc_offset 523 utc_offset_sec_abs = abs(utc_offset_sec) 524 utc_offset_hour = utc_offset_sec_abs // (60*60) 525 utc_offset_min = (utc_offset_sec_abs % (60*60)) // 60 526 utc_sign_char = '+' 527 if utc_offset_sec < 0: 528 utc_sign_char = '-' 529 530 utc_bd = time.gmtime(unix_sec + utc_offset_sec) 531 532 tai_offset = unix_sec + leap_sec - tai_seconds 533 tai_sign_char = '+' 534 if tai_offset < 0: 535 tai_sign_char = '-' 536 537 return '%04d-%02d-%02dT%02d:%02d:%02dF%02u %u/%u UTC%c%02u:%02u TAI%c%u' % ( 538 utc_bd.tm_year, utc_bd.tm_mon, utc_bd.tm_mday, 539 utc_bd.tm_hour, utc_bd.tm_min, utc_bd.tm_sec + leap_sec, 540 count_within_second, 541 rate.numerator, rate.denominator, 542 utc_sign_char, utc_offset_hour, utc_offset_min, 543 tai_sign_char, abs(tai_offset))
545 def to_count(self, rate_num: RationalTypes, rate_den: RationalTypes = 1, 546 rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> int: 547 """Returns an integer such that if this Timestamp is equal to an exact number of samples at the given rate 548 then this is equal, and otherwise the value is rounded as indicated by the rounding parameter. 549 550 :param rate_num: numerator of rate 551 :param rate_den: denominator of rate 552 :param rounding: One of Timestamp.ROUND_NEAREST, Timestamp.ROUND_UP, or Timestamp.ROUND_DOWN 553 """ 554 if rate_num <= 0 or rate_den <= 0: 555 raise TsValueError("invalid rate") 556 557 rate = Fraction(rate_num, rate_den) 558 use_rounding = rounding 559 if self.sign < 0: 560 if use_rounding == self.ROUND_UP: 561 use_rounding = self.ROUND_DOWN 562 elif use_rounding == self.ROUND_DOWN: 563 use_rounding = self.ROUND_UP 564 if use_rounding == self.ROUND_NEAREST: 565 round_ns = Timestamp.get_interval_fraction(rate, factor=2).to_nanosec() 566 elif use_rounding == self.ROUND_UP: 567 round_ns = Timestamp.get_interval_fraction(rate, factor=1).to_nanosec() - 1 568 else: 569 round_ns = 0 570 571 return int(self.sign * ( 572 ((abs(self._value) + round_ns) * rate.numerator) // ( 573 self.MAX_NANOSEC * rate.denominator)))
Returns an integer such that if this Timestamp is equal to an exact number of samples at the given rate then this is equal, and otherwise the value is rounded as indicated by the rounding parameter.
Parameters
- rate_num: numerator of rate
- rate_den: denominator of rate
- rounding: One of Timestamp.ROUND_NEAREST, Timestamp.ROUND_UP, or Timestamp.ROUND_DOWN
575 def to_phase_offset(self, rate_num: RationalTypes, rate_den: RationalTypes = 1) -> "Timestamp": 576 """Return the smallest positive Timestamp such that abs(self - returnval) represents an integer number of 577 samples at the given rate""" 578 return self - self.normalise(rate_num, rate_den, rounding=Timestamp.ROUND_DOWN)
Return the smallest positive Timestamp such that abs(self - returnval) represents an integer number of samples at the given rate
580 def normalise(self, 581 rate_num: RationalTypes, 582 rate_den: RationalTypes = 1, 583 rounding: "Timestamp.Rounding" = ROUND_NEAREST) -> "Timestamp": 584 """Return the nearest Timestamp to self which represents an integer number of samples at the given rate. 585 586 :param rate_num: Rate numerator 587 :param rate_den: Rate denominator 588 :param rounding: How to round, if set to Timestamp.ROUND_DOWN (resp. Timestamp.ROUND_UP) this method will only 589 return a Timestamp less than or equal to this one (resp. greater than or equal to). 590 """ 591 return self.from_count(self.to_count(rate_num, rate_den, rounding), rate_num, rate_den)
Return the nearest Timestamp to self which represents an integer number of samples at the given rate.
Parameters
- rate_num: Rate numerator
- rate_den: Rate denominator
- rounding: How to round, if set to Timestamp.ROUND_DOWN (resp. Timestamp.ROUND_UP) this method will only return a Timestamp less than or equal to this one (resp. greater than or equal to).
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0)
4
Inherited Members
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
43 @runtime_checkable 44 class SupportsMediaTimestamp (Protocol): 45 def __mediatimestamp__(self) -> "Timestamp": 46 ...
This is an abstract base class for any class that can be automagically converted into a Timestamp.
To implement this simply implement the __mediatimestamp__ magic method. No need to inherit from this class explicitly.
66def mediatimestamp(v: TimestampConstructionType) -> "Timestamp": 67 """This method can be called on any object which supports the __mediatimestamp__ magic method 68 and also on a Timestamp, an int or a float. 69 It will always return a Timestamp or raise a ValueError. 70 """ 71 if isinstance(v, Timestamp): 72 return v 73 elif isinstance(v, int): 74 return Timestamp(v) 75 elif isinstance(v, float): 76 return Timestamp.from_float(v) 77 elif hasattr(v, "__mediatimestamp__"): 78 return v.__mediatimestamp__() 79 else: 80 raise ValueError("{!r} cannot be converted to a mediatimestamp.Timestamp".format(v))
This method can be called on any object which supports the __mediatimestamp__ magic method and also on a Timestamp, an int or a float. It will always return a Timestamp or raise a ValueError.
72class TimeRange (object): 73 """A nanosecond immutable precision time range object""" 74 75 class Inclusivity (int): 76 def __and__(self, other: int) -> "TimeRange.Inclusivity": 77 return TimeRange.Inclusivity(int(self) & int(other) & 0x3) 78 79 def __or__(self, other: int) -> "TimeRange.Inclusivity": 80 return TimeRange.Inclusivity((int(self) | int(other)) & 0x3) 81 82 def __xor__(self, other: int) -> "TimeRange.Inclusivity": 83 return TimeRange.Inclusivity((int(self) ^ int(other)) & 0x3) 84 85 def __invert__(self) -> "TimeRange.Inclusivity": 86 return TimeRange.Inclusivity((~int(self)) & 0x3) 87 88 EXCLUSIVE = Inclusivity(0x0) 89 INCLUDE_START = Inclusivity(0x1) 90 INCLUDE_END = Inclusivity(0x2) 91 INCLUSIVE = Inclusivity(0x3) 92 93 class Rounding(int): 94 pass 95 96 ROUND_DOWN = Rounding(0) 97 ROUND_NEAREST = Rounding(1) 98 ROUND_UP = Rounding(2) 99 ROUND_IN = Rounding(3) 100 ROUND_OUT = Rounding(4) 101 ROUND_START = Rounding(5) 102 ROUND_END = Rounding(6) 103 104 def __init__(self, 105 start: Optional[SupportsMediaTimestamp], 106 end: Optional[SupportsMediaTimestamp], 107 inclusivity: "TimeRange.Inclusivity" = INCLUSIVE): 108 """Construct a time range starting at start and ending at end 109 110 :param start: A Timestamp or None 111 :param end: A Timestamp or None 112 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 113 super().__init__() 114 self.start: Optional[Timestamp] 115 self.end: Optional[Timestamp] 116 self.inclusivity: TimeRange.Inclusivity 117 118 # Convert convertible inputs 119 if start is not None: 120 start = mediatimestamp(start) 121 if end is not None: 122 end = mediatimestamp(end) 123 124 # Normalise the 'never' cases 125 if start is not None and end is not None: 126 if start > end or (start == end and inclusivity != TimeRange.INCLUSIVE): 127 start = Timestamp() 128 end = Timestamp() 129 inclusivity = TimeRange.EXCLUSIVE 130 131 # Normalise the 'eternity' cases 132 if start is None and end is None: 133 inclusivity = TimeRange.INCLUSIVE 134 135 self.__dict__['start'] = start 136 self.__dict__['end'] = end 137 self.__dict__['inclusivity'] = inclusivity 138 139 def __setattr__(self, name: str, value: object) -> None: 140 raise TsValueError("Cannot assign to an immutable TimeRange") 141 142 def __mediatimerange__(self) -> "TimeRange": 143 return self 144 145 def __iter__(self) -> Iterator[Timestamp]: 146 return self.at_rate(MAX_NANOSEC) 147 148 def __reversed__(self) -> Iterator[Timestamp]: 149 return self.reversed_at_rate(MAX_NANOSEC) 150 151 def __hash__(self) -> int: 152 return hash(self.to_sec_nsec_range()) 153 154 def at_rate(self, 155 numerator: RationalTypes, 156 denominator: RationalTypes = 1, 157 phase_offset: SupportsMediaTimestamp = Timestamp()) -> Iterator[Timestamp]: 158 """Returns an iterable which yields Timestamp objects at the specified rate within the 159 range starting at the beginning and moving later. 160 161 :param numerator: The numerator for the rate in Hz (or the exact rate as a Fraction or float) 162 :param denominator: The denominator for the rate in Hz 163 :param phase_offset: A Timestamp object which sets the phase offset of the first timestamp 164 drawn from the iterable. 165 166 :raises: ValueError If a phase_offset is specified which is larger than the reciprocal of the rate 167 168 :returns: an iterable that yields Timestamp objects 169 """ 170 rate = Fraction(numerator, denominator) 171 phase_offset = mediatimestamp(phase_offset) 172 if phase_offset >= Timestamp.from_count(1, rate): 173 raise ValueError("phase_offset of {} is too large for rate {}".format(phase_offset, rate)) 174 175 if self.start is None: 176 raise ValueError("Cannot iterate over a timerange with no start") 177 178 count = (self.start - phase_offset).to_count(rate) 179 180 while True: 181 ts = Timestamp.from_count(count, rate) + phase_offset 182 count += 1 183 184 if ts < self.start or ((self.inclusivity & TimeRange.INCLUDE_START) == 0 and ts == self.start): 185 continue 186 elif (self.end is not None and 187 (ts > self.end or ((self.inclusivity & TimeRange.INCLUDE_END) == 0 and ts == self.end))): 188 break 189 else: 190 yield ts 191 192 def reversed_at_rate(self, 193 numerator: RationalTypes, 194 denominator: RationalTypes = 1, 195 phase_offset: SupportsMediaTimestamp = Timestamp()) -> Iterator[Timestamp]: 196 """Returns an iterable which yields Timestamp objects at the specified rate within the 197 range starting at the end and moving earlier. 198 199 :param numerator: The numerator for the rate in Hz (or the exact rate as a Fraction or float) 200 :param denominator: The denominator for the rate in Hz 201 :param phase_offset: A Timestamp object which sets the phase offset of the first timestamp 202 drawn from the iterable. 203 204 :raises: ValueError If a phase_offset is specified which is larger than the reciprocal of the rate 205 206 :returns: an iterable that yields Timestamp objects 207 """ 208 phase_offset = mediatimestamp(phase_offset) 209 rate = Fraction(numerator, denominator) 210 if phase_offset >= Timestamp.from_count(1, rate): 211 raise ValueError("phase_offset of {} is too large for rate {}".format(phase_offset, rate)) 212 213 if self.end is None: 214 raise ValueError("Cannot reverse iterate over a timerange with no end") 215 216 count = (self.end - phase_offset).to_count(rate) 217 218 while True: 219 ts = Timestamp.from_count(count, rate) + phase_offset 220 count -= 1 221 222 if ts > self.end or ((self.inclusivity & TimeRange.INCLUDE_END) == 0 and ts == self.end): 223 continue 224 elif (self.start is not None and 225 (ts < self.start or ((self.inclusivity & TimeRange.INCLUDE_START) == 0 and ts == self.start))): 226 break 227 else: 228 yield ts 229 230 @classmethod 231 def from_timerange(cls, other: SupportsMediaTimeRange) -> "TimeRange": 232 """Construct an immutable timerange from another timerange""" 233 other = mediatimerange(other) 234 return TimeRange(other.start, 235 other.end, 236 other.inclusivity) 237 238 @classmethod 239 def from_start(cls, start: SupportsMediaTimestamp, inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 240 """Construct a time range starting at start with no end 241 242 :param start: A Timestamp 243 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 244 return cls(start, None, inclusivity) 245 246 @classmethod 247 def from_end(cls, end: SupportsMediaTimestamp, inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 248 """Construct a time range ending at end with no start 249 250 :param end: A Timestamp 251 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 252 return cls(None, end, inclusivity) 253 254 @classmethod 255 def from_start_length(cls, 256 start: SupportsMediaTimestamp, 257 length: SupportsMediaTimestamp, 258 inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 259 """Construct a time range starting at start and ending at (start + length) 260 261 :param start: A Timestamp 262 :param length: A Timestamp, which must be non-negative 263 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 264 265 :raises: TsValueError if the length is negative""" 266 length = mediatimestamp(length) 267 start = mediatimestamp(start) 268 if length < Timestamp(): 269 raise TsValueError("Length must be non-negative") 270 return cls(start, start + length, inclusivity) 271 272 @classmethod 273 def eternity(cls) -> "TimeRange": 274 """Return an unbounded time range covering all time""" 275 return cls(None, None) 276 277 @classmethod 278 def never(cls) -> "TimeRange": 279 """Return a time range covering no time""" 280 return cls(Timestamp(), Timestamp(), TimeRange.EXCLUSIVE) 281 282 @classmethod 283 def from_single_timestamp(cls, ts: SupportsMediaTimestamp) -> "TimeRange": 284 """Construct a time range containing only a single timestamp 285 286 :param ts: A Timestamp""" 287 ts = mediatimestamp(ts) 288 return cls(ts, ts, TimeRange.INCLUSIVE) 289 290 @classmethod 291 def from_str(cls, s: str, inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 292 """Convert a string to a time range. 293 294 Valid ranges are: 295 [<ts>_<ts>] 296 [<ts>_<ts>) 297 (<ts>_<ts>] 298 (<ts>_<ts>) 299 [<ts>] 300 <ts>_<ts> 301 <ts> 302 () 303 304 where <ts> is any valid string format for Timestamp.from_str() or an empty string. 305 306 The meaning of these is relatively simple: [ indicates including the start time, 307 ( indicates excluding it, ] indicates including the end time, and ) indicates excludint it. 308 If brackets are ommitted entirely then this is taken as an inclusive range at both ends. 309 Omitting a timestamp indicates that there is no bound on that end (ie. the range goes on forever), 310 including only a single timestamp by itself indicates a range containing exactly that one timestamp. 311 Finally the string "()" represents the empty range. 312 313 :param s: The string to process 314 """ 315 m = re.match(r'(\[|\()?([^_\)\]]+)?(_([^_\)\]]+)?)?(\]|\))?', s) 316 317 if m is None: 318 raise ValueError("{!r} is not a valid TimeRange".format(s)) 319 320 inc = TimeRange.INCLUSIVE 321 if m.group(1) == "(": 322 inc &= ~TimeRange.INCLUDE_START 323 if m.group(5) == ")": 324 inc &= ~TimeRange.INCLUDE_END 325 326 start_str = m.group(2) 327 end_str = m.group(4) 328 329 start: Optional[Timestamp] = None 330 end: Optional[Timestamp] = None 331 if start_str is not None: 332 start = Timestamp.from_str(start_str) 333 if end_str is not None: 334 end = Timestamp.from_str(end_str) 335 336 if start is None and end is None: 337 # Ie. we have no first or second timestamp 338 if m.group(3) is not None: 339 # ie. we have a '_' character 340 return cls.eternity() 341 else: 342 # We have no '_' character, so the whole range is empty 343 return cls.never() 344 elif start is not None and end is None and m.group(3) is None: 345 # timestamp of form <ts> 346 return cls.from_single_timestamp(start) 347 else: 348 return cls(start, end, inc) 349 350 @property 351 def length(self) -> Union[Timestamp, float]: 352 if self.end is None or self.start is None: 353 return float("inf") 354 return self.end - self.start 355 356 def bounded_before(self) -> bool: 357 return self.start is not None 358 359 def bounded_after(self) -> bool: 360 return self.end is not None 361 362 def unbounded(self) -> bool: 363 return self.start is None and self.end is None 364 365 def includes_start(self) -> bool: 366 return (self.inclusivity & TimeRange.INCLUDE_START) != 0 367 368 def includes_end(self) -> bool: 369 return (self.inclusivity & TimeRange.INCLUDE_END) != 0 370 371 def finite(self) -> bool: 372 return (self.start is not None and self.end is not None) 373 374 def __contains__(self, ts: object) -> bool: 375 """Returns true if the timestamp is within this range.""" 376 return ((isinstance(ts, SupportsMediaTimestamp)) and 377 (self.start is None or mediatimestamp(ts) >= self.start) and 378 (self.end is None or mediatimestamp(ts) <= self.end) and 379 (not ((self.start is not None) and 380 (mediatimestamp(ts) == self.start) and 381 (self.inclusivity & TimeRange.INCLUDE_START == 0))) and 382 (not ((self.end is not None) and 383 (mediatimestamp(ts) == self.end) and 384 (self.inclusivity & TimeRange.INCLUDE_END == 0)))) 385 386 def __eq__(self, other: object) -> bool: 387 if not isinstance(other, SupportsMediaTimeRange): 388 return False 389 390 other = mediatimerange(other) 391 return (isinstance(other, SupportsMediaTimeRange) and 392 ((self.is_empty() and other.is_empty()) or 393 (((self.start is None and other.start is None) or 394 (self.start == other.start and 395 (self.inclusivity & TimeRange.INCLUDE_START) == (other.inclusivity & TimeRange.INCLUDE_START))) and 396 ((self.end is None and other.end is None) or 397 (self.end == other.end and 398 (self.inclusivity & TimeRange.INCLUDE_END) == (other.inclusivity & TimeRange.INCLUDE_END)))))) 399 400 def __repr__(self) -> str: 401 return "{}.{}.from_str('{}')".format("mediatimestamp.immutable", type(self).__name__, self.to_sec_nsec_range()) 402 403 def __str__(self) -> str: 404 return self.to_sec_nsec_range() 405 406 def contains_subrange(self, tr: SupportsMediaTimeRange) -> bool: 407 """Returns True if the timerange supplied lies entirely inside this timerange""" 408 tr = mediatimerange(tr) 409 return ((not self.is_empty()) and 410 (tr.is_empty() or 411 (self.start is None or (tr.start is not None and self.start <= tr.start)) and 412 (self.end is None or (tr.end is not None and self.end >= tr.end)) and 413 (not ((self.start is not None) and 414 (tr.start is not None) and 415 (self.start == tr.start) and 416 (self.inclusivity & TimeRange.INCLUDE_START == 0) and 417 (tr.inclusivity & TimeRange.INCLUDE_START != 0))) and 418 (not ((self.end is not None) and 419 (tr.end is not None) and 420 (self.end == tr.end) and 421 (self.inclusivity & TimeRange.INCLUDE_END == 0) and 422 (tr.inclusivity & TimeRange.INCLUDE_END != 0))))) 423 424 def to_sec_nsec_range(self, with_inclusivity_markers: bool = True) -> str: 425 """Convert to [<seconds>:<nanoseconds>_<seconds>:<nanoseconds>] format, 426 usually the opening and closing delimiters are set to [ or ] for inclusive and ( or ) for exclusive ranges. 427 Unbounded ranges have no marker attached to them. 428 429 :param with_inclusivity_markers: if set to False do not include parentheses/brackets""" 430 if self.is_empty(): 431 if with_inclusivity_markers: 432 return "()" 433 else: 434 return "" 435 elif self.start is not None and self.end is not None and self.start == self.end: 436 if with_inclusivity_markers: 437 return "[" + self.start.to_tai_sec_nsec() + "]" 438 else: 439 return self.start.to_tai_sec_nsec() 440 441 if with_inclusivity_markers: 442 brackets = [("(", ")"), ("[", ")"), ("(", "]"), ("[", "]")][self.inclusivity] 443 else: 444 brackets = ("", "") 445 446 return '_'.join([ 447 (brackets[0] + self.start.to_tai_sec_nsec()) if self.start is not None else '', 448 (self.end.to_tai_sec_nsec() + brackets[1]) if self.end is not None else '' 449 ]) 450 451 def intersect_with(self, tr: SupportsMediaTimeRange) -> "TimeRange": 452 """Return a range which represents the intersection of this range with another""" 453 tr = mediatimerange(tr) 454 if self.is_empty() or tr.is_empty(): 455 return TimeRange.never() 456 457 start = self.start 458 if tr.start is not None and (self.start is None or self.start < tr.start): 459 start = tr.start 460 end = self.end 461 if tr.end is not None and (self.end is None or self.end > tr.end): 462 end = tr.end 463 464 inclusivity = TimeRange.EXCLUSIVE 465 if start is None or (start in self and start in tr): 466 inclusivity |= TimeRange.INCLUDE_START 467 if end is None or (end in self and end in tr): 468 inclusivity |= TimeRange.INCLUDE_END 469 470 if start is not None and end is not None and start > end: 471 return TimeRange.never() 472 473 return TimeRange(start, end, inclusivity) 474 475 def starts_inside_timerange(self, other: SupportsMediaTimeRange) -> bool: 476 """Returns true if the start of this timerange is located inside the other.""" 477 other = mediatimerange(other) 478 return (not self.is_empty() and 479 not other.is_empty() and 480 ((self.bounded_before() and self.start in other and 481 (not (other.bounded_after() and self.start == other.end and not self.includes_start()))) or 482 (self.bounded_before() and other.bounded_before() and self.start == other.start and 483 (not (self.includes_start() and not other.includes_start()))) or 484 (not self.bounded_before() and not other.bounded_before()))) 485 486 def ends_inside_timerange(self, other: SupportsMediaTimeRange) -> bool: 487 """Returns true if the end of this timerange is located inside the other.""" 488 other = mediatimerange(other) 489 return (not self.is_empty() and 490 not other.is_empty() and 491 ((self.bounded_after() and self.end in other and 492 (not (other.bounded_before() and self.end == other.start and not self.includes_end()))) or 493 (self.bounded_after() and other.bounded_after() and self.end == other.end and 494 (not (self.includes_end() and not other.includes_end()))) or 495 (not self.bounded_after() and not other.bounded_after()))) 496 497 def is_earlier_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 498 """Returns true if this timerange ends earlier than the start of the other.""" 499 other = mediatimerange(other) 500 return (not self.is_empty() and 501 not other.is_empty() and 502 other.bounded_before() and 503 self.bounded_after() and 504 (cast(Timestamp, self.end) < cast(Timestamp, other.start) or 505 (cast(Timestamp, self.end) == cast(Timestamp, other.start) and 506 not (self.includes_end() and other.includes_start())))) 507 508 def is_later_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 509 """Returns true if this timerange starts later than the end of the other.""" 510 other = mediatimerange(other) 511 return (not self.is_empty() and 512 not other.is_empty() and 513 other.bounded_after() and 514 self.bounded_before() and 515 (cast(Timestamp, self.start) > cast(Timestamp, other.end) or 516 (cast(Timestamp, self.start) == cast(Timestamp, other.end) and 517 not (self.includes_start() and other.includes_end())))) 518 519 def starts_earlier_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 520 """Returns true if this timerange starts earlier than the start of the other.""" 521 other = mediatimerange(other) 522 return (not self.is_empty() and 523 not other.is_empty() and 524 other.bounded_before() and 525 (not self.bounded_before() or 526 (cast(Timestamp, self.start) < cast(Timestamp, other.start) or 527 (cast(Timestamp, self.start) == cast(Timestamp, other.start) and 528 self.includes_start() and 529 not other.includes_start())))) 530 531 def starts_later_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 532 """Returns true if this timerange starts later than the start of the other.""" 533 other = mediatimerange(other) 534 return (not self.is_empty() and 535 not other.is_empty() and 536 self.bounded_before() and 537 (not other.bounded_before() or 538 (cast(Timestamp, self.start) > cast(Timestamp, other.start) or 539 (cast(Timestamp, self.start) == cast(Timestamp, other.start) and 540 (not self.includes_start() and other.includes_start()))))) 541 542 def ends_earlier_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 543 """Returns true if this timerange ends earlier than the end of the other.""" 544 other = mediatimerange(other) 545 return (not self.is_empty() and 546 not other.is_empty() and 547 self.bounded_after() and 548 (not other.bounded_after() or 549 (cast(Timestamp, self.end) < cast(Timestamp, other.end) or 550 (cast(Timestamp, self.end) == cast(Timestamp, other.end) and 551 (not self.includes_end() and other.includes_end()))))) 552 553 def ends_later_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 554 """Returns true if this timerange ends later than the end of the other.""" 555 other = mediatimerange(other) 556 return (not self.is_empty() and 557 not other.is_empty() and 558 other.bounded_after() and 559 (not self.bounded_after() or 560 (cast(Timestamp, self.end) > cast(Timestamp, other.end) or 561 (cast(Timestamp, self.end) == cast(Timestamp, other.end) and 562 self.includes_end() and 563 not other.includes_end())))) 564 565 def overlaps_with_timerange(self, other: SupportsMediaTimeRange) -> bool: 566 """Returns true if this timerange and the other overlap.""" 567 other = mediatimerange(other) 568 return (not self.is_earlier_than_timerange(other) and not self.is_later_than_timerange(other)) 569 570 def is_contiguous_with_timerange(self, other: SupportsMediaTimeRange) -> bool: 571 """Returns true if the union of this timerange and the other would be a valid timerange""" 572 other = mediatimerange(other) 573 return (self.overlaps_with_timerange(other) or 574 (self.is_earlier_than_timerange(other) and 575 self.end == other.start and 576 (self.includes_end() or other.includes_start())) or 577 (self.is_later_than_timerange(other) and 578 self.start == other.end and 579 (self.includes_start() or other.includes_end()))) 580 581 def union_with_timerange(self, other: SupportsMediaTimeRange) -> "TimeRange": 582 """Returns the union of this timerange and the other. 583 :raises: ValueError if the ranges are not contiguous.""" 584 other = mediatimerange(other) 585 if not self.is_contiguous_with_timerange(other): 586 raise ValueError("Timeranges {} and {} are not contiguous, so cannot take the union.".format(self, other)) 587 588 return self.extend_to_encompass_timerange(other) 589 590 def extend_to_encompass_timerange(self, other: SupportsMediaTimeRange) -> "TimeRange": 591 """Returns the timerange that encompasses this and the other timerange.""" 592 other = mediatimerange(other) 593 if self.is_empty(): 594 return other 595 596 if other.is_empty(): 597 return self 598 599 inclusivity = TimeRange.EXCLUSIVE 600 if self.start == other.start: 601 start = self.start 602 inclusivity |= ((self.inclusivity | other.inclusivity) & TimeRange.INCLUDE_START) 603 elif self.starts_earlier_than_timerange(other): 604 start = self.start 605 inclusivity |= (self.inclusivity & TimeRange.INCLUDE_START) 606 else: 607 start = other.start 608 inclusivity |= (other.inclusivity & TimeRange.INCLUDE_START) 609 610 if self.end == other.end: 611 end = self.end 612 inclusivity |= ((self.inclusivity | other.inclusivity) & TimeRange.INCLUDE_END) 613 elif self.ends_later_than_timerange(other): 614 end = self.end 615 inclusivity |= (self.inclusivity & TimeRange.INCLUDE_END) 616 else: 617 end = other.end 618 inclusivity |= (other.inclusivity & TimeRange.INCLUDE_END) 619 620 return TimeRange(start, end, inclusivity) 621 622 def split_at(self, timestamp: SupportsMediaTimestamp) -> Tuple["TimeRange", "TimeRange"]: 623 """Splits a timerange at a specified timestamp. 624 625 It is guaranteed that the splitting point will be in the *second* TimeRange returned, and not in the first. 626 627 :param timestamp: the timestamp to split at 628 :returns: A pair of TimeRange objects 629 :raises: ValueError if timestamp not in self""" 630 631 timestamp = mediatimestamp(timestamp) 632 633 if timestamp not in self: 634 raise ValueError("Cannot split range {} at {}".format(self, timestamp)) 635 636 return (TimeRange(self.start, timestamp, (self.inclusivity & TimeRange.INCLUDE_START)), 637 TimeRange(timestamp, self.end, TimeRange.INCLUDE_START | (self.inclusivity & TimeRange.INCLUDE_END))) 638 639 def timerange_between(self, other: SupportsMediaTimeRange) -> "TimeRange": 640 """Returns the time range between the end of the earlier timerange and the start of the later one""" 641 other = mediatimerange(other) 642 643 if self.is_contiguous_with_timerange(other): 644 return TimeRange.never() 645 elif self.is_earlier_than_timerange(other): 646 inclusivity = TimeRange.EXCLUSIVE 647 if not self.includes_end(): 648 inclusivity |= TimeRange.INCLUDE_START 649 if not other.includes_start(): 650 inclusivity |= TimeRange.INCLUDE_END 651 return TimeRange(self.end, other.start, inclusivity) 652 else: 653 inclusivity = TimeRange.EXCLUSIVE 654 if not self.includes_start(): 655 inclusivity |= TimeRange.INCLUDE_END 656 if not other.includes_end(): 657 inclusivity |= TimeRange.INCLUDE_START 658 return TimeRange(other.end, self.start, inclusivity) 659 660 def timerange_before(self) -> "TimeRange": 661 """Returns the time range before the start of the this one""" 662 if self.start is None: 663 return TimeRange.never() 664 665 if self.includes_start(): 666 inclusivity = TimeRange.EXCLUSIVE 667 else: 668 inclusivity = TimeRange.INCLUDE_END 669 670 return TimeRange.from_end(self.start, inclusivity=inclusivity) 671 672 def timerange_after(self) -> "TimeRange": 673 """Returns the time range after the end of the this one""" 674 if self.end is None: 675 return TimeRange.never() 676 677 if self.includes_end(): 678 inclusivity = TimeRange.EXCLUSIVE 679 else: 680 inclusivity = TimeRange.INCLUDE_START 681 682 return TimeRange.from_start(self.end, inclusivity=inclusivity) 683 684 def is_empty(self) -> bool: 685 """Returns true on any empty range.""" 686 return (self.start is not None and 687 self.end is not None and 688 self.start == self.end and 689 self.inclusivity != TimeRange.INCLUSIVE) 690 691 def is_normalised(self, 692 rate_num: RationalTypes, 693 rate_den: RationalTypes = 1, 694 rounding: Rounding = ROUND_NEAREST) -> bool: 695 """Checks if timerange is normalised""" 696 normalised_timerange = self.normalise(rate_num, rate_den, rounding) 697 if normalised_timerange == self: 698 return True 699 else: 700 return False 701 702 def normalise(self, 703 rate_num: RationalTypes, 704 rate_den: RationalTypes = 1, 705 rounding: Rounding = ROUND_NEAREST) -> "TimeRange": 706 """Returns a normalised half-open TimeRange based on this timerange. 707 708 The returned TimeRange will always have INCLUDE_START inclusivity. 709 710 If the original TimeRange was inclusive of its start then the returned TimeRange will 711 start at the normalised timestamp closest to that start point (respecting rounding). 712 713 If the original TimeRange was exclusive of its start then the returned TimeRange will 714 start at the next normalised timestamp after the normalised timestamp closest to that 715 start point (respecting rounding). 716 717 If the original TimeRange was exclusive of its end then the returned TimeRange will 718 end just before the normalised timestamp closest to that end point (respecting rounding). 719 720 If the original TimeRange was inclusive of its end then the returned TimeRange will 721 end just before the next normalised timestamp after the normalised timestamp closest to that 722 end point (respecting rounding). 723 724 The rounding options are: 725 * ROUND_NEAREST -- each end of the range independently rounds to the nearest normalised timestamp 726 * ROUND_UP -- both ends of the range round up 727 * ROUND_DOWN -- both ends of the range round down 728 * ROUND_IN -- The start of the range rounds up, the end rounds down 729 * ROUND_OUT -- The start of the range rounds down, the end rounds up 730 * ROUND_START -- The start rounds to the nearest normalised timestamp, the end rounds in the same direction 731 as the start 732 * ROUND_END -- The end rounds to the nearest normalised timestamp, the start rounds in the same direction 733 as the end 734 """ 735 if rounding == TimeRange.ROUND_OUT: 736 start_rounding = Timestamp.ROUND_DOWN 737 end_rounding = Timestamp.ROUND_UP 738 elif rounding == TimeRange.ROUND_IN: 739 start_rounding = Timestamp.ROUND_UP 740 end_rounding = Timestamp.ROUND_DOWN 741 elif rounding in [TimeRange.ROUND_START, TimeRange.ROUND_END]: 742 start_rounding = Timestamp.ROUND_NEAREST 743 end_rounding = Timestamp.ROUND_NEAREST 744 else: 745 start_rounding = Timestamp.Rounding(rounding) 746 end_rounding = Timestamp.Rounding(rounding) 747 748 rate = Fraction(rate_num, rate_den) 749 750 start: Optional[int] 751 if self.bounded_before(): 752 start = cast(Timestamp, self.start).to_count(rate, rounding=start_rounding) 753 else: 754 start = None 755 756 end: Optional[int] 757 if self.bounded_after(): 758 end = cast(Timestamp, self.end).to_count(rate, rounding=end_rounding) 759 else: 760 end = None 761 762 if rounding == TimeRange.ROUND_START and self.bounded_before() and self.bounded_after(): 763 if start == cast(Timestamp, self.start).to_count(rate, rounding=Timestamp.ROUND_UP): 764 end = cast(Timestamp, self.end).to_count(rate, rounding=Timestamp.ROUND_UP) 765 else: 766 end = cast(Timestamp, self.end).to_count(rate, rounding=Timestamp.ROUND_DOWN) 767 elif rounding == TimeRange.ROUND_END and self.bounded_before() and self.bounded_after(): 768 if end == cast(Timestamp, self.end).to_count(rate, rounding=Timestamp.ROUND_UP): 769 start = cast(Timestamp, self.start).to_count(rate, rounding=Timestamp.ROUND_UP) 770 else: 771 start = cast(Timestamp, self.start).to_count(rate, rounding=Timestamp.ROUND_DOWN) 772 773 if start is not None and not self.includes_start(): 774 start += 1 775 if end is not None and self.includes_end(): 776 end += 1 777 778 start_ts: Optional[Timestamp] = None 779 end_ts: Optional[Timestamp] = None 780 if start is not None: 781 start_ts = Timestamp.from_count(start, rate) 782 if end is not None: 783 end_ts = Timestamp.from_count(end, rate) 784 785 return TimeRange(start_ts, 786 end_ts, 787 TimeRange.INCLUDE_START) 788 789 def into_chunks( 790 self, 791 time_duration: Timestamp 792 ) -> Generator["TimeRange", None, None]: 793 """Returns a generator of TimeRanges of the length specified in time_duration based on this TimeRange. 794 795 If this TimeRange has an infinite length, then this will generate chunks of time_duration infinitely. 796 797 The first chunk will have the start inclusivity of this TimeRange, and will have an exclusive end. 798 The last chunk (if this TimeRange is not infinite) will have the end inclusivity of this TimeRange, and an 799 inclusive start. 800 All other chunks will have an inclusive start and exclusive end. 801 802 If the time_duration does not divide exactly into the length of this TimeRange, the last chunk returned will 803 be of the 'remainder' length. 804 805 If the time_duration is the same length or longer than this TimeRange, then a copy of this TimeRange will be 806 returned. 807 808 :param time_duration: A Timestamp representing the requested chunk length 809 :returns: A TimeRange generator that generates chunks. 810 """ 811 if not self.bounded_before(): 812 raise ValueError("The timerange to be chunked has no start time!") 813 remainder = self # this is a timerange from the end of the last one to the end of self 814 while not remainder.is_empty(): 815 next_timerange = TimeRange.from_start_length( 816 cast(Timestamp, remainder.start), 817 time_duration, 818 TimeRange.INCLUDE_START) 819 next_timerange = next_timerange.intersect_with(remainder) 820 821 if next_timerange.end == remainder.end and remainder.includes_end(): 822 next_timerange = TimeRange.from_start_length( 823 cast(Timestamp, next_timerange.start), 824 cast(Timestamp, next_timerange.length), 825 next_timerange.inclusivity | TimeRange.INCLUDE_END 826 ) 827 828 yield next_timerange 829 # If we have just yielded the final chunk, then this will produce an empty timerange. 830 remainder = remainder.intersect_with(next_timerange.timerange_after()) 831 return
A nanosecond immutable precision time range object
104 def __init__(self, 105 start: Optional[SupportsMediaTimestamp], 106 end: Optional[SupportsMediaTimestamp], 107 inclusivity: "TimeRange.Inclusivity" = INCLUSIVE): 108 """Construct a time range starting at start and ending at end 109 110 :param start: A Timestamp or None 111 :param end: A Timestamp or None 112 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 113 super().__init__() 114 self.start: Optional[Timestamp] 115 self.end: Optional[Timestamp] 116 self.inclusivity: TimeRange.Inclusivity 117 118 # Convert convertible inputs 119 if start is not None: 120 start = mediatimestamp(start) 121 if end is not None: 122 end = mediatimestamp(end) 123 124 # Normalise the 'never' cases 125 if start is not None and end is not None: 126 if start > end or (start == end and inclusivity != TimeRange.INCLUSIVE): 127 start = Timestamp() 128 end = Timestamp() 129 inclusivity = TimeRange.EXCLUSIVE 130 131 # Normalise the 'eternity' cases 132 if start is None and end is None: 133 inclusivity = TimeRange.INCLUSIVE 134 135 self.__dict__['start'] = start 136 self.__dict__['end'] = end 137 self.__dict__['inclusivity'] = inclusivity
Construct a time range starting at start and ending at end
Parameters
- start: A Timestamp or None
- end: A Timestamp or None
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
154 def at_rate(self, 155 numerator: RationalTypes, 156 denominator: RationalTypes = 1, 157 phase_offset: SupportsMediaTimestamp = Timestamp()) -> Iterator[Timestamp]: 158 """Returns an iterable which yields Timestamp objects at the specified rate within the 159 range starting at the beginning and moving later. 160 161 :param numerator: The numerator for the rate in Hz (or the exact rate as a Fraction or float) 162 :param denominator: The denominator for the rate in Hz 163 :param phase_offset: A Timestamp object which sets the phase offset of the first timestamp 164 drawn from the iterable. 165 166 :raises: ValueError If a phase_offset is specified which is larger than the reciprocal of the rate 167 168 :returns: an iterable that yields Timestamp objects 169 """ 170 rate = Fraction(numerator, denominator) 171 phase_offset = mediatimestamp(phase_offset) 172 if phase_offset >= Timestamp.from_count(1, rate): 173 raise ValueError("phase_offset of {} is too large for rate {}".format(phase_offset, rate)) 174 175 if self.start is None: 176 raise ValueError("Cannot iterate over a timerange with no start") 177 178 count = (self.start - phase_offset).to_count(rate) 179 180 while True: 181 ts = Timestamp.from_count(count, rate) + phase_offset 182 count += 1 183 184 if ts < self.start or ((self.inclusivity & TimeRange.INCLUDE_START) == 0 and ts == self.start): 185 continue 186 elif (self.end is not None and 187 (ts > self.end or ((self.inclusivity & TimeRange.INCLUDE_END) == 0 and ts == self.end))): 188 break 189 else: 190 yield ts
Returns an iterable which yields Timestamp objects at the specified rate within the range starting at the beginning and moving later.
Parameters
- numerator: The numerator for the rate in Hz (or the exact rate as a Fraction or float)
- denominator: The denominator for the rate in Hz
- phase_offset: A Timestamp object which sets the phase offset of the first timestamp drawn from the iterable.
Raises
- ValueError If a phase_offset is specified which is larger than the reciprocal of the rate
:returns: an iterable that yields Timestamp objects
192 def reversed_at_rate(self, 193 numerator: RationalTypes, 194 denominator: RationalTypes = 1, 195 phase_offset: SupportsMediaTimestamp = Timestamp()) -> Iterator[Timestamp]: 196 """Returns an iterable which yields Timestamp objects at the specified rate within the 197 range starting at the end and moving earlier. 198 199 :param numerator: The numerator for the rate in Hz (or the exact rate as a Fraction or float) 200 :param denominator: The denominator for the rate in Hz 201 :param phase_offset: A Timestamp object which sets the phase offset of the first timestamp 202 drawn from the iterable. 203 204 :raises: ValueError If a phase_offset is specified which is larger than the reciprocal of the rate 205 206 :returns: an iterable that yields Timestamp objects 207 """ 208 phase_offset = mediatimestamp(phase_offset) 209 rate = Fraction(numerator, denominator) 210 if phase_offset >= Timestamp.from_count(1, rate): 211 raise ValueError("phase_offset of {} is too large for rate {}".format(phase_offset, rate)) 212 213 if self.end is None: 214 raise ValueError("Cannot reverse iterate over a timerange with no end") 215 216 count = (self.end - phase_offset).to_count(rate) 217 218 while True: 219 ts = Timestamp.from_count(count, rate) + phase_offset 220 count -= 1 221 222 if ts > self.end or ((self.inclusivity & TimeRange.INCLUDE_END) == 0 and ts == self.end): 223 continue 224 elif (self.start is not None and 225 (ts < self.start or ((self.inclusivity & TimeRange.INCLUDE_START) == 0 and ts == self.start))): 226 break 227 else: 228 yield ts
Returns an iterable which yields Timestamp objects at the specified rate within the range starting at the end and moving earlier.
Parameters
- numerator: The numerator for the rate in Hz (or the exact rate as a Fraction or float)
- denominator: The denominator for the rate in Hz
- phase_offset: A Timestamp object which sets the phase offset of the first timestamp drawn from the iterable.
Raises
- ValueError If a phase_offset is specified which is larger than the reciprocal of the rate
:returns: an iterable that yields Timestamp objects
230 @classmethod 231 def from_timerange(cls, other: SupportsMediaTimeRange) -> "TimeRange": 232 """Construct an immutable timerange from another timerange""" 233 other = mediatimerange(other) 234 return TimeRange(other.start, 235 other.end, 236 other.inclusivity)
Construct an immutable timerange from another timerange
238 @classmethod 239 def from_start(cls, start: SupportsMediaTimestamp, inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 240 """Construct a time range starting at start with no end 241 242 :param start: A Timestamp 243 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 244 return cls(start, None, inclusivity)
Construct a time range starting at start with no end
Parameters
- start: A Timestamp
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
246 @classmethod 247 def from_end(cls, end: SupportsMediaTimestamp, inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 248 """Construct a time range ending at end with no start 249 250 :param end: A Timestamp 251 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 252 return cls(None, end, inclusivity)
Construct a time range ending at end with no start
Parameters
- end: A Timestamp
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
254 @classmethod 255 def from_start_length(cls, 256 start: SupportsMediaTimestamp, 257 length: SupportsMediaTimestamp, 258 inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 259 """Construct a time range starting at start and ending at (start + length) 260 261 :param start: A Timestamp 262 :param length: A Timestamp, which must be non-negative 263 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 264 265 :raises: TsValueError if the length is negative""" 266 length = mediatimestamp(length) 267 start = mediatimestamp(start) 268 if length < Timestamp(): 269 raise TsValueError("Length must be non-negative") 270 return cls(start, start + length, inclusivity)
Construct a time range starting at start and ending at (start + length)
Parameters
- start: A Timestamp
- length: A Timestamp, which must be non-negative
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
Raises
- TsValueError if the length is negative
272 @classmethod 273 def eternity(cls) -> "TimeRange": 274 """Return an unbounded time range covering all time""" 275 return cls(None, None)
Return an unbounded time range covering all time
277 @classmethod 278 def never(cls) -> "TimeRange": 279 """Return a time range covering no time""" 280 return cls(Timestamp(), Timestamp(), TimeRange.EXCLUSIVE)
Return a time range covering no time
282 @classmethod 283 def from_single_timestamp(cls, ts: SupportsMediaTimestamp) -> "TimeRange": 284 """Construct a time range containing only a single timestamp 285 286 :param ts: A Timestamp""" 287 ts = mediatimestamp(ts) 288 return cls(ts, ts, TimeRange.INCLUSIVE)
Construct a time range containing only a single timestamp
Parameters
- ts: A Timestamp
290 @classmethod 291 def from_str(cls, s: str, inclusivity: "TimeRange.Inclusivity" = INCLUSIVE) -> "TimeRange": 292 """Convert a string to a time range. 293 294 Valid ranges are: 295 [<ts>_<ts>] 296 [<ts>_<ts>) 297 (<ts>_<ts>] 298 (<ts>_<ts>) 299 [<ts>] 300 <ts>_<ts> 301 <ts> 302 () 303 304 where <ts> is any valid string format for Timestamp.from_str() or an empty string. 305 306 The meaning of these is relatively simple: [ indicates including the start time, 307 ( indicates excluding it, ] indicates including the end time, and ) indicates excludint it. 308 If brackets are ommitted entirely then this is taken as an inclusive range at both ends. 309 Omitting a timestamp indicates that there is no bound on that end (ie. the range goes on forever), 310 including only a single timestamp by itself indicates a range containing exactly that one timestamp. 311 Finally the string "()" represents the empty range. 312 313 :param s: The string to process 314 """ 315 m = re.match(r'(\[|\()?([^_\)\]]+)?(_([^_\)\]]+)?)?(\]|\))?', s) 316 317 if m is None: 318 raise ValueError("{!r} is not a valid TimeRange".format(s)) 319 320 inc = TimeRange.INCLUSIVE 321 if m.group(1) == "(": 322 inc &= ~TimeRange.INCLUDE_START 323 if m.group(5) == ")": 324 inc &= ~TimeRange.INCLUDE_END 325 326 start_str = m.group(2) 327 end_str = m.group(4) 328 329 start: Optional[Timestamp] = None 330 end: Optional[Timestamp] = None 331 if start_str is not None: 332 start = Timestamp.from_str(start_str) 333 if end_str is not None: 334 end = Timestamp.from_str(end_str) 335 336 if start is None and end is None: 337 # Ie. we have no first or second timestamp 338 if m.group(3) is not None: 339 # ie. we have a '_' character 340 return cls.eternity() 341 else: 342 # We have no '_' character, so the whole range is empty 343 return cls.never() 344 elif start is not None and end is None and m.group(3) is None: 345 # timestamp of form <ts> 346 return cls.from_single_timestamp(start) 347 else: 348 return cls(start, end, inc)
Convert a string to a time range.
Valid ranges are:
[
where
The meaning of these is relatively simple: [ indicates including the start time, ( indicates excluding it, ] indicates including the end time, and ) indicates excludint it. If brackets are ommitted entirely then this is taken as an inclusive range at both ends. Omitting a timestamp indicates that there is no bound on that end (ie. the range goes on forever), including only a single timestamp by itself indicates a range containing exactly that one timestamp. Finally the string "()" represents the empty range.
Parameters
- s: The string to process
406 def contains_subrange(self, tr: SupportsMediaTimeRange) -> bool: 407 """Returns True if the timerange supplied lies entirely inside this timerange""" 408 tr = mediatimerange(tr) 409 return ((not self.is_empty()) and 410 (tr.is_empty() or 411 (self.start is None or (tr.start is not None and self.start <= tr.start)) and 412 (self.end is None or (tr.end is not None and self.end >= tr.end)) and 413 (not ((self.start is not None) and 414 (tr.start is not None) and 415 (self.start == tr.start) and 416 (self.inclusivity & TimeRange.INCLUDE_START == 0) and 417 (tr.inclusivity & TimeRange.INCLUDE_START != 0))) and 418 (not ((self.end is not None) and 419 (tr.end is not None) and 420 (self.end == tr.end) and 421 (self.inclusivity & TimeRange.INCLUDE_END == 0) and 422 (tr.inclusivity & TimeRange.INCLUDE_END != 0)))))
Returns True if the timerange supplied lies entirely inside this timerange
424 def to_sec_nsec_range(self, with_inclusivity_markers: bool = True) -> str: 425 """Convert to [<seconds>:<nanoseconds>_<seconds>:<nanoseconds>] format, 426 usually the opening and closing delimiters are set to [ or ] for inclusive and ( or ) for exclusive ranges. 427 Unbounded ranges have no marker attached to them. 428 429 :param with_inclusivity_markers: if set to False do not include parentheses/brackets""" 430 if self.is_empty(): 431 if with_inclusivity_markers: 432 return "()" 433 else: 434 return "" 435 elif self.start is not None and self.end is not None and self.start == self.end: 436 if with_inclusivity_markers: 437 return "[" + self.start.to_tai_sec_nsec() + "]" 438 else: 439 return self.start.to_tai_sec_nsec() 440 441 if with_inclusivity_markers: 442 brackets = [("(", ")"), ("[", ")"), ("(", "]"), ("[", "]")][self.inclusivity] 443 else: 444 brackets = ("", "") 445 446 return '_'.join([ 447 (brackets[0] + self.start.to_tai_sec_nsec()) if self.start is not None else '', 448 (self.end.to_tai_sec_nsec() + brackets[1]) if self.end is not None else '' 449 ])
Convert to [
Parameters
- with_inclusivity_markers: if set to False do not include parentheses/brackets
451 def intersect_with(self, tr: SupportsMediaTimeRange) -> "TimeRange": 452 """Return a range which represents the intersection of this range with another""" 453 tr = mediatimerange(tr) 454 if self.is_empty() or tr.is_empty(): 455 return TimeRange.never() 456 457 start = self.start 458 if tr.start is not None and (self.start is None or self.start < tr.start): 459 start = tr.start 460 end = self.end 461 if tr.end is not None and (self.end is None or self.end > tr.end): 462 end = tr.end 463 464 inclusivity = TimeRange.EXCLUSIVE 465 if start is None or (start in self and start in tr): 466 inclusivity |= TimeRange.INCLUDE_START 467 if end is None or (end in self and end in tr): 468 inclusivity |= TimeRange.INCLUDE_END 469 470 if start is not None and end is not None and start > end: 471 return TimeRange.never() 472 473 return TimeRange(start, end, inclusivity)
Return a range which represents the intersection of this range with another
475 def starts_inside_timerange(self, other: SupportsMediaTimeRange) -> bool: 476 """Returns true if the start of this timerange is located inside the other.""" 477 other = mediatimerange(other) 478 return (not self.is_empty() and 479 not other.is_empty() and 480 ((self.bounded_before() and self.start in other and 481 (not (other.bounded_after() and self.start == other.end and not self.includes_start()))) or 482 (self.bounded_before() and other.bounded_before() and self.start == other.start and 483 (not (self.includes_start() and not other.includes_start()))) or 484 (not self.bounded_before() and not other.bounded_before())))
Returns true if the start of this timerange is located inside the other.
486 def ends_inside_timerange(self, other: SupportsMediaTimeRange) -> bool: 487 """Returns true if the end of this timerange is located inside the other.""" 488 other = mediatimerange(other) 489 return (not self.is_empty() and 490 not other.is_empty() and 491 ((self.bounded_after() and self.end in other and 492 (not (other.bounded_before() and self.end == other.start and not self.includes_end()))) or 493 (self.bounded_after() and other.bounded_after() and self.end == other.end and 494 (not (self.includes_end() and not other.includes_end()))) or 495 (not self.bounded_after() and not other.bounded_after())))
Returns true if the end of this timerange is located inside the other.
497 def is_earlier_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 498 """Returns true if this timerange ends earlier than the start of the other.""" 499 other = mediatimerange(other) 500 return (not self.is_empty() and 501 not other.is_empty() and 502 other.bounded_before() and 503 self.bounded_after() and 504 (cast(Timestamp, self.end) < cast(Timestamp, other.start) or 505 (cast(Timestamp, self.end) == cast(Timestamp, other.start) and 506 not (self.includes_end() and other.includes_start()))))
Returns true if this timerange ends earlier than the start of the other.
508 def is_later_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 509 """Returns true if this timerange starts later than the end of the other.""" 510 other = mediatimerange(other) 511 return (not self.is_empty() and 512 not other.is_empty() and 513 other.bounded_after() and 514 self.bounded_before() and 515 (cast(Timestamp, self.start) > cast(Timestamp, other.end) or 516 (cast(Timestamp, self.start) == cast(Timestamp, other.end) and 517 not (self.includes_start() and other.includes_end()))))
Returns true if this timerange starts later than the end of the other.
519 def starts_earlier_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 520 """Returns true if this timerange starts earlier than the start of the other.""" 521 other = mediatimerange(other) 522 return (not self.is_empty() and 523 not other.is_empty() and 524 other.bounded_before() and 525 (not self.bounded_before() or 526 (cast(Timestamp, self.start) < cast(Timestamp, other.start) or 527 (cast(Timestamp, self.start) == cast(Timestamp, other.start) and 528 self.includes_start() and 529 not other.includes_start()))))
Returns true if this timerange starts earlier than the start of the other.
531 def starts_later_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 532 """Returns true if this timerange starts later than the start of the other.""" 533 other = mediatimerange(other) 534 return (not self.is_empty() and 535 not other.is_empty() and 536 self.bounded_before() and 537 (not other.bounded_before() or 538 (cast(Timestamp, self.start) > cast(Timestamp, other.start) or 539 (cast(Timestamp, self.start) == cast(Timestamp, other.start) and 540 (not self.includes_start() and other.includes_start())))))
Returns true if this timerange starts later than the start of the other.
542 def ends_earlier_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 543 """Returns true if this timerange ends earlier than the end of the other.""" 544 other = mediatimerange(other) 545 return (not self.is_empty() and 546 not other.is_empty() and 547 self.bounded_after() and 548 (not other.bounded_after() or 549 (cast(Timestamp, self.end) < cast(Timestamp, other.end) or 550 (cast(Timestamp, self.end) == cast(Timestamp, other.end) and 551 (not self.includes_end() and other.includes_end())))))
Returns true if this timerange ends earlier than the end of the other.
553 def ends_later_than_timerange(self, other: SupportsMediaTimeRange) -> bool: 554 """Returns true if this timerange ends later than the end of the other.""" 555 other = mediatimerange(other) 556 return (not self.is_empty() and 557 not other.is_empty() and 558 other.bounded_after() and 559 (not self.bounded_after() or 560 (cast(Timestamp, self.end) > cast(Timestamp, other.end) or 561 (cast(Timestamp, self.end) == cast(Timestamp, other.end) and 562 self.includes_end() and 563 not other.includes_end()))))
Returns true if this timerange ends later than the end of the other.
565 def overlaps_with_timerange(self, other: SupportsMediaTimeRange) -> bool: 566 """Returns true if this timerange and the other overlap.""" 567 other = mediatimerange(other) 568 return (not self.is_earlier_than_timerange(other) and not self.is_later_than_timerange(other))
Returns true if this timerange and the other overlap.
570 def is_contiguous_with_timerange(self, other: SupportsMediaTimeRange) -> bool: 571 """Returns true if the union of this timerange and the other would be a valid timerange""" 572 other = mediatimerange(other) 573 return (self.overlaps_with_timerange(other) or 574 (self.is_earlier_than_timerange(other) and 575 self.end == other.start and 576 (self.includes_end() or other.includes_start())) or 577 (self.is_later_than_timerange(other) and 578 self.start == other.end and 579 (self.includes_start() or other.includes_end())))
Returns true if the union of this timerange and the other would be a valid timerange
581 def union_with_timerange(self, other: SupportsMediaTimeRange) -> "TimeRange": 582 """Returns the union of this timerange and the other. 583 :raises: ValueError if the ranges are not contiguous.""" 584 other = mediatimerange(other) 585 if not self.is_contiguous_with_timerange(other): 586 raise ValueError("Timeranges {} and {} are not contiguous, so cannot take the union.".format(self, other)) 587 588 return self.extend_to_encompass_timerange(other)
Returns the union of this timerange and the other.
Raises
- ValueError if the ranges are not contiguous.
590 def extend_to_encompass_timerange(self, other: SupportsMediaTimeRange) -> "TimeRange": 591 """Returns the timerange that encompasses this and the other timerange.""" 592 other = mediatimerange(other) 593 if self.is_empty(): 594 return other 595 596 if other.is_empty(): 597 return self 598 599 inclusivity = TimeRange.EXCLUSIVE 600 if self.start == other.start: 601 start = self.start 602 inclusivity |= ((self.inclusivity | other.inclusivity) & TimeRange.INCLUDE_START) 603 elif self.starts_earlier_than_timerange(other): 604 start = self.start 605 inclusivity |= (self.inclusivity & TimeRange.INCLUDE_START) 606 else: 607 start = other.start 608 inclusivity |= (other.inclusivity & TimeRange.INCLUDE_START) 609 610 if self.end == other.end: 611 end = self.end 612 inclusivity |= ((self.inclusivity | other.inclusivity) & TimeRange.INCLUDE_END) 613 elif self.ends_later_than_timerange(other): 614 end = self.end 615 inclusivity |= (self.inclusivity & TimeRange.INCLUDE_END) 616 else: 617 end = other.end 618 inclusivity |= (other.inclusivity & TimeRange.INCLUDE_END) 619 620 return TimeRange(start, end, inclusivity)
Returns the timerange that encompasses this and the other timerange.
622 def split_at(self, timestamp: SupportsMediaTimestamp) -> Tuple["TimeRange", "TimeRange"]: 623 """Splits a timerange at a specified timestamp. 624 625 It is guaranteed that the splitting point will be in the *second* TimeRange returned, and not in the first. 626 627 :param timestamp: the timestamp to split at 628 :returns: A pair of TimeRange objects 629 :raises: ValueError if timestamp not in self""" 630 631 timestamp = mediatimestamp(timestamp) 632 633 if timestamp not in self: 634 raise ValueError("Cannot split range {} at {}".format(self, timestamp)) 635 636 return (TimeRange(self.start, timestamp, (self.inclusivity & TimeRange.INCLUDE_START)), 637 TimeRange(timestamp, self.end, TimeRange.INCLUDE_START | (self.inclusivity & TimeRange.INCLUDE_END)))
Splits a timerange at a specified timestamp.
It is guaranteed that the splitting point will be in the second TimeRange returned, and not in the first.
Parameters
- timestamp: the timestamp to split at :returns: A pair of TimeRange objects
Raises
- ValueError if timestamp not in self
639 def timerange_between(self, other: SupportsMediaTimeRange) -> "TimeRange": 640 """Returns the time range between the end of the earlier timerange and the start of the later one""" 641 other = mediatimerange(other) 642 643 if self.is_contiguous_with_timerange(other): 644 return TimeRange.never() 645 elif self.is_earlier_than_timerange(other): 646 inclusivity = TimeRange.EXCLUSIVE 647 if not self.includes_end(): 648 inclusivity |= TimeRange.INCLUDE_START 649 if not other.includes_start(): 650 inclusivity |= TimeRange.INCLUDE_END 651 return TimeRange(self.end, other.start, inclusivity) 652 else: 653 inclusivity = TimeRange.EXCLUSIVE 654 if not self.includes_start(): 655 inclusivity |= TimeRange.INCLUDE_END 656 if not other.includes_end(): 657 inclusivity |= TimeRange.INCLUDE_START 658 return TimeRange(other.end, self.start, inclusivity)
Returns the time range between the end of the earlier timerange and the start of the later one
660 def timerange_before(self) -> "TimeRange": 661 """Returns the time range before the start of the this one""" 662 if self.start is None: 663 return TimeRange.never() 664 665 if self.includes_start(): 666 inclusivity = TimeRange.EXCLUSIVE 667 else: 668 inclusivity = TimeRange.INCLUDE_END 669 670 return TimeRange.from_end(self.start, inclusivity=inclusivity)
Returns the time range before the start of the this one
672 def timerange_after(self) -> "TimeRange": 673 """Returns the time range after the end of the this one""" 674 if self.end is None: 675 return TimeRange.never() 676 677 if self.includes_end(): 678 inclusivity = TimeRange.EXCLUSIVE 679 else: 680 inclusivity = TimeRange.INCLUDE_START 681 682 return TimeRange.from_start(self.end, inclusivity=inclusivity)
Returns the time range after the end of the this one
684 def is_empty(self) -> bool: 685 """Returns true on any empty range.""" 686 return (self.start is not None and 687 self.end is not None and 688 self.start == self.end and 689 self.inclusivity != TimeRange.INCLUSIVE)
Returns true on any empty range.
691 def is_normalised(self, 692 rate_num: RationalTypes, 693 rate_den: RationalTypes = 1, 694 rounding: Rounding = ROUND_NEAREST) -> bool: 695 """Checks if timerange is normalised""" 696 normalised_timerange = self.normalise(rate_num, rate_den, rounding) 697 if normalised_timerange == self: 698 return True 699 else: 700 return False
Checks if timerange is normalised
702 def normalise(self, 703 rate_num: RationalTypes, 704 rate_den: RationalTypes = 1, 705 rounding: Rounding = ROUND_NEAREST) -> "TimeRange": 706 """Returns a normalised half-open TimeRange based on this timerange. 707 708 The returned TimeRange will always have INCLUDE_START inclusivity. 709 710 If the original TimeRange was inclusive of its start then the returned TimeRange will 711 start at the normalised timestamp closest to that start point (respecting rounding). 712 713 If the original TimeRange was exclusive of its start then the returned TimeRange will 714 start at the next normalised timestamp after the normalised timestamp closest to that 715 start point (respecting rounding). 716 717 If the original TimeRange was exclusive of its end then the returned TimeRange will 718 end just before the normalised timestamp closest to that end point (respecting rounding). 719 720 If the original TimeRange was inclusive of its end then the returned TimeRange will 721 end just before the next normalised timestamp after the normalised timestamp closest to that 722 end point (respecting rounding). 723 724 The rounding options are: 725 * ROUND_NEAREST -- each end of the range independently rounds to the nearest normalised timestamp 726 * ROUND_UP -- both ends of the range round up 727 * ROUND_DOWN -- both ends of the range round down 728 * ROUND_IN -- The start of the range rounds up, the end rounds down 729 * ROUND_OUT -- The start of the range rounds down, the end rounds up 730 * ROUND_START -- The start rounds to the nearest normalised timestamp, the end rounds in the same direction 731 as the start 732 * ROUND_END -- The end rounds to the nearest normalised timestamp, the start rounds in the same direction 733 as the end 734 """ 735 if rounding == TimeRange.ROUND_OUT: 736 start_rounding = Timestamp.ROUND_DOWN 737 end_rounding = Timestamp.ROUND_UP 738 elif rounding == TimeRange.ROUND_IN: 739 start_rounding = Timestamp.ROUND_UP 740 end_rounding = Timestamp.ROUND_DOWN 741 elif rounding in [TimeRange.ROUND_START, TimeRange.ROUND_END]: 742 start_rounding = Timestamp.ROUND_NEAREST 743 end_rounding = Timestamp.ROUND_NEAREST 744 else: 745 start_rounding = Timestamp.Rounding(rounding) 746 end_rounding = Timestamp.Rounding(rounding) 747 748 rate = Fraction(rate_num, rate_den) 749 750 start: Optional[int] 751 if self.bounded_before(): 752 start = cast(Timestamp, self.start).to_count(rate, rounding=start_rounding) 753 else: 754 start = None 755 756 end: Optional[int] 757 if self.bounded_after(): 758 end = cast(Timestamp, self.end).to_count(rate, rounding=end_rounding) 759 else: 760 end = None 761 762 if rounding == TimeRange.ROUND_START and self.bounded_before() and self.bounded_after(): 763 if start == cast(Timestamp, self.start).to_count(rate, rounding=Timestamp.ROUND_UP): 764 end = cast(Timestamp, self.end).to_count(rate, rounding=Timestamp.ROUND_UP) 765 else: 766 end = cast(Timestamp, self.end).to_count(rate, rounding=Timestamp.ROUND_DOWN) 767 elif rounding == TimeRange.ROUND_END and self.bounded_before() and self.bounded_after(): 768 if end == cast(Timestamp, self.end).to_count(rate, rounding=Timestamp.ROUND_UP): 769 start = cast(Timestamp, self.start).to_count(rate, rounding=Timestamp.ROUND_UP) 770 else: 771 start = cast(Timestamp, self.start).to_count(rate, rounding=Timestamp.ROUND_DOWN) 772 773 if start is not None and not self.includes_start(): 774 start += 1 775 if end is not None and self.includes_end(): 776 end += 1 777 778 start_ts: Optional[Timestamp] = None 779 end_ts: Optional[Timestamp] = None 780 if start is not None: 781 start_ts = Timestamp.from_count(start, rate) 782 if end is not None: 783 end_ts = Timestamp.from_count(end, rate) 784 785 return TimeRange(start_ts, 786 end_ts, 787 TimeRange.INCLUDE_START)
Returns a normalised half-open TimeRange based on this timerange.
The returned TimeRange will always have INCLUDE_START inclusivity.
If the original TimeRange was inclusive of its start then the returned TimeRange will start at the normalised timestamp closest to that start point (respecting rounding).
If the original TimeRange was exclusive of its start then the returned TimeRange will start at the next normalised timestamp after the normalised timestamp closest to that start point (respecting rounding).
If the original TimeRange was exclusive of its end then the returned TimeRange will end just before the normalised timestamp closest to that end point (respecting rounding).
If the original TimeRange was inclusive of its end then the returned TimeRange will end just before the next normalised timestamp after the normalised timestamp closest to that end point (respecting rounding).
The rounding options are:
- ROUND_NEAREST -- each end of the range independently rounds to the nearest normalised timestamp
- ROUND_UP -- both ends of the range round up
- ROUND_DOWN -- both ends of the range round down
- ROUND_IN -- The start of the range rounds up, the end rounds down
- ROUND_OUT -- The start of the range rounds down, the end rounds up
- ROUND_START -- The start rounds to the nearest normalised timestamp, the end rounds in the same direction as the start
- ROUND_END -- The end rounds to the nearest normalised timestamp, the start rounds in the same direction as the end
789 def into_chunks( 790 self, 791 time_duration: Timestamp 792 ) -> Generator["TimeRange", None, None]: 793 """Returns a generator of TimeRanges of the length specified in time_duration based on this TimeRange. 794 795 If this TimeRange has an infinite length, then this will generate chunks of time_duration infinitely. 796 797 The first chunk will have the start inclusivity of this TimeRange, and will have an exclusive end. 798 The last chunk (if this TimeRange is not infinite) will have the end inclusivity of this TimeRange, and an 799 inclusive start. 800 All other chunks will have an inclusive start and exclusive end. 801 802 If the time_duration does not divide exactly into the length of this TimeRange, the last chunk returned will 803 be of the 'remainder' length. 804 805 If the time_duration is the same length or longer than this TimeRange, then a copy of this TimeRange will be 806 returned. 807 808 :param time_duration: A Timestamp representing the requested chunk length 809 :returns: A TimeRange generator that generates chunks. 810 """ 811 if not self.bounded_before(): 812 raise ValueError("The timerange to be chunked has no start time!") 813 remainder = self # this is a timerange from the end of the last one to the end of self 814 while not remainder.is_empty(): 815 next_timerange = TimeRange.from_start_length( 816 cast(Timestamp, remainder.start), 817 time_duration, 818 TimeRange.INCLUDE_START) 819 next_timerange = next_timerange.intersect_with(remainder) 820 821 if next_timerange.end == remainder.end and remainder.includes_end(): 822 next_timerange = TimeRange.from_start_length( 823 cast(Timestamp, next_timerange.start), 824 cast(Timestamp, next_timerange.length), 825 next_timerange.inclusivity | TimeRange.INCLUDE_END 826 ) 827 828 yield next_timerange 829 # If we have just yielded the final chunk, then this will produce an empty timerange. 830 remainder = remainder.intersect_with(next_timerange.timerange_after()) 831 return
Returns a generator of TimeRanges of the length specified in time_duration based on this TimeRange.
If this TimeRange has an infinite length, then this will generate chunks of time_duration infinitely.
The first chunk will have the start inclusivity of this TimeRange, and will have an exclusive end. The last chunk (if this TimeRange is not infinite) will have the end inclusivity of this TimeRange, and an inclusive start. All other chunks will have an inclusive start and exclusive end.
If the time_duration does not divide exactly into the length of this TimeRange, the last chunk returned will be of the 'remainder' length.
If the time_duration is the same length or longer than this TimeRange, then a copy of this TimeRange will be returned.
Parameters
- time_duration: A Timestamp representing the requested chunk length :returns: A TimeRange generator that generates chunks.
75 class Inclusivity (int): 76 def __and__(self, other: int) -> "TimeRange.Inclusivity": 77 return TimeRange.Inclusivity(int(self) & int(other) & 0x3) 78 79 def __or__(self, other: int) -> "TimeRange.Inclusivity": 80 return TimeRange.Inclusivity((int(self) | int(other)) & 0x3) 81 82 def __xor__(self, other: int) -> "TimeRange.Inclusivity": 83 return TimeRange.Inclusivity((int(self) ^ int(other)) & 0x3) 84 85 def __invert__(self) -> "TimeRange.Inclusivity": 86 return TimeRange.Inclusivity((~int(self)) & 0x3)
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0)
4
Inherited Members
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.
>>> int('0b100', base=0)
4
Inherited Members
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
31 @runtime_checkable 32 class SupportsMediaTimeRange (Protocol): 33 def __mediatimerange__(self) -> "TimeRange": 34 ...
This is an abstract base class for any class that can be automagically converted into a TimeRange.
To implement this simply implement the __mediatimerange__ magic method. No need to inherit from this class explicitly.
58def mediatimerange(v: SupportsMediaTimeRange) -> "TimeRange": 59 """This method can be called on any object which supports the __mediatimerange__ magic method 60 and also on a TimeRange. It will always return a TimeRange or raise a ValueError. 61 """ 62 if isinstance(v, TimeRange): 63 return v 64 elif hasattr(v, "__mediatimerange__"): 65 return v.__mediatimerange__() 66 elif isinstance(v, SupportsMediaTimestamp): 67 return mediatimerange(mediatimestamp(v)) 68 else: 69 raise ValueError("{!r} cannot be converted to a mediatimestamp.TimeRange".format(v))
This method can be called on any object which supports the __mediatimerange__ magic method and also on a TimeRange. It will always return a TimeRange or raise a ValueError.
12class CountRange(object): 13 """Represents a range of integer media unit counts. 14 15 The implementation matches mediatimestamp.immutable.TimeRange, but 16 with Timestamps replaced with integer counts. 17 """ 18 19 # These inclusivity and rounding values must match the TimeRange values 20 21 EXCLUSIVE = 0x0 22 INCLUDE_START = 0x1 23 INCLUDE_END = 0x2 24 INCLUSIVE = 0x3 25 26 ROUND_DOWN = 0 27 ROUND_NEAREST = 1 28 ROUND_UP = 2 29 ROUND_IN = 3 30 ROUND_OUT = 4 31 ROUND_START = 5 32 ROUND_END = 6 33 34 def __init__(self, start: Optional[int], 35 end: Optional[int] = None, 36 inclusivity: int = INCLUSIVE): 37 """Construct a count range starting at start and ending at end 38 39 :param start: An integer media unit count or None 40 :param end: An integer media unit count or None 41 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 42 43 # normalise the representation to always have an exclusive end if bounded 44 if end is not None and (inclusivity & CountRange.INCLUDE_END): 45 end = end + 1 46 inclusivity &= ~CountRange.INCLUDE_END 47 48 # normalise the representation to always have an inclusive start if bounded 49 if start is not None and not (inclusivity & CountRange.INCLUDE_START): 50 start = start + 1 51 inclusivity |= CountRange.INCLUDE_START 52 53 # normalise 'never' cases 54 if start is not None and end is not None: 55 if start > end or (start == end and inclusivity != CountRange.INCLUSIVE): 56 start = 0 57 end = 0 58 inclusivity = CountRange.EXCLUSIVE 59 60 # Normalise the 'eternity' cases 61 if start is None and end is None: 62 inclusivity = CountRange.INCLUSIVE 63 64 # set attributes using dict to workaround immutability 65 self.__dict__['start'] = start 66 self.__dict__['end'] = end 67 self.__dict__['inclusivity'] = inclusivity 68 69 # provide attribute type info given that attributes are not set directly 70 self.start: int 71 self.end: int 72 self.inclusivity: int 73 74 @classmethod 75 def from_start(cls, start: int, inclusivity: int = INCLUSIVE) -> "CountRange": 76 """Construct a range starting at start with no end 77 78 :param start: An integer media unit count 79 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 80 return cls(start, None, inclusivity) 81 82 @classmethod 83 def from_end(cls, end: int, inclusivity: int = INCLUSIVE) -> "CountRange": 84 """Construct a range ending at end with no start 85 86 :param end: An integer media unit count 87 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 88 return cls(None, end, inclusivity) 89 90 @classmethod 91 def from_start_length(cls, start: int, length: int, inclusivity: int = INCLUSIVE) -> "CountRange": 92 """Construct a range starting at start and ending at (start + length) 93 94 :param start: An integer media unit count 95 :param length: An integer media unit offset, which must be non-negative 96 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 97 98 :raises: ValueError if the length is negative""" 99 if length < 0: 100 raise ValueError("Length must be non-negative") 101 return cls(start, start + length, inclusivity) 102 103 @classmethod 104 def eternity(cls) -> "CountRange": 105 """Return an unbounded range covering all time""" 106 return cls(None, None) 107 108 @classmethod 109 def never(cls) -> "CountRange": 110 """Return a range covering no time""" 111 return cls(0, 0, CountRange.EXCLUSIVE) 112 113 @classmethod 114 def from_single_count(cls, count: int) -> "CountRange": 115 """Construct a range containing only a single count 116 117 :param count: An integer media unit count""" 118 return cls(count, count, CountRange.INCLUSIVE) 119 120 @classmethod 121 def from_str(cls, s: str, inclusivity: int = INCLUSIVE) -> "CountRange": 122 """Convert a string to a range. 123 124 Valid ranges are: 125 [<count>_<count>] 126 [<count>_<count>) 127 (<count>_<count>] 128 (<count>_<count>) 129 [<count>] 130 <count>_<count> 131 <count> 132 () 133 134 where <count> is an integer or an empty string. 135 136 The meaning of these is relatively simple: [ indicates including the start count, 137 ( indicates excluding it, ] indicates including the end count, and ) indicates excluding it. 138 If brackets are ommitted entirely then this is taken as an inclusive range at both ends. 139 Omitting a count indicates that there is no bound on that end (ie. the range goes on forever), 140 including only a single count by itself indicates a range containing exactly that one count. 141 Finally the string "()" represents the empty range. 142 143 :param s: The string to process 144 """ 145 m = re.match(r'(\[|\()?([^_\)\]]+)?(_([^_\)\]]+)?)?(\]|\))?', s) 146 147 if m is None: 148 raise ValueError("Invalid CountRange string") 149 150 inc = CountRange.INCLUSIVE 151 if m.group(1) == "(": 152 inc &= ~CountRange.INCLUDE_START 153 if m.group(5) == ")": 154 inc &= ~CountRange.INCLUDE_END 155 156 start_str = m.group(2) 157 end_str = m.group(4) 158 159 if start_str is not None: 160 start = int(start_str) 161 else: 162 start = None 163 if end_str is not None: 164 end = int(end_str) 165 else: 166 end = None 167 168 if start is None and end is None: 169 # Ie. we have no first or second count 170 if m.group(3) is not None: 171 # ie. we have a '_' character 172 return cls.eternity() 173 else: 174 # We have no '_' character, so the whole range is empty 175 return cls.never() 176 elif start is not None and end is None and m.group(3) is None: 177 return cls.from_single_count(start) 178 else: 179 return cls(start, end, inc) 180 181 @property 182 def length(self) -> Union[int, float]: 183 """Return the range length as a media unit count""" 184 if self.end is None or self.start is None: 185 return float("inf") # there is no int("inf") in python 186 return self.end - self.start 187 188 def bounded_before(self) -> bool: 189 """Return true if the start of the range is bounded""" 190 return self.start is not None 191 192 def bounded_after(self) -> bool: 193 """Return true if the end of the range is bounded""" 194 return self.end is not None 195 196 def unbounded(self) -> bool: 197 """Return true if neither the start or end of the range is bounded""" 198 return self.start is None and self.end is None 199 200 def includes_start(self) -> bool: 201 """Return true if the start is inclusive""" 202 return (self.inclusivity & CountRange.INCLUDE_START) != 0 203 204 def includes_end(self) -> bool: 205 """Return true if the end is inclusive""" 206 return (self.inclusivity & CountRange.INCLUDE_END) != 0 207 208 def finite(self) -> bool: 209 """Return true if the range is finite""" 210 return (self.start is not None and self.end is not None) 211 212 def contains_subrange(self, other: "CountRange") -> bool: 213 """Returns true if the range supplied lies entirely inside this range""" 214 return ((not self.is_empty()) and 215 (other.is_empty() or 216 (self.start is None or (other.start is not None and self.start <= other.start)) and 217 (self.end is None or (other.end is not None and self.end >= other.end)) and 218 (not ((self.start is not None) and 219 (other.start is not None) and 220 (self.start == other.start) and 221 (self.inclusivity & CountRange.INCLUDE_START == 0) and 222 (other.inclusivity & CountRange.INCLUDE_START != 0))) and 223 (not ((self.end is not None) and 224 (other.end is not None) and 225 (self.end == other.end) and 226 (self.inclusivity & CountRange.INCLUDE_END == 0) and 227 (other.inclusivity & CountRange.INCLUDE_END != 0))))) 228 229 def to_str(self, with_inclusivity_markers: bool = True) -> str: 230 """Convert to [<count>_<count>] format, 231 usually the opening and closing delimiters are set to [ or ] for inclusive and ( or ) for exclusive ranges. 232 Unbounded ranges have no marker attached to them. 233 234 :param with_inclusivity_markers: if set to False do not include parentheses/brackecount""" 235 if self.is_empty(): 236 if with_inclusivity_markers: 237 return "()" 238 else: 239 return "" 240 elif self.start is not None and self.end is not None and self.start == self.end: 241 if with_inclusivity_markers: 242 return "[" + str(self.start) + "]" 243 else: 244 return str(self.start) 245 246 if with_inclusivity_markers: 247 brackets = [("(", ")"), ("[", ")"), ("(", "]"), ("[", "]")][self.inclusivity] 248 else: 249 brackets = ("", "") 250 251 return '_'.join([ 252 (brackets[0] + str(self.start)) if self.start is not None else '', 253 (str(self.end) + brackets[1]) if self.end is not None else '' 254 ]) 255 256 def intersect_with(self, other: "CountRange") -> "CountRange": 257 """Return a range which represents the intersection of this range with another""" 258 if self.is_empty() or other.is_empty(): 259 return CountRange.never() 260 261 start = self.start 262 if other.start is not None and (self.start is None or self.start < other.start): 263 start = other.start 264 end = self.end 265 if other.end is not None and (self.end is None or self.end > other.end): 266 end = other.end 267 268 inclusivity = CountRange.EXCLUSIVE 269 if start is None or (start in self and start in other): 270 inclusivity |= CountRange.INCLUDE_START 271 if end is None or (end in self and end in other): 272 inclusivity |= CountRange.INCLUDE_END 273 274 if start is not None and end is not None and start > end: 275 return CountRange.never() 276 277 return CountRange(start, end, inclusivity) 278 279 def starts_inside_range(self, other: "CountRange") -> bool: 280 """Returns true if the start of this range is located inside the other.""" 281 return (not self.is_empty() and 282 not other.is_empty() and 283 ((self.bounded_before() and self.start in other and 284 (not (other.bounded_after() and self.start == other.end and not self.includes_start()))) or 285 (self.bounded_before() and other.bounded_before() and self.start == other.start and 286 (not (self.includes_start() and not other.includes_start()))) or 287 (not self.bounded_before() and not other.bounded_before()))) 288 289 def ends_inside_range(self, other: "CountRange") -> bool: 290 """Returns true if the end of this range is located inside the other.""" 291 return (not self.is_empty() and 292 not other.is_empty() and 293 ((self.bounded_after() and self.end in other and 294 (not (other.bounded_before() and self.end == other.start and not self.includes_end()))) or 295 (self.bounded_after() and other.bounded_after() and self.end == other.end and 296 (not (self.includes_end() and not other.includes_end()))) or 297 (not self.bounded_after() and not other.bounded_after()))) 298 299 def is_earlier_than_range(self, other: "CountRange") -> bool: 300 """Returns true if this range ends earlier than the start of the other.""" 301 return (not self.is_empty() and 302 not other.is_empty() and 303 other.bounded_before() and 304 self.bounded_after() and 305 (self.end < other.start or 306 (self.end == other.start and 307 not (self.includes_end() and other.includes_start())))) 308 309 def is_later_than_range(self, other: "CountRange") -> bool: 310 """Returns true if this range starts later than the end of the other.""" 311 return (not self.is_empty() and 312 not other.is_empty() and 313 other.bounded_after() and 314 self.bounded_before() and 315 (self.start > other.end or 316 (self.start == other.end and 317 not (self.includes_start() and other.includes_end())))) 318 319 def starts_earlier_than_range(self, other: "CountRange") -> bool: 320 """Returns true if this range starts earlier than the start of the other.""" 321 return (not self.is_empty() and 322 not other.is_empty() and 323 other.bounded_before() and 324 (not self.bounded_before() or 325 (self.start < other.start or 326 (self.start == other.start and 327 self.includes_start() and 328 not other.includes_start())))) 329 330 def starts_later_than_range(self, other: "CountRange") -> bool: 331 """Returns true if this range starts later than the start of the other.""" 332 return (not self.is_empty() and 333 not other.is_empty() and 334 self.bounded_before() and 335 (not other.bounded_before() or 336 (self.start > other.start or 337 (self.start == other.start and 338 (not self.includes_start() and other.includes_start()))))) 339 340 def ends_earlier_than_range(self, other: "CountRange") -> bool: 341 """Returns true if this range ends earlier than the end of the other.""" 342 return (not self.is_empty() and 343 not other.is_empty() and 344 self.bounded_after() and 345 (not other.bounded_after() or 346 (self.end < other.end or 347 (self.end == other.end and 348 (not self.includes_end() and other.includes_end()))))) 349 350 def ends_later_than_range(self, other: "CountRange") -> bool: 351 """Returns true if this range ends later than the end of the other.""" 352 return (not self.is_empty() and 353 not other.is_empty() and 354 other.bounded_after() and 355 (not self.bounded_after() or 356 (self.end > other.end or 357 (self.end == other.end and 358 self.includes_end() and 359 not other.includes_end())))) 360 361 def overlaps_with_range(self, other: "CountRange") -> bool: 362 """Returns true if this range and the other overlap.""" 363 return (not self.is_earlier_than_range(other) and not self.is_later_than_range(other)) 364 365 def is_contiguous_with_range(self, other: "CountRange") -> bool: 366 """Returns true if the union of this range and the other would be a valid range""" 367 return (self.overlaps_with_range(other) or 368 (self.is_earlier_than_range(other) and 369 self.end == other.start and 370 (self.includes_end() or other.includes_start())) or 371 (self.is_later_than_range(other) and 372 self.start == other.end and 373 (self.includes_start() or other.includes_end()))) 374 375 def union_with_range(self, other: "CountRange") -> "CountRange": 376 """Returns the union of this range and the other. 377 :raises: ValueError if the ranges are not contiguous.""" 378 if not self.is_contiguous_with_range(other): 379 raise ValueError("CountRanges {} and {} are not contiguous, so cannot take the union.".format(self, other)) 380 381 return self.extend_to_encompass_range(other) 382 383 def extend_to_encompass_range(self, other: "CountRange") -> "CountRange": 384 """Returns the range that encompasses this and the other range.""" 385 if self.is_empty(): 386 return other 387 388 if other.is_empty(): 389 return self 390 391 inclusivity = CountRange.EXCLUSIVE 392 if self.start == other.start: 393 start = self.start 394 inclusivity |= ((self.inclusivity | other.inclusivity) & CountRange.INCLUDE_START) 395 elif self.starts_earlier_than_range(other): 396 start = self.start 397 inclusivity |= (self.inclusivity & CountRange.INCLUDE_START) 398 else: 399 start = other.start 400 inclusivity |= (other.inclusivity & CountRange.INCLUDE_START) 401 402 if self.end == other.end: 403 end = self.end 404 inclusivity |= ((self.inclusivity | other.inclusivity) & CountRange.INCLUDE_END) 405 elif self.ends_later_than_range(other): 406 end = self.end 407 inclusivity |= (self.inclusivity & CountRange.INCLUDE_END) 408 else: 409 end = other.end 410 inclusivity |= (other.inclusivity & CountRange.INCLUDE_END) 411 412 return CountRange(start, end, inclusivity) 413 414 def split_at(self, count: int) -> Tuple["CountRange", "CountRange"]: 415 """Splits a range at a specified count. 416 417 It is guaranteed that the splitting point will be in the *second* CountRange returned, and not in the first. 418 419 :param count: the count to split at 420 :returns: A pair of CountRange objects 421 :raises: ValueError if count not in self""" 422 423 if count not in self: 424 raise ValueError("Cannot split range {} at {}".format(self, count)) 425 426 return (CountRange(self.start, count, (self.inclusivity & CountRange.INCLUDE_START)), 427 CountRange(count, self.end, CountRange.INCLUDE_START | (self.inclusivity & CountRange.INCLUDE_END))) 428 429 def range_between(self, other: "CountRange") -> "CountRange": 430 """Returns the range between the end of the earlier range and the start of the later one""" 431 if self.is_contiguous_with_range(other): 432 return CountRange.never() 433 elif self.is_earlier_than_range(other): 434 inclusivity = CountRange.EXCLUSIVE 435 if not self.includes_end(): 436 inclusivity |= CountRange.INCLUDE_START 437 if not other.includes_start(): 438 inclusivity |= CountRange.INCLUDE_END 439 return CountRange(self.end, other.start, inclusivity) 440 else: 441 inclusivity = CountRange.EXCLUSIVE 442 if not self.includes_start(): 443 inclusivity |= CountRange.INCLUDE_END 444 if not other.includes_end(): 445 inclusivity |= CountRange.INCLUDE_START 446 return CountRange(other.end, self.start, inclusivity) 447 448 def is_empty(self) -> bool: 449 """Returns true on any empty range.""" 450 return (self.start is not None and 451 self.end is not None and 452 self.start == self.end and 453 self.inclusivity != CountRange.INCLUSIVE) 454 455 def __setattr__(self, name: str, value: Any) -> None: 456 """Raises a ValueError if attempt to set an attribute on the immutable CountRange""" 457 raise ValueError("Cannot assign to an immutable CountRange") 458 459 def __contains__(self, count: int) -> bool: 460 """Returns true if the count is within this range.""" 461 return ((self.start is None or count >= self.start) and 462 (self.end is None or count <= self.end) and 463 (not ((self.start is not None) and 464 (count == self.start) and 465 (self.inclusivity & CountRange.INCLUDE_START == 0))) and 466 (not ((self.end is not None) and 467 (count == self.end) and 468 (self.inclusivity & CountRange.INCLUDE_END == 0)))) 469 470 def __eq__(self, other: Any) -> bool: 471 """Return true if the ranges are equal""" 472 return (isinstance(other, CountRange) and 473 ((self.is_empty() and other.is_empty()) or 474 (((self.start is None and other.start is None) or 475 (self.start == other.start and 476 (self.inclusivity & CountRange.INCLUDE_START) == (other.inclusivity & CountRange.INCLUDE_START))) and 477 ((self.end is None and other.end is None) or 478 (self.end == other.end and 479 (self.inclusivity & CountRange.INCLUDE_END) == (other.inclusivity & CountRange.INCLUDE_END)))))) 480 481 def __str__(self) -> str: 482 return self.to_str() 483 484 def __repr__(self) -> str: 485 return "{}.{}.from_str('{}')".format(type(self).__module__, type(self).__name__, self.to_str())
Represents a range of integer media unit counts.
The implementation matches mediatimestamp.immutable.TimeRange, but with Timestamps replaced with integer counts.
34 def __init__(self, start: Optional[int], 35 end: Optional[int] = None, 36 inclusivity: int = INCLUSIVE): 37 """Construct a count range starting at start and ending at end 38 39 :param start: An integer media unit count or None 40 :param end: An integer media unit count or None 41 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 42 43 # normalise the representation to always have an exclusive end if bounded 44 if end is not None and (inclusivity & CountRange.INCLUDE_END): 45 end = end + 1 46 inclusivity &= ~CountRange.INCLUDE_END 47 48 # normalise the representation to always have an inclusive start if bounded 49 if start is not None and not (inclusivity & CountRange.INCLUDE_START): 50 start = start + 1 51 inclusivity |= CountRange.INCLUDE_START 52 53 # normalise 'never' cases 54 if start is not None and end is not None: 55 if start > end or (start == end and inclusivity != CountRange.INCLUSIVE): 56 start = 0 57 end = 0 58 inclusivity = CountRange.EXCLUSIVE 59 60 # Normalise the 'eternity' cases 61 if start is None and end is None: 62 inclusivity = CountRange.INCLUSIVE 63 64 # set attributes using dict to workaround immutability 65 self.__dict__['start'] = start 66 self.__dict__['end'] = end 67 self.__dict__['inclusivity'] = inclusivity 68 69 # provide attribute type info given that attributes are not set directly 70 self.start: int 71 self.end: int 72 self.inclusivity: int
Construct a count range starting at start and ending at end
Parameters
- start: An integer media unit count or None
- end: An integer media unit count or None
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
74 @classmethod 75 def from_start(cls, start: int, inclusivity: int = INCLUSIVE) -> "CountRange": 76 """Construct a range starting at start with no end 77 78 :param start: An integer media unit count 79 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 80 return cls(start, None, inclusivity)
Construct a range starting at start with no end
Parameters
- start: An integer media unit count
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
82 @classmethod 83 def from_end(cls, end: int, inclusivity: int = INCLUSIVE) -> "CountRange": 84 """Construct a range ending at end with no start 85 86 :param end: An integer media unit count 87 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END""" 88 return cls(None, end, inclusivity)
Construct a range ending at end with no start
Parameters
- end: An integer media unit count
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
90 @classmethod 91 def from_start_length(cls, start: int, length: int, inclusivity: int = INCLUSIVE) -> "CountRange": 92 """Construct a range starting at start and ending at (start + length) 93 94 :param start: An integer media unit count 95 :param length: An integer media unit offset, which must be non-negative 96 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 97 98 :raises: ValueError if the length is negative""" 99 if length < 0: 100 raise ValueError("Length must be non-negative") 101 return cls(start, start + length, inclusivity)
Construct a range starting at start and ending at (start + length)
Parameters
- start: An integer media unit count
- length: An integer media unit offset, which must be non-negative
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
Raises
- ValueError if the length is negative
103 @classmethod 104 def eternity(cls) -> "CountRange": 105 """Return an unbounded range covering all time""" 106 return cls(None, None)
Return an unbounded range covering all time
108 @classmethod 109 def never(cls) -> "CountRange": 110 """Return a range covering no time""" 111 return cls(0, 0, CountRange.EXCLUSIVE)
Return a range covering no time
113 @classmethod 114 def from_single_count(cls, count: int) -> "CountRange": 115 """Construct a range containing only a single count 116 117 :param count: An integer media unit count""" 118 return cls(count, count, CountRange.INCLUSIVE)
Construct a range containing only a single count
Parameters
- count: An integer media unit count
120 @classmethod 121 def from_str(cls, s: str, inclusivity: int = INCLUSIVE) -> "CountRange": 122 """Convert a string to a range. 123 124 Valid ranges are: 125 [<count>_<count>] 126 [<count>_<count>) 127 (<count>_<count>] 128 (<count>_<count>) 129 [<count>] 130 <count>_<count> 131 <count> 132 () 133 134 where <count> is an integer or an empty string. 135 136 The meaning of these is relatively simple: [ indicates including the start count, 137 ( indicates excluding it, ] indicates including the end count, and ) indicates excluding it. 138 If brackets are ommitted entirely then this is taken as an inclusive range at both ends. 139 Omitting a count indicates that there is no bound on that end (ie. the range goes on forever), 140 including only a single count by itself indicates a range containing exactly that one count. 141 Finally the string "()" represents the empty range. 142 143 :param s: The string to process 144 """ 145 m = re.match(r'(\[|\()?([^_\)\]]+)?(_([^_\)\]]+)?)?(\]|\))?', s) 146 147 if m is None: 148 raise ValueError("Invalid CountRange string") 149 150 inc = CountRange.INCLUSIVE 151 if m.group(1) == "(": 152 inc &= ~CountRange.INCLUDE_START 153 if m.group(5) == ")": 154 inc &= ~CountRange.INCLUDE_END 155 156 start_str = m.group(2) 157 end_str = m.group(4) 158 159 if start_str is not None: 160 start = int(start_str) 161 else: 162 start = None 163 if end_str is not None: 164 end = int(end_str) 165 else: 166 end = None 167 168 if start is None and end is None: 169 # Ie. we have no first or second count 170 if m.group(3) is not None: 171 # ie. we have a '_' character 172 return cls.eternity() 173 else: 174 # We have no '_' character, so the whole range is empty 175 return cls.never() 176 elif start is not None and end is None and m.group(3) is None: 177 return cls.from_single_count(start) 178 else: 179 return cls(start, end, inc)
Convert a string to a range.
Valid ranges are:
[
where
The meaning of these is relatively simple: [ indicates including the start count, ( indicates excluding it, ] indicates including the end count, and ) indicates excluding it. If brackets are ommitted entirely then this is taken as an inclusive range at both ends. Omitting a count indicates that there is no bound on that end (ie. the range goes on forever), including only a single count by itself indicates a range containing exactly that one count. Finally the string "()" represents the empty range.
Parameters
- s: The string to process
181 @property 182 def length(self) -> Union[int, float]: 183 """Return the range length as a media unit count""" 184 if self.end is None or self.start is None: 185 return float("inf") # there is no int("inf") in python 186 return self.end - self.start
Return the range length as a media unit count
188 def bounded_before(self) -> bool: 189 """Return true if the start of the range is bounded""" 190 return self.start is not None
Return true if the start of the range is bounded
192 def bounded_after(self) -> bool: 193 """Return true if the end of the range is bounded""" 194 return self.end is not None
Return true if the end of the range is bounded
196 def unbounded(self) -> bool: 197 """Return true if neither the start or end of the range is bounded""" 198 return self.start is None and self.end is None
Return true if neither the start or end of the range is bounded
200 def includes_start(self) -> bool: 201 """Return true if the start is inclusive""" 202 return (self.inclusivity & CountRange.INCLUDE_START) != 0
Return true if the start is inclusive
204 def includes_end(self) -> bool: 205 """Return true if the end is inclusive""" 206 return (self.inclusivity & CountRange.INCLUDE_END) != 0
Return true if the end is inclusive
208 def finite(self) -> bool: 209 """Return true if the range is finite""" 210 return (self.start is not None and self.end is not None)
Return true if the range is finite
212 def contains_subrange(self, other: "CountRange") -> bool: 213 """Returns true if the range supplied lies entirely inside this range""" 214 return ((not self.is_empty()) and 215 (other.is_empty() or 216 (self.start is None or (other.start is not None and self.start <= other.start)) and 217 (self.end is None or (other.end is not None and self.end >= other.end)) and 218 (not ((self.start is not None) and 219 (other.start is not None) and 220 (self.start == other.start) and 221 (self.inclusivity & CountRange.INCLUDE_START == 0) and 222 (other.inclusivity & CountRange.INCLUDE_START != 0))) and 223 (not ((self.end is not None) and 224 (other.end is not None) and 225 (self.end == other.end) and 226 (self.inclusivity & CountRange.INCLUDE_END == 0) and 227 (other.inclusivity & CountRange.INCLUDE_END != 0)))))
Returns true if the range supplied lies entirely inside this range
229 def to_str(self, with_inclusivity_markers: bool = True) -> str: 230 """Convert to [<count>_<count>] format, 231 usually the opening and closing delimiters are set to [ or ] for inclusive and ( or ) for exclusive ranges. 232 Unbounded ranges have no marker attached to them. 233 234 :param with_inclusivity_markers: if set to False do not include parentheses/brackecount""" 235 if self.is_empty(): 236 if with_inclusivity_markers: 237 return "()" 238 else: 239 return "" 240 elif self.start is not None and self.end is not None and self.start == self.end: 241 if with_inclusivity_markers: 242 return "[" + str(self.start) + "]" 243 else: 244 return str(self.start) 245 246 if with_inclusivity_markers: 247 brackets = [("(", ")"), ("[", ")"), ("(", "]"), ("[", "]")][self.inclusivity] 248 else: 249 brackets = ("", "") 250 251 return '_'.join([ 252 (brackets[0] + str(self.start)) if self.start is not None else '', 253 (str(self.end) + brackets[1]) if self.end is not None else '' 254 ])
Convert to [
Parameters
- with_inclusivity_markers: if set to False do not include parentheses/brackecount
256 def intersect_with(self, other: "CountRange") -> "CountRange": 257 """Return a range which represents the intersection of this range with another""" 258 if self.is_empty() or other.is_empty(): 259 return CountRange.never() 260 261 start = self.start 262 if other.start is not None and (self.start is None or self.start < other.start): 263 start = other.start 264 end = self.end 265 if other.end is not None and (self.end is None or self.end > other.end): 266 end = other.end 267 268 inclusivity = CountRange.EXCLUSIVE 269 if start is None or (start in self and start in other): 270 inclusivity |= CountRange.INCLUDE_START 271 if end is None or (end in self and end in other): 272 inclusivity |= CountRange.INCLUDE_END 273 274 if start is not None and end is not None and start > end: 275 return CountRange.never() 276 277 return CountRange(start, end, inclusivity)
Return a range which represents the intersection of this range with another
279 def starts_inside_range(self, other: "CountRange") -> bool: 280 """Returns true if the start of this range is located inside the other.""" 281 return (not self.is_empty() and 282 not other.is_empty() and 283 ((self.bounded_before() and self.start in other and 284 (not (other.bounded_after() and self.start == other.end and not self.includes_start()))) or 285 (self.bounded_before() and other.bounded_before() and self.start == other.start and 286 (not (self.includes_start() and not other.includes_start()))) or 287 (not self.bounded_before() and not other.bounded_before())))
Returns true if the start of this range is located inside the other.
289 def ends_inside_range(self, other: "CountRange") -> bool: 290 """Returns true if the end of this range is located inside the other.""" 291 return (not self.is_empty() and 292 not other.is_empty() and 293 ((self.bounded_after() and self.end in other and 294 (not (other.bounded_before() and self.end == other.start and not self.includes_end()))) or 295 (self.bounded_after() and other.bounded_after() and self.end == other.end and 296 (not (self.includes_end() and not other.includes_end()))) or 297 (not self.bounded_after() and not other.bounded_after())))
Returns true if the end of this range is located inside the other.
299 def is_earlier_than_range(self, other: "CountRange") -> bool: 300 """Returns true if this range ends earlier than the start of the other.""" 301 return (not self.is_empty() and 302 not other.is_empty() and 303 other.bounded_before() and 304 self.bounded_after() and 305 (self.end < other.start or 306 (self.end == other.start and 307 not (self.includes_end() and other.includes_start()))))
Returns true if this range ends earlier than the start of the other.
309 def is_later_than_range(self, other: "CountRange") -> bool: 310 """Returns true if this range starts later than the end of the other.""" 311 return (not self.is_empty() and 312 not other.is_empty() and 313 other.bounded_after() and 314 self.bounded_before() and 315 (self.start > other.end or 316 (self.start == other.end and 317 not (self.includes_start() and other.includes_end()))))
Returns true if this range starts later than the end of the other.
319 def starts_earlier_than_range(self, other: "CountRange") -> bool: 320 """Returns true if this range starts earlier than the start of the other.""" 321 return (not self.is_empty() and 322 not other.is_empty() and 323 other.bounded_before() and 324 (not self.bounded_before() or 325 (self.start < other.start or 326 (self.start == other.start and 327 self.includes_start() and 328 not other.includes_start()))))
Returns true if this range starts earlier than the start of the other.
330 def starts_later_than_range(self, other: "CountRange") -> bool: 331 """Returns true if this range starts later than the start of the other.""" 332 return (not self.is_empty() and 333 not other.is_empty() and 334 self.bounded_before() and 335 (not other.bounded_before() or 336 (self.start > other.start or 337 (self.start == other.start and 338 (not self.includes_start() and other.includes_start())))))
Returns true if this range starts later than the start of the other.
340 def ends_earlier_than_range(self, other: "CountRange") -> bool: 341 """Returns true if this range ends earlier than the end of the other.""" 342 return (not self.is_empty() and 343 not other.is_empty() and 344 self.bounded_after() and 345 (not other.bounded_after() or 346 (self.end < other.end or 347 (self.end == other.end and 348 (not self.includes_end() and other.includes_end())))))
Returns true if this range ends earlier than the end of the other.
350 def ends_later_than_range(self, other: "CountRange") -> bool: 351 """Returns true if this range ends later than the end of the other.""" 352 return (not self.is_empty() and 353 not other.is_empty() and 354 other.bounded_after() and 355 (not self.bounded_after() or 356 (self.end > other.end or 357 (self.end == other.end and 358 self.includes_end() and 359 not other.includes_end()))))
Returns true if this range ends later than the end of the other.
361 def overlaps_with_range(self, other: "CountRange") -> bool: 362 """Returns true if this range and the other overlap.""" 363 return (not self.is_earlier_than_range(other) and not self.is_later_than_range(other))
Returns true if this range and the other overlap.
365 def is_contiguous_with_range(self, other: "CountRange") -> bool: 366 """Returns true if the union of this range and the other would be a valid range""" 367 return (self.overlaps_with_range(other) or 368 (self.is_earlier_than_range(other) and 369 self.end == other.start and 370 (self.includes_end() or other.includes_start())) or 371 (self.is_later_than_range(other) and 372 self.start == other.end and 373 (self.includes_start() or other.includes_end())))
Returns true if the union of this range and the other would be a valid range
375 def union_with_range(self, other: "CountRange") -> "CountRange": 376 """Returns the union of this range and the other. 377 :raises: ValueError if the ranges are not contiguous.""" 378 if not self.is_contiguous_with_range(other): 379 raise ValueError("CountRanges {} and {} are not contiguous, so cannot take the union.".format(self, other)) 380 381 return self.extend_to_encompass_range(other)
Returns the union of this range and the other.
Raises
- ValueError if the ranges are not contiguous.
383 def extend_to_encompass_range(self, other: "CountRange") -> "CountRange": 384 """Returns the range that encompasses this and the other range.""" 385 if self.is_empty(): 386 return other 387 388 if other.is_empty(): 389 return self 390 391 inclusivity = CountRange.EXCLUSIVE 392 if self.start == other.start: 393 start = self.start 394 inclusivity |= ((self.inclusivity | other.inclusivity) & CountRange.INCLUDE_START) 395 elif self.starts_earlier_than_range(other): 396 start = self.start 397 inclusivity |= (self.inclusivity & CountRange.INCLUDE_START) 398 else: 399 start = other.start 400 inclusivity |= (other.inclusivity & CountRange.INCLUDE_START) 401 402 if self.end == other.end: 403 end = self.end 404 inclusivity |= ((self.inclusivity | other.inclusivity) & CountRange.INCLUDE_END) 405 elif self.ends_later_than_range(other): 406 end = self.end 407 inclusivity |= (self.inclusivity & CountRange.INCLUDE_END) 408 else: 409 end = other.end 410 inclusivity |= (other.inclusivity & CountRange.INCLUDE_END) 411 412 return CountRange(start, end, inclusivity)
Returns the range that encompasses this and the other range.
414 def split_at(self, count: int) -> Tuple["CountRange", "CountRange"]: 415 """Splits a range at a specified count. 416 417 It is guaranteed that the splitting point will be in the *second* CountRange returned, and not in the first. 418 419 :param count: the count to split at 420 :returns: A pair of CountRange objects 421 :raises: ValueError if count not in self""" 422 423 if count not in self: 424 raise ValueError("Cannot split range {} at {}".format(self, count)) 425 426 return (CountRange(self.start, count, (self.inclusivity & CountRange.INCLUDE_START)), 427 CountRange(count, self.end, CountRange.INCLUDE_START | (self.inclusivity & CountRange.INCLUDE_END)))
Splits a range at a specified count.
It is guaranteed that the splitting point will be in the second CountRange returned, and not in the first.
Parameters
- count: the count to split at :returns: A pair of CountRange objects
Raises
- ValueError if count not in self
429 def range_between(self, other: "CountRange") -> "CountRange": 430 """Returns the range between the end of the earlier range and the start of the later one""" 431 if self.is_contiguous_with_range(other): 432 return CountRange.never() 433 elif self.is_earlier_than_range(other): 434 inclusivity = CountRange.EXCLUSIVE 435 if not self.includes_end(): 436 inclusivity |= CountRange.INCLUDE_START 437 if not other.includes_start(): 438 inclusivity |= CountRange.INCLUDE_END 439 return CountRange(self.end, other.start, inclusivity) 440 else: 441 inclusivity = CountRange.EXCLUSIVE 442 if not self.includes_start(): 443 inclusivity |= CountRange.INCLUDE_END 444 if not other.includes_end(): 445 inclusivity |= CountRange.INCLUDE_START 446 return CountRange(other.end, self.start, inclusivity)
Returns the range between the end of the earlier range and the start of the later one
34class TimeValue(object): 35 """Represents a media unit time value on a timeline (e.g. Flow). 36 37 Supports one of the following input value representations: 38 * Timestamp 39 * int (integer media unit count) 40 * TimeValue 41 * Anything that implements the __mediatimestamp__ magic method 42 43 An optional rate can be set and is required and checked when there is a 44 need to convert between representations. 45 46 Timestamps are converted internally to ints if a rate is provided. 47 48 The time value can be converted to a Timestamp or int using 49 the as_*() methods. 50 """ 51 52 def __init__(self, value: TimeValueConstructTypes, rate: Optional[Fraction] = None): 53 """ 54 :param value: A TimeValue, TimeStamp or int. 55 :param rate: The media unit rate. 56 """ 57 self_rate: Optional[Fraction] 58 self_value: TimeValueRepTypes 59 60 value = _perform_all_conversions(value) 61 62 if isinstance(value, TimeValue): 63 if rate and value._rate != rate: 64 # A rate conversion is required. Convert to a timeoffset here and the 65 # conversion at the end using the new rate 66 try: 67 self_value = value.as_timestamp() 68 except ValueError: 69 # the representation is a count and so we assume it is as the given rate 70 self_value = value._value 71 self_rate = rate 72 else: 73 self_value = value._value 74 self_rate = value._rate 75 elif isinstance(value, (Timestamp, int)): 76 self_value = value 77 self_rate = rate 78 else: 79 raise TypeError("Unsupported value type {!r}".format(value)) 80 81 # Convert to an int if the value is a Timestamp and a rate is 82 # provided. This allows for more efficient calculations and no 83 # normalisation is required. 84 if isinstance(self_value, Timestamp) and self_rate: 85 self_value = self_value.to_count(self_rate.numerator, self_rate.denominator) 86 87 # set attributes using dict to workaround immutability 88 self.__dict__['_value'] = self_value 89 self.__dict__['_rate'] = self_rate 90 91 # provide attribute type info given that attributes are not set directly 92 self._value: TimeValueRepTypes 93 self._rate: Optional[Fraction] 94 95 @classmethod 96 def from_str(cls, s: str, rate: Optional[Fraction] = None) -> "TimeValue": 97 """Parse a time value string 98 99 :param s: The string to convert from. 100 :param rate: The default media unit rate. 101 """ 102 parts = s.split("@") 103 if len(parts) == 2: 104 s_val = parts[0] 105 rate = Fraction(parts[1]) 106 elif len(parts) == 1: 107 s_val = s 108 else: 109 raise ValueError("Multiple '@' in TimeValue string") 110 111 if s_val.isdigit() or ( 112 len(s_val) > 0 and s_val[0] in ['+', '-'] and s_val[1:].isdigit()): 113 return cls(int(s_val), rate=rate) 114 else: 115 return cls(Timestamp.from_str(s_val), rate=rate) 116 117 @classmethod 118 def from_float(cls, f: float, rate: Optional[Fraction] = None) -> "TimeValue": 119 """Parse a time value from a float 120 121 :param f: The float to convert from. 122 :param rate: The default media unit rate. 123 """ 124 return cls(Timestamp.from_float(f), rate=rate) 125 126 @deprecated(version="4.0.0", 127 reason="This method is deprecated. TimeOffset has been merged into Timestamp. " 128 "Use as_timestamp() instead") 129 def as_timeoffset(self) -> TimeOffset: 130 """Legacy method that returned a TimeOffset.""" 131 return self.as_timestamp() 132 133 def as_timestamp(self) -> Timestamp: 134 """Returns a Timestamp representation.""" 135 if isinstance(self._value, Timestamp): 136 return self._value 137 else: 138 rate = self._require_rate() 139 return Timestamp.from_count(self._value, rate.numerator, rate.denominator) 140 141 def __mediatimestamp__(self) -> Timestamp: 142 return self.as_timestamp() 143 144 def __mediatimerange__(self) -> TimeRange: 145 return TimeRange.from_single_timestamp(self.as_timestamp()) 146 147 def as_count(self) -> int: 148 """Returns an integer media unit count representation.""" 149 if isinstance(self._value, Timestamp): 150 rate = self._require_rate() 151 return self._value.to_count(rate.numerator, rate.denominator) 152 else: 153 return self._value 154 155 @property 156 def rate(self) -> Optional[Fraction]: 157 return self._rate 158 159 @property 160 def value(self) -> TimeValueRepTypes: 161 return self._value 162 163 def compare(self, other: TimeValueConstructTypes) -> int: 164 """Compare time values and return an integer to indicate the difference""" 165 other = _perform_all_conversions(other) 166 other_value = self._match_value_type(other) 167 if isinstance(self._value, Timestamp): 168 return self._value.compare(other_value) 169 else: 170 # The logic here follows the Timestamp implementation 171 this_sign = 1 if self >= 0 else -1 172 other_sign = 1 if other >= 0 else -1 173 if this_sign != other_sign: 174 return this_sign 175 elif self < other: 176 return -this_sign 177 elif self > other: 178 return this_sign 179 else: 180 return 0 181 182 def to_str(self, include_rate: bool = True) -> str: 183 """Convert to a string""" 184 result = str(self._value) 185 if self._rate and include_rate: 186 result += "@{}".format(self._rate) 187 return result 188 189 def __setattr__(self, name: str, value: Any) -> None: 190 """Raises a ValueError if attempt to set an attribute on the immutable TimeValue""" 191 raise ValueError("Cannot assign to an immutable TimeValue") 192 193 def __str__(self) -> str: 194 return self.to_str() 195 196 def __repr__(self) -> str: 197 return "{}.{}.from_str('{}')".format(type(self).__module__, type(self).__name__, self.to_str()) 198 199 def __abs__(self) -> "TimeValue": 200 """Return the absolute TimeValue""" 201 return TimeValue(self._value.__abs__(), self._rate) 202 203 def __eq__(self, other: object) -> bool: 204 """"Return true if the TimeValues are equal""" 205 if not isinstance(other, (SupportsMediaTimestamp, int, TimeValue)): 206 return False 207 other_value = self._match_value_type(other) 208 return self._value.__eq__(other_value) 209 210 def __ne__(self, other: object) -> bool: 211 """"Return true if the TimeValues are not equal""" 212 if not isinstance(other, (SupportsMediaTimestamp, int, TimeValue)): 213 return True 214 other_value = self._match_value_type(other) 215 return self._value.__ne__(other_value) 216 217 def __lt__(self, other: TimeValueConstructTypes) -> bool: 218 """"Return true if this TimeValue is less than the other TimeValue""" 219 other_value = self._match_value_type(other) 220 return self._value.__lt__(other_value) # type: ignore 221 222 def __le__(self, other: TimeValueConstructTypes) -> bool: 223 """"Return true if this TimeValue is less than or equal to the other TimeValue""" 224 other_value = self._match_value_type(other) 225 return self._value.__le__(other_value) # type: ignore 226 227 def __gt__(self, other: TimeValueConstructTypes) -> bool: 228 """"Return true if this TimeValue is greater than the other TimeValue""" 229 other_value = self._match_value_type(other) 230 return self._value.__gt__(other_value) # type: ignore 231 232 def __ge__(self, other: TimeValueConstructTypes) -> bool: 233 """"Return true if this TimeValue is greater than or equal to the other TimeValue""" 234 other_value = self._match_value_type(other) 235 return self._value.__ge__(other_value) # type: ignore 236 237 def __add__(self, other: TimeValueConstructTypes) -> "TimeValue": 238 """"Return a TimeValue that is the sum of this and the other TimeValue""" 239 other_value = self._match_value_type(other) 240 return TimeValue(self._value.__add__(other_value), self._rate) # type: ignore 241 242 def __sub__(self, other: TimeValueConstructTypes) -> "TimeValue": 243 """"Return a TimeValue that is the difference between this and the other TimeValue""" 244 other_value = self._match_value_type(other) 245 return TimeValue(self._value.__sub__(other_value), self._rate) # type: ignore 246 247 def __mul__(self, anint: int) -> "TimeValue": 248 """"Return this TimeValue multiplied by an integer""" 249 self._check_is_int(anint) 250 return TimeValue(self._value.__mul__(anint), self._rate) 251 252 def __rmul__(self, anint: int) -> "TimeValue": 253 """"Return this TimeValue multiplied by an integer""" 254 self._check_is_int(anint) 255 return (self * anint) 256 257 def __div__(self, anint: int) -> "TimeValue": 258 """"Return this TimeValue divided by an integer, rounded down to -inf""" 259 self._check_is_int(anint) 260 return (self // anint) 261 262 def __truediv__(self, anint: int) -> "TimeValue": 263 """"Return this TimeValue divided by an integer, rounded down to -inf""" 264 self._check_is_int(anint) 265 return (self // anint) 266 267 def __floordiv__(self, anint: int) -> "TimeValue": 268 """"Return this TimeValue divided by an integer, rounded down to -inf""" 269 self._check_is_int(anint) 270 return TimeValue(self._value.__floordiv__(anint), self._rate) 271 272 def __hash__(self) -> int: 273 return hash(repr(self)) 274 275 def _match_value_type(self, other: TimeValueConstructTypes) -> TimeValueRepTypes: 276 """Converts the other value type to self's value type. 277 278 A rate conversion is done if `other` is a TimeValue and the rate 279 differs from self._rate. 280 281 :param other: A TimeValue, Timestamp or int. 282 """ 283 other_tv = TimeValue(other, rate=self._rate) 284 if isinstance(self._value, Timestamp): 285 return other_tv.as_timestamp() 286 else: 287 return other_tv.as_count() 288 289 def _require_rate(self) -> Fraction: 290 """Raise an exception if the self._rate is not set or zero. 291 """ 292 if not self._rate: 293 raise ValueError("A non-zero TimeValue rate is required for conversion") 294 295 return self._rate 296 297 def _check_is_int(self, anint: Any) -> None: 298 """Raise an exception if the parameter is not an integer. 299 300 :param anint: Parameter to check. 301 """ 302 if not isinstance(anint, int): 303 raise TypeError("TimeValue operator parameter {!r} is not an 'int'".format(anint))
Represents a media unit time value on a timeline (e.g. Flow).
Supports one of the following input value representations:
- Timestamp
- int (integer media unit count)
- TimeValue
- Anything that implements the __mediatimestamp__ magic method
An optional rate can be set and is required and checked when there is a need to convert between representations.
Timestamps are converted internally to ints if a rate is provided.
The time value can be converted to a Timestamp or int using the as_*() methods.
52 def __init__(self, value: TimeValueConstructTypes, rate: Optional[Fraction] = None): 53 """ 54 :param value: A TimeValue, TimeStamp or int. 55 :param rate: The media unit rate. 56 """ 57 self_rate: Optional[Fraction] 58 self_value: TimeValueRepTypes 59 60 value = _perform_all_conversions(value) 61 62 if isinstance(value, TimeValue): 63 if rate and value._rate != rate: 64 # A rate conversion is required. Convert to a timeoffset here and the 65 # conversion at the end using the new rate 66 try: 67 self_value = value.as_timestamp() 68 except ValueError: 69 # the representation is a count and so we assume it is as the given rate 70 self_value = value._value 71 self_rate = rate 72 else: 73 self_value = value._value 74 self_rate = value._rate 75 elif isinstance(value, (Timestamp, int)): 76 self_value = value 77 self_rate = rate 78 else: 79 raise TypeError("Unsupported value type {!r}".format(value)) 80 81 # Convert to an int if the value is a Timestamp and a rate is 82 # provided. This allows for more efficient calculations and no 83 # normalisation is required. 84 if isinstance(self_value, Timestamp) and self_rate: 85 self_value = self_value.to_count(self_rate.numerator, self_rate.denominator) 86 87 # set attributes using dict to workaround immutability 88 self.__dict__['_value'] = self_value 89 self.__dict__['_rate'] = self_rate 90 91 # provide attribute type info given that attributes are not set directly 92 self._value: TimeValueRepTypes 93 self._rate: Optional[Fraction]
Parameters
- value: A TimeValue, TimeStamp or int.
- rate: The media unit rate.
95 @classmethod 96 def from_str(cls, s: str, rate: Optional[Fraction] = None) -> "TimeValue": 97 """Parse a time value string 98 99 :param s: The string to convert from. 100 :param rate: The default media unit rate. 101 """ 102 parts = s.split("@") 103 if len(parts) == 2: 104 s_val = parts[0] 105 rate = Fraction(parts[1]) 106 elif len(parts) == 1: 107 s_val = s 108 else: 109 raise ValueError("Multiple '@' in TimeValue string") 110 111 if s_val.isdigit() or ( 112 len(s_val) > 0 and s_val[0] in ['+', '-'] and s_val[1:].isdigit()): 113 return cls(int(s_val), rate=rate) 114 else: 115 return cls(Timestamp.from_str(s_val), rate=rate)
Parse a time value string
Parameters
- s: The string to convert from.
- rate: The default media unit rate.
117 @classmethod 118 def from_float(cls, f: float, rate: Optional[Fraction] = None) -> "TimeValue": 119 """Parse a time value from a float 120 121 :param f: The float to convert from. 122 :param rate: The default media unit rate. 123 """ 124 return cls(Timestamp.from_float(f), rate=rate)
Parse a time value from a float
Parameters
- f: The float to convert from.
- rate: The default media unit rate.
126 @deprecated(version="4.0.0", 127 reason="This method is deprecated. TimeOffset has been merged into Timestamp. " 128 "Use as_timestamp() instead") 129 def as_timeoffset(self) -> TimeOffset: 130 """Legacy method that returned a TimeOffset.""" 131 return self.as_timestamp()
Legacy method that returned a TimeOffset.
133 def as_timestamp(self) -> Timestamp: 134 """Returns a Timestamp representation.""" 135 if isinstance(self._value, Timestamp): 136 return self._value 137 else: 138 rate = self._require_rate() 139 return Timestamp.from_count(self._value, rate.numerator, rate.denominator)
Returns a Timestamp representation.
147 def as_count(self) -> int: 148 """Returns an integer media unit count representation.""" 149 if isinstance(self._value, Timestamp): 150 rate = self._require_rate() 151 return self._value.to_count(rate.numerator, rate.denominator) 152 else: 153 return self._value
Returns an integer media unit count representation.
163 def compare(self, other: TimeValueConstructTypes) -> int: 164 """Compare time values and return an integer to indicate the difference""" 165 other = _perform_all_conversions(other) 166 other_value = self._match_value_type(other) 167 if isinstance(self._value, Timestamp): 168 return self._value.compare(other_value) 169 else: 170 # The logic here follows the Timestamp implementation 171 this_sign = 1 if self >= 0 else -1 172 other_sign = 1 if other >= 0 else -1 173 if this_sign != other_sign: 174 return this_sign 175 elif self < other: 176 return -this_sign 177 elif self > other: 178 return this_sign 179 else: 180 return 0
Compare time values and return an integer to indicate the difference
28class TimeValueRange(Reversible[TimeValue]): 29 """Represents a range of media unit time values on a timeline (e.g. Flow). 30 31 Supports these range types if start_or_value is a range value: 32 * TimeRange 33 * CountRange 34 * TimeValueRange 35 * Anything that implements the __mediatimerange__ magic method, but does not implement the __mediatimestamp__ 36 magic methods. 37 38 Supports one of the following time value constructor representations if start_or_value is a range start: 39 * Timestamp 40 * int (integer media unit count) 41 * TimeValue 42 * Anything that implements the __mediatimestamp__ magic method 43 44 If state_or_value could be interpreted as a range value without 45 46 An optional rate can be set and is required and checked when there is a 47 need to convert between representations. 48 49 The time value range can be converted to a TimeRange or CountRange using 50 the as_*() methods. 51 52 The time value range implements the __mediatimerange__ magic method. 53 """ 54 55 # These inclusivity and rounding values must match the TimeRange values 56 57 EXCLUSIVE = 0x0 58 INCLUDE_START = 0x1 59 INCLUDE_END = 0x2 60 INCLUSIVE = 0x3 61 62 ROUND_DOWN = 0 63 ROUND_NEAREST = 1 64 ROUND_UP = 2 65 ROUND_IN = 3 66 ROUND_OUT = 4 67 ROUND_START = 5 68 ROUND_END = 6 69 70 def __init__(self, start_or_value: Optional[Union[TimeValueConstructTypes, RangeConstructionTypes]], 71 end: Optional[TimeValueConstructTypes] = None, 72 inclusivity: Optional[int] = None, 73 rate: Optional[Fraction] = None, 74 *, 75 start: Optional[TimeValueConstructTypes] = None, 76 value: Optional[RangeConstructionTypes] = None): 77 """Construct a time value range 78 79 :param start_or_value: The start of the range, a range or None 80 :param end: The end of the range or None 81 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 82 :param rate: The media unit rate. 83 """ 84 self_start: Optional[TimeValue] 85 self_end: Optional[TimeValue] 86 self_rate: Optional[Fraction] = rate 87 88 if value is None and ( 89 isinstance(start_or_value, (TimeValueRange, TimeRange, CountRange)) or 90 ( 91 not isinstance(start_or_value, SupportsMediaTimestamp) and 92 isinstance(start_or_value, SupportsMediaTimeRange) 93 ) 94 ): 95 value = start_or_value 96 97 if isinstance(value, (TimeValueRange, TimeRange, CountRange)): 98 start = value.start 99 end = value.end 100 self_inclusivity = inclusivity if inclusivity is not None else value.inclusivity 101 elif ( 102 not isinstance(range, SupportsMediaTimestamp) and 103 isinstance(range, SupportsMediaTimeRange) 104 ): 105 value = mediatimerange(value) 106 start = value.start 107 end = value.end 108 self_inclusivity = inclusivity if inclusivity is not None else value.inclusivity 109 else: 110 if start_or_value is not None: 111 if not isinstance(start_or_value, (SupportsMediaTimestamp, int, TimeValue)): 112 raise ValueError(f"Unsupported type for start: {start_or_value!r}") 113 if start is not None: 114 raise ValueError("Cannot specify start or value as a positional and a keyword parameter!") 115 start = start_or_value 116 self_inclusivity = inclusivity if inclusivity is not None else TimeValueRange.INCLUSIVE 117 118 if start is not None: 119 self_start = TimeValue(start, rate=self_rate) 120 self_rate = self_start._rate 121 else: 122 self_start = None 123 124 if end is not None: 125 self_end = TimeValue(end, rate=self_rate) 126 self_rate = self_end._rate 127 else: 128 self_end = None 129 130 # Add a rate to the start if it was available in end 131 if self_rate and self_start is not None: 132 self_start = TimeValue(self_start, rate=self_rate) 133 134 # normalise the representation to always have an inclusive start if bounded 135 if self_start is not None and ( 136 (self_rate or isinstance(self_start.value, int)) and 137 not (self_inclusivity & TimeValueRange.INCLUDE_START)): 138 self_start = self_start + 1 139 self_inclusivity |= TimeValueRange.INCLUDE_START 140 141 # normalise the representation to always have an exclusive end if bounded 142 if self_end is not None and ( 143 (self_rate or isinstance(self_end.value, int)) and 144 (self_inclusivity & TimeValueRange.INCLUDE_END)): 145 self_end = self_end + 1 146 self_inclusivity &= ~TimeValueRange.INCLUDE_END 147 148 # Normalise the 'never' cases 149 if self_start is not None and self_end is not None: 150 if self_start > self_end or (self_start == self_end and self_inclusivity != TimeValueRange.INCLUSIVE): 151 self_start = TimeValue(0, self_rate) 152 self_end = TimeValue(0, self_rate) 153 self_inclusivity = TimeValueRange.EXCLUSIVE 154 155 # Normalise the 'eternity' cases 156 if self_start is None and self_end is None: 157 self_inclusivity = TimeValueRange.INCLUSIVE 158 159 # set attributes using dict to workaround immutability 160 self.__dict__['_start'] = self_start 161 self.__dict__['_end'] = self_end 162 self.__dict__['_inclusivity'] = self_inclusivity 163 self.__dict__['_rate'] = self_rate 164 165 # provide attribute type info given that attributes are not set directly 166 self._start: Optional[TimeValue] 167 self._end: Optional[TimeValue] 168 self._inclusivity: int 169 self._rate: Optional[Fraction] 170 171 @classmethod 172 def from_start(cls, start: TimeValueConstructTypes, 173 inclusivity: int = INCLUSIVE, 174 rate: Optional[Fraction] = None) -> "TimeValueRange": 175 """Construct a range starting at start with no end 176 177 :param start: A time value or type supported by TimeValue 178 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 179 :param rate: The media unit rate.""" 180 return cls(start, None, inclusivity=inclusivity, rate=rate) 181 182 @classmethod 183 def from_end(cls, end: TimeValueConstructTypes, 184 inclusivity: int = INCLUSIVE, 185 rate: Optional[Fraction] = None) -> "TimeValueRange": 186 """Construct a range ending at end with no start 187 188 :param end: A time value or type supported by TimeValue 189 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 190 :param rate: The media unit rate.""" 191 return cls(None, end, inclusivity=inclusivity, rate=rate) 192 193 @classmethod 194 def from_start_length(cls, start: TimeValueConstructTypes, 195 length: TimeValueConstructTypes, 196 inclusivity: int = INCLUSIVE, 197 rate: Optional[Fraction] = None) -> "TimeValueRange": 198 """Construct a range starting at start and ending at (start + length) 199 200 :param start: A time value or type supported by TimeValue 201 :param length: A time value or type supported by TimeValue, which must be non-negative 202 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 203 :param rate: The media unit rate. 204 205 :raises: ValueError if the length is negative""" 206 length = _perform_all_conversions(length) 207 if length < 0: 208 raise ValueError("Length must be non-negative") 209 210 end = TimeValue(start, rate=rate) + length 211 return cls(start, end, inclusivity=inclusivity, rate=rate) 212 213 @classmethod 214 def eternity(cls, rate: Optional[Fraction] = None) -> "TimeValueRange": 215 """Return an unbounded range covering all time""" 216 return cls(None, None, rate=rate) 217 218 @classmethod 219 def never(cls, rate: Optional[Fraction] = None) -> "TimeValueRange": 220 """Return a range covering no time 221 222 :param rate: The media unit rate.""" 223 return cls(TimeValue(0), TimeValue(0), inclusivity=TimeValueRange.EXCLUSIVE, rate=rate) 224 225 @classmethod 226 def from_single_value(cls, value: TimeValueConstructTypes, 227 rate: Optional[Fraction] = None) -> "TimeValueRange": 228 """Construct a range containing only a single time value 229 230 :param value: A time value or type supported by TimeValue 231 :param rate: The media unit rate.""" 232 return cls(value, value, inclusivity=TimeValueRange.INCLUSIVE, rate=rate) 233 234 @classmethod 235 def from_str(cls, s: str, rate: Optional[Fraction] = None) -> "TimeValueRange": 236 """Convert a string to a range. 237 238 Valid ranges are: 239 [<tv>_<tv>] 240 [<tv>_<tv>) 241 (<tv>_<tv>] 242 (<tv>_<tv>) 243 [<tv>] 244 <tv>_<tv> 245 <tv> 246 () 247 248 where <tv> is an integer or an empty string. 249 250 The meaning of these is relatively simple: [ indicates including the start time value, 251 ( indicates excluding it, ] indicates including the end time value, and ) indicates excluding it. 252 If brackets are ommitted entirely then this is taken as an inclusive range at both ends. 253 Omitting a time value indicates that there is no bound on that end (ie. the range goes on forever), 254 including only a single time value by itself indicates a range containing exactly that one time value. 255 Finally the string "()" represents the empty range. 256 257 :param s: The string to process 258 :param rate: The media unit rate. 259 """ 260 m = re.match(r'(\[|\()?([^_\)\]]+)?(_([^_\)\]]+)?)?(\]|\))?(@([^\/]+(\/.+)?))?', s) 261 262 if m is None: 263 raise ValueError("Invalid TimeValueRange string") 264 265 inc = TimeValueRange.INCLUSIVE 266 if m.group(1) == "(": 267 inc &= ~TimeValueRange.INCLUDE_START 268 if m.group(5) == ")": 269 inc &= ~TimeValueRange.INCLUDE_END 270 271 start_str = m.group(2) 272 end_str = m.group(4) 273 rate_str = m.group(7) 274 275 if rate is None and rate_str is not None: 276 rate = Fraction(rate_str) 277 278 if start_str is not None: 279 start = TimeValue.from_str(start_str, rate=rate) 280 else: 281 start = None 282 if end_str is not None: 283 end = TimeValue.from_str(end_str, rate=rate) 284 else: 285 end = None 286 287 if start is None and end is None: 288 # Ie. we have no first or second value 289 if m.group(3) is not None: 290 # ie. we have a '_' character 291 return cls.eternity() 292 else: 293 # We have no '_' character, so the whole range is empty 294 return cls.never() 295 elif start is not None and end is None and m.group(3) is None: 296 return cls.from_single_value(start) 297 else: 298 return cls(start, end, inclusivity=inc, rate=rate) 299 300 def as_timerange(self) -> TimeRange: 301 """Returns a TimeRange representation.""" 302 start = self._start.as_timestamp() if self._start is not None else None 303 end = self._end.as_timestamp() if self._end is not None else None 304 inclusivity = self._inclusivity 305 return TimeRange(start, end, inclusivity=TimeRange.Inclusivity(inclusivity)) 306 307 def __mediatimerange__(self) -> TimeRange: 308 return self.as_timerange() 309 310 def as_count_range(self) -> CountRange: 311 """Returns a CountRange representation.""" 312 start = self._start.as_count() if self._start is not None else None 313 end = self._end.as_count() if self._end is not None else None 314 inclusivity = self._inclusivity 315 return CountRange(start, end, inclusivity=inclusivity) 316 317 @property 318 def start(self) -> Optional[TimeValue]: 319 return self._start 320 321 @property 322 def end(self) -> Optional[TimeValue]: 323 return self._end 324 325 @property 326 def inclusivity(self) -> int: 327 return self._inclusivity 328 329 @property 330 def rate(self) -> Optional[Fraction]: 331 return self._rate 332 333 def length_as_timestamp(self) -> Union[Timestamp, float]: 334 """Returns the range length as a Timestamp or the float value infinity""" 335 return self.as_timerange().length 336 337 @deprecated(version="4.0.0", 338 reason="This method is deprecated. TimeOffset has been merged into Timestamp. " 339 "Use length_as_timestamp() instead") 340 def length_as_timeoffset(self) -> Union[TimeOffset, float]: 341 """Legacy method that returns the range length as a TimeOffset or the float value infinity""" 342 return self.length_as_timestamp() 343 344 def length_as_count(self) -> Union[int, float]: 345 """Returns the range length as an media unit count""" 346 return self.as_count_range().length 347 348 def bounded_before(self) -> bool: 349 """Return true if the start of the range is bounded""" 350 return self._start is not None 351 352 def bounded_after(self) -> bool: 353 """Return true if the end of the range is bounded""" 354 return self._end is not None 355 356 def unbounded(self) -> bool: 357 """Return true if neither the start or end of the range is bounded""" 358 return self._start is None and self._end is None 359 360 def includes_start(self) -> bool: 361 """Return true if the start is inclusive""" 362 return (self._inclusivity & TimeValueRange.INCLUDE_START) != 0 363 364 def includes_end(self) -> bool: 365 """Return true if the end is inclusive""" 366 return (self._inclusivity & TimeValueRange.INCLUDE_END) != 0 367 368 def finite(self) -> bool: 369 """Return true if the range is finite""" 370 return (self._start is not None and self._end is not None) 371 372 def contains_subrange(self, other: RangeConstructionTypes) -> bool: 373 """Returns true if the range supplied lies entirely inside this range""" 374 other = self._as_time_value_range(other) 375 376 return ((not self.is_empty()) and 377 (other.is_empty() or 378 (self._start is None or (other._start is not None and self._start <= other._start)) and 379 (self._end is None or (other._end is not None and self._end >= other._end)) and 380 (not ((self._start is not None) and 381 (other._start is not None) and 382 (self._start == other._start) and 383 (self._inclusivity & TimeValueRange.INCLUDE_START == 0) and 384 (other._inclusivity & TimeValueRange.INCLUDE_START != 0))) and 385 (not ((self._end is not None) and 386 (other._end is not None) and 387 (self._end == other._end) and 388 (self._inclusivity & TimeValueRange.INCLUDE_END == 0) and 389 (other._inclusivity & TimeValueRange.INCLUDE_END != 0))))) 390 391 def to_str(self, with_inclusivity_markers: bool = True, 392 include_rate: bool = True) -> str: 393 """Convert to [<value>_<value>] format, 394 usually the opening and closing delimiters are set to [ or ] for inclusive and ( or ) for exclusive ranges. 395 Unbounded ranges have no marker attached to them. 396 397 :param with_inclusivity_markers: if set to False do not include parentheses/brackecount 398 :param include_rate: If True and there is a non-zero media rate then include the media rate suffix string 399 """ 400 if self.is_empty(): 401 if with_inclusivity_markers: 402 return "()" 403 else: 404 return "" 405 elif self._start is not None and self._end is not None and self._start == self._end: 406 if with_inclusivity_markers: 407 return "[" + self._start.to_str(False) + "]" 408 else: 409 return self._start.to_str(False) 410 411 if with_inclusivity_markers: 412 brackets = [("(", ")"), ("[", ")"), ("(", "]"), ("[", "]")][self._inclusivity] 413 else: 414 brackets = ("", "") 415 416 result = '_'.join([ 417 (brackets[0] + self._start.to_str(False)) if self._start is not None else '', 418 (self._end.to_str(False) + brackets[1]) if self._end is not None else '' 419 ]) 420 if include_rate and self._rate: 421 result += "@{}".format(self._rate) 422 return result 423 424 def intersect_with(self, other: RangeConstructionTypes) -> "TimeValueRange": 425 """Return a range which represents the intersection of this range with another""" 426 other = self._as_time_value_range(other) 427 428 if self.is_empty() or other.is_empty(): 429 return TimeValueRange.never() 430 431 start = self._start 432 if other._start is not None and (self._start is None or self._start < other._start): 433 start = other._start 434 end = self._end 435 if other._end is not None and (self._end is None or self._end > other._end): 436 end = other._end 437 438 inclusivity = TimeValueRange.EXCLUSIVE 439 if start is None or (start in self and start in other): 440 inclusivity |= TimeValueRange.INCLUDE_START 441 if end is None or (end in self and end in other): 442 inclusivity |= TimeValueRange.INCLUDE_END 443 444 if start is not None and end is not None and start > end: 445 return TimeValueRange.never() 446 447 return TimeValueRange(start, end, inclusivity) 448 449 def starts_inside_range(self, other: RangeConstructionTypes) -> bool: 450 """Returns true if the start of this range is located inside the other.""" 451 other = self._as_time_value_range(other) 452 453 return (not self.is_empty() and 454 not other.is_empty() and 455 ((self.bounded_before() and cast(TimeValue, self._start) in other and 456 (not (other.bounded_after() and self._start == other._end and not self.includes_start()))) or 457 (self.bounded_before() and other.bounded_before() and self._start == other._start and 458 (not (self.includes_start() and not other.includes_start()))) or 459 (not self.bounded_before() and not other.bounded_before()))) 460 461 def ends_inside_range(self, other: RangeConstructionTypes) -> bool: 462 """Returns true if the end of this range is located inside the other.""" 463 other = self._as_time_value_range(other) 464 465 return (not self.is_empty() and 466 not other.is_empty() and 467 ((self.bounded_after() and cast(TimeValue, self._end) in other and 468 (not (other.bounded_before() and self._end == other._start and not self.includes_end()))) or 469 (self.bounded_after() and other.bounded_after() and self._end == other._end and 470 (not (self.includes_end() and not other.includes_end()))) or 471 (not self.bounded_after() and not other.bounded_after()))) 472 473 def is_earlier_than_range(self, other: RangeConstructionTypes) -> bool: 474 """Returns true if this range ends earlier than the start of the other.""" 475 other = self._as_time_value_range(other) 476 477 return (not self.is_empty() and 478 not other.is_empty() and 479 (other.bounded_before() and 480 other._start is not None) and # redundant but is for type checking 481 (self.bounded_after() and 482 self._end is not None) and # redundant but is for type checking 483 (self._end < other._start or 484 (self._end == other._start and 485 not (self.includes_end() and other.includes_start())))) 486 487 def is_later_than_range(self, other: RangeConstructionTypes) -> bool: 488 """Returns true if this range starts later than the end of the other.""" 489 other = self._as_time_value_range(other) 490 491 return (not self.is_empty() and 492 not other.is_empty() and 493 (other.bounded_after() and 494 other._end is not None) and # redundant but is for type checking 495 (self.bounded_before() and 496 self._start is not None) and # redundant but is for type checking 497 (self._start > other._end or 498 (self._start == other._end and 499 not (self.includes_start() and other.includes_end())))) 500 501 def starts_earlier_than_range(self, other: RangeConstructionTypes) -> bool: 502 """Returns true if this range starts earlier than the start of the other.""" 503 other = self._as_time_value_range(other) 504 505 return (not self.is_empty() and 506 not other.is_empty() and 507 (other.bounded_before() and 508 other._start is not None) and # redundant but is for type checking 509 ((not self.bounded_before() or 510 self._start is None) or # redundant but is for type checking 511 (self._start < other._start or 512 (self._start == other._start and 513 self.includes_start() and 514 not other.includes_start())))) 515 516 def starts_later_than_range(self, other: RangeConstructionTypes) -> bool: 517 """Returns true if this range starts later than the start of the other.""" 518 other = self._as_time_value_range(other) 519 520 return (not self.is_empty() and 521 not other.is_empty() and 522 (self.bounded_before() and 523 self._start is not None) and # redundant but is for type checking 524 ((not other.bounded_before() or 525 other._start is None) or # redundant but is for type checking 526 (self._start > other._start or 527 (self._start == other._start and 528 (not self.includes_start() and other.includes_start()))))) 529 530 def ends_earlier_than_range(self, other: RangeConstructionTypes) -> bool: 531 """Returns true if this range ends earlier than the end of the other.""" 532 other = self._as_time_value_range(other) 533 534 return (not self.is_empty() and 535 not other.is_empty() and 536 (self.bounded_after() and 537 self._end is not None) and # redundant but is for type checking 538 ((not other.bounded_after() or 539 other._end is None) or # redundant but is for type checking 540 (self._end < other._end or 541 (self._end == other._end and 542 (not self.includes_end() and other.includes_end()))))) 543 544 def ends_later_than_range(self, other: RangeConstructionTypes) -> bool: 545 """Returns true if this range ends later than the end of the other.""" 546 other = self._as_time_value_range(other) 547 548 return (not self.is_empty() and 549 not other.is_empty() and 550 (other.bounded_after() and 551 other._end is not None) and # redundant but is for type checking 552 ((not self.bounded_after() or 553 self._end is None) or # redundant but is for type checking 554 (self._end > other._end or 555 (self._end == other._end and 556 self.includes_end() and 557 not other.includes_end())))) 558 559 def overlaps_with_range(self, other: RangeConstructionTypes) -> bool: 560 """Returns true if this range and the other overlap.""" 561 other = self._as_time_value_range(other) 562 563 return (not self.is_earlier_than_range(other) and not self.is_later_than_range(other)) 564 565 def is_contiguous_with_range(self, other: RangeConstructionTypes) -> bool: 566 """Returns true if the union of this range and the other would be a valid range""" 567 other = self._as_time_value_range(other) 568 569 return (self.overlaps_with_range(other) or 570 (self.is_earlier_than_range(other) and 571 self._end == other._start and 572 (self.includes_end() or other.includes_start())) or 573 (self.is_later_than_range(other) and 574 self._start == other._end and 575 (self.includes_start() or other.includes_end()))) 576 577 def union_with_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 578 """Returns the union of this range and the other. 579 :raises: ValueError if the ranges are not contiguous.""" 580 other = self._as_time_value_range(other) 581 582 if not self.is_contiguous_with_range(other): 583 raise ValueError("TimeValueRanges {} and {} are not contiguous, so cannot take the union.".format( 584 self, other)) 585 586 return self.extend_to_encompass_range(other) 587 588 def extend_to_encompass_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 589 """Returns the range that encompasses this and the other range.""" 590 other = self._as_time_value_range(other) 591 592 if self.is_empty(): 593 return other 594 595 if other.is_empty(): 596 return self 597 598 inclusivity = TimeValueRange.EXCLUSIVE 599 if self._start == other._start: 600 start = self._start 601 inclusivity |= ((self._inclusivity | other._inclusivity) & TimeValueRange.INCLUDE_START) 602 elif self.starts_earlier_than_range(other): 603 start = self._start 604 inclusivity |= (self._inclusivity & TimeValueRange.INCLUDE_START) 605 else: 606 start = other._start 607 inclusivity |= (other._inclusivity & TimeValueRange.INCLUDE_START) 608 609 if self._end == other._end: 610 end = self._end 611 inclusivity |= ((self._inclusivity | other._inclusivity) & TimeValueRange.INCLUDE_END) 612 elif self.ends_later_than_range(other): 613 end = self._end 614 inclusivity |= (self._inclusivity & TimeValueRange.INCLUDE_END) 615 else: 616 end = other._end 617 inclusivity |= (other._inclusivity & TimeValueRange.INCLUDE_END) 618 619 return TimeValueRange(start, end, inclusivity) 620 621 def split_at(self, value: TimeValueConstructTypes) -> Tuple["TimeValueRange", "TimeValueRange"]: 622 """Splits a range at a specified value. 623 624 It is guaranteed that the splitting point will be in the *second* TimeValueRange returned, and not in the first. 625 626 :param value: the time value to split at 627 :returns: A pair of TimeValueRange objects 628 :raises: ValueError if value not in self""" 629 value = self._as_time_value(value) 630 631 if value not in self: 632 raise ValueError("Cannot split range {} at {}".format(self, value)) 633 634 return (TimeValueRange(self._start, 635 value, 636 (self._inclusivity & TimeValueRange.INCLUDE_START)), 637 TimeValueRange(value, 638 self._end, 639 TimeValueRange.INCLUDE_START | (self._inclusivity & TimeValueRange.INCLUDE_END))) 640 641 def split_after(self, value: TimeValueConstructTypes) -> Tuple["TimeValueRange", "TimeValueRange"]: 642 """Splits a range after a specified value. 643 644 It is guaranteed that the splitting point will be in the *first* TimeValueRange returned, and not in the second. 645 646 :param value: the time value to split at 647 :returns: A pair of TimeValueRange objects 648 :raises: ValueError if value not in self""" 649 value = self._as_time_value(value) 650 651 if value not in self: 652 raise ValueError("Cannot split range {} at {}".format(self, value)) 653 654 return (TimeValueRange(self._start, 655 value, 656 TimeValueRange.INCLUDE_END | (self._inclusivity & TimeValueRange.INCLUDE_START)), 657 TimeValueRange(value, 658 self._end, 659 (self._inclusivity & TimeValueRange.INCLUDE_END))) 660 661 def excluding_up_to_end_of_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 662 """Returns the portion of this timerange which is not earlier than or contained in the 663 given timerange. 664 """ 665 other = self._as_time_value_range(other) 666 667 if other.is_empty() or other.is_earlier_than_range(self): 668 return self 669 elif not other.bounded_after() or cast(TimeValue, other.end) not in self: 670 return TimeValueRange.never() 671 else: 672 if other.includes_end(): 673 return self.split_after(cast(TimeValue, other.end))[1] 674 else: 675 return self.split_at(cast(TimeValue, other.end))[1] 676 677 def excluding_before_start_of_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 678 """Returns the portion of this timerange which is not later than or contained in the 679 given timerange. 680 """ 681 other = self._as_time_value_range(other) 682 683 if other.is_empty() or other.is_later_than_range(self): 684 return self 685 elif not other.bounded_before() or cast(TimeValue, other.start) not in self: 686 return TimeValueRange.never() 687 else: 688 assert (isinstance(other.start, TimeValue)) 689 if other.includes_start(): 690 return self.split_at(other.start)[0] 691 else: 692 return self.split_after(other.start)[0] 693 694 def range_between(self, other: RangeConstructionTypes) -> "TimeValueRange": 695 """Returns the range between the end of the earlier range and the start of the later one""" 696 other = self._as_time_value_range(other) 697 698 if self.is_contiguous_with_range(other): 699 return TimeValueRange.never() 700 elif self.is_earlier_than_range(other): 701 inclusivity = TimeValueRange.EXCLUSIVE 702 if not self.includes_end(): 703 inclusivity |= TimeValueRange.INCLUDE_START 704 if not other.includes_start(): 705 inclusivity |= TimeValueRange.INCLUDE_END 706 return TimeValueRange(self._end, other._start, inclusivity) 707 else: 708 inclusivity = TimeValueRange.EXCLUSIVE 709 if not self.includes_start(): 710 inclusivity |= TimeValueRange.INCLUDE_END 711 if not other.includes_end(): 712 inclusivity |= TimeValueRange.INCLUDE_START 713 return TimeValueRange(other._end, self._start, inclusivity) 714 715 def is_empty(self) -> bool: 716 """Returns true on any empty range.""" 717 return (self._start is not None and 718 self._end is not None and 719 self._start == self._end and 720 self._inclusivity != TimeValueRange.INCLUSIVE) 721 722 def __setattr__(self, name: str, value: Any) -> None: 723 """Raises a ValueError if attempt to set an attribute on the immutable TimeValueRange""" 724 raise ValueError("Cannot assign to an immutable TimeValueRange") 725 726 def __contains__(self, value: TimeValueConstructTypes) -> bool: 727 """Returns true if the time value is within this range.""" 728 value = self._as_time_value(value) 729 730 return ((self._start is None or value >= self._start) and 731 (self._end is None or value <= self._end) and 732 (not ((self._start is not None) and 733 (value == self._start) and 734 (self._inclusivity & TimeValueRange.INCLUDE_START == 0))) and 735 (not ((self._end is not None) and 736 (value == self._end) and 737 (self._inclusivity & TimeValueRange.INCLUDE_END == 0)))) 738 739 def __eq__(self, other: object) -> bool: 740 """Return true if the ranges are equal""" 741 try: 742 other = self._as_time_value_range(cast(RangeConstructionTypes, other)) 743 except Exception: 744 return False 745 746 return (((self.is_empty() and other.is_empty()) or 747 (((self._start is None and other._start is None) or 748 (self._start == other._start and 749 (self._inclusivity & TimeValueRange.INCLUDE_START) == (other._inclusivity & TimeValueRange.INCLUDE_START))) and # noqa: E501 750 ((self._end is None and other._end is None) or 751 (self._end == other._end and 752 (self._inclusivity & TimeValueRange.INCLUDE_END) == (other._inclusivity & TimeValueRange.INCLUDE_END)))))) # noqa: E501 753 754 def __str__(self) -> str: 755 return self.to_str() 756 757 def __repr__(self) -> str: 758 return "{}.{}.from_str('{}')".format(type(self).__module__, type(self).__name__, self.to_str()) 759 760 def _as_time_value(self, other: TimeValueConstructTypes) -> TimeValue: 761 """Returns a TimeValue from `other`. 762 763 A rate conversion is done if `other` is a TimeValue and the rate 764 differs from self._rate. 765 766 :param other: A TimeValue, TimeStamp or int. 767 """ 768 return TimeValue(other, rate=self._rate) 769 770 def _as_time_value_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 771 """Returns a TimeValueRange from `other`. 772 773 A rate conversion is done if `other` is a TimeValueRange and the rate 774 differs from self._rate. 775 776 :param other: A TimeValueRange, TimeRange or CountRange. 777 """ 778 return TimeValueRange(other, rate=self._rate) 779 780 def subranges(self, rate: Optional[Fraction] = None) -> Iterator["TimeValueRange"]: 781 """Divide this range naturally into subranges. 782 783 (nb. Ranges with a rate set are automatically normalised to half-open, which may lead 784 to unexpected results when using this method with some ranges that have a rate but were 785 not originally defined as half-open) 786 787 If the rate parameter is specified then the boundaries between subranges will occur 788 at that frequency through the range. If it is None then the rate of this range will 789 be used instead if it has one. If this range has no rate then the returned iterable 790 will yield only a single timerange, which is equal to this one. 791 792 If this range is unbounded before then the iterable yields a single range equal to 793 this one. 794 795 Provided that this range is bounded before and a rate has been provided somehow, the 796 first subrange yielded will have the same start clusivity as this range. If this range 797 is bounded after then the last time range yielded will have the same end clusivity as 798 this range. Any other timeranges yielded will be half-open (ie. INCLUDE_START). The 799 yielded timeranges will be contiguous and non-overlapping. Every portion of this range 800 will be covered by exactly one of the yielded ranges and all of the yielded ranges will 801 be entirely contained within this range. 802 803 :param rate: Optional rate. 804 """ 805 _rate: Fraction 806 if rate is None: 807 if self.rate is None: 808 return iter([self]) 809 else: 810 _rate = self.rate 811 else: 812 _rate = rate 813 814 if self.is_empty() or not self.bounded_before(): 815 return iter([self]) 816 817 def __inner(): 818 start: TimeValue = cast(TimeValue, self.start) 819 include_start = self.includes_start() 820 821 for tv in TimeValueRange(self, rate=_rate): 822 if tv == start: 823 continue 824 elif tv == self.end: 825 break 826 else: 827 yield TimeValueRange( 828 start.as_timestamp(), 829 tv.as_timestamp(), 830 rate=self.rate, 831 inclusivity=TimeValueRange.INCLUDE_START if include_start else TimeValueRange.EXCLUSIVE) 832 include_start = True 833 start = tv 834 835 inclusivity = TimeValueRange.EXCLUSIVE 836 if include_start: 837 inclusivity |= TimeValueRange.INCLUDE_START 838 if self.includes_end(): 839 inclusivity |= TimeValueRange.INCLUDE_END 840 841 yield TimeValueRange( 842 start.as_timestamp(), 843 self.end.as_timestamp(), 844 rate=self.rate, 845 inclusivity=inclusivity) 846 847 return __inner() 848 849 def __iter__(self) -> Iterator[TimeValue]: 850 """If this range has no rate set or is unbounded before then this will raise a ValueError. 851 852 If this range has a rate then this returns an iterator which yields TimeValues contained 853 within this range starting at the start of the range and moving forward at the range's rate. 854 """ 855 if self.is_empty(): 856 return iter([]) 857 858 if not self.bounded_before() or self.rate is None: 859 raise ValueError("{!r} is not iterable".format(self)) 860 861 first: TimeValue 862 if self.includes_start(): 863 first = cast(TimeValue, self.start) 864 else: 865 first = cast(TimeValue, self.start) + 1 866 867 last: Optional[TimeValue] 868 if not self.bounded_after(): 869 last = None 870 elif self.includes_end(): 871 last = cast(TimeValue, self.end) 872 else: 873 last = cast(TimeValue, self.end) - 1 874 875 def __inner(first: TimeValue, last: Optional[TimeValue]) -> Iterator[TimeValue]: 876 cur = first 877 while last is None or cur <= last: 878 yield cur 879 cur = cur + 1 880 881 return __inner(first, last) 882 883 def __reversed__(self) -> Iterator[TimeValue]: 884 """If this range has no rate set or is unbounded after then this will raise a ValueError. 885 886 If this range has a rate then this returns an iterator which yields TimeValues contained 887 within this range starting at the end of the range and moving backward at the range's rate. 888 """ 889 if self.is_empty(): 890 return iter([]) 891 892 if not self.bounded_after() or self.rate is None: 893 raise ValueError("reversed({!r}) is not iterable".format(self)) 894 895 first: TimeValue 896 if self.includes_end(): 897 first = cast(TimeValue, self.end) 898 else: 899 first = cast(TimeValue, self.end) - 1 900 901 last: Optional[TimeValue] 902 if not self.bounded_before(): 903 last = None 904 elif self.includes_start(): 905 last = cast(TimeValue, self.start) 906 else: 907 last = cast(TimeValue, self.start) + 1 908 909 def __inner(first: TimeValue, last: Optional[TimeValue]) -> Iterator[TimeValue]: 910 cur = first 911 while last is None or cur >= last: 912 yield cur 913 cur = cur - 1 914 915 return __inner(first, last) 916 917 def __hash__(self) -> int: 918 return hash(repr(self)) 919 920 def merge_into_ordered_ranges(self, ranges: Iterable["TimeValueRange"]) -> Iterable["TimeValueRange"]: 921 """Merge this range into an ordered list of non-overlapping non-contiguous timeranges, returning the unique 922 list of non-overlapping non-contiguous timeranges which covers the union of the ranges in the original list and 923 also this timerange. 924 925 :param ranges: An iterable yielding non-overlapping non-contiguous timeranges in chronological order 926 :returns: An iterable yielding non-overlapping non-contiguous timeranges in chronological order""" 927 new_range = self 928 for existing_range in ranges: 929 if ((existing_range.is_contiguous_with_range(new_range) or 930 existing_range.overlaps_with_range(new_range))): 931 # This means that the new and old range can be combined together 932 new_range = new_range.extend_to_encompass_range(existing_range) 933 elif existing_range.is_earlier_than_range(new_range): 934 # In this case the exiting range is entirely earlier than the new range and so won't interfere 935 # with it 936 yield existing_range 937 elif existing_range.is_later_than_range(new_range): 938 # The new_range is entirely located earlier than the existing range, we can simply add the 939 # new_range 940 yield new_range 941 new_range = existing_range 942 yield new_range 943 944 def complement_of_ordered_subranges(self, ranges: Iterable["TimeValueRange"]) -> Iterable["TimeValueRange"]: 945 """Given an iterable that yields chronologically ordered disjoint ranges yield chronologically ordered disjoint 946 subranges of this range which cover every part of this range that is *not* covered by the input ranges. 947 948 :param ranges: Iterable yielding chronologically ordered disjoint TimeValueRanges 949 :returns: Iterable yielding chronologically ordered disjoint non-empty TimeValueRanges 950 """ 951 current_timerange = self 952 953 for existing_range in ranges: 954 if not existing_range.is_empty(): 955 range_before = current_timerange.excluding_before_start_of_range(existing_range) 956 if not range_before.is_empty(): 957 yield range_before 958 current_timerange = current_timerange.excluding_up_to_end_of_range(existing_range) 959 960 if not current_timerange.is_empty(): 961 yield current_timerange
Represents a range of media unit time values on a timeline (e.g. Flow).
Supports these range types if start_or_value is a range value:
- TimeRange
- CountRange
- TimeValueRange
- Anything that implements the __mediatimerange__ magic method, but does not implement the __mediatimestamp__ magic methods.
Supports one of the following time value constructor representations if start_or_value is a range start:
- Timestamp
- int (integer media unit count)
- TimeValue
- Anything that implements the __mediatimestamp__ magic method
If state_or_value could be interpreted as a range value without
An optional rate can be set and is required and checked when there is a need to convert between representations.
The time value range can be converted to a TimeRange or CountRange using the as_*() methods.
The time value range implements the __mediatimerange__ magic method.
70 def __init__(self, start_or_value: Optional[Union[TimeValueConstructTypes, RangeConstructionTypes]], 71 end: Optional[TimeValueConstructTypes] = None, 72 inclusivity: Optional[int] = None, 73 rate: Optional[Fraction] = None, 74 *, 75 start: Optional[TimeValueConstructTypes] = None, 76 value: Optional[RangeConstructionTypes] = None): 77 """Construct a time value range 78 79 :param start_or_value: The start of the range, a range or None 80 :param end: The end of the range or None 81 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 82 :param rate: The media unit rate. 83 """ 84 self_start: Optional[TimeValue] 85 self_end: Optional[TimeValue] 86 self_rate: Optional[Fraction] = rate 87 88 if value is None and ( 89 isinstance(start_or_value, (TimeValueRange, TimeRange, CountRange)) or 90 ( 91 not isinstance(start_or_value, SupportsMediaTimestamp) and 92 isinstance(start_or_value, SupportsMediaTimeRange) 93 ) 94 ): 95 value = start_or_value 96 97 if isinstance(value, (TimeValueRange, TimeRange, CountRange)): 98 start = value.start 99 end = value.end 100 self_inclusivity = inclusivity if inclusivity is not None else value.inclusivity 101 elif ( 102 not isinstance(range, SupportsMediaTimestamp) and 103 isinstance(range, SupportsMediaTimeRange) 104 ): 105 value = mediatimerange(value) 106 start = value.start 107 end = value.end 108 self_inclusivity = inclusivity if inclusivity is not None else value.inclusivity 109 else: 110 if start_or_value is not None: 111 if not isinstance(start_or_value, (SupportsMediaTimestamp, int, TimeValue)): 112 raise ValueError(f"Unsupported type for start: {start_or_value!r}") 113 if start is not None: 114 raise ValueError("Cannot specify start or value as a positional and a keyword parameter!") 115 start = start_or_value 116 self_inclusivity = inclusivity if inclusivity is not None else TimeValueRange.INCLUSIVE 117 118 if start is not None: 119 self_start = TimeValue(start, rate=self_rate) 120 self_rate = self_start._rate 121 else: 122 self_start = None 123 124 if end is not None: 125 self_end = TimeValue(end, rate=self_rate) 126 self_rate = self_end._rate 127 else: 128 self_end = None 129 130 # Add a rate to the start if it was available in end 131 if self_rate and self_start is not None: 132 self_start = TimeValue(self_start, rate=self_rate) 133 134 # normalise the representation to always have an inclusive start if bounded 135 if self_start is not None and ( 136 (self_rate or isinstance(self_start.value, int)) and 137 not (self_inclusivity & TimeValueRange.INCLUDE_START)): 138 self_start = self_start + 1 139 self_inclusivity |= TimeValueRange.INCLUDE_START 140 141 # normalise the representation to always have an exclusive end if bounded 142 if self_end is not None and ( 143 (self_rate or isinstance(self_end.value, int)) and 144 (self_inclusivity & TimeValueRange.INCLUDE_END)): 145 self_end = self_end + 1 146 self_inclusivity &= ~TimeValueRange.INCLUDE_END 147 148 # Normalise the 'never' cases 149 if self_start is not None and self_end is not None: 150 if self_start > self_end or (self_start == self_end and self_inclusivity != TimeValueRange.INCLUSIVE): 151 self_start = TimeValue(0, self_rate) 152 self_end = TimeValue(0, self_rate) 153 self_inclusivity = TimeValueRange.EXCLUSIVE 154 155 # Normalise the 'eternity' cases 156 if self_start is None and self_end is None: 157 self_inclusivity = TimeValueRange.INCLUSIVE 158 159 # set attributes using dict to workaround immutability 160 self.__dict__['_start'] = self_start 161 self.__dict__['_end'] = self_end 162 self.__dict__['_inclusivity'] = self_inclusivity 163 self.__dict__['_rate'] = self_rate 164 165 # provide attribute type info given that attributes are not set directly 166 self._start: Optional[TimeValue] 167 self._end: Optional[TimeValue] 168 self._inclusivity: int 169 self._rate: Optional[Fraction]
Construct a time value range
Parameters
- start_or_value: The start of the range, a range or None
- end: The end of the range or None
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
- rate: The media unit rate.
171 @classmethod 172 def from_start(cls, start: TimeValueConstructTypes, 173 inclusivity: int = INCLUSIVE, 174 rate: Optional[Fraction] = None) -> "TimeValueRange": 175 """Construct a range starting at start with no end 176 177 :param start: A time value or type supported by TimeValue 178 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 179 :param rate: The media unit rate.""" 180 return cls(start, None, inclusivity=inclusivity, rate=rate)
Construct a range starting at start with no end
Parameters
- start: A time value or type supported by TimeValue
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
- rate: The media unit rate.
182 @classmethod 183 def from_end(cls, end: TimeValueConstructTypes, 184 inclusivity: int = INCLUSIVE, 185 rate: Optional[Fraction] = None) -> "TimeValueRange": 186 """Construct a range ending at end with no start 187 188 :param end: A time value or type supported by TimeValue 189 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 190 :param rate: The media unit rate.""" 191 return cls(None, end, inclusivity=inclusivity, rate=rate)
Construct a range ending at end with no start
Parameters
- end: A time value or type supported by TimeValue
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
- rate: The media unit rate.
193 @classmethod 194 def from_start_length(cls, start: TimeValueConstructTypes, 195 length: TimeValueConstructTypes, 196 inclusivity: int = INCLUSIVE, 197 rate: Optional[Fraction] = None) -> "TimeValueRange": 198 """Construct a range starting at start and ending at (start + length) 199 200 :param start: A time value or type supported by TimeValue 201 :param length: A time value or type supported by TimeValue, which must be non-negative 202 :param inclusivity: a combination of flags INCLUDE_START and INCLUDE_END 203 :param rate: The media unit rate. 204 205 :raises: ValueError if the length is negative""" 206 length = _perform_all_conversions(length) 207 if length < 0: 208 raise ValueError("Length must be non-negative") 209 210 end = TimeValue(start, rate=rate) + length 211 return cls(start, end, inclusivity=inclusivity, rate=rate)
Construct a range starting at start and ending at (start + length)
Parameters
- start: A time value or type supported by TimeValue
- length: A time value or type supported by TimeValue, which must be non-negative
- inclusivity: a combination of flags INCLUDE_START and INCLUDE_END
- rate: The media unit rate.
Raises
- ValueError if the length is negative
213 @classmethod 214 def eternity(cls, rate: Optional[Fraction] = None) -> "TimeValueRange": 215 """Return an unbounded range covering all time""" 216 return cls(None, None, rate=rate)
Return an unbounded range covering all time
218 @classmethod 219 def never(cls, rate: Optional[Fraction] = None) -> "TimeValueRange": 220 """Return a range covering no time 221 222 :param rate: The media unit rate.""" 223 return cls(TimeValue(0), TimeValue(0), inclusivity=TimeValueRange.EXCLUSIVE, rate=rate)
Return a range covering no time
Parameters
- rate: The media unit rate.
225 @classmethod 226 def from_single_value(cls, value: TimeValueConstructTypes, 227 rate: Optional[Fraction] = None) -> "TimeValueRange": 228 """Construct a range containing only a single time value 229 230 :param value: A time value or type supported by TimeValue 231 :param rate: The media unit rate.""" 232 return cls(value, value, inclusivity=TimeValueRange.INCLUSIVE, rate=rate)
Construct a range containing only a single time value
Parameters
- value: A time value or type supported by TimeValue
- rate: The media unit rate.
234 @classmethod 235 def from_str(cls, s: str, rate: Optional[Fraction] = None) -> "TimeValueRange": 236 """Convert a string to a range. 237 238 Valid ranges are: 239 [<tv>_<tv>] 240 [<tv>_<tv>) 241 (<tv>_<tv>] 242 (<tv>_<tv>) 243 [<tv>] 244 <tv>_<tv> 245 <tv> 246 () 247 248 where <tv> is an integer or an empty string. 249 250 The meaning of these is relatively simple: [ indicates including the start time value, 251 ( indicates excluding it, ] indicates including the end time value, and ) indicates excluding it. 252 If brackets are ommitted entirely then this is taken as an inclusive range at both ends. 253 Omitting a time value indicates that there is no bound on that end (ie. the range goes on forever), 254 including only a single time value by itself indicates a range containing exactly that one time value. 255 Finally the string "()" represents the empty range. 256 257 :param s: The string to process 258 :param rate: The media unit rate. 259 """ 260 m = re.match(r'(\[|\()?([^_\)\]]+)?(_([^_\)\]]+)?)?(\]|\))?(@([^\/]+(\/.+)?))?', s) 261 262 if m is None: 263 raise ValueError("Invalid TimeValueRange string") 264 265 inc = TimeValueRange.INCLUSIVE 266 if m.group(1) == "(": 267 inc &= ~TimeValueRange.INCLUDE_START 268 if m.group(5) == ")": 269 inc &= ~TimeValueRange.INCLUDE_END 270 271 start_str = m.group(2) 272 end_str = m.group(4) 273 rate_str = m.group(7) 274 275 if rate is None and rate_str is not None: 276 rate = Fraction(rate_str) 277 278 if start_str is not None: 279 start = TimeValue.from_str(start_str, rate=rate) 280 else: 281 start = None 282 if end_str is not None: 283 end = TimeValue.from_str(end_str, rate=rate) 284 else: 285 end = None 286 287 if start is None and end is None: 288 # Ie. we have no first or second value 289 if m.group(3) is not None: 290 # ie. we have a '_' character 291 return cls.eternity() 292 else: 293 # We have no '_' character, so the whole range is empty 294 return cls.never() 295 elif start is not None and end is None and m.group(3) is None: 296 return cls.from_single_value(start) 297 else: 298 return cls(start, end, inclusivity=inc, rate=rate)
Convert a string to a range.
Valid ranges are:
[
where
The meaning of these is relatively simple: [ indicates including the start time value, ( indicates excluding it, ] indicates including the end time value, and ) indicates excluding it. If brackets are ommitted entirely then this is taken as an inclusive range at both ends. Omitting a time value indicates that there is no bound on that end (ie. the range goes on forever), including only a single time value by itself indicates a range containing exactly that one time value. Finally the string "()" represents the empty range.
Parameters
- s: The string to process
- rate: The media unit rate.
300 def as_timerange(self) -> TimeRange: 301 """Returns a TimeRange representation.""" 302 start = self._start.as_timestamp() if self._start is not None else None 303 end = self._end.as_timestamp() if self._end is not None else None 304 inclusivity = self._inclusivity 305 return TimeRange(start, end, inclusivity=TimeRange.Inclusivity(inclusivity))
Returns a TimeRange representation.
310 def as_count_range(self) -> CountRange: 311 """Returns a CountRange representation.""" 312 start = self._start.as_count() if self._start is not None else None 313 end = self._end.as_count() if self._end is not None else None 314 inclusivity = self._inclusivity 315 return CountRange(start, end, inclusivity=inclusivity)
Returns a CountRange representation.
333 def length_as_timestamp(self) -> Union[Timestamp, float]: 334 """Returns the range length as a Timestamp or the float value infinity""" 335 return self.as_timerange().length
Returns the range length as a Timestamp or the float value infinity
337 @deprecated(version="4.0.0", 338 reason="This method is deprecated. TimeOffset has been merged into Timestamp. " 339 "Use length_as_timestamp() instead") 340 def length_as_timeoffset(self) -> Union[TimeOffset, float]: 341 """Legacy method that returns the range length as a TimeOffset or the float value infinity""" 342 return self.length_as_timestamp()
Legacy method that returns the range length as a TimeOffset or the float value infinity
344 def length_as_count(self) -> Union[int, float]: 345 """Returns the range length as an media unit count""" 346 return self.as_count_range().length
Returns the range length as an media unit count
348 def bounded_before(self) -> bool: 349 """Return true if the start of the range is bounded""" 350 return self._start is not None
Return true if the start of the range is bounded
352 def bounded_after(self) -> bool: 353 """Return true if the end of the range is bounded""" 354 return self._end is not None
Return true if the end of the range is bounded
356 def unbounded(self) -> bool: 357 """Return true if neither the start or end of the range is bounded""" 358 return self._start is None and self._end is None
Return true if neither the start or end of the range is bounded
360 def includes_start(self) -> bool: 361 """Return true if the start is inclusive""" 362 return (self._inclusivity & TimeValueRange.INCLUDE_START) != 0
Return true if the start is inclusive
364 def includes_end(self) -> bool: 365 """Return true if the end is inclusive""" 366 return (self._inclusivity & TimeValueRange.INCLUDE_END) != 0
Return true if the end is inclusive
368 def finite(self) -> bool: 369 """Return true if the range is finite""" 370 return (self._start is not None and self._end is not None)
Return true if the range is finite
372 def contains_subrange(self, other: RangeConstructionTypes) -> bool: 373 """Returns true if the range supplied lies entirely inside this range""" 374 other = self._as_time_value_range(other) 375 376 return ((not self.is_empty()) and 377 (other.is_empty() or 378 (self._start is None or (other._start is not None and self._start <= other._start)) and 379 (self._end is None or (other._end is not None and self._end >= other._end)) and 380 (not ((self._start is not None) and 381 (other._start is not None) and 382 (self._start == other._start) and 383 (self._inclusivity & TimeValueRange.INCLUDE_START == 0) and 384 (other._inclusivity & TimeValueRange.INCLUDE_START != 0))) and 385 (not ((self._end is not None) and 386 (other._end is not None) and 387 (self._end == other._end) and 388 (self._inclusivity & TimeValueRange.INCLUDE_END == 0) and 389 (other._inclusivity & TimeValueRange.INCLUDE_END != 0)))))
Returns true if the range supplied lies entirely inside this range
391 def to_str(self, with_inclusivity_markers: bool = True, 392 include_rate: bool = True) -> str: 393 """Convert to [<value>_<value>] format, 394 usually the opening and closing delimiters are set to [ or ] for inclusive and ( or ) for exclusive ranges. 395 Unbounded ranges have no marker attached to them. 396 397 :param with_inclusivity_markers: if set to False do not include parentheses/brackecount 398 :param include_rate: If True and there is a non-zero media rate then include the media rate suffix string 399 """ 400 if self.is_empty(): 401 if with_inclusivity_markers: 402 return "()" 403 else: 404 return "" 405 elif self._start is not None and self._end is not None and self._start == self._end: 406 if with_inclusivity_markers: 407 return "[" + self._start.to_str(False) + "]" 408 else: 409 return self._start.to_str(False) 410 411 if with_inclusivity_markers: 412 brackets = [("(", ")"), ("[", ")"), ("(", "]"), ("[", "]")][self._inclusivity] 413 else: 414 brackets = ("", "") 415 416 result = '_'.join([ 417 (brackets[0] + self._start.to_str(False)) if self._start is not None else '', 418 (self._end.to_str(False) + brackets[1]) if self._end is not None else '' 419 ]) 420 if include_rate and self._rate: 421 result += "@{}".format(self._rate) 422 return result
Convert to [
Parameters
- with_inclusivity_markers: if set to False do not include parentheses/brackecount
- include_rate: If True and there is a non-zero media rate then include the media rate suffix string
424 def intersect_with(self, other: RangeConstructionTypes) -> "TimeValueRange": 425 """Return a range which represents the intersection of this range with another""" 426 other = self._as_time_value_range(other) 427 428 if self.is_empty() or other.is_empty(): 429 return TimeValueRange.never() 430 431 start = self._start 432 if other._start is not None and (self._start is None or self._start < other._start): 433 start = other._start 434 end = self._end 435 if other._end is not None and (self._end is None or self._end > other._end): 436 end = other._end 437 438 inclusivity = TimeValueRange.EXCLUSIVE 439 if start is None or (start in self and start in other): 440 inclusivity |= TimeValueRange.INCLUDE_START 441 if end is None or (end in self and end in other): 442 inclusivity |= TimeValueRange.INCLUDE_END 443 444 if start is not None and end is not None and start > end: 445 return TimeValueRange.never() 446 447 return TimeValueRange(start, end, inclusivity)
Return a range which represents the intersection of this range with another
449 def starts_inside_range(self, other: RangeConstructionTypes) -> bool: 450 """Returns true if the start of this range is located inside the other.""" 451 other = self._as_time_value_range(other) 452 453 return (not self.is_empty() and 454 not other.is_empty() and 455 ((self.bounded_before() and cast(TimeValue, self._start) in other and 456 (not (other.bounded_after() and self._start == other._end and not self.includes_start()))) or 457 (self.bounded_before() and other.bounded_before() and self._start == other._start and 458 (not (self.includes_start() and not other.includes_start()))) or 459 (not self.bounded_before() and not other.bounded_before())))
Returns true if the start of this range is located inside the other.
461 def ends_inside_range(self, other: RangeConstructionTypes) -> bool: 462 """Returns true if the end of this range is located inside the other.""" 463 other = self._as_time_value_range(other) 464 465 return (not self.is_empty() and 466 not other.is_empty() and 467 ((self.bounded_after() and cast(TimeValue, self._end) in other and 468 (not (other.bounded_before() and self._end == other._start and not self.includes_end()))) or 469 (self.bounded_after() and other.bounded_after() and self._end == other._end and 470 (not (self.includes_end() and not other.includes_end()))) or 471 (not self.bounded_after() and not other.bounded_after())))
Returns true if the end of this range is located inside the other.
473 def is_earlier_than_range(self, other: RangeConstructionTypes) -> bool: 474 """Returns true if this range ends earlier than the start of the other.""" 475 other = self._as_time_value_range(other) 476 477 return (not self.is_empty() and 478 not other.is_empty() and 479 (other.bounded_before() and 480 other._start is not None) and # redundant but is for type checking 481 (self.bounded_after() and 482 self._end is not None) and # redundant but is for type checking 483 (self._end < other._start or 484 (self._end == other._start and 485 not (self.includes_end() and other.includes_start()))))
Returns true if this range ends earlier than the start of the other.
487 def is_later_than_range(self, other: RangeConstructionTypes) -> bool: 488 """Returns true if this range starts later than the end of the other.""" 489 other = self._as_time_value_range(other) 490 491 return (not self.is_empty() and 492 not other.is_empty() and 493 (other.bounded_after() and 494 other._end is not None) and # redundant but is for type checking 495 (self.bounded_before() and 496 self._start is not None) and # redundant but is for type checking 497 (self._start > other._end or 498 (self._start == other._end and 499 not (self.includes_start() and other.includes_end()))))
Returns true if this range starts later than the end of the other.
501 def starts_earlier_than_range(self, other: RangeConstructionTypes) -> bool: 502 """Returns true if this range starts earlier than the start of the other.""" 503 other = self._as_time_value_range(other) 504 505 return (not self.is_empty() and 506 not other.is_empty() and 507 (other.bounded_before() and 508 other._start is not None) and # redundant but is for type checking 509 ((not self.bounded_before() or 510 self._start is None) or # redundant but is for type checking 511 (self._start < other._start or 512 (self._start == other._start and 513 self.includes_start() and 514 not other.includes_start()))))
Returns true if this range starts earlier than the start of the other.
516 def starts_later_than_range(self, other: RangeConstructionTypes) -> bool: 517 """Returns true if this range starts later than the start of the other.""" 518 other = self._as_time_value_range(other) 519 520 return (not self.is_empty() and 521 not other.is_empty() and 522 (self.bounded_before() and 523 self._start is not None) and # redundant but is for type checking 524 ((not other.bounded_before() or 525 other._start is None) or # redundant but is for type checking 526 (self._start > other._start or 527 (self._start == other._start and 528 (not self.includes_start() and other.includes_start())))))
Returns true if this range starts later than the start of the other.
530 def ends_earlier_than_range(self, other: RangeConstructionTypes) -> bool: 531 """Returns true if this range ends earlier than the end of the other.""" 532 other = self._as_time_value_range(other) 533 534 return (not self.is_empty() and 535 not other.is_empty() and 536 (self.bounded_after() and 537 self._end is not None) and # redundant but is for type checking 538 ((not other.bounded_after() or 539 other._end is None) or # redundant but is for type checking 540 (self._end < other._end or 541 (self._end == other._end and 542 (not self.includes_end() and other.includes_end())))))
Returns true if this range ends earlier than the end of the other.
544 def ends_later_than_range(self, other: RangeConstructionTypes) -> bool: 545 """Returns true if this range ends later than the end of the other.""" 546 other = self._as_time_value_range(other) 547 548 return (not self.is_empty() and 549 not other.is_empty() and 550 (other.bounded_after() and 551 other._end is not None) and # redundant but is for type checking 552 ((not self.bounded_after() or 553 self._end is None) or # redundant but is for type checking 554 (self._end > other._end or 555 (self._end == other._end and 556 self.includes_end() and 557 not other.includes_end()))))
Returns true if this range ends later than the end of the other.
559 def overlaps_with_range(self, other: RangeConstructionTypes) -> bool: 560 """Returns true if this range and the other overlap.""" 561 other = self._as_time_value_range(other) 562 563 return (not self.is_earlier_than_range(other) and not self.is_later_than_range(other))
Returns true if this range and the other overlap.
565 def is_contiguous_with_range(self, other: RangeConstructionTypes) -> bool: 566 """Returns true if the union of this range and the other would be a valid range""" 567 other = self._as_time_value_range(other) 568 569 return (self.overlaps_with_range(other) or 570 (self.is_earlier_than_range(other) and 571 self._end == other._start and 572 (self.includes_end() or other.includes_start())) or 573 (self.is_later_than_range(other) and 574 self._start == other._end and 575 (self.includes_start() or other.includes_end())))
Returns true if the union of this range and the other would be a valid range
577 def union_with_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 578 """Returns the union of this range and the other. 579 :raises: ValueError if the ranges are not contiguous.""" 580 other = self._as_time_value_range(other) 581 582 if not self.is_contiguous_with_range(other): 583 raise ValueError("TimeValueRanges {} and {} are not contiguous, so cannot take the union.".format( 584 self, other)) 585 586 return self.extend_to_encompass_range(other)
Returns the union of this range and the other.
Raises
- ValueError if the ranges are not contiguous.
588 def extend_to_encompass_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 589 """Returns the range that encompasses this and the other range.""" 590 other = self._as_time_value_range(other) 591 592 if self.is_empty(): 593 return other 594 595 if other.is_empty(): 596 return self 597 598 inclusivity = TimeValueRange.EXCLUSIVE 599 if self._start == other._start: 600 start = self._start 601 inclusivity |= ((self._inclusivity | other._inclusivity) & TimeValueRange.INCLUDE_START) 602 elif self.starts_earlier_than_range(other): 603 start = self._start 604 inclusivity |= (self._inclusivity & TimeValueRange.INCLUDE_START) 605 else: 606 start = other._start 607 inclusivity |= (other._inclusivity & TimeValueRange.INCLUDE_START) 608 609 if self._end == other._end: 610 end = self._end 611 inclusivity |= ((self._inclusivity | other._inclusivity) & TimeValueRange.INCLUDE_END) 612 elif self.ends_later_than_range(other): 613 end = self._end 614 inclusivity |= (self._inclusivity & TimeValueRange.INCLUDE_END) 615 else: 616 end = other._end 617 inclusivity |= (other._inclusivity & TimeValueRange.INCLUDE_END) 618 619 return TimeValueRange(start, end, inclusivity)
Returns the range that encompasses this and the other range.
621 def split_at(self, value: TimeValueConstructTypes) -> Tuple["TimeValueRange", "TimeValueRange"]: 622 """Splits a range at a specified value. 623 624 It is guaranteed that the splitting point will be in the *second* TimeValueRange returned, and not in the first. 625 626 :param value: the time value to split at 627 :returns: A pair of TimeValueRange objects 628 :raises: ValueError if value not in self""" 629 value = self._as_time_value(value) 630 631 if value not in self: 632 raise ValueError("Cannot split range {} at {}".format(self, value)) 633 634 return (TimeValueRange(self._start, 635 value, 636 (self._inclusivity & TimeValueRange.INCLUDE_START)), 637 TimeValueRange(value, 638 self._end, 639 TimeValueRange.INCLUDE_START | (self._inclusivity & TimeValueRange.INCLUDE_END)))
Splits a range at a specified value.
It is guaranteed that the splitting point will be in the second TimeValueRange returned, and not in the first.
Parameters
- value: the time value to split at :returns: A pair of TimeValueRange objects
Raises
- ValueError if value not in self
641 def split_after(self, value: TimeValueConstructTypes) -> Tuple["TimeValueRange", "TimeValueRange"]: 642 """Splits a range after a specified value. 643 644 It is guaranteed that the splitting point will be in the *first* TimeValueRange returned, and not in the second. 645 646 :param value: the time value to split at 647 :returns: A pair of TimeValueRange objects 648 :raises: ValueError if value not in self""" 649 value = self._as_time_value(value) 650 651 if value not in self: 652 raise ValueError("Cannot split range {} at {}".format(self, value)) 653 654 return (TimeValueRange(self._start, 655 value, 656 TimeValueRange.INCLUDE_END | (self._inclusivity & TimeValueRange.INCLUDE_START)), 657 TimeValueRange(value, 658 self._end, 659 (self._inclusivity & TimeValueRange.INCLUDE_END)))
Splits a range after a specified value.
It is guaranteed that the splitting point will be in the first TimeValueRange returned, and not in the second.
Parameters
- value: the time value to split at :returns: A pair of TimeValueRange objects
Raises
- ValueError if value not in self
661 def excluding_up_to_end_of_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 662 """Returns the portion of this timerange which is not earlier than or contained in the 663 given timerange. 664 """ 665 other = self._as_time_value_range(other) 666 667 if other.is_empty() or other.is_earlier_than_range(self): 668 return self 669 elif not other.bounded_after() or cast(TimeValue, other.end) not in self: 670 return TimeValueRange.never() 671 else: 672 if other.includes_end(): 673 return self.split_after(cast(TimeValue, other.end))[1] 674 else: 675 return self.split_at(cast(TimeValue, other.end))[1]
Returns the portion of this timerange which is not earlier than or contained in the given timerange.
677 def excluding_before_start_of_range(self, other: RangeConstructionTypes) -> "TimeValueRange": 678 """Returns the portion of this timerange which is not later than or contained in the 679 given timerange. 680 """ 681 other = self._as_time_value_range(other) 682 683 if other.is_empty() or other.is_later_than_range(self): 684 return self 685 elif not other.bounded_before() or cast(TimeValue, other.start) not in self: 686 return TimeValueRange.never() 687 else: 688 assert (isinstance(other.start, TimeValue)) 689 if other.includes_start(): 690 return self.split_at(other.start)[0] 691 else: 692 return self.split_after(other.start)[0]
Returns the portion of this timerange which is not later than or contained in the given timerange.
694 def range_between(self, other: RangeConstructionTypes) -> "TimeValueRange": 695 """Returns the range between the end of the earlier range and the start of the later one""" 696 other = self._as_time_value_range(other) 697 698 if self.is_contiguous_with_range(other): 699 return TimeValueRange.never() 700 elif self.is_earlier_than_range(other): 701 inclusivity = TimeValueRange.EXCLUSIVE 702 if not self.includes_end(): 703 inclusivity |= TimeValueRange.INCLUDE_START 704 if not other.includes_start(): 705 inclusivity |= TimeValueRange.INCLUDE_END 706 return TimeValueRange(self._end, other._start, inclusivity) 707 else: 708 inclusivity = TimeValueRange.EXCLUSIVE 709 if not self.includes_start(): 710 inclusivity |= TimeValueRange.INCLUDE_END 711 if not other.includes_end(): 712 inclusivity |= TimeValueRange.INCLUDE_START 713 return TimeValueRange(other._end, self._start, inclusivity)
Returns the range between the end of the earlier range and the start of the later one
715 def is_empty(self) -> bool: 716 """Returns true on any empty range.""" 717 return (self._start is not None and 718 self._end is not None and 719 self._start == self._end and 720 self._inclusivity != TimeValueRange.INCLUSIVE)
Returns true on any empty range.
780 def subranges(self, rate: Optional[Fraction] = None) -> Iterator["TimeValueRange"]: 781 """Divide this range naturally into subranges. 782 783 (nb. Ranges with a rate set are automatically normalised to half-open, which may lead 784 to unexpected results when using this method with some ranges that have a rate but were 785 not originally defined as half-open) 786 787 If the rate parameter is specified then the boundaries between subranges will occur 788 at that frequency through the range. If it is None then the rate of this range will 789 be used instead if it has one. If this range has no rate then the returned iterable 790 will yield only a single timerange, which is equal to this one. 791 792 If this range is unbounded before then the iterable yields a single range equal to 793 this one. 794 795 Provided that this range is bounded before and a rate has been provided somehow, the 796 first subrange yielded will have the same start clusivity as this range. If this range 797 is bounded after then the last time range yielded will have the same end clusivity as 798 this range. Any other timeranges yielded will be half-open (ie. INCLUDE_START). The 799 yielded timeranges will be contiguous and non-overlapping. Every portion of this range 800 will be covered by exactly one of the yielded ranges and all of the yielded ranges will 801 be entirely contained within this range. 802 803 :param rate: Optional rate. 804 """ 805 _rate: Fraction 806 if rate is None: 807 if self.rate is None: 808 return iter([self]) 809 else: 810 _rate = self.rate 811 else: 812 _rate = rate 813 814 if self.is_empty() or not self.bounded_before(): 815 return iter([self]) 816 817 def __inner(): 818 start: TimeValue = cast(TimeValue, self.start) 819 include_start = self.includes_start() 820 821 for tv in TimeValueRange(self, rate=_rate): 822 if tv == start: 823 continue 824 elif tv == self.end: 825 break 826 else: 827 yield TimeValueRange( 828 start.as_timestamp(), 829 tv.as_timestamp(), 830 rate=self.rate, 831 inclusivity=TimeValueRange.INCLUDE_START if include_start else TimeValueRange.EXCLUSIVE) 832 include_start = True 833 start = tv 834 835 inclusivity = TimeValueRange.EXCLUSIVE 836 if include_start: 837 inclusivity |= TimeValueRange.INCLUDE_START 838 if self.includes_end(): 839 inclusivity |= TimeValueRange.INCLUDE_END 840 841 yield TimeValueRange( 842 start.as_timestamp(), 843 self.end.as_timestamp(), 844 rate=self.rate, 845 inclusivity=inclusivity) 846 847 return __inner()
Divide this range naturally into subranges.
(nb. Ranges with a rate set are automatically normalised to half-open, which may lead to unexpected results when using this method with some ranges that have a rate but were not originally defined as half-open)
If the rate parameter is specified then the boundaries between subranges will occur at that frequency through the range. If it is None then the rate of this range will be used instead if it has one. If this range has no rate then the returned iterable will yield only a single timerange, which is equal to this one.
If this range is unbounded before then the iterable yields a single range equal to this one.
Provided that this range is bounded before and a rate has been provided somehow, the first subrange yielded will have the same start clusivity as this range. If this range is bounded after then the last time range yielded will have the same end clusivity as this range. Any other timeranges yielded will be half-open (ie. INCLUDE_START). The yielded timeranges will be contiguous and non-overlapping. Every portion of this range will be covered by exactly one of the yielded ranges and all of the yielded ranges will be entirely contained within this range.
Parameters
- rate: Optional rate.
920 def merge_into_ordered_ranges(self, ranges: Iterable["TimeValueRange"]) -> Iterable["TimeValueRange"]: 921 """Merge this range into an ordered list of non-overlapping non-contiguous timeranges, returning the unique 922 list of non-overlapping non-contiguous timeranges which covers the union of the ranges in the original list and 923 also this timerange. 924 925 :param ranges: An iterable yielding non-overlapping non-contiguous timeranges in chronological order 926 :returns: An iterable yielding non-overlapping non-contiguous timeranges in chronological order""" 927 new_range = self 928 for existing_range in ranges: 929 if ((existing_range.is_contiguous_with_range(new_range) or 930 existing_range.overlaps_with_range(new_range))): 931 # This means that the new and old range can be combined together 932 new_range = new_range.extend_to_encompass_range(existing_range) 933 elif existing_range.is_earlier_than_range(new_range): 934 # In this case the exiting range is entirely earlier than the new range and so won't interfere 935 # with it 936 yield existing_range 937 elif existing_range.is_later_than_range(new_range): 938 # The new_range is entirely located earlier than the existing range, we can simply add the 939 # new_range 940 yield new_range 941 new_range = existing_range 942 yield new_range
Merge this range into an ordered list of non-overlapping non-contiguous timeranges, returning the unique list of non-overlapping non-contiguous timeranges which covers the union of the ranges in the original list and also this timerange.
Parameters
- ranges: An iterable yielding non-overlapping non-contiguous timeranges in chronological order :returns: An iterable yielding non-overlapping non-contiguous timeranges in chronological order
944 def complement_of_ordered_subranges(self, ranges: Iterable["TimeValueRange"]) -> Iterable["TimeValueRange"]: 945 """Given an iterable that yields chronologically ordered disjoint ranges yield chronologically ordered disjoint 946 subranges of this range which cover every part of this range that is *not* covered by the input ranges. 947 948 :param ranges: Iterable yielding chronologically ordered disjoint TimeValueRanges 949 :returns: Iterable yielding chronologically ordered disjoint non-empty TimeValueRanges 950 """ 951 current_timerange = self 952 953 for existing_range in ranges: 954 if not existing_range.is_empty(): 955 range_before = current_timerange.excluding_before_start_of_range(existing_range) 956 if not range_before.is_empty(): 957 yield range_before 958 current_timerange = current_timerange.excluding_up_to_end_of_range(existing_range) 959 960 if not current_timerange.is_empty(): 961 yield current_timerange
Given an iterable that yields chronologically ordered disjoint ranges yield chronologically ordered disjoint subranges of this range which cover every part of this range that is not covered by the input ranges.
Parameters
- ranges: Iterable yielding chronologically ordered disjoint TimeValueRanges :returns: Iterable yielding chronologically ordered disjoint non-empty TimeValueRanges