您的当前位置:首页>全部文章>文章详情

工单的技术代码操作你知道吗?原来这么简单!

发表于:2023-01-25 16:13:18浏览:577次TAG: #工单系统 #App

很多人以为,“提交工单”只是把一句话存进系统里。
比如用户在网站、App、外卖系统或客服平台里提交:
“reset password”
“payment gateway timeout error”
“cannot update profile picture”
“urgent login failure after update”
“investigate database timeout issue”
看起来只是几行普通文本,但在真实后台系统里,一条工单并不是简单地被放进列表里。
一个完整的 Ticket System(工单系统),通常需要完成很多隐藏操作。
它要判断工单标题是否符合管理员设置的筛选规则;
它要保护用户隐私,不能让任何人随便查看工单内容;
它要管理 access token 和 verification PIN;
它要防止旧 token 和旧 PIN 被重复使用;
它要在用户多次输错 PIN 后锁定访问会话;
它还要决定工单怎么展示、怎么调度、怎么分类、怎么记录日志、怎么生成统计报表。
所以,一个看似简单的“提交问题”,背后其实藏着很多数据结构和算法。
这篇内容就用一个简化版 智能工单系统 Ticket System,带你看看真实后台功能可以怎样一步步用代码实现。

这篇科普贴会讲什么?
我们会把一个工单系统拆成几个功能模块:
Part A:高级工单标题规则匹配器
Part B:工单系统中的安全访问会话管理器
Part C:具体工单 verification PIN 设置与读取
Part D:工单任务展示顺序优化器
Part E:系统功能测试代码
Part F:工单优先级调度与超时重试
Part G:工单状态流转管理
Part H:工单分类与标签系统
Part I:工单处理日志与审计记录
Part J:工单统计报表功能
Part K:新增功能测试代码
它不是单纯考某一个算法,而是把很多经典数据结构和算法放进真实业务场景中:
动态规划
滚动数组
有序表查找
集合判重
链表插入
状态管理
优先级调度
异常处理
迭代器设计
assert 测试
这样写出来的代码,不只是“能运行”,而是更接近真实软件系统中的功能模块。

项目背景:一个智能工单系统是怎么工作的?
假设你正在开发一个客服后台系统。
用户提交了一条工单:
“urgent login failure after update”
系统后台可能会立刻思考这些问题:
这是不是紧急问题?
标题里有没有 login?
是否符合管理员设置的筛选规则?
这条工单应该优先处理吗?
用户访问这条工单时是否需要 PIN?
这个用户的访问 session 有没有被锁定?
这条工单应该显示在列表的前面还是后面?
它现在是 new、assigned、in_progress,还是 closed?
它属于 billing、technical_support,还是 account_security?
有没有操作日志可以追踪它的处理过程?
这些问题最后都会变成代码逻辑。
例如:
标题规则匹配可以用动态规划解决;
PIN 是否重复可以用集合判断;
工单展示顺序可以用链表插入;
session 查找可以用有序表;
优先级调度可以用排序规则;
状态流转可以用状态管理思想。

Part A:工单标题怎么被系统自动识别?
高级工单标题规则匹配器
在真实工单系统中,管理员经常需要快速筛选某一类工单。
比如管理员想查找所有紧急失败类工单,可以输入规则:
“urgent * failure”
这个规则可以匹配:
“urgent login failure”
“urgent payment gateway failure”
“urgent account security failure”
其中:

  • 可以匹配零个或多个连续单词
    ? 可以匹配任意一个单词
    同时,真实系统中的标题匹配不能只是简单字符串比较,还需要支持:
    ignore_case=True # 忽略大小写
    ignore_punctuation=True # 忽略标点
    stop_words=[“the”, “a”] # 删除无意义词
    must_contain=[“login”] # 标题必须包含某些关键词
    min_words=2 # 标题至少包含多少个词

参考答案代码
def matches_ticket_rule(
self,
pattern: str,
ignore_case=True,
ignore_punctuation=True,
wildcard=”*”,
single_wildcard=”?”,
stop_words=None,
must_contain=None,
min_words=1
) -> bool:
“””
判断 self.ticket_title_words 是否匹配一个工单过滤规则。

该方法用于工单系统中,管理员可以通过灵活的标题模式来筛选工单。

模式规则:
- 普通 token 必须完全匹配。
- wildcard 表示匹配 0 个或多个连续单词。
- single_wildcard 表示匹配恰好 1 个单词。
- 整个标题必须从头到尾完全匹配。

额外过滤条件:
- ignore_case 控制是否忽略大小写。
- ignore_punctuation 用于去除简单标点符号。
- stop_words 用于移除无意义词。
- must_contain 表示标题中必须包含的词。
- min_words 表示标题最少单词数量要求。

动态规划定义:
dp[i][j] = True 当且仅当标题前 i 个单词
           可以匹配模式前 j 个 token。

