Source code for oss2.xml_utils

# -*- coding: utf-8 -*-

"""
oss2.xml_utils
~~~~~~~~~~~~~~

XML处理相关。

主要包括两类接口:
    - parse_开头的函数:用来解析服务器端返回的XML
    - to_开头的函数:用来生成发往服务器端的XML

"""

import xml.etree.ElementTree as ElementTree

from .models import (SimplifiedObjectInfo,
                     SimplifiedBucketInfo,
                     PartInfo,
                     MultipartUploadInfo,
                     LifecycleRule,
                     LifecycleExpiration,
                     CorsRule,
                     LiveChannelInfoTarget,
                     LiveChannelInfo,
                     LiveRecord,
                     LiveChannelVideoStat,
                     LiveChannelAudioStat,
                     Owner,
                     AccessControlList,
                     AbortMultipartUpload,
                     StorageTransition)

from .compat import urlunquote, to_unicode, to_string
from .utils import iso8601_to_unixtime, date_to_iso8601, iso8601_to_date


def _find_tag(parent, path):
    child = parent.find(path)
    if child is None:
        raise RuntimeError("parse xml: " + path + " could not be found under " + parent.tag)

    if child.text is None:
        return ''

    return to_string(child.text)


def _find_bool(parent, path):
    text = _find_tag(parent, path)
    if text == 'true':
        return True
    elif text == 'false':
        return False
    else:
        raise RuntimeError("parse xml: value of " + path + " is not a boolean under " + parent.tag)


def _find_int(parent, path):
    return int(_find_tag(parent, path))


def _find_object(parent, path, url_encoded):
    name = _find_tag(parent, path)
    if url_encoded:
        return urlunquote(name)
    else:
        return name


def _find_all_tags(parent, tag):
    return [to_string(node.text) or '' for node in parent.findall(tag)]


def _is_url_encoding(root):
    node = root.find('EncodingType')
    if node is not None and to_string(node.text) == 'url':
        return True
    else:
        return False


def _node_to_string(root):
    return ElementTree.tostring(root, encoding='utf-8')


def _add_node_list(parent, tag, entries):
    for e in entries:
        _add_text_child(parent, tag, e)


def _add_text_child(parent, tag, text):
    ElementTree.SubElement(parent, tag).text = to_unicode(text)

def _add_node_child(parent, tag):
    return ElementTree.SubElement(parent, tag)

