f31ee84bfdc6dc2325c8890412a965e509074d0a,streaming/python/runtime/task.py,InputStreamTask,run,#InputStreamTask#,132

Before Change


    def run(self):
        while self.running:
            item = self.reader.read(self.read_timeout_millis)
            if item is not None:
                msg_data = item.body()
                type_id = msg_data[:1]
                if (type_id == serialization._PYTHON_TYPE_ID):
                    msg = self.python_serializer.deserialize(msg_data[1:])
                else:
                    msg = self.cross_lang_serializer.deserialize(msg_data[1:])
                self.processor.process(msg)
        self.stopped = True

    def cancel_task(self):
        self.running = False

After Change



    def run(self):
        logger.info("Input task thread start.")
        try:
            while self.running:
                self.worker.initial_state_lock.acquire()
                try:
                    item = self.reader.read(self.read_timeout_millis)
                    self.is_initial_state = False
                finally:
                    self.worker.initial_state_lock.release()

                if item is None:
                    continue

                if isinstance(item, DataMessage):
                    msg_data = item.body
                    type_id = msg_data[0]
                    if type_id == serialization.PYTHON_TYPE_ID:
                        msg = self.python_serializer.deserialize(msg_data[1:])
                    else:
                        msg = self.cross_lang_serializer.deserialize(
                            msg_data[1:])
                    self.processor.process(msg)
                elif isinstance(item, CheckpointBarrier):
                    logger.info("Got barrier:{}".format(item))
                    logger.info("Start to do checkpoint {}.".format(
                        item.checkpoint_id))

                    input_points = item.get_input_checkpoints()

                    self.do_checkpoint(item.checkpoint_id, input_points)
                    logger.info("Do checkpoint {} success.".format(
                        item.checkpoint_id))
                else:
                    raise RuntimeError(
                        "Unknown item type! item={}".format(item))

        except ChannelInterruptException:
            logger.info("queue has stopped.")
        except BaseException as e:
            logger.exception(
                "Last success checkpointId={}, now occur error.".format(
                    self.last_checkpoint_id))
            self.request_rollback(str(e))

        logger.info("Source fetcher thread exit.")
        self.stopped = True

    def cancel_task(self):
Italian Trulli
In pattern: SUPERPATTERN

Frequency: 3

Non-data size: 7

Instances


Project Name: ray-project/ray
Commit Name: f31ee84bfdc6dc2325c8890412a965e509074d0a
Time: 2020-09-05
Author: wlx65005@gmail.com
File Name: streaming/python/runtime/task.py
Class Name: InputStreamTask
Method Name: run


Project Name: thenetcircle/dino
Commit Name: 3233032f5961762af0085ba0234c4cac91bc5e67
Time: 2018-08-26
Author: oscar.eriks@gmail.com
File Name: dino/hooks/message.py
Class Name: OnMessageHooks
Method Name: do_process


Project Name: ray-project/ray
Commit Name: 1b1466748f1db72835a594d73d502e9787e080a9
Time: 2020-09-04
Author: wlx65005@gmail.com
File Name: streaming/python/runtime/task.py
Class Name: InputStreamTask
Method Name: run