时间复杂度:O(N * M)
空间复杂度:O(M)
"""

if stop_words is None:
    stop_words = []

if must_contain is None:
    must_contain = []

def clean_word(word):
    if ignore_case:
        word = word.lower()

    if ignore_punctuation:
        cleaned = ""

        for char in word:
            if char.isalnum():
                cleaned += char

        word = cleaned

    return word

title_words = []

for word in self.ticket_title_words:
    cleaned_word = clean_word(word)

    if cleaned_word != "" and cleaned_word not in stop_words:
        title_words.append(cleaned_word)

pattern_tokens = []

for token in pattern.split(" "):
    cleaned_token = clean_word(token)

    if cleaned_token != "" and cleaned_token not in stop_words:
        pattern_tokens.append(cleaned_token)

if len(title_words) < min_words:
    return False

for required_word in must_contain:
    cleaned_required_word = clean_word(required_word)

    if cleaned_required_word not in title_words:
        return False

n = len(title_words)
m = len(pattern_tokens)

previous_row = ArrayR(m + 1)
previous_row[0] = True

for j in range(1, m + 1):
    if pattern_tokens[j - 1] == wildcard:
        prev = previous_row[j - 1]
        previous_row[j] = prev if prev is not None else False
    else:
        previous_row[j] = False

for i in range(1, n + 1):
    current_row = ArrayR(m + 1)
    current_row[0] = False

    for j in range(1, m + 1):
        token = pattern_tokens[j - 1]

        if token == wildcard:
            match_zero = current_row[j - 1]
            match_zero = match_zero if match_zero is not None else False

            match_more = previous_row[j]
            match_more = match_more if match_more is not None else False

            current_row[j] = match_zero or match_more

        elif token == single_wildcard:
            prev_val = previous_row[j - 1]
            prev_val = prev_val if prev_val is not None else False

            current_row[j] = prev_val

        else:
            prev_val = previous_row[j - 1]
            prev_val = prev_val if prev_val is not None else False

            current_row[j] = prev_val and title_words[i - 1] == token

    previous_row = current_row

final_result = previous_row[m]
return final_result if final_result is not None else False

科普式解释
这个方法的核心是:先清洗,再匹配。
系统会先统一大小写、删除标点、去掉停用词,然后再检查标题是否满足最低词数和必含关键词要求。
完成这些前置过滤后,代码使用动态规划判断标题和规则是否完整匹配。
普通 token 必须逐词精确匹配;

  • 可以匹配零个或多个连续单词;
    ? 可以匹配恰好一个单词。
    为了节省空间,代码没有建立完整二维 DP 表,而是只保留 previous_row 和 current_row 两行数组。
    这就是一个典型的“把动态规划放进真实业务场景”的例子。

Part B:用户访问工单为什么需要 session?
工单系统中的安全访问会话管理器
用户访问工单时,系统不能让任何人随便查看内容。
例如,账单工单、账户安全工单、退款工单都可能包含敏感信息。因此,每个用户在不同工单类别下都需要一个独立的访问会话。
例如:
category = “billing”
user_id = “user_1”
表示用户 user_1 正在访问账单类工单。
一个 session 需要保存:
active_token
token_records
active_pin
used_pins
is_locked
failed_pin_attempts
is_session_open

参考答案代码
from data_structures import ArrayList
from data_structures import ArraySortedList
from data_structures import BitVectorSet

class TicketAccessSession:
“””
存储一个用户在某个工单类别下的安全访问信息。
“””

def __init__(self, category, user_id, max_token_records):
    self.category = category
    self.user_id = user_id
    self.session_key = category + "\0" + user_id

    self.active_token = None
    self.token_records = ArrayList(max_token_records)

    self.active_pin = None
    self.used_pins = BitVectorSet()

    self.is_locked = False
    self.failed_pin_attempts = 0
    self.is_session_open = True

def __eq__(self, other):
    return self.session_key == other.session_key

def __lt__(self, other):
    return self.session_key < other.session_key

def __gt__(self, other):
    return self.session_key > other.session_key

def __le__(self, other):
    return self.session_key <= other.session_key

def __ge__(self, other):
    return self.session_key >= other.session_key

class TicketAccessManager:
“””
管理工单系统中的访问 token 和 PIN。
“””

def __init__(self, max_token_records_per_session: int, max_failed_pin_attempts: int = 3):
    self.max_token_records = max_token_records_per_session
    self.max_failed_pin_attempts = max_failed_pin_attempts
    self.sessions = ArraySortedList()

def _build_dummy_session(self, category, user_id):
    return TicketAccessSession(category, user_id, 1)

def _find_session(self, category, user_id):
    dummy = self._build_dummy_session(category, user_id)

    try:
        index = self.sessions.index(dummy)
        return self.sessions[index]
    except ValueError:
        return None

def create_session(self, category, user_id):
    existing_session = self._find_session(category, user_id)

    if existing_session is not None:
        raise ValueError(
            f"Ticket access session already exists for user '{user_id}' "
            f"in category '{category}'."
        )

    new_session = TicketAccessSession(
        category,
        user_id,
        self.max_token_records
    )

    self.sessions.add(new_session)

def set_access_token(self, category, user_id, access_token):
    session = self._find_session(category, user_id)

    if session is None:
        session = TicketAccessSession(
            category,
            user_id,
            self.max_token_records
        )
        self.sessions.add(session)

    if session.is_locked:
        raise ValueError(
            f"Session for user '{user_id}' in category '{category}' is locked."
        )

    for i in range(len(session.token_records)):
        if session.token_records[i] == access_token:
            raise ValueError(
                f"Access token '{access_token}' has already been used "
                f"for this ticket session."
            )

    session.active_token = access_token
    session.token_records.append(access_token)
    session.is_session_open = True

def get_access_token(self, category, user_id):
    session = self._find_session(category, user_id)

    if session is None:
        raise ValueError(
            f"No ticket access session found for user '{user_id}' "
            f"in category '{category}'."
        )

    if not session.is_session_open:
        raise ValueError(
            f"Ticket access session for user '{user_id}' "
            f"in category '{category}' is closed."
        )

    if session.active_token is None:
        raise ValueError(
            f"No active access token found for user '{user_id}' "
            f"in category '{category}'."
        )

    return session.active_token

def is_token_valid(self, category, user_id, access_token):
    session = self._find_session(category, user_id)

    if session is None:
        return False

    if session.is_locked or not session.is_session_open:
        return False

    return session.active_token == access_token

def set_verification_pin(self, category, user_id, pin: int):
    session = self._find_session(category, user_id)

    if session is None:
        session = TicketAccessSession(
            category,
            user_id,
            self.max_token_records
        )
        self.sessions.add(session)

    if session.is_locked:
        raise ValueError(
            f"Session for user '{user_id}' in category '{category}' is locked."
        )

    if pin in session.used_pins:
        raise ValueError(
            f"PIN '{pin}' has already been used for this ticket session."
        )

    session.active_pin = pin
    session.used_pins.add(pin)
    session.failed_pin_attempts = 0

def verify_pin(self, category, user_id, pin: int):
    session = self._find_session(category, user_id)

    if session is None:
        raise ValueError(
            f"No ticket access session found for user '{user_id}' "
            f"in category '{category}'."
        )

    if session.is_locked:
        raise ValueError(
            f"Session for user '{user_id}' in category '{category}' is locked."
        )

    if session.active_pin is None:
        raise ValueError(
            f"No verification PIN has been set for user '{user_id}' "
            f"in category '{category}'."
        )

    if session.active_pin == pin:
        session.failed_pin_attempts = 0
        return True

    session.failed_pin_attempts += 1

    if session.failed_pin_attempts >= self.max_failed_pin_attempts:
        session.is_locked = True

    return False

def close_session(self, category, user_id):
    session = self._find_session(category, user_id)

    if session is None:
        raise ValueError(
            f"No ticket access session found for user '{user_id}' "
            f"in category '{category}'."
        )

    session.is_session_open = False

def reopen_session(self, category, user_id):
    session = self._find_session(category, user_id)

    if session is None:
        raise ValueError(
            f"No ticket access session found for user '{user_id}' "
            f"in category '{category}'."
        )

    if session.is_locked:
        raise ValueError(
            f"Locked session for user '{user_id}' cannot be reopened."
        )

    session.is_session_open = True

def unlock_session(self, category, user_id):
    session = self._find_session(category, user_id)

    if session is None:
        raise ValueError(
            f"No ticket access session found for user '{user_id}' "
            f"in category '{category}'."
        )

    session.is_locked = False
    session.failed_pin_attempts = 0

def count_token_history(self, category, user_id):
    session = self._find_session(category, user_id)

    if session is None:
        raise ValueError(
            f"No ticket access session found for user '{user_id}' "
            f"in category '{category}'."
        )

    return len(session.token_records)

def has_used_pin(self, category, user_id, pin: int):
    session = self._find_session(category, user_id)

    if session is None:
        return False

    return pin in session.used_pins

def get_session_status(self, category, user_id):
    session = self._find_session(category, user_id)

    if session is None:
        return "not_found"

    if session.is_locked:
        return "locked"

    if not session.is_session_open:
        return "closed"

    return "open"

科普式解释
这个模块解决的是“谁能访问工单”的问题。
每个用户在不同工单类别下都有一个独立 session。系统通过:
category + “\0” + user_id
生成唯一排序键,然后把所有 session 存入 ArraySortedList。
设置 access token 时,系统会检查旧 token 是否已经用过。
设置 PIN 时,系统会检查旧 PIN 是否已经用过。
验证 PIN 时,如果输错次数过多,session 会自动锁定。
session 被锁定后,就不能继续验证或访问。
这就是一个简化版的后台安全访问机制。

Part C:为什么旧 PIN 不能重复使用?
具体工单 verification PIN 设置与读取
上一个模块是按照:
category + user_id
管理 session。
但在更细的业务场景中,系统可能需要针对某一个具体工单设置 PIN。
例如:
category = “billing”
user_id = “user_1”
ticket_id = “T20260001”
这表示用户 user_1 正在访问一个具体账单工单。

参考答案代码
def set_verification_pin(self, category, user_id, ticket_id, pin: int):
“””
为指定工单访问会话设置验证 PIN。
“””

session = self._find_session(category, user_id, ticket_id)

if session is None:
    session = TicketAccessSession(
        category,
        user_id,
        ticket_id,
        self.max_token_records
    )

    session.active_pin = pin
    session.previous_pins.add(pin)

    self.sessions.add(session)

else:
    if session.is_locked():
        raise ValueError(
            f"工单 '{ticket_id}' 的会话已被锁定。"
        )

    if pin in session.previous_pins:
        raise ValueError(
            f"PIN '{pin}' 已在该工单会话中使用过。"
        )

    session.active_pin = pin
    session.previous_pins.add(pin)

    session.failed_attempts = 0
    session.last_action = "pin_updated"

def get_verification_pin(self, category, user_id, ticket_id):
“””
返回指定工单会话的当前验证 PIN。
“””

session = self._find_session(category, user_id, ticket_id)

if session is None:
    raise ValueError(
        f"未找到工单 '{ticket_id}' 的访问会话。"
    )

if session.is_locked():
    raise ValueError(
        f"工单 '{ticket_id}' 的会话已被锁定。"
    )

if session.active_pin is None:
    raise ValueError(
        f"工单 '{ticket_id}' 尚未设置验证 PIN。"
    )

return session.active_pin

科普式解释
这个功能进一步细化了访问控制。
它不是只判断某个用户能不能访问某个类别,而是精确到:
category + user_id + ticket_id
也就是说,同一个用户访问不同工单时,可以有不同 PIN 记录。
这对应真实系统中的一个安全原则:
敏感信息访问不应该只控制到用户层面,还应该进一步控制到具体资源层面。

Part D:工单列表为什么不是简单排序?
工单任务展示顺序优化器
后台工单列表中可能会出现很多不同复杂度的任务:
“reset password”
“fix payment gateway error”
“update user profile page”
“investigate database timeout issue”
短标题通常比较简单,长标题可能代表更复杂的问题。
如果直接按照提交顺序展示,列表可能缺乏规律;如果完全按照复杂度排序,又显得太机械。
所以这里设计一个 semi-sorted ticket display algorithm,也就是“半有序工单展示算法”。

参考答案代码
def balanced_ticket_iterator(
self,
tickets=None,
min_complexity=5,
normalize=True,
deduplicate=True,
metric=”length”
):
“””
为工单系统构建一个半有序的展示序列。
“””

if tickets is None:
    items = list(self.default_tickets)
else:
    items = list(tickets)

processed = []

for ticket in items:
    if normalize:
        ticket = ticket.strip().lower()

    if len(ticket) >= min_complexity:
        processed.append(ticket)

if deduplicate:
    seen = set()
    unique_items = []

    for ticket in processed:
        if ticket not in seen:
            seen.add(ticket)
            unique_items.append(ticket)

    processed = unique_items

result = LinkedList()

if len(processed) == 0:
    return iter(result)

if len(processed) == 1:
    result.append(processed[0])
    return iter(result)

def get_complexity(ticket):
    if metric == "length":
        return len(ticket)

    if metric == "words":
        return len(ticket.split())

    raise ValueError("metric must be either 'length' or 'words'")

first = processed[0]
second = processed[1]

if get_complexity(first) <= get_complexity(second):
    result.append(first)
    result.append(second)
else:
    result.append(second)
    result.append(first)

for i in range(2, len(processed)):
    ticket = processed[i]

    ticket_score = get_complexity(ticket)
    front_score = get_complexity(result[0])
    rear_score = get_complexity(result[-1])

    diff_front = abs(ticket_score - front_score)
    diff_rear = abs(ticket_score - rear_score)

    if diff_front < diff_rear:
        result.insert(0, ticket)
    else:
        result.append(ticket)

return iter(result)

科普式解释
这个方法先清洗工单数据,包括去掉首尾空格、统一小写、过滤过短工单和删除重复工单。
然后系统根据 metric 计算复杂度。
如果:
metric = “length”
复杂度就是字符串长度。
如果:
metric = “words”
复杂度就是单词数量。
排序时,它不是把所有工单完全排序,而是使用半有序插入策略:
先把前两个工单按复杂度放入链表;
之后每来一个新工单,只比较它和链表头部、尾部工单的复杂度差距;
如果更接近头部,就插入最前面;
否则放到最后面。
最终得到的是一种“有规律但不死板”的展示顺序。

Part E:怎么证明这些功能真的能用?
系统功能测试代码
代码写完以后,不能只看起来对,还要测试它是否真的符合业务逻辑。
下面这部分测试主要验证 access token、verification PIN、PIN 自动锁定、session 关闭与重启功能。
PIN 测试中的数字使用:
2468
975310
8642

测试代码
if name == “main“:

print("=== Ticket Access Token Tests ===")

access_manager = TicketAccessManager(max_token_records_per_session=10)

access_manager.set_access_token("billing", "user_1", "token_abcd")
access_manager.set_access_token("billing", "user_2", "token_1234")

try:
    access_manager.set_access_token("billing", "user_1", "token_abcd")
    print("ERROR: should have raised ValueError")
except ValueError as e:
    print(f"Correctly rejected: {e}")

access_manager.set_access_token("technical_support", "user_1", "token_abcd")

assert access_manager.get_access_token("billing", "user_1") == "token_abcd"
assert access_manager.get_access_token("billing", "user_2") == "token_1234"
assert access_manager.get_access_token("technical_support", "user_1") == "token_abcd"

try:
    access_manager.get_access_token("refund", "user_1")
    print("ERROR: should have raised ValueError")
except ValueError as e:
    print(f"Correctly raised: {e}")

print("All access token tests passed!")


print("=== Ticket Verification PIN Tests ===")

access_manager = TicketAccessManager(max_token_records_per_session=10)

access_manager.set_verification_pin("billing", "user_1", 2468)
access_manager.set_verification_pin("billing", "user_2", 975310)

try:
    access_manager.set_verification_pin("billing", "user_1", 2468)
    print("ERROR: should have raised ValueError")
except ValueError as e:
    print(f"Correctly rejected: {e}")

access_manager.set_verification_pin("technical_support", "user_1", 2468)

assert access_manager.has_used_pin("billing", "user_1", 2468) is True
assert access_manager.has_used_pin("billing", "user_2", 975310) is True
assert access_manager.has_used_pin("technical_support", "user_1", 2468) is True

assert access_manager.has_used_pin("refund", "user_1", 2468) is False

print("All verification PIN tests passed!")


print("=== Ticket PIN Verification and Lock Tests ===")

access_manager = TicketAccessManager(
    max_token_records_per_session=10,
    max_failed_pin_attempts=3
)

access_manager.set_verification_pin("account_security", "user_3", 8642)

assert access_manager.verify_pin("account_security", "user_3", 8642) is True

assert access_manager.verify_pin("account_security", "user_3", 111111) is False
assert access_manager.verify_pin("account_security", "user_3", 222222) is False
assert access_manager.verify_pin("account_security", "user_3", 333333) is False

assert access_manager.get_session_status("account_security", "user_3") == "locked"

try:
    access_manager.verify_pin("account_security", "user_3", 8642)
    print("ERROR: should have raised ValueError")
except ValueError as e:
    print(f"Correctly locked: {e}")

access_manager.unlock_session("account_security", "user_3")

assert access_manager.get_session_status("account_security", "user_3") == "open"

print("All PIN verification and lock tests passed!")


print("=== Ticket Session Open and Close Tests ===")

access_manager = TicketAccessManager(max_token_records_per_session=10)

access_manager.set_access_token("technical_support", "user_4", "token_helpdesk")

access_manager.close_session("technical_support", "user_4")

assert access_manager.get_session_status("technical_support", "user_4") == "closed"

try:
    access_manager.get_access_token("technical_support", "user_4")
    print("ERROR: should have raised ValueError")
except ValueError as e:
    print(f"Correctly rejected closed session: {e}")

access_manager.reopen_session("technical_support", "user_4")

assert access_manager.get_session_status("technical_support", "user_4") == "open"
assert access_manager.get_access_token("technical_support", "user_4") == "token_helpdesk"

print("All session open and close tests passed!")

print("\n=== All ticket system tests passed! ===")

Part F:紧急工单为什么会被优先处理?
工单优先级调度与超时重试
在真实工单系统中,不同工单的重要程度不同。
例如:
“password reset” # 普通工单
“payment gateway failure” # 高优先级工单
“account hacked” # 紧急工单
系统不能简单按照提交顺序处理所有工单,而是应该让高优先级工单优先进入处理队列。
同时,如果某个工单已经被分配出去,但长时间没有完成,系统应该认为它可能处理超时,并让它重新进入调度队列。

参考答案代码
PRIORITY_LOW = 1
PRIORITY_NORMAL = 2
PRIORITY_HIGH = 3
PRIORITY_URGENT = 4

class TicketDispatchRecord:
“””
存储一个待处理工单的调度信息。
“””

def __init__(self, ticket_id, title, priority, order):
    self.ticket_id = ticket_id
    self.title = title
    self.priority = priority
    self.order = order
    self.status = "waiting"
    self.retry_count = 0

def __lt__(self, other):
    if self.retry_count != other.retry_count:
        return self.retry_count > other.retry_count

    if self.priority != other.priority:
        return self.priority > other.priority

    return self.order < other.order

class TicketDispatchManager:
“””
管理工单调度顺序。
“””

def __init__(self):
    self.waiting_tickets = []
    self.all_tickets = {}
    self.order_counter = 0

def ticket_submitted(self, ticket_id, title, priority):
    if ticket_id in self.all_tickets:
        raise ValueError(
            f"工单 '{ticket_id}' 已经存在,不能重复提交。"
        )

    record = TicketDispatchRecord(
        ticket_id,
        title,
        priority,
        self.order_counter
    )

    self.order_counter += 1

    self.all_tickets[ticket_id] = record
    self.waiting_tickets.append(record)

def get_next_ticket(self):
    available = []

    for ticket in self.waiting_tickets:
        if ticket.status == "waiting":
            available.append(ticket)

    if len(available) == 0:
        return None

    available.sort()

    next_ticket = available[0]
    next_ticket.status = "in_progress"

    self.waiting_tickets.remove(next_ticket)

    return (
        next_ticket.ticket_id,
        next_ticket.title,
        next_ticket.priority
    )

def ticket_completed(self, ticket_id):
    if ticket_id not in self.all_tickets:
        raise ValueError(
            f"工单 '{ticket_id}' 不存在。"
        )

    ticket = self.all_tickets[ticket_id]

    if ticket.status == "completed":
        raise ValueError(
            f"工单 '{ticket_id}' 已经完成,不能重复完成。"
        )

    ticket.status = "completed"

def retry_ticket(self, ticket_id):
    if ticket_id not in self.all_tickets:
        raise ValueError(
            f"工单 '{ticket_id}' 不存在。"
        )

    ticket = self.all_tickets[ticket_id]

    if ticket.status == "completed":
        raise ValueError(
            f"工单 '{ticket_id}' 已经完成,不能重试。"
        )

    ticket.retry_count += 1
    ticket.status = "waiting"

    if ticket not in self.waiting_tickets:
        self.waiting_tickets.append(ticket)

def get_ticket_status(self, ticket_id):
    if ticket_id not in self.all_tickets:
        return "not_found"

    return self.all_tickets[ticket_id].status

科普式解释
这个模块让系统知道“哪个工单应该先处理”。
排序规则不是只看优先级,而是综合考虑:
retry_count
priority
order
也就是说:
已经超时重试的旧工单优先;
重试次数一样时,高优先级工单优先;
优先级也一样时,先提交的工单优先。
这样可以避免一个旧工单因为不断被新工单插队而长期积压。

Part G:一条工单从提交到关闭经历了什么?
工单状态流转管理
一个工单从提交到关闭,通常会经历多个阶段:
new
assigned
in_progress
waiting_user
resolved
closed
如果没有状态管理,系统就很难判断工单到底是在等待客服处理、等待用户回复,还是已经解决。

参考答案代码
class TicketStatusRecord:
“””
存储一个工单的基本状态信息。
“””

def __init__(self, ticket_id, title):
    self.ticket_id = ticket_id
    self.title = title
    self.status = "new"
    self.agent_id = None

class TicketStatusManager:
“””
管理工单状态流转。
“””

def __init__(self):
    self.tickets = {}

def create_ticket(self, ticket_id, title):
    if ticket_id in self.tickets:
        raise ValueError(
            f"工单 '{ticket_id}' 已经存在。"
        )

    self.tickets[ticket_id] = TicketStatusRecord(ticket_id, title)

def assign_ticket(self, ticket_id, agent_id):
    ticket = self._get_ticket(ticket_id)

    if ticket.status == "closed":
        raise ValueError(
            f"工单 '{ticket_id}' 已关闭,不能重新分配。"
        )

    ticket.agent_id = agent_id
    ticket.status = "assigned"

def start_ticket(self, ticket_id):
    ticket = self._get_ticket(ticket_id)

    if ticket.status == "closed":
        raise ValueError(
            f"工单 '{ticket_id}' 已关闭,不能开始处理。"
        )

    if ticket.agent_id is None:
        raise ValueError(
            f"工单 '{ticket_id}' 尚未分配客服,不能开始处理。"
        )

    ticket.status = "in_progress"

def wait_for_user(self, ticket_id):
    ticket = self._get_ticket(ticket_id)

    if ticket.status != "in_progress":
        raise ValueError(
            f"只有处理中工单才能进入等待用户状态。"
        )

    ticket.status = "waiting_user"

def resolve_ticket(self, ticket_id):
    ticket = self._get_ticket(ticket_id)

    if ticket.status == "closed":
        raise ValueError(
            f"工单 '{ticket_id}' 已关闭,不能重复解决。"
        )

    ticket.status = "resolved"

def close_ticket(self, ticket_id):
    ticket = self._get_ticket(ticket_id)

    if ticket.status != "resolved":
        raise ValueError(
            f"只有已解决工单才能关闭。"
        )

    ticket.status = "closed"

def get_status(self, ticket_id):
    ticket = self._get_ticket(ticket_id)
    return ticket.status

def _get_ticket(self, ticket_id):
    if ticket_id not in self.tickets:
        raise ValueError(
            f"工单 '{ticket_id}' 不存在。"
        )

    return self.tickets[ticket_id]

科普式解释
这个模块让工单系统有了完整生命周期。
它的重点不是简单修改字符串,而是控制状态能否合理流转。
例如:
未分配客服的工单不能直接开始处理;
只有处理中工单才能进入等待用户状态;
只有已解决工单才能关闭;
已关闭工单不能重新开始处理。
这样可以避免系统出现不合理状态。

Part H:工单为什么需要分类和标签?
工单分类与标签系统
工单系统中,很多工单需要被分类和打标签。
例如:
category = “billing”
tags = [“payment”, “urgent”, “refund”]
分类通常表示工单属于哪个业务模块,标签则用于更细粒度的筛选。
一个工单可以只有一个分类,但可以有多个标签。

参考答案代码
class TicketTagManager:
“””
管理工单分类和标签。
“””

def __init__(self):
    self.ticket_categories = {}
    self.ticket_tags = {}

def set_category(self, ticket_id, category):
    self.ticket_categories[ticket_id] = category

    if ticket_id not in self.ticket_tags:
        self.ticket_tags[ticket_id] = set()

def add_tag(self, ticket_id, tag):
    if ticket_id not in self.ticket_tags:
        self.ticket_tags[ticket_id] = set()

    self.ticket_tags[ticket_id].add(tag)

def remove_tag(self, ticket_id, tag):
    if ticket_id not in self.ticket_tags:
        raise ValueError(
            f"工单 '{ticket_id}' 没有标签记录。"
        )

    if tag not in self.ticket_tags[ticket_id]:
        raise ValueError(
            f"工单 '{ticket_id}' 不包含标签 '{tag}'。"
        )

    self.ticket_tags[ticket_id].remove(tag)

def has_tag(self, ticket_id, tag):
    if ticket_id not in self.ticket_tags:
        return False

    return tag in self.ticket_tags[ticket_id]

def get_tags(self, ticket_id):
    if ticket_id not in self.ticket_tags:
        return []

    return list(self.ticket_tags[ticket_id])

def find_by_tag(self, tag):
    result = []

    for ticket_id in self.ticket_tags:
        if tag in self.ticket_tags[ticket_id]:
            result.append(ticket_id)

    return result

def find_by_category(self, category):
    result = []

    for ticket_id in self.ticket_categories:
        if self.ticket_categories[ticket_id] == category:
            result.append(ticket_id)

    return result

科普式解释
分类适合表示大的业务方向,例如:
“billing”
“technical_support”
“refund”
“account_security”
标签适合表示更细的特征,例如:
“urgent”
“payment”
“login”
“database”
这里用 dict 存储分类,用 set 存储标签。
set 的好处是天然去重,所以重复添加同一个标签不会产生重复记录。

Part I:为什么系统要记录每一次操作?
工单处理日志与审计记录
真实系统中,工单的每一次操作都应该被记录下来。
例如:
user_1 created ticket T001
agent_2 assigned ticket T001
agent_2 changed status to in_progress
agent_2 resolved ticket T001
这些日志可以用于:
追踪处理过程
排查系统问题
审计客服操作
还原工单历史

参考答案代码
class TicketAuditRecord:
“””
存储一条工单操作日志。
“””

def __init__(self, ticket_id, actor_id, action):
    self.ticket_id = ticket_id
    self.actor_id = actor_id
    self.action = action

class TicketAuditLogger:
“””
管理工单操作日志。
“””

def __init__(self):
    self.logs = {}

def log_action(self, ticket_id, actor_id, action):
    if ticket_id not in self.logs:
        self.logs[ticket_id] = []

    record = TicketAuditRecord(ticket_id, actor_id, action)

    self.logs[ticket_id].append(record)

def get_logs(self, ticket_id):
    if ticket_id not in self.logs:
        return []

    return self.logs[ticket_id]

def get_latest_action(self, ticket_id):
    if ticket_id not in self.logs:
        return None

    if len(self.logs[ticket_id]) == 0:
        return None

    return self.logs[ticket_id][-1].action

def count_actions(self, ticket_id):
    if ticket_id not in self.logs:
        return 0

    return len(self.logs[ticket_id])

科普式解释
日志不是用来决定业务状态的,而是用来追踪历史过程的。
例如某个工单为什么被关闭、是谁处理的、什么时候进入等待用户状态,都可以通过日志还原。
在真实系统中,审计日志非常重要,因为它可以帮助系统管理员追责、排错和分析处理效率。

Part J:管理员怎么知道系统运行情况?
工单统计报表功能
除了处理单个工单,系统还需要从整体角度了解运行情况。
例如:
一共有多少工单?
多少工单已经完成?
多少工单仍在处理中?
哪类工单最多?
某个标签出现了多少次?
这些统计数据可以帮助管理员判断客服压力、用户问题集中在哪些领域,以及系统是否存在积压。

参考答案代码
class TicketReportManager:
“””
生成工单系统的简单统计报表。
“””

def count_by_status(self, status_manager):
    result = {}

    for ticket_id in status_manager.tickets:
        status = status_manager.tickets[ticket_id].status

        if status not in result:
            result[status] = 0

        result[status] += 1

    return result

def count_by_category(self, tag_manager):
    result = {}

    for ticket_id in tag_manager.ticket_categories:
        category = tag_manager.ticket_categories[ticket_id]

        if category not in result:
            result[category] = 0

        result[category] += 1

    return result

def count_by_tag(self, tag_manager):
    result = {}

    for ticket_id in tag_manager.ticket_tags:
        for tag in tag_manager.ticket_tags[ticket_id]:
            if tag not in result:
                result[tag] = 0

            result[tag] += 1

    return result

def get_total_tickets(self, status_manager):
    return len(status_manager.tickets)

科普式解释
这个模块让工单系统具备基础报表能力。
它不直接修改工单,而是从已有的状态管理器和标签管理器中读取数据,然后生成统计结果。
例如:
{
“new”: 3,
“in_progress”: 5,
“resolved”: 8,
“closed”: 10
}
这样的结果可以帮助管理员快速判断系统运行情况。

Part K:扩展功能测试代码
下面的测试可以继续追加到原来的 if name == “main“: 后面。
print(“=== Ticket Dispatch and Retry Tests ===”)

dispatch_manager = TicketDispatchManager()

dispatch_manager.ticket_submitted(
    "T001",
    "reset password",
    PRIORITY_LOW
)

dispatch_manager.ticket_submitted(
    "T002",
    "urgent payment gateway failure",
    PRIORITY_URGENT
)

next_ticket = dispatch_manager.get_next_ticket()

assert next_ticket[0] == "T002"
assert next_ticket[1] == "urgent payment gateway failure"

dispatch_manager.ticket_submitted(
    "T003",
    "cannot update profile picture",
    PRIORITY_NORMAL
)

dispatch_manager.retry_ticket("T001")

retry_ticket = dispatch_manager.get_next_ticket()

assert retry_ticket[0] == "T001"

dispatch_manager.ticket_completed("T001")

assert dispatch_manager.get_ticket_status("T001") == "completed"

print("All ticket dispatch and retry tests passed!")


print("=== Ticket Status Flow Tests ===")

status_manager = TicketStatusManager()

status_manager.create_ticket(
    "T100",
    "login failure after update"
)

assert status_manager.get_status("T100") == "new"

status_manager.assign_ticket("T100", "agent_1")
assert status_manager.get_status("T100") == "assigned"

status_manager.start_ticket("T100")
assert status_manager.get_status("T100") == "in_progress"

status_manager.wait_for_user("T100")
assert status_manager.get_status("T100") == "waiting_user"

status_manager.resolve_ticket("T100")
assert status_manager.get_status("T100") == "resolved"

status_manager.close_ticket("T100")
assert status_manager.get_status("T100") == "closed"

print("All ticket status flow tests passed!")


print("=== Ticket Category and Tag Tests ===")

tag_manager = TicketTagManager()

tag_manager.set_category("T200", "billing")
tag_manager.add_tag("T200", "payment")
tag_manager.add_tag("T200", "urgent")
tag_manager.add_tag("T200", "payment")

assert tag_manager.has_tag("T200", "payment") is True
assert tag_manager.has_tag("T200", "urgent") is True

assert len(tag_manager.get_tags("T200")) == 2

tag_manager.set_category("T201", "technical_support")
tag_manager.add_tag("T201", "login")

assert tag_manager.find_by_category("billing") == ["T200"]
assert tag_manager.find_by_tag("login") == ["T201"]

tag_manager.remove_tag("T200", "urgent")

assert tag_manager.has_tag("T200", "urgent") is False

print("All ticket category and tag tests passed!")


print("=== Ticket Audit Log Tests ===")

audit_logger = TicketAuditLogger()

audit_logger.log_action("T300", "user_1", "created ticket")
audit_logger.log_action("T300", "agent_1", "assigned ticket")
audit_logger.log_action("T300", "agent_1", "resolved ticket")

assert audit_logger.count_actions("T300") == 3
assert audit_logger.get_latest_action("T300") == "resolved ticket"

logs = audit_logger.get_logs("T300")

assert logs[0].action == "created ticket"
assert logs[1].action == "assigned ticket"
assert logs[2].action == "resolved ticket"

print("All ticket audit log tests passed!")


print("=== Ticket Report Tests ===")

status_manager = TicketStatusManager()

status_manager.create_ticket("T400", "reset password")
status_manager.create_ticket("T401", "payment error")
status_manager.create_ticket("T402", "database timeout")

status_manager.assign_ticket("T400", "agent_1")
status_manager.start_ticket("T400")

status_manager.assign_ticket("T401", "agent_2")
status_manager.start_ticket("T401")
status_manager.resolve_ticket("T401")

tag_manager = TicketTagManager()

tag_manager.set_category("T400", "account_security")
tag_manager.add_tag("T400", "login")

tag_manager.set_category("T401", "billing")
tag_manager.add_tag("T401", "payment")
tag_manager.add_tag("T401", "urgent")

tag_manager.set_category("T402", "technical_support")
tag_manager.add_tag("T402", "database")

report_manager = TicketReportManager()

status_report = report_manager.count_by_status(status_manager)
category_report = report_manager.count_by_category(tag_manager)
tag_report = report_manager.count_by_tag(tag_manager)

assert report_manager.get_total_tickets(status_manager) == 3
assert status_report["in_progress"] == 1
assert status_report["resolved"] == 1
assert status_report["new"] == 1

assert category_report["billing"] == 1
assert category_report["account_security"] == 1
assert category_report["technical_support"] == 1

assert tag_report["payment"] == 1
assert tag_report["urgent"] == 1
assert tag_report["database"] == 1

print("All ticket report tests passed!")

print("\n=== All extended ticket system tests passed! ===")

最后总结:看似简单的工单,背后其实是一个小型系统
这个代码系统表面上是在写几个函数和类,但真正训练的是系统化编程能力。
它覆盖了:
标题规则匹配
动态规划
滚动数组
有序表查找
组合 key 设计
access token 管理
verification PIN 管理
PIN 防重复
错误次数锁定
session 开启与关闭
链表半有序插入
迭代器返回
优先级调度
超时重试
状态流转
分类标签
审计日志
统计报表
异常处理
assert 测试
所以,一个“提交工单”的动作,背后并不只是保存一句话。
它可能会经过标题识别、安全验证、权限判断、任务排序、优先级调度、状态流转、日志记录和数据统计。
这就是为什么工单系统非常适合做代码综合题。
它可以让我们把数据结构、算法和真实业务场景结合起来,写出更接近实际开发的代码。

0.062346s