[docs]def parse_list_objects(result, body): root = ElementTree.fromstring(body) url_encoded = _is_url_encoding(root) result.is_truncated = _find_bool(root, 'IsTruncated') if result.is_truncated: result.next_marker = _find_object(root, 'NextMarker', url_encoded) for contents_node in root.findall('Contents'): result.object_list.append(SimplifiedObjectInfo( _find_object(contents_node, 'Key', url_encoded), iso8601_to_unixtime(_find_tag(contents_node, 'LastModified')), _find_tag(contents_node, 'ETag').strip('"'), _find_tag(contents_node, 'Type'), int(_find_tag(contents_node, 'Size')), _find_tag(contents_node, 'StorageClass') )) for prefix_node in root.findall('CommonPrefixes'): result.prefix_list.append(_find_object(prefix_node, 'Prefix', url_encoded)) return result
[docs]def parse_list_buckets(result, body): root = ElementTree.fromstring(body) if root.find('IsTruncated') is None: result.is_truncated = False else: result.is_truncated = _find_bool(root, 'IsTruncated') if result.is_truncated: result.next_marker = _find_tag(root, 'NextMarker') for bucket_node in root.findall('Buckets/Bucket'): result.buckets.append(SimplifiedBucketInfo( _find_tag(bucket_node, 'Name'), _find_tag(bucket_node, 'Location'), iso8601_to_unixtime(_find_tag(bucket_node, 'CreationDate')) ))
[docs]def parse_init_multipart_upload(result, body): root = ElementTree.fromstring(body) result.upload_id = _find_tag(root, 'UploadId') return result
[docs]def parse_list_multipart_uploads(result, body): root = ElementTree.fromstring(body) url_encoded = _is_url_encoding(root) result.is_truncated = _find_bool(root, 'IsTruncated') result.next_key_marker = _find_object(root, 'NextKeyMarker', url_encoded) result.next_upload_id_marker = _find_tag(root, 'NextUploadIdMarker') for upload_node in root.findall('Upload'): result.upload_list.append(MultipartUploadInfo( _find_object(upload_node, 'Key', url_encoded), _find_tag(upload_node, 'UploadId'), iso8601_to_unixtime(_find_tag(upload_node, 'Initiated')) )) for prefix_node in root.findall('CommonPrefixes'): result.prefix_list.append(_find_object(prefix_node, 'Prefix', url_encoded)) return result
[docs]def parse_list_parts(result, body): root = ElementTree.fromstring(body) result.is_truncated = _find_bool(root, 'IsTruncated') result.next_marker = _find_tag(root, 'NextPartNumberMarker') for part_node in root.findall('Part'): result.parts.append(PartInfo( _find_int(part_node, 'PartNumber'), _find_tag(part_node, 'ETag').strip('"'), size=_find_int(part_node, 'Size'), last_modified=iso8601_to_unixtime(_find_tag(part_node, 'LastModified')) )) return result
[docs]def parse_batch_delete_objects(result, body): if not body: return result root = ElementTree.fromstring(body) url_encoded = _is_url_encoding(root) for deleted_node in root.findall('Deleted'): result.deleted_keys.append(_find_object(deleted_node, 'Key', url_encoded)) return result
[docs]def parse_get_bucket_acl(result, body): root = ElementTree.fromstring(body) result.acl = _find_tag(root, 'AccessControlList/Grant') return result
parse_get_object_acl = parse_get_bucket_acl
[docs]def parse_get_bucket_location(result, body): result.location = to_string(ElementTree.fromstring(body).text) return result
[docs]def parse_get_bucket_logging(result, body): root = ElementTree.fromstring(body) if root.find('LoggingEnabled/TargetBucket') is not None: result.target_bucket = _find_tag(root, 'LoggingEnabled/TargetBucket') if root.find('LoggingEnabled/TargetPrefix') is not None: result.target_prefix = _find_tag(root, 'LoggingEnabled/TargetPrefix') return result
[docs]def parse_get_bucket_stat(result, body): root = ElementTree.fromstring(body) result.storage_size_in_bytes = _find_int(root, 'Storage') result.object_count = _find_int(root, 'ObjectCount') result.multi_part_upload_count = _find_int(root, 'MultipartUploadCount') return result
[docs]def parse_get_bucket_info(result, body): root = ElementTree.fromstring(body) result.name = _find_tag(root, 'Bucket/Name') result.creation_date = _find_tag(root, 'Bucket/CreationDate') result.storage_class = _find_tag(root, 'Bucket/StorageClass') result.extranet_endpoint = _find_tag(root, 'Bucket/ExtranetEndpoint') result.intranet_endpoint = _find_tag(root, 'Bucket/IntranetEndpoint') result.location = _find_tag(root, 'Bucket/Location') result.owner = Owner(_find_tag(root, 'Bucket/Owner/DisplayName'), _find_tag(root, 'Bucket/Owner/ID')) result.acl = AccessControlList(_find_tag(root, 'Bucket/AccessControlList/Grant')) return result
[docs]def parse_get_bucket_referer(result, body): root = ElementTree.fromstring(body) result.allow_empty_referer = _find_bool(root, 'AllowEmptyReferer') result.referers = _find_all_tags(root, 'RefererList/Referer') return result
[docs]def parse_get_bucket_websiste(result, body): root = ElementTree.fromstring(body) result.index_file = _find_tag(root, 'IndexDocument/Suffix') result.error_file = _find_tag(root, 'ErrorDocument/Key') return result
[docs]def parse_create_live_channel(result, body): root = ElementTree.fromstring(body) result.play_url = _find_tag(root, 'PlayUrls/Url') result.publish_url = _find_tag(root, 'PublishUrls/Url') return result
[docs]def parse_get_live_channel(result, body): root = ElementTree.fromstring(body) result.status = _find_tag(root, 'Status') result.description = _find_tag(root, 'Description') target = LiveChannelInfoTarget() target.type = _find_tag(root, 'Target/Type') target.frag_duration = _find_tag(root, 'Target/FragDuration') target.frag_count = _find_tag(root, 'Target/FragCount') target.playlist_name = _find_tag(root, 'Target/PlaylistName') result.target = target return result
[docs]def parse_list_live_channel(result, body): root = ElementTree.fromstring(body) result.prefix = _find_tag(root, 'Prefix') result.marker = _find_tag(root, 'Marker') result.max_keys = _find_int(root, 'MaxKeys') result.is_truncated = _find_bool(root, 'IsTruncated') if result.is_truncated: result.next_marker = _find_tag(root, 'NextMarker') channels = root.findall('LiveChannel') for channel in channels: tmp = LiveChannelInfo() tmp.name = _find_tag(channel, 'Name') tmp.description = _find_tag(channel, 'Description') tmp.status = _find_tag(channel, 'Status') tmp.last_modified = iso8601_to_unixtime(_find_tag(channel, 'LastModified')) tmp.play_url = _find_tag(channel, 'PlayUrls/Url') tmp.publish_url = _find_tag(channel, 'PublishUrls/Url') result.channels.append(tmp) return result
[docs]def parse_stat_video(video_node, video): video.width = _find_int(video_node, 'Width') video.height = _find_int(video_node, 'Height') video.frame_rate = _find_int(video_node, 'FrameRate') video.bandwidth = _find_int(video_node, 'Bandwidth') video.codec = _find_tag(video_node, 'Codec')
[docs]def parse_stat_audio(audio_node, audio): audio.bandwidth = _find_int(audio_node, 'Bandwidth') audio.sample_rate = _find_int(audio_node, 'SampleRate') audio.codec = _find_tag(audio_node, 'Codec')
[docs]def parse_live_channel_stat(result, body): root = ElementTree.fromstring(body) result.status = _find_tag(root, 'Status') if root.find('RemoteAddr') is not None: result.remote_addr = _find_tag(root, 'RemoteAddr') if root.find('ConnectedTime') is not None: result.connected_time = iso8601_to_unixtime(_find_tag(root, 'ConnectedTime')) video_node = root.find('Video') audio_node = root.find('Audio') if video_node is not None: result.video = LiveChannelVideoStat() parse_stat_video(video_node, result.video) if audio_node is not None: result.audio = LiveChannelAudioStat() parse_stat_audio(audio_node, result.audio) return result
[docs]def parse_live_channel_history(result, body): root = ElementTree.fromstring(body) records = root.findall('LiveRecord') for record in records: tmp = LiveRecord() tmp.start_time = iso8601_to_unixtime(_find_tag(record, 'StartTime')) tmp.end_time = iso8601_to_unixtime(_find_tag(record, 'EndTime')) tmp.remote_addr = _find_tag(record, 'RemoteAddr') result.records.append(tmp) return result
[docs]def parse_lifecycle_expiration(expiration_node): if expiration_node is None: return None expiration = LifecycleExpiration() if expiration_node.find('Days') is not None: expiration.days = _find_int(expiration_node, 'Days') elif expiration_node.find('Date') is not None: expiration.date = iso8601_to_date(_find_tag(expiration_node, 'Date')) return expiration
[docs]def parse_lifecycle_abort_multipart_upload(abort_multipart_upload_node): if abort_multipart_upload_node is None: return None abort_multipart_upload = AbortMultipartUpload() if abort_multipart_upload_node.find('Days') is not None: abort_multipart_upload.days = _find_int(abort_multipart_upload_node, 'Days') elif abort_multipart_upload_node.find('CreatedBeforeDate') is not None: abort_multipart_upload.created_before_date = iso8601_to_date(_find_tag(abort_multipart_upload_node, 'CreatedBeforeDate')) return abort_multipart_upload
[docs]def parse_lifecycle_storage_transitions(storage_transition_nodes): storage_transitions = [] for storage_transition_node in storage_transition_nodes: storage_class = _find_tag(storage_transition_node, 'StorageClass') storage_transition = StorageTransition(storage_class=storage_class) if storage_transition_node.find('Days') is not None: storage_transition.days = _find_int(storage_transition_node, 'Days') elif storage_transition_node.find('CreatedBeforeDate') is not None: storage_transition.created_before_date = iso8601_to_date(_find_tag(storage_transition_node, 'CreatedBeforeDate')) storage_transitions.append(storage_transition) return storage_transitions
[docs]def parse_get_bucket_lifecycle(result, body): root = ElementTree.fromstring(body) for rule_node in root.findall('Rule'): expiration = parse_lifecycle_expiration(rule_node.find('Expiration')) abort_multipart_upload = parse_lifecycle_abort_multipart_upload(rule_node.find('AbortMultipartUpload')) storage_transitions = parse_lifecycle_storage_transitions(rule_node.findall('Transition')) rule = LifecycleRule( _find_tag(rule_node, 'ID'), _find_tag(rule_node, 'Prefix'), status=_find_tag(rule_node, 'Status'), expiration=expiration, abort_multipart_upload=abort_multipart_upload, storage_transitions=storage_transitions ) result.rules.append(rule) return result
[docs]def parse_get_bucket_cors(result, body): root = ElementTree.fromstring(body) for rule_node in root.findall('CORSRule'): rule = CorsRule() rule.allowed_origins = _find_all_tags(rule_node, 'AllowedOrigin') rule.allowed_methods = _find_all_tags(rule_node, 'AllowedMethod') rule.allowed_headers = _find_all_tags(rule_node, 'AllowedHeader') rule.expose_headers = _find_all_tags(rule_node, 'ExposeHeader') max_age_node = rule_node.find('MaxAgeSeconds') if max_age_node is not None: rule.max_age_seconds = int(max_age_node.text) result.rules.append(rule) return result
[docs]def to_complete_upload_request(parts): root = ElementTree.Element('CompleteMultipartUpload') for p in parts: part_node = ElementTree.SubElement(root, "Part") _add_text_child(part_node, 'PartNumber', str(p.part_number)) _add_text_child(part_node, 'ETag', '"{0}"'.format(p.etag)) return _node_to_string(root)
[docs]def to_batch_delete_objects_request(keys, quiet): root_node = ElementTree.Element('Delete') _add_text_child(root_node, 'Quiet', str(quiet).lower()) for key in keys: object_node = ElementTree.SubElement(root_node, 'Object') _add_text_child(object_node, 'Key', key) return _node_to_string(root_node)
[docs]def to_put_bucket_config(bucket_config): root = ElementTree.Element('CreateBucketConfiguration') _add_text_child(root, 'StorageClass', str(bucket_config.storage_class)) return _node_to_string(root)
[docs]def to_put_bucket_logging(bucket_logging): root = ElementTree.Element('BucketLoggingStatus') if bucket_logging.target_bucket: logging_node = ElementTree.SubElement(root, 'LoggingEnabled') _add_text_child(logging_node, 'TargetBucket', bucket_logging.target_bucket) _add_text_child(logging_node, 'TargetPrefix', bucket_logging.target_prefix) return _node_to_string(root)
[docs]def to_put_bucket_referer(bucket_referer): root = ElementTree.Element('RefererConfiguration') _add_text_child(root, 'AllowEmptyReferer', str(bucket_referer.allow_empty_referer).lower()) list_node = ElementTree.SubElement(root, 'RefererList') for r in bucket_referer.referers: _add_text_child(list_node, 'Referer', r) return _node_to_string(root)
[docs]def to_put_bucket_website(bucket_websiste): root = ElementTree.Element('WebsiteConfiguration') index_node = ElementTree.SubElement(root, 'IndexDocument') _add_text_child(index_node, 'Suffix', bucket_websiste.index_file) error_node = ElementTree.SubElement(root, 'ErrorDocument') _add_text_child(error_node, 'Key', bucket_websiste.error_file) return _node_to_string(root)
[docs]def to_put_bucket_lifecycle(bucket_lifecycle): root = ElementTree.Element('LifecycleConfiguration') for rule in bucket_lifecycle.rules: rule_node = ElementTree.SubElement(root, 'Rule') _add_text_child(rule_node, 'ID', rule.id) _add_text_child(rule_node, 'Prefix', rule.prefix) _add_text_child(rule_node, 'Status', rule.status) expiration = rule.expiration if expiration: expiration_node = ElementTree.SubElement(rule_node, 'Expiration') if expiration.days is not None: _add_text_child(expiration_node, 'Days', str(expiration.days)) elif expiration.date is not None: _add_text_child(expiration_node, 'Date', date_to_iso8601(expiration.date)) elif expiration.created_before_date is not None: _add_text_child(expiration_node, 'CreatedBeforeDate', date_to_iso8601(expiration.created_before_date)) abort_multipart_upload = rule.abort_multipart_upload if abort_multipart_upload: abort_multipart_upload_node = ElementTree.SubElement(rule_node, 'AbortMultipartUpload') if abort_multipart_upload.days is not None: _add_text_child(abort_multipart_upload_node, 'Days', str(abort_multipart_upload.days)) elif abort_multipart_upload.created_before_date is not None: _add_text_child(abort_multipart_upload_node, 'CreatedBeforeDate', date_to_iso8601(abort_multipart_upload.created_before_date)) storage_transitions = rule.storage_transitions if storage_transitions: for storage_transition in storage_transitions: storage_transition_node = ElementTree.SubElement(rule_node, 'Transition') _add_text_child(storage_transition_node, 'StorageClass', str(storage_transition.storage_class)) if storage_transition.days is not None: _add_text_child(storage_transition_node, 'Days', str(storage_transition.days)) elif storage_transition.created_before_date is not None: _add_text_child(storage_transition_node, 'CreatedBeforeDate', date_to_iso8601(storage_transition.created_before_date)) return _node_to_string(root)
[docs]def to_put_bucket_cors(bucket_cors): root = ElementTree.Element('CORSConfiguration') for rule in bucket_cors.rules: rule_node = ElementTree.SubElement(root, 'CORSRule') _add_node_list(rule_node, 'AllowedOrigin', rule.allowed_origins) _add_node_list(rule_node, 'AllowedMethod', rule.allowed_methods) _add_node_list(rule_node, 'AllowedHeader', rule.allowed_headers) _add_node_list(rule_node, 'ExposeHeader', rule.expose_headers) if rule.max_age_seconds is not None: _add_text_child(rule_node, 'MaxAgeSeconds', str(rule.max_age_seconds)) return _node_to_string(root)
[docs]def to_create_live_channel(live_channel): root = ElementTree.Element('LiveChannelConfiguration') _add_text_child(root, 'Description', live_channel.description) _add_text_child(root, 'Status', live_channel.status) target_node = _add_node_child(root, 'Target') _add_text_child(target_node, 'Type', live_channel.target.type) _add_text_child(target_node, 'FragDuration', str(live_channel.target.frag_duration)) _add_text_child(target_node, 'FragCount', str(live_channel.target.frag_count)) _add_text_child(target_node, 'PlaylistName', str(live_channel.target.playlist_name)) return _node_to_string(